Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add cubejs tasks #5280

Merged
merged 75 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
379270d
Merge pull request #1 from PrefectHQ/master
AlessandroLollo Mar 27, 2021
a621fa7
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Oct 25, 2021
f56917a
Update click requirement from <8.0,>=7.0 to >=7.0,<9.0
dependabot[bot] Oct 25, 2021
768c500
Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911
dependabot[bot] Oct 25, 2021
8ec54c9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Nov 18, 2021
4e8a10c
Merge pull request #3 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo Nov 18, 2021
a7980c2
Merge pull request #2 from AlessandroLollo/dependabot/pip/click-gte-7…
AlessandroLollo Nov 18, 2021
17b2856
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 19, 2021
d8a27b9
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 21, 2021
f82cd1a
Added tests for Mixpanel tasks
alollo-ca Dec 22, 2021
4fea3aa
Added handling of Mixpanel error response
alollo-ca Dec 23, 2021
05c3d82
Small improvements
alollo-ca Dec 24, 2021
8aec26d
Added change file and updated outline
alollo-ca Dec 24, 2021
490a6d3
Fixed typo in change file
alollo-ca Dec 24, 2021
0232294
Fixed quotes in outline.toml
alollo-ca Dec 24, 2021
03a2790
Added group_events option
alollo-ca Dec 25, 2021
2764172
Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931
dependabot[bot] Dec 25, 2021
3b5b85f
Merge branch 'PrefectHQ:master' into master
AlessandroLollo Dec 25, 2021
6e3b9e1
Merge pull request #5 from AlessandroLollo/dependabot/pip/mypy-gte-0.…
AlessandroLollo Dec 25, 2021
9c573be
Refactored Mixpanel tasks __init__.py
alollo-ca Dec 25, 2021
d6c7fb0
Small refactor of import statement
alollo-ca Dec 25, 2021
d887f19
CubeJS Query Task added along with tests
alollo-ca Dec 27, 2021
2382d4c
Added outline
alollo-ca Dec 27, 2021
974f92a
Added change file
alollo-ca Dec 27, 2021
bef5d58
Fixed typo in outline
alollo-ca Dec 27, 2021
4cd192b
Moved var init after docstring
alollo-ca Dec 27, 2021
955bc26
Added extra cubejs
alollo-ca Dec 27, 2021
ec71072
Run black
alollo-ca Dec 27, 2021
081579e
Added PyJWT to test requirements
alollo-ca Dec 27, 2021
962cc0b
Fixedo typo in test-requirements file
alollo-ca Dec 27, 2021
e7a8df6
Remove upper bound version for PyJWT
alollo-ca Dec 27, 2021
eb942f1
Run Black
alollo-ca Dec 27, 2021
02c226c
Update src/prefect/tasks/mixpanel/__init__.py
AlessandroLollo Dec 27, 2021
2592fd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
cdbcc24
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
4d1ac7b
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
cc174ac
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
f058cff
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
de8cafe
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
8f7cdd6
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
147d73c
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aca74e5
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aa507e2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
1b188eb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
98d15fb
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
b4a38f2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
33d1e79
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aaa6f43
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
a4ac00d
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
14e94ab
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
fb0fb66
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
aa560a2
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
611a8aa
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
0cd78c4
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
10c9c07
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
2542610
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
ffcd771
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
0a1f088
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
bd2dba3
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
8da7d50
Update src/prefect/tasks/mixpanel/mixpanel_tasks.py
AlessandroLollo Dec 27, 2021
59bfa96
Merge branch 'PrefectHQ:master' into feature/add-mixpanel-tasks
AlessandroLollo Dec 27, 2021
8209521
Merge pull request #6 from AlessandroLollo/feature/add-mixpanel-tasks
AlessandroLollo Dec 27, 2021
27b03e4
Merge branch 'master' into feature/add-cubejs-tasks
alollo-ca Dec 29, 2021
f5179ff
Applied suggestions from reviewers
alollo-ca Dec 29, 2021
5cb6318
Fixed failing test
alollo-ca Dec 29, 2021
d8f613d
Run Black
alollo-ca Dec 29, 2021
cf0b263
Fixed flake8 warnings
alollo-ca Dec 29, 2021
da59d3d
Run Black
alollo-ca Dec 29, 2021
72b8a5b
Removed Mixpanel stuff
alollo-ca Dec 29, 2021
56c4ad1
Removed mixpanel entry from outline to fix docs tests
alollo-ca Dec 29, 2021
100fa87
Severale adjustments based on reviewers' suggestions
alollo-ca Dec 29, 2021
335a407
Test refactoring following suggestions
alollo-ca Jan 3, 2022
92187f8
Run Black
alollo-ca Jan 3, 2022
f55aa84
Merge branch 'master' into feature/add-cubejs-tasks
zanieb Jan 10, 2022
944045e
Merge branch 'master' into feature/add-cubejs-tasks
zanieb Jan 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions changes/pr5280.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
feature:
- "Added Cube.js Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)"

contributor:
- "[Alessandro Lollo](https://github.com/AlessandroLollo)"
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
black
graphviz >= 0.8
jinja2 >= 2.0, < 4.0
mypy >= 0.600, < 0.813
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're pinning below this version because we don't have separate typeshed installations. Is there a reason you bumped this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I did this.
Probably not intentional.
Anyway, I should have reverted back to mypy >= 0.600, 0.813

mypy >= 0.600, < 0.931
Pygments >= 2.2, < 3.0
5 changes: 5 additions & 0 deletions docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,8 @@ title = "Airbyte Tasks"
module = "prefect.tasks.airbyte"
classes = ["AirbyteConnectionTask"]
verified_task = true

[pages.tasks.cubejs]
title = "Cube.js Tasks"
module = "prefect.tasks.cubejs"
classes = ["CubeJSQueryTask"]
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def run(self):
"sodasql": ["soda-sql >= 2.0.0b25"],
"sodaspark": ["soda-spark >= 0.2.1"],
"sendgrid": ["sendgrid >= 6.7.0"],
"cubejs": ["PyJWT >= 2.3.0, < 3.0"],
}


Expand Down
10 changes: 10 additions & 0 deletions src/prefect/tasks/cubejs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
This is a collection of tasks to interact with a Cube.js or Cube Cloud environment.
"""

try:
from prefect.tasks.cubejs.cubejs_tasks import CubeJSQueryTask
except ImportError as err:
raise ImportError(
'prefect.tasks.cubejs` requires Prefect to be installed with the "cubejs" extra.'
) from err
203 changes: 203 additions & 0 deletions src/prefect/tasks/cubejs/cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import os
from requests import Session
from prefect import Task
from prefect.utilities.tasks import defaults_from_attrs
from prefect.engine.signals import FAIL
import json
import jwt
import time
from typing import Dict, List, Union


class CubeJSQueryTask(Task):

"""
This task calls Cueb.js load API and returns the result
as a JSON object.
More info about Cube.js load API at
https://cube.dev/docs/rest-api#api-reference-v-1-load.

Args:
- subdomain (str, optional): The subdomain to use to get the data.
If provided, `subdomain` takes precedence over `url`.
This is likely to be useful to Cube Cloud users.
- url (str, optional): The URL to use to get the data.
This is likely to be useful to users of self-hosted Cube.js.
- api_secret (str, optional): The API secret used to generate an
API token for authentication.
If provided, it takes precedence over `api_secret_env_var`.
- api_secret_env_var (str, optional): The name of the env var that contains
the API secret to use to generate an API token for authentication.
Defaults to `CUBEJS_API_SECRET`.
- query (dict, list, optional): `dict` or `list` representing
valid Cube.js queries.
If you pass multiple queries, then be aware of Cube.js Data Blending.
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load
and at https://cube.dev/docs/schema/advanced/data-blending.
Query format can be found at: https://cube.dev/docs/query-format.
- security_context (str, dict, optional): The security context to use
during authentication.
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at: https://cube.dev/docs/security/context.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
- **kwargs (optional): Additional keyword arguments to pass to the
standard Task initalization.

"""

__CUBEJS_CLOUD_BASE_URL = "https://{subdomain}.cubecloud.dev"

def __init__(
self,
subdomain: str = None,
url: str = None,
api_secret: str = None,
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
max_wait_time: int = None,
**kwargs,
):
self.subdomain = subdomain
self.url = url
self.api_secret = api_secret
self.api_secret_env_var = api_secret_env_var
self.query = query
self.security_context = security_context
self.max_wait_time = max_wait_time
super().__init__(**kwargs)

@defaults_from_attrs(
"subdomain",
"url",
"api_secret",
"api_secret_env_var",
"query",
"security_context",
"max_wait_time",
)
def run(
self,
subdomain: str = None,
url: str = None,
api_secret: str = None,
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
max_wait_time: int = None,
):
"""
Task run method to perform a query using Cube.js load API.

Args:
- subdomain (str, optional): The subdomain to use to get the data.
If provided, `subdomain` takes precedence over `url`.
This is likely to be useful to Cube Cloud users.
- url (str, optional): The URL to use to get the data.
This is likely to be useful to users of self-hosted Cube.js.
- api_secret (str, optional): The API secret used to generate an
API token for authentication.
If provided, it takes precedence over `api_secret_env_var`.
- api_secret_env_var (str, optional): The name of the env var that contains
the API secret to use to generate an API token for authentication.
Defaults to `CUBEJS_API_SECRET`.
- query (dict, list, optional): `dict` or `list` representing
valid Cube.js queries.
If you pass multiple queries, then be aware of Cube.js Data Blending.
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load
and at https://cube.dev/docs/schema/advanced/data-blending.
Query format can be found at: https://cube.dev/docs/query-format.
- security_context (str, dict, optional): The security context to use
during authentication.
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at https://cube.dev/docs/security/context.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.

Raises:
- ValueError if both `subdomain` and `url` are missing.
- ValueError if `api_token` is missing and `api_token_env_var` cannot be found.
- ValueError if `query` is missing.
- `prefect.engine.signals.FAIL` if the Cube.js load API fails.
- `prefect.engine.signals.FAIL` if the Cube.js load API takes more than
`max_wait_time` seconds to respond.

Returns:
- The Cube.js JSON response.

"""

if not subdomain and not url:
raise ValueError("Missing both `subdomain` and `url`.")

if not api_secret and api_secret_env_var not in os.environ:
raise ValueError("Missing `api_secret` and `api_secret_env_var` not found.")

if not query:
raise ValueError("Missing `query`.")

cube_base_url = self.__CUBEJS_CLOUD_BASE_URL
if subdomain:
cube_base_url = f"{cube_base_url.format(subdomain=subdomain)}/cubejs-api"
else:
cube_base_url = url
query_api_url = f"{cube_base_url}/v1/load"

self.logger.debug(f"Query URL: {query_api_url}")

secret = api_secret if api_secret else os.environ[api_secret_env_var]

if security_context:

extended_context = security_context
if "exp" not in security_context and "expiresIn" not in security_context:
extended_context["epxiresIn"] = "7d"
api_token = jwt.encode(
payload=extended_context, key=secret, algorithm="HS256"
)

self.logger.debug("JWT token generated with security context.")

else:
api_token = jwt.encode(payload={}, key=secret)

session = Session()
session.headers = {
"Content-type": "application/json",
"Authorization": api_token,
}

params = {"query": json.dumps(query)}

wait_time_between_api_calls = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this configurable via a task parameter and default it to 10?

elapsed_wait_time = 0
while not max_wait_time or elapsed_wait_time <= max_wait_time:

with session.get(url=query_api_url, params=params) as response:
self.logger.debug(f"URL is: {response.url}")

if response.status_code == 200:
data = response.json()

if "error" in data.keys() and "Continue wait" in data["error"]:
msg = (
"Cube.js load API still running."
"Waiting {wait_time_between_api_calls} seconds before retrying"
)
self.logger.warning(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.logger.warning(msg)
self.logger.info(msg)

time.sleep(wait_time_between_api_calls)
elapsed_wait_time += wait_time_between_api_calls
continue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could add a parameter to tune the number of subsequent retries and raise in case of counted failures reach the ceil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a max_wait_time parameter, similarly to what has been implemented in DbtCloudRunJob task.


else:
return data

else:
raise FAIL(
message=f"Cube.js load API failed!. Error is: {response.reason}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
message=f"Cube.js load API failed!. Error is: {response.reason}"
message=f"Cube.js load API failed! Error is: {response.reason}"

)

raise FAIL(message="Cube.js load API took too long to provide a response.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise FAIL(message="Cube.js load API took too long to provide a response.")
raise FAIL(message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response.")

3 changes: 2 additions & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ testfixtures >= 6.10.3
pytest-env >= 0.6.0
pytest-xdist >= 2.0
flaky >= 3.0
responses >= 0.14.0
responses >= 0.14.0
PyJWT >= 2.3.0
Empty file added tests/tasks/cubejs/__init__.py
Empty file.
135 changes: 135 additions & 0 deletions tests/tasks/cubejs/test_cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
import pytest
from prefect.engine.signals import FAIL
from prefect.tasks.cubejs import CubeJSQueryTask

import responses


class TestCubeJSQueryTask:
def test_construction_no_values(self):
cubejs_task = CubeJSQueryTask()

assert cubejs_task.subdomain is None
assert cubejs_task.url is None
assert cubejs_task.api_secret is None
assert cubejs_task.api_secret_env_var == "CUBEJS_API_SECRET"
assert cubejs_task.query is None
assert cubejs_task.security_context is None

def test_construction_with_values(self):
cubejs_task = CubeJSQueryTask(
subdomain="foo",
url="http://bar",
api_secret="secret",
api_secret_env_var="secret_env_var",
query="query",
security_context={"foo": "bar"},
)

assert cubejs_task.subdomain == "foo"
assert cubejs_task.url == "http://bar"
assert cubejs_task.api_secret == "secret"
assert cubejs_task.api_secret_env_var == "secret_env_var"
assert cubejs_task.query == "query"
assert cubejs_task.security_context == {"foo": "bar"}

def test_run_with_no_values_raises(self):
cubejs_task = CubeJSQueryTask()
with pytest.raises(ValueError) as exc:
cubejs_task.run()

assert "Missing both `subdomain` and `url`." in str(exc)

def test_run_without_api_secret_api_secret_env_var(self):
cubejs_task = CubeJSQueryTask()
with pytest.raises(ValueError) as exc:
cubejs_task.run(subdomain="foo")

assert "Missing `api_secret` and `api_secret_env_var` not found." in str(exc)

def test_run_without_query_raises(self):
cubejs_task = CubeJSQueryTask()
with pytest.raises(ValueError) as exc:
cubejs_task.run(subdomain="foo", api_secret="bar")

assert "Missing `query`." in str(exc)

@responses.activate
def test_run_with_failing_api_raises(self):
cubejs_task = CubeJSQueryTask()
responses.add(
responses.GET, "https://test.cubecloud.dev/cubejs-api/v1/load", status=123
)

with pytest.raises(FAIL) as exc:
cubejs_task.run(subdomain="test", api_secret="foo", query="query")

assert "Cube.js load API failed!." in str(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert "Cube.js load API failed!." in str(exc)
assert "Cube.js load API failed!" in str(exc)


@responses.activate
def test_run_with_continue_waiting(self, caplog):
caplog.set_level(logging.DEBUG)
cubejs_task = CubeJSQueryTask()

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"error": "Continue wait"},
)

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"data": "result"},
)

data = cubejs_task.run(subdomain="test", api_secret="foo", query="query")

assert "Cube.js load API still running." in caplog.text
assert isinstance(data, dict)

@responses.activate
def test_run_with_security_context(self, caplog):
caplog.set_level(logging.DEBUG)
cubejs_task = CubeJSQueryTask()

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"data": "result"},
)

cubejs_task.run(
subdomain="test",
api_secret="foo",
query="query",
security_context={"foo": "bar"},
)

assert "JWT token generated with security context." in caplog.text
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test I would recommend inspecting the JWT that was included in the request made by the task so that inspecting the logs isn't necessary. You should be able to access the request headers via responses.calls[0].request.headers


@responses.activate
def test_run_with_max_wait_time_raises(self):
cubejs_task = CubeJSQueryTask()

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"error": "Continue wait"},
)

with pytest.raises(FAIL) as exc:
cubejs_task.run(
subdomain="test",
api_secret="foo",
query="query",
security_context={"foo": "bar"},
max_wait_time=15,
)

assert "Cube.js load API took too long to provide a response." in str(exc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a configurable time between calls would be useful for this test since it could spend less time waiting.