-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/add cubejs tasks #5280
Merged
zanieb
merged 75 commits into
PrefectHQ:master
from
AlessandroLollo:feature/add-cubejs-tasks
Jan 10, 2022
Merged
Feature/add cubejs tasks #5280
Changes from 72 commits
Commits
Show all changes
75 commits
Select commit
Hold shift + click to select a range
379270d
Merge pull request #1 from PrefectHQ/master
AlessandroLollo a621fa7
Merge branch 'PrefectHQ:master' into master
AlessandroLollo f56917a
Update click requirement from <8.0,>=7.0 to >=7.0,<9.0
dependabot[bot] 768c500
Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911
dependabot[bot] 8ec54c9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo 4e8a10c
Merge pull request #3 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo a7980c2
Merge pull request #2 from AlessandroLollo/dependabot/pip/click-gte-7…
AlessandroLollo 17b2856
Merge branch 'PrefectHQ:master' into master
AlessandroLollo d8a27b9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo f82cd1a
Added tests for Mixpanel tasks
alollo-ca 4fea3aa
Added handling of Mixpanel error response
alollo-ca 05c3d82
Small improvements
alollo-ca 8aec26d
Added change file and updated outline
alollo-ca 490a6d3
Fixed typo in change file
alollo-ca 0232294
Fixed quotes in outline.toml
alollo-ca 03a2790
Added group_events option
alollo-ca 2764172
Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931
dependabot[bot] 3b5b85f
Merge branch 'PrefectHQ:master' into master
AlessandroLollo 6e3b9e1
Merge pull request #5 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo 9c573be
Refactored Mixpanel tasks __init__.py
alollo-ca d6c7fb0
Small refactor of import statement
alollo-ca d887f19
CubeJS Query Task added along with tests
alollo-ca 2382d4c
Added outline
alollo-ca 974f92a
Added change file
alollo-ca bef5d58
Fixed typo in outline
alollo-ca 4cd192b
Moved var init after docstring
alollo-ca 955bc26
Added extra cubejs
alollo-ca ec71072
Run black
alollo-ca 081579e
Added PyJWT to test requirements
alollo-ca 962cc0b
Fixedo typo in test-requirements file
alollo-ca e7a8df6
Remove upper bound version for PyJWT
alollo-ca eb942f1
Run Black
alollo-ca 02c226c
Update src/prefect/tasks/mixpanel/__init__.py
AlessandroLollo 2592fd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo cdbcc24
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 4d1ac7b
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo cc174ac
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo f058cff
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo de8cafe
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 8f7cdd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 147d73c
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo aca74e5
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo aa507e2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 1b188eb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 98d15fb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo b4a38f2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 33d1e79
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo aaa6f43
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo a4ac00d
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 14e94ab
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo fb0fb66
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo aa560a2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 611a8aa
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 0cd78c4
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 10c9c07
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 2542610
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo ffcd771
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 0a1f088
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo bd2dba3
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 8da7d50
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo 59bfa96
Merge branch 'PrefectHQ:master' into feature/add-mixpanel-tasks
AlessandroLollo 8209521
Merge pull request #6 from AlessandroLollo/feature/add-mixpanel-tasks
AlessandroLollo 27b03e4
Merge branch 'master' into feature/add-cubejs-tasks
alollo-ca f5179ff
Applied suggestions from reviewers
alollo-ca 5cb6318
Fixed failing test
alollo-ca d8f613d
Run Black
alollo-ca cf0b263
Fixed flake8 warnings
alollo-ca da59d3d
Run Black
alollo-ca 72b8a5b
Removed Mixpanel stuff
alollo-ca 56c4ad1
Removed mixpanel entry from outline to fix docs tests
alollo-ca 100fa87
Severale adjustments based on reviewers' suggestions
alollo-ca 335a407
Test refactoring following suggestions
alollo-ca 92187f8
Run Black
alollo-ca f55aa84
Merge branch 'master' into feature/add-cubejs-tasks
zanieb 944045e
Merge branch 'master' into feature/add-cubejs-tasks
zanieb File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
feature: | ||
- "Added Cube.js Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)" | ||
|
||
contributor: | ||
- "[Alessandro Lollo](https://github.com/AlessandroLollo)" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
""" | ||
This is a collection of tasks to interact with a Cube.js or Cube Cloud environment. | ||
""" | ||
|
||
try: | ||
from prefect.tasks.cubejs.cubejs_tasks import CubeJSQueryTask | ||
except ImportError as err: | ||
raise ImportError( | ||
'prefect.tasks.cubejs` requires Prefect to be installed with the "cubejs" extra.' | ||
) from err |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
import os | ||
from requests import Session | ||
from prefect import Task | ||
from prefect.utilities.tasks import defaults_from_attrs | ||
from prefect.engine.signals import FAIL | ||
import json | ||
import jwt | ||
import time | ||
from typing import Dict, List, Union | ||
|
||
|
||
class CubeJSQueryTask(Task): | ||
|
||
""" | ||
This task calls Cueb.js load API and returns the result | ||
as a JSON object. | ||
More info about Cube.js load API at | ||
https://cube.dev/docs/rest-api#api-reference-v-1-load. | ||
|
||
Args: | ||
- subdomain (str, optional): The subdomain to use to get the data. | ||
If provided, `subdomain` takes precedence over `url`. | ||
This is likely to be useful to Cube Cloud users. | ||
- url (str, optional): The URL to use to get the data. | ||
This is likely to be useful to users of self-hosted Cube.js. | ||
- api_secret (str, optional): The API secret used to generate an | ||
API token for authentication. | ||
If provided, it takes precedence over `api_secret_env_var`. | ||
- api_secret_env_var (str, optional): The name of the env var that contains | ||
the API secret to use to generate an API token for authentication. | ||
Defaults to `CUBEJS_API_SECRET`. | ||
- query (dict, list, optional): `dict` or `list` representing | ||
valid Cube.js queries. | ||
If you pass multiple queries, then be aware of Cube.js Data Blending. | ||
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load | ||
and at https://cube.dev/docs/schema/advanced/data-blending. | ||
Query format can be found at: https://cube.dev/docs/query-format. | ||
- security_context (str, dict, optional): The security context to use | ||
during authentication. | ||
If the security context does not contain an expiration period, | ||
then a 7-day expiration period is added automatically. | ||
More info at: https://cube.dev/docs/security/context. | ||
- wait_time_between_api_calls (int, optional): The number of seconds to | ||
wait between API calls. | ||
Default to 10. | ||
- max_wait_time (int, optional): The number of seconds to wait for the | ||
Cube.js load API to return a response. | ||
- **kwargs (optional): Additional keyword arguments to pass to the | ||
standard Task initalization. | ||
|
||
""" | ||
|
||
__CUBEJS_CLOUD_BASE_URL = "https://{subdomain}.cubecloud.dev" | ||
|
||
def __init__( | ||
self, | ||
subdomain: str = None, | ||
url: str = None, | ||
api_secret: str = None, | ||
api_secret_env_var: str = "CUBEJS_API_SECRET", | ||
query: Union[Dict, List[Dict]] = None, | ||
security_context: Union[str, Dict] = None, | ||
wait_time_between_api_calls: int = 10, | ||
max_wait_time: int = None, | ||
**kwargs, | ||
): | ||
self.subdomain = subdomain | ||
self.url = url | ||
self.api_secret = api_secret | ||
self.api_secret_env_var = api_secret_env_var | ||
self.query = query | ||
self.security_context = security_context | ||
self.wait_time_between_api_calls = wait_time_between_api_calls | ||
self.max_wait_time = max_wait_time | ||
super().__init__(**kwargs) | ||
|
||
@defaults_from_attrs( | ||
"subdomain", | ||
"url", | ||
"api_secret", | ||
"api_secret_env_var", | ||
"query", | ||
"security_context", | ||
"wait_time_between_api_calls", | ||
"max_wait_time", | ||
) | ||
def run( | ||
self, | ||
subdomain: str = None, | ||
url: str = None, | ||
api_secret: str = None, | ||
api_secret_env_var: str = "CUBEJS_API_SECRET", | ||
query: Union[Dict, List[Dict]] = None, | ||
security_context: Union[str, Dict] = None, | ||
wait_time_between_api_calls: int = 10, | ||
max_wait_time: int = None, | ||
): | ||
""" | ||
Task run method to perform a query using Cube.js load API. | ||
|
||
Args: | ||
- subdomain (str, optional): The subdomain to use to get the data. | ||
If provided, `subdomain` takes precedence over `url`. | ||
This is likely to be useful to Cube Cloud users. | ||
- url (str, optional): The URL to use to get the data. | ||
This is likely to be useful to users of self-hosted Cube.js. | ||
- api_secret (str, optional): The API secret used to generate an | ||
API token for authentication. | ||
If provided, it takes precedence over `api_secret_env_var`. | ||
- api_secret_env_var (str, optional): The name of the env var that contains | ||
the API secret to use to generate an API token for authentication. | ||
Defaults to `CUBEJS_API_SECRET`. | ||
- query (dict, list, optional): `dict` or `list` representing | ||
valid Cube.js queries. | ||
If you pass multiple queries, then be aware of Cube.js Data Blending. | ||
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load | ||
and at https://cube.dev/docs/schema/advanced/data-blending. | ||
Query format can be found at: https://cube.dev/docs/query-format. | ||
- security_context (str, dict, optional): The security context to use | ||
during authentication. | ||
If the security context does not contain an expiration period, | ||
then a 7-day expiration period is added automatically. | ||
More info at https://cube.dev/docs/security/context. | ||
- wait_time_between_api_calls (int, optional): The number of seconds to | ||
wait between API calls. | ||
Default to 10. | ||
- max_wait_time (int, optional): The number of seconds to wait for the | ||
Cube.js load API to return a response. | ||
|
||
Raises: | ||
- ValueError if both `subdomain` and `url` are missing. | ||
- ValueError if `api_token` is missing and `api_token_env_var` cannot be found. | ||
- ValueError if `query` is missing. | ||
- `prefect.engine.signals.FAIL` if the Cube.js load API fails. | ||
- `prefect.engine.signals.FAIL` if the Cube.js load API takes more than | ||
`max_wait_time` seconds to respond. | ||
|
||
Returns: | ||
- The Cube.js JSON response. | ||
|
||
""" | ||
|
||
if not subdomain and not url: | ||
raise ValueError("Missing both `subdomain` and `url`.") | ||
|
||
if not api_secret and api_secret_env_var not in os.environ: | ||
raise ValueError("Missing `api_secret` and `api_secret_env_var` not found.") | ||
|
||
if not query: | ||
raise ValueError("Missing `query`.") | ||
|
||
cube_base_url = self.__CUBEJS_CLOUD_BASE_URL | ||
if subdomain: | ||
cube_base_url = f"{cube_base_url.format(subdomain=subdomain)}/cubejs-api" | ||
else: | ||
cube_base_url = url | ||
query_api_url = f"{cube_base_url}/v1/load" | ||
|
||
self.logger.debug(f"Query URL: {query_api_url}") | ||
|
||
secret = api_secret if api_secret else os.environ[api_secret_env_var] | ||
|
||
if security_context: | ||
|
||
extended_context = security_context | ||
if "exp" not in security_context and "expiresIn" not in security_context: | ||
extended_context["expiresIn"] = "7d" | ||
api_token = jwt.encode( | ||
payload=extended_context, key=secret, algorithm="HS256" | ||
) | ||
|
||
self.logger.debug("JWT token generated with security context.") | ||
|
||
else: | ||
api_token = jwt.encode(payload={}, key=secret) | ||
|
||
session = Session() | ||
session.headers = { | ||
"Content-type": "application/json", | ||
"Authorization": api_token, | ||
} | ||
|
||
params = {"query": json.dumps(query)} | ||
|
||
wait_api_call_secs = ( | ||
wait_time_between_api_calls if wait_time_between_api_calls > 0 else 10 | ||
) | ||
elapsed_wait_time = 0 | ||
while not max_wait_time or elapsed_wait_time <= max_wait_time: | ||
|
||
with session.get(url=query_api_url, params=params) as response: | ||
self.logger.debug(f"URL is: {response.url}") | ||
|
||
if response.status_code == 200: | ||
data = response.json() | ||
|
||
if "error" in data.keys() and "Continue wait" in data["error"]: | ||
msg = ( | ||
"Cube.js load API still running." | ||
"Waiting {wait_api_call_secs} seconds before retrying" | ||
) | ||
self.logger.info(msg) | ||
time.sleep(wait_api_call_secs) | ||
elapsed_wait_time += wait_api_call_secs | ||
continue | ||
|
||
else: | ||
return data | ||
|
||
else: | ||
raise FAIL( | ||
message=f"Cube.js load API failed! Error is: {response.reason}" | ||
) | ||
|
||
raise FAIL( | ||
message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response." | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import logging | ||
import jwt | ||
import pytest | ||
from prefect.engine.signals import FAIL | ||
from prefect.tasks.cubejs import CubeJSQueryTask | ||
|
||
import responses | ||
|
||
|
||
class TestCubeJSQueryTask: | ||
def test_construction_no_values(self): | ||
cubejs_task = CubeJSQueryTask() | ||
|
||
assert cubejs_task.subdomain is None | ||
assert cubejs_task.url is None | ||
assert cubejs_task.api_secret is None | ||
assert cubejs_task.api_secret_env_var == "CUBEJS_API_SECRET" | ||
assert cubejs_task.query is None | ||
assert cubejs_task.security_context is None | ||
|
||
def test_construction_with_values(self): | ||
cubejs_task = CubeJSQueryTask( | ||
subdomain="foo", | ||
url="http://bar", | ||
api_secret="secret", | ||
api_secret_env_var="secret_env_var", | ||
query="query", | ||
security_context={"foo": "bar"}, | ||
) | ||
|
||
assert cubejs_task.subdomain == "foo" | ||
assert cubejs_task.url == "http://bar" | ||
assert cubejs_task.api_secret == "secret" | ||
assert cubejs_task.api_secret_env_var == "secret_env_var" | ||
assert cubejs_task.query == "query" | ||
assert cubejs_task.security_context == {"foo": "bar"} | ||
|
||
def test_run_with_no_values_raises(self): | ||
cubejs_task = CubeJSQueryTask() | ||
msg_match = "Missing both `subdomain` and `url`." | ||
with pytest.raises(ValueError, match=msg_match): | ||
cubejs_task.run() | ||
|
||
def test_run_without_api_secret_api_secret_env_var(self): | ||
cubejs_task = CubeJSQueryTask() | ||
msg_match = "Missing `api_secret` and `api_secret_env_var` not found." | ||
with pytest.raises(ValueError, match=msg_match): | ||
cubejs_task.run(subdomain="foo") | ||
|
||
def test_run_without_query_raises(self): | ||
cubejs_task = CubeJSQueryTask() | ||
msg_match = "Missing `query`." | ||
with pytest.raises(ValueError, match=msg_match): | ||
cubejs_task.run(subdomain="foo", api_secret="bar") | ||
|
||
@responses.activate | ||
def test_run_with_failing_api_raises(self): | ||
cubejs_task = CubeJSQueryTask() | ||
msg_match = "Cube.js load API failed!" | ||
responses.add( | ||
responses.GET, "https://test.cubecloud.dev/cubejs-api/v1/load", status=123 | ||
) | ||
|
||
with pytest.raises(FAIL, match=msg_match): | ||
cubejs_task.run(subdomain="test", api_secret="foo", query="query") | ||
|
||
@responses.activate | ||
def test_run_with_continue_waiting(self, caplog): | ||
caplog.set_level(logging.DEBUG) | ||
cubejs_task = CubeJSQueryTask() | ||
|
||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"error": "Continue wait"}, | ||
) | ||
|
||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"data": "result"}, | ||
) | ||
|
||
data = cubejs_task.run(subdomain="test", api_secret="foo", query="query") | ||
|
||
assert "Cube.js load API still running." in caplog.text | ||
assert isinstance(data, dict) | ||
|
||
@responses.activate | ||
def test_run_with_security_context(self, caplog): | ||
caplog.set_level(logging.DEBUG) | ||
cubejs_task = CubeJSQueryTask() | ||
|
||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"data": "result"}, | ||
) | ||
|
||
cubejs_task.run( | ||
subdomain="test", | ||
api_secret="foo", | ||
query="query", | ||
security_context={"foo": "bar"}, | ||
) | ||
|
||
expected_jwt = jwt.encode( | ||
payload={"foo": "bar", "expiresIn": "7d"}, key="foo", algorithm="HS256" | ||
) | ||
|
||
# assert "JWT token generated with security context." in caplog.text | ||
assert responses.calls[0].request.headers["Authorization"] == expected_jwt | ||
|
||
@responses.activate | ||
def test_run_with_max_wait_time_raises(self): | ||
cubejs_task = CubeJSQueryTask() | ||
msg_match = "Cube.js load API took longer than 3 seconds to provide a response." | ||
responses.add( | ||
responses.GET, | ||
"https://test.cubecloud.dev/cubejs-api/v1/load", | ||
status=200, | ||
json={"error": "Continue wait"}, | ||
) | ||
|
||
with pytest.raises(FAIL, match=msg_match): | ||
cubejs_task.run( | ||
subdomain="test", | ||
api_secret="foo", | ||
query="query", | ||
security_context={"foo": "bar"}, | ||
wait_time_between_api_calls=1, | ||
max_wait_time=3 | ||
) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could add a parameter to tune the number of subsequent retries and raise in case of counted failures reach the ceil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a
max_wait_time
parameter, similarly to what has been implemented inDbtCloudRunJob
task.