New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/add cubejs tasks #5280
Feature/add cubejs tasks #5280
Conversation
Sync to Prefect main repo
Updates the requirements on [click](https://github.com/pallets/click) to permit the latest version. - [Release notes](https://github.com/pallets/click/releases) - [Changelog](https://github.com/pallets/click/blob/main/CHANGES.rst) - [Commits](pallets/click@7.0...8.0.3) --- updated-dependencies: - dependency-name: click dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com>
Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version. - [Release notes](https://github.com/python/mypy/releases) - [Commits](python/mypy@v0.600...v0.910) --- updated-dependencies: - dependency-name: mypy dependency-type: direct:development ... Signed-off-by: dependabot[bot] <support@github.com>
…600-and-lt-0.911 Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911
….0-and-lt-9.0 Update click requirement from <8.0,>=7.0 to >=7.0,<9.0
Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version. - [Release notes](https://github.com/python/mypy/releases) - [Commits](python/mypy@v0.600...v0.930) --- updated-dependencies: - dependency-name: mypy dependency-type: direct:development ... Signed-off-by: dependabot[bot] <support@github.com>
…600-and-lt-0.931 Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931
) | ||
time.sleep(secs) | ||
secs *= 2 if secs < 60 else 2 | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could add a parameter to tune the number of subsequent retries and raise in case of counted failures reach the ceil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a max_wait_time
parameter, similarly to what has been implemented in DbtCloudRunJob
task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Some minor edits for consistency and punctuation.
changes/pr5280.yaml
Outdated
@@ -0,0 +1,5 @@ | |||
feature: | |||
- "Added CubeJS Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- "Added CubeJS Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)" | |
- "Added Cube.js Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)" |
docs/outline.toml
Outdated
@@ -765,3 +765,8 @@ title = "Airbyte Tasks" | |||
module = "prefect.tasks.airbyte" | |||
classes = ["AirbyteConnectionTask"] | |||
verified_task = true | |||
|
|||
[pages.tasks.cubejs] | |||
title = "CubeJS Tasks" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
title = "CubeJS Tasks" | |
title = "Cube.js Tasks" |
src/prefect/tasks/cubejs/__init__.py
Outdated
@@ -0,0 +1,10 @@ | |||
""" | |||
This is a collection of tasks to interact with a CubeJS/Cube Cloud environment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a collection of tasks to interact with a CubeJS/Cube Cloud environment. | |
This is a collection of tasks to interact with a Cube.js or Cube Cloud environment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with their usage, in user facing docstrings let's use "Cube.js" to reference the open-source platform, and "Cube Cloud" to reference the cloud hosted offering. In code, "cubejs" as you've been using it is fine.
class CubeJSQueryTask(Task): | ||
|
||
""" | ||
This task calls CuebJS Quuery API and returns the result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This task calls CuebJS Quuery API and returns the result | |
This task calls the Cube.js load API and returns the result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure the terminology is aligned with Cube docs.
""" | ||
This task calls CuebJS Quuery API and returns the result | ||
as a JSON object. | ||
More info about CubeJS Query API at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More info about CubeJS Query API at | |
More info about Cube.js load API at |
If you pass multiple queries, then be aware of CubeJS Data Blending. | ||
More info at: https://cube.dev/docs/rest-api#api-reference-v-1-load | ||
and at: https://cube.dev/docs/schema/advanced/data-blending | ||
Query format can be found at: https://cube.dev/docs/query-format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query format can be found at: https://cube.dev/docs/query-format | |
Query format can be found at: https://cube.dev/docs/query-format. |
- security_context (str, dict, optional): The security context to use | ||
during authentication. | ||
If the security context does not contain an expiration period, | ||
then a 7 days expiration period is added automatically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then a 7 days expiration period is added automatically. | |
then a 7-day expiration period is added automatically. |
during authentication. | ||
If the security context does not contain an expiration period, | ||
then a 7 days expiration period is added automatically. | ||
More info at: https://cube.dev/docs/security/context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More info at: https://cube.dev/docs/security/context | |
More info at: https://cube.dev/docs/security/context. |
- ValueError if both `subdomain` and `url` are missing. | ||
- ValueError if `api_token` is missing and `api_token_env_var` cannot be found. | ||
- ValueError if `query` is missing. | ||
- `prefect.engine.signals.FAIL` if the CubeJS Query API fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- `prefect.engine.signals.FAIL` if the CubeJS Query API fails. | |
- `prefect.engine.signals.FAIL` if the Cube.js load API fails. |
- `prefect.engine.signals.FAIL` if the CubeJS Query API fails. | ||
|
||
Returns: | ||
- The CubeJS JSON response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The CubeJS JSON response | |
- The Cube.js JSON response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my docs perspective. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! I had couple of small suggestions. The most important one is to add the ability to configure the amount of wait time between calls to the Cube API.
|
||
params = {"query": json.dumps(query)} | ||
|
||
wait_time_between_api_calls = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this configurable via a task parameter and default it to 10?
"Cube.js load API still running." | ||
"Waiting {wait_time_between_api_calls} seconds before retrying" | ||
) | ||
self.logger.warning(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.logger.warning(msg) | |
self.logger.info(msg) |
message=f"Cube.js load API failed!. Error is: {response.reason}" | ||
) | ||
|
||
raise FAIL(message="Cube.js load API took too long to provide a response.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise FAIL(message="Cube.js load API took too long to provide a response.") | |
raise FAIL(message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response.") |
|
||
else: | ||
raise FAIL( | ||
message=f"Cube.js load API failed!. Error is: {response.reason}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message=f"Cube.js load API failed!. Error is: {response.reason}" | |
message=f"Cube.js load API failed! Error is: {response.reason}" |
with pytest.raises(FAIL) as exc: | ||
cubejs_task.run(subdomain="test", api_secret="foo", query="query") | ||
|
||
assert "Cube.js load API failed!." in str(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert "Cube.js load API failed!." in str(exc) | |
assert "Cube.js load API failed!" in str(exc) |
def test_run_with_max_wait_time_raises(self): | ||
cubejs_task = CubeJSQueryTask() | ||
|
||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"error": "Continue wait"}, | ||
) | ||
|
||
with pytest.raises(FAIL) as exc: | ||
cubejs_task.run( | ||
subdomain="test", | ||
api_secret="foo", | ||
query="query", | ||
security_context={"foo": "bar"}, | ||
max_wait_time=15, | ||
) | ||
|
||
assert "Cube.js load API took too long to provide a response." in str(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a configurable time between calls would be useful for this test since it could spend less time waiting.
dev-requirements.txt
Outdated
@@ -1,5 +1,5 @@ | |||
black | |||
graphviz >= 0.8 | |||
jinja2 >= 2.0, < 4.0 | |||
mypy >= 0.600, < 0.813 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're pinning below this version because we don't have separate typeshed installations. Is there a reason you bumped this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why I did this.
Probably not intentional.
Anyway, I should have reverted back to mypy >= 0.600, 0.813
@responses.activate | ||
def test_run_with_security_context(self, caplog): | ||
caplog.set_level(logging.DEBUG) | ||
cubejs_task = CubeJSQueryTask() | ||
|
||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"data": "result"}, | ||
) | ||
|
||
cubejs_task.run( | ||
subdomain="test", | ||
api_secret="foo", | ||
query="query", | ||
security_context={"foo": "bar"}, | ||
) | ||
|
||
assert "JWT token generated with security context." in caplog.text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this test I would recommend inspecting the JWT that was included in the request made by the task so that inspecting the logs isn't necessary. You should be able to access the request headers via responses.calls[0].request.headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look good! I added a couple of comments to improve the readability of a couple of tests, but the changes are very minor.
with pytest.raises(FAIL) as exc: | ||
cubejs_task.run(subdomain="test", api_secret="foo", query="query") | ||
|
||
assert "Cube.js load API failed!." in str(exc) | ||
assert "Cube.js load API failed!" in str(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the match
parameter of pytest.rasises
for brevity:
with pytest.raises(FAIL, match="Cube.js load API failed!") as exc:
cubejs_task.run(subdomain="test", api_secret="foo", query="query")
assert ( | ||
"Cube.js load API took longer than 3 seconds to provide a response." | ||
in str(exc) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the match
parameter of pytest.rasises
for to eliminate these lines:
with pytest.raises(FAIL, match="Cube.js load API took longer than 3 seconds to provide a response.") as exc:
* Update click requirement from <8.0,>=7.0 to >=7.0,<9.0 Updates the requirements on [click](https://github.com/pallets/click) to permit the latest version. - [Release notes](https://github.com/pallets/click/releases) - [Changelog](https://github.com/pallets/click/blob/main/CHANGES.rst) - [Commits](pallets/click@7.0...8.0.3) --- updated-dependencies: - dependency-name: click dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> * Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911 Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version. - [Release notes](https://github.com/python/mypy/releases) - [Commits](python/mypy@v0.600...v0.910) --- updated-dependencies: - dependency-name: mypy dependency-type: direct:development ... Signed-off-by: dependabot[bot] <support@github.com> * Added tests for Mixpanel tasks * Added handling of Mixpanel error response * Small improvements * Added change file and updated outline * Fixed typo in change file * Fixed quotes in outline.toml * Added group_events option * Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931 Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version. - [Release notes](https://github.com/python/mypy/releases) - [Commits](python/mypy@v0.600...v0.930) --- updated-dependencies: - dependency-name: mypy dependency-type: direct:development ... Signed-off-by: dependabot[bot] <support@github.com> * Refactored Mixpanel tasks __init__.py * Small refactor of import statement * CubeJS Query Task added along with tests * Added outline * Added change file * Fixed typo in outline * Moved var init after docstring * Added extra cubejs * Run black * Added PyJWT to test requirements * Fixedo typo in test-requirements file * Remove upper bound version for PyJWT * Run Black * Update src/prefect/tasks/mixpanel/__init__.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Update src/prefect/tasks/mixpanel/mixpanel_tasks.py Co-authored-by: Terrence Dorsey <terrend@mishu.com> * Applied suggestions from reviewers * Fixed failing test * Run Black * Fixed flake8 warnings * Run Black * Removed Mixpanel stuff * Removed mixpanel entry from outline to fix docs tests * Severale adjustments based on reviewers' suggestions * Test refactoring following suggestions * Run Black Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Alessandro Lollo <alessandro.lollo@cloudacademy.com> Co-authored-by: Terrence Dorsey <terrend@mishu.com> Co-authored-by: Michael Adkins <madkinszane@gmail.com>
Summary
This PR adds the ability to run queries on CubeJS.
Importance
Users can now query CubeJS using a native Prefect task 😊
This PR:
changes/
directory (if appropriate)docs/outline.toml
for API reference docs (if appropriate)