Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add cubejs tasks #5280

Merged
merged 75 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
379270d
Merge pull request #1 from PrefectHQ/master
AlessandroLollo Mar 27, 2021
a621fa7
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Oct 25, 2021
f56917a
Update click requirement from <8.0,>=7.0 to >=7.0,<9.0
dependabot[bot] Oct 25, 2021
768c500
Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911
dependabot[bot] Oct 25, 2021
8ec54c9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Nov 18, 2021
4e8a10c
Merge pull request #3 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo Nov 18, 2021
a7980c2
Merge pull request #2 from AlessandroLollo/dependabot/pip/click-gte-7…
AlessandroLollo Nov 18, 2021
17b2856
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 19, 2021
d8a27b9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 21, 2021
f82cd1a
Added tests for Mixpanel tasks
alollo-ca Dec 22, 2021
4fea3aa
Added handling of Mixpanel error response
alollo-ca Dec 23, 2021
05c3d82
Small improvements
alollo-ca Dec 24, 2021
8aec26d
Added change file and updated outline
alollo-ca Dec 24, 2021
490a6d3
Fixed typo in change file
alollo-ca Dec 24, 2021
0232294
Fixed quotes in outline.toml
alollo-ca Dec 24, 2021
03a2790
Added group_events option
alollo-ca Dec 25, 2021
2764172
Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931
dependabot[bot] Dec 25, 2021
3b5b85f
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 25, 2021
6e3b9e1
Merge pull request #5 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo Dec 25, 2021
9c573be
Refactored Mixpanel tasks __init__.py
alollo-ca Dec 25, 2021
d6c7fb0
Small refactor of import statement
alollo-ca Dec 25, 2021
d887f19
CubeJS Query Task added along with tests
alollo-ca Dec 27, 2021
2382d4c
Added outline
alollo-ca Dec 27, 2021
974f92a
Added change file
alollo-ca Dec 27, 2021
bef5d58
Fixed typo in outline
alollo-ca Dec 27, 2021
4cd192b
Moved var init after docstring
alollo-ca Dec 27, 2021
955bc26
Added extra cubejs
alollo-ca Dec 27, 2021
ec71072
Run black
alollo-ca Dec 27, 2021
081579e
Added PyJWT to test requirements
alollo-ca Dec 27, 2021
962cc0b
Fixedo typo in test-requirements file
alollo-ca Dec 27, 2021
e7a8df6
Remove upper bound version for PyJWT
alollo-ca Dec 27, 2021
eb942f1
Run Black
alollo-ca Dec 27, 2021
02c226c
Update src/prefect/tasks/mixpanel/__init__.py
AlessandroLollo Dec 27, 2021
2592fd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
cdbcc24
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
4d1ac7b
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
cc174ac
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
f058cff
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
de8cafe
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
8f7cdd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
147d73c
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aca74e5
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aa507e2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
1b188eb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
98d15fb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
b4a38f2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
33d1e79
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aaa6f43
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
a4ac00d
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
14e94ab
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
fb0fb66
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aa560a2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
611a8aa
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
0cd78c4
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
10c9c07
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
2542610
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
ffcd771
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
0a1f088
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
bd2dba3
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
8da7d50
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
59bfa96
Merge branch 'PrefectHQ:master' into feature/add-mixpanel-tasks
AlessandroLollo Dec 27, 2021
8209521
Merge pull request #6 from AlessandroLollo/feature/add-mixpanel-tasks
AlessandroLollo Dec 27, 2021
27b03e4
Merge branch 'master' into feature/add-cubejs-tasks
alollo-ca Dec 29, 2021
f5179ff
Applied suggestions from reviewers
alollo-ca Dec 29, 2021
5cb6318
Fixed failing test
alollo-ca Dec 29, 2021
d8f613d
Run Black
alollo-ca Dec 29, 2021
cf0b263
Fixed flake8 warnings
alollo-ca Dec 29, 2021
da59d3d
Run Black
alollo-ca Dec 29, 2021
72b8a5b
Removed Mixpanel stuff
alollo-ca Dec 29, 2021
56c4ad1
Removed mixpanel entry from outline to fix docs tests
alollo-ca Dec 29, 2021
100fa87
Severale adjustments based on reviewers' suggestions
alollo-ca Dec 29, 2021
335a407
Test refactoring following suggestions
alollo-ca Jan 3, 2022
92187f8
Run Black
alollo-ca Jan 3, 2022
f55aa84
Merge branch 'master' into feature/add-cubejs-tasks
zanieb Jan 10, 2022
944045e
Merge branch 'master' into feature/add-cubejs-tasks
zanieb Jan 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
black
graphviz >= 0.8
jinja2 >= 2.0, < 4.0
mypy >= 0.600, < 0.931
mypy >= 0.600, < 0.813
Pygments >= 2.2, < 3.0
30 changes: 22 additions & 8 deletions src/prefect/tasks/cubejs/cubejs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class CubeJSQueryTask(Task):
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at: https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
- **kwargs (optional): Additional keyword arguments to pass to the
Expand All @@ -57,6 +60,7 @@ def __init__(
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
**kwargs,
):
Expand All @@ -66,6 +70,7 @@ def __init__(
self.api_secret_env_var = api_secret_env_var
self.query = query
self.security_context = security_context
self.wait_time_between_api_calls = wait_time_between_api_calls
self.max_wait_time = max_wait_time
super().__init__(**kwargs)

Expand All @@ -76,6 +81,7 @@ def __init__(
"api_secret_env_var",
"query",
"security_context",
"wait_time_between_api_calls",
"max_wait_time",
)
def run(
Expand All @@ -86,6 +92,7 @@ def run(
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
):
"""
Expand Down Expand Up @@ -114,6 +121,9 @@ def run(
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.

Expand Down Expand Up @@ -154,7 +164,7 @@ def run(

extended_context = security_context
if "exp" not in security_context and "expiresIn" not in security_context:
extended_context["epxiresIn"] = "7d"
extended_context["expiresIn"] = "7d"
api_token = jwt.encode(
payload=extended_context, key=secret, algorithm="HS256"
)
Expand All @@ -172,7 +182,9 @@ def run(

params = {"query": json.dumps(query)}

wait_time_between_api_calls = 10
wait_api_call_secs = (
wait_time_between_api_calls if wait_time_between_api_calls > 0 else 10
)
elapsed_wait_time = 0
while not max_wait_time or elapsed_wait_time <= max_wait_time:

Expand All @@ -185,19 +197,21 @@ def run(
if "error" in data.keys() and "Continue wait" in data["error"]:
msg = (
"Cube.js load API still running."
"Waiting {wait_time_between_api_calls} seconds before retrying"
"Waiting {wait_api_call_secs} seconds before retrying"
)
self.logger.warning(msg)
time.sleep(wait_time_between_api_calls)
elapsed_wait_time += wait_time_between_api_calls
self.logger.info(msg)
time.sleep(wait_api_call_secs)
elapsed_wait_time += wait_api_call_secs
continue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could add a parameter to tune the number of subsequent retries and raise in case of counted failures reach the ceil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a max_wait_time parameter, similarly to what has been implemented in DbtCloudRunJob task.


else:
return data

else:
raise FAIL(
message=f"Cube.js load API failed!. Error is: {response.reason}"
message=f"Cube.js load API failed! Error is: {response.reason}"
)

raise FAIL(message="Cube.js load API took too long to provide a response.")
raise FAIL(
message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response."
)
18 changes: 14 additions & 4 deletions tests/tasks/cubejs/test_cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import jwt
import pytest
from prefect.engine.signals import FAIL
from prefect.tasks.cubejs import CubeJSQueryTask
Expand Down Expand Up @@ -65,7 +66,7 @@ def test_run_with_failing_api_raises(self):
with pytest.raises(FAIL) as exc:
cubejs_task.run(subdomain="test", api_secret="foo", query="query")

assert "Cube.js load API failed!." in str(exc)
assert "Cube.js load API failed!" in str(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the match parameter of pytest.rasises for brevity:

with pytest.raises(FAIL, match="Cube.js load API failed!") as exc:
            cubejs_task.run(subdomain="test", api_secret="foo", query="query")


@responses.activate
def test_run_with_continue_waiting(self, caplog):
Expand Down Expand Up @@ -110,7 +111,12 @@ def test_run_with_security_context(self, caplog):
security_context={"foo": "bar"},
)

assert "JWT token generated with security context." in caplog.text
expected_jwt = jwt.encode(
payload={"foo": "bar", "expiresIn": "7d"}, key="foo", algorithm="HS256"
)

# assert "JWT token generated with security context." in caplog.text
assert responses.calls[0].request.headers["Authorization"] == expected_jwt

@responses.activate
def test_run_with_max_wait_time_raises(self):
Expand All @@ -129,7 +135,11 @@ def test_run_with_max_wait_time_raises(self):
api_secret="foo",
query="query",
security_context={"foo": "bar"},
max_wait_time=15,
wait_time_between_api_calls=1,
max_wait_time=3,
)

assert "Cube.js load API took too long to provide a response." in str(exc)
assert (
"Cube.js load API took longer than 3 seconds to provide a response."
in str(exc)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the match parameter of pytest.rasises for to eliminate these lines:

with pytest.raises(FAIL, match="Cube.js load API took longer than 3 seconds to provide a response.") as exc: