Skip to content

Commit

Permalink
Feature/add cubejs tasks (PrefectHQ#5280)
Browse files Browse the repository at this point in the history
* Update click requirement from <8.0,>=7.0 to >=7.0,<9.0

Updates the requirements on [click](https://github.com/pallets/click) to permit the latest version.
- [Release notes](https://github.com/pallets/click/releases)
- [Changelog](https://github.com/pallets/click/blob/main/CHANGES.rst)
- [Commits](pallets/click@7.0...8.0.3)

---
updated-dependencies:
- dependency-name: click
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update mypy requirement from <0.813,>=0.600 to >=0.600,<0.911

Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version.
- [Release notes](https://github.com/python/mypy/releases)
- [Commits](python/mypy@v0.600...v0.910)

---
updated-dependencies:
- dependency-name: mypy
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>

* Added tests for Mixpanel tasks

* Added handling of Mixpanel error response

* Small improvements

* Added change file and updated outline

* Fixed typo in change file

* Fixed quotes in outline.toml

* Added group_events option

* Update mypy requirement from <0.911,>=0.600 to >=0.600,<0.931

Updates the requirements on [mypy](https://github.com/python/mypy) to permit the latest version.
- [Release notes](https://github.com/python/mypy/releases)
- [Commits](python/mypy@v0.600...v0.930)

---
updated-dependencies:
- dependency-name: mypy
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>

* Refactored Mixpanel tasks __init__.py

* Small refactor of import statement

* CubeJS Query Task added along with tests

* Added outline

* Added change file

* Fixed typo in outline

* Moved var init after docstring

* Added extra cubejs

* Run black

* Added PyJWT to test requirements

* Fixedo typo in test-requirements file

* Remove upper bound version for PyJWT

* Run Black

* Update src/prefect/tasks/mixpanel/__init__.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Update src/prefect/tasks/mixpanel/mixpanel_tasks.py

Co-authored-by: Terrence Dorsey <terrend@mishu.com>

* Applied suggestions from reviewers

* Fixed failing test

* Run Black

* Fixed flake8 warnings

* Run Black

* Removed Mixpanel stuff

* Removed mixpanel entry from outline to fix docs tests

* Severale adjustments based on reviewers' suggestions

* Test refactoring following suggestions

* Run Black

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Alessandro Lollo <alessandro.lollo@cloudacademy.com>
Co-authored-by: Terrence Dorsey <terrend@mishu.com>
Co-authored-by: Michael Adkins <madkinszane@gmail.com>
  • Loading branch information
5 people authored and lance0805 committed Aug 2, 2022
1 parent 0f5550a commit a065c53
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 0 deletions.
5 changes: 5 additions & 0 deletions changes/pr5280.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
feature:
- "Added Cube.js Query Task - [#5280](https://github.com/PrefectHQ/prefect/pull/5280)"

contributor:
- "[Alessandro Lollo](https://github.com/AlessandroLollo)"
10 changes: 10 additions & 0 deletions src/prefect/tasks/cubejs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
This is a collection of tasks to interact with a Cube.js or Cube Cloud environment.
"""

try:
from prefect.tasks.cubejs.cubejs_tasks import CubeJSQueryTask
except ImportError as err:
raise ImportError(
'prefect.tasks.cubejs` requires Prefect to be installed with the "cubejs" extra.'
) from err
217 changes: 217 additions & 0 deletions src/prefect/tasks/cubejs/cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import os
from requests import Session
from prefect import Task
from prefect.utilities.tasks import defaults_from_attrs
from prefect.engine.signals import FAIL
import json
import jwt
import time
from typing import Dict, List, Union


class CubeJSQueryTask(Task):

"""
This task calls Cueb.js load API and returns the result
as a JSON object.
More info about Cube.js load API at
https://cube.dev/docs/rest-api#api-reference-v-1-load.
Args:
- subdomain (str, optional): The subdomain to use to get the data.
If provided, `subdomain` takes precedence over `url`.
This is likely to be useful to Cube Cloud users.
- url (str, optional): The URL to use to get the data.
This is likely to be useful to users of self-hosted Cube.js.
- api_secret (str, optional): The API secret used to generate an
API token for authentication.
If provided, it takes precedence over `api_secret_env_var`.
- api_secret_env_var (str, optional): The name of the env var that contains
the API secret to use to generate an API token for authentication.
Defaults to `CUBEJS_API_SECRET`.
- query (dict, list, optional): `dict` or `list` representing
valid Cube.js queries.
If you pass multiple queries, then be aware of Cube.js Data Blending.
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load
and at https://cube.dev/docs/schema/advanced/data-blending.
Query format can be found at: https://cube.dev/docs/query-format.
- security_context (str, dict, optional): The security context to use
during authentication.
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at: https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
- **kwargs (optional): Additional keyword arguments to pass to the
standard Task initalization.
"""

__CUBEJS_CLOUD_BASE_URL = "https://{subdomain}.cubecloud.dev"

def __init__(
self,
subdomain: str = None,
url: str = None,
api_secret: str = None,
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
**kwargs,
):
self.subdomain = subdomain
self.url = url
self.api_secret = api_secret
self.api_secret_env_var = api_secret_env_var
self.query = query
self.security_context = security_context
self.wait_time_between_api_calls = wait_time_between_api_calls
self.max_wait_time = max_wait_time
super().__init__(**kwargs)

@defaults_from_attrs(
"subdomain",
"url",
"api_secret",
"api_secret_env_var",
"query",
"security_context",
"wait_time_between_api_calls",
"max_wait_time",
)
def run(
self,
subdomain: str = None,
url: str = None,
api_secret: str = None,
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
):
"""
Task run method to perform a query using Cube.js load API.
Args:
- subdomain (str, optional): The subdomain to use to get the data.
If provided, `subdomain` takes precedence over `url`.
This is likely to be useful to Cube Cloud users.
- url (str, optional): The URL to use to get the data.
This is likely to be useful to users of self-hosted Cube.js.
- api_secret (str, optional): The API secret used to generate an
API token for authentication.
If provided, it takes precedence over `api_secret_env_var`.
- api_secret_env_var (str, optional): The name of the env var that contains
the API secret to use to generate an API token for authentication.
Defaults to `CUBEJS_API_SECRET`.
- query (dict, list, optional): `dict` or `list` representing
valid Cube.js queries.
If you pass multiple queries, then be aware of Cube.js Data Blending.
More info at https://cube.dev/docs/rest-api#api-reference-v-1-load
and at https://cube.dev/docs/schema/advanced/data-blending.
Query format can be found at: https://cube.dev/docs/query-format.
- security_context (str, dict, optional): The security context to use
during authentication.
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
Raises:
- ValueError if both `subdomain` and `url` are missing.
- ValueError if `api_token` is missing and `api_token_env_var` cannot be found.
- ValueError if `query` is missing.
- `prefect.engine.signals.FAIL` if the Cube.js load API fails.
- `prefect.engine.signals.FAIL` if the Cube.js load API takes more than
`max_wait_time` seconds to respond.
Returns:
- The Cube.js JSON response.
"""

if not subdomain and not url:
raise ValueError("Missing both `subdomain` and `url`.")

if not api_secret and api_secret_env_var not in os.environ:
raise ValueError("Missing `api_secret` and `api_secret_env_var` not found.")

if not query:
raise ValueError("Missing `query`.")

cube_base_url = self.__CUBEJS_CLOUD_BASE_URL
if subdomain:
cube_base_url = f"{cube_base_url.format(subdomain=subdomain)}/cubejs-api"
else:
cube_base_url = url
query_api_url = f"{cube_base_url}/v1/load"

self.logger.debug(f"Query URL: {query_api_url}")

secret = api_secret if api_secret else os.environ[api_secret_env_var]

if security_context:

extended_context = security_context
if "exp" not in security_context and "expiresIn" not in security_context:
extended_context["expiresIn"] = "7d"
api_token = jwt.encode(
payload=extended_context, key=secret, algorithm="HS256"
)

self.logger.debug("JWT token generated with security context.")

else:
api_token = jwt.encode(payload={}, key=secret)

session = Session()
session.headers = {
"Content-type": "application/json",
"Authorization": api_token,
}

params = {"query": json.dumps(query)}

wait_api_call_secs = (
wait_time_between_api_calls if wait_time_between_api_calls > 0 else 10
)
elapsed_wait_time = 0
while not max_wait_time or elapsed_wait_time <= max_wait_time:

with session.get(url=query_api_url, params=params) as response:
self.logger.debug(f"URL is: {response.url}")

if response.status_code == 200:
data = response.json()

if "error" in data.keys() and "Continue wait" in data["error"]:
msg = (
"Cube.js load API still running."
"Waiting {wait_api_call_secs} seconds before retrying"
)
self.logger.info(msg)
time.sleep(wait_api_call_secs)
elapsed_wait_time += wait_api_call_secs
continue

else:
return data

else:
raise FAIL(
message=f"Cube.js load API failed! Error is: {response.reason}"
)

raise FAIL(
message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response."
)
Empty file added tests/tasks/cubejs/__init__.py
Empty file.
136 changes: 136 additions & 0 deletions tests/tasks/cubejs/test_cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import logging
import jwt
import pytest
from prefect.engine.signals import FAIL
from prefect.tasks.cubejs import CubeJSQueryTask

import responses


class TestCubeJSQueryTask:
def test_construction_no_values(self):
cubejs_task = CubeJSQueryTask()

assert cubejs_task.subdomain is None
assert cubejs_task.url is None
assert cubejs_task.api_secret is None
assert cubejs_task.api_secret_env_var == "CUBEJS_API_SECRET"
assert cubejs_task.query is None
assert cubejs_task.security_context is None

def test_construction_with_values(self):
cubejs_task = CubeJSQueryTask(
subdomain="foo",
url="http://bar",
api_secret="secret",
api_secret_env_var="secret_env_var",
query="query",
security_context={"foo": "bar"},
)

assert cubejs_task.subdomain == "foo"
assert cubejs_task.url == "http://bar"
assert cubejs_task.api_secret == "secret"
assert cubejs_task.api_secret_env_var == "secret_env_var"
assert cubejs_task.query == "query"
assert cubejs_task.security_context == {"foo": "bar"}

def test_run_with_no_values_raises(self):
cubejs_task = CubeJSQueryTask()
msg_match = "Missing both `subdomain` and `url`."
with pytest.raises(ValueError, match=msg_match):
cubejs_task.run()

def test_run_without_api_secret_api_secret_env_var(self):
cubejs_task = CubeJSQueryTask()
msg_match = "Missing `api_secret` and `api_secret_env_var` not found."
with pytest.raises(ValueError, match=msg_match):
cubejs_task.run(subdomain="foo")

def test_run_without_query_raises(self):
cubejs_task = CubeJSQueryTask()
msg_match = "Missing `query`."
with pytest.raises(ValueError, match=msg_match):
cubejs_task.run(subdomain="foo", api_secret="bar")

@responses.activate
def test_run_with_failing_api_raises(self):
cubejs_task = CubeJSQueryTask()
msg_match = "Cube.js load API failed!"
responses.add(
responses.GET, "https://test.cubecloud.dev/cubejs-api/v1/load", status=123
)

with pytest.raises(FAIL, match=msg_match):
cubejs_task.run(subdomain="test", api_secret="foo", query="query")

@responses.activate
def test_run_with_continue_waiting(self, caplog):
caplog.set_level(logging.DEBUG)
cubejs_task = CubeJSQueryTask()

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"error": "Continue wait"},
)

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"data": "result"},
)

data = cubejs_task.run(subdomain="test", api_secret="foo", query="query")

assert "Cube.js load API still running." in caplog.text
assert isinstance(data, dict)

@responses.activate
def test_run_with_security_context(self, caplog):
caplog.set_level(logging.DEBUG)
cubejs_task = CubeJSQueryTask()

responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"data": "result"},
)

cubejs_task.run(
subdomain="test",
api_secret="foo",
query="query",
security_context={"foo": "bar"},
)

expected_jwt = jwt.encode(
payload={"foo": "bar", "expiresIn": "7d"}, key="foo", algorithm="HS256"
)

# assert "JWT token generated with security context." in caplog.text
assert responses.calls[0].request.headers["Authorization"] == expected_jwt

@responses.activate
def test_run_with_max_wait_time_raises(self):
cubejs_task = CubeJSQueryTask()
msg_match = "Cube.js load API took longer than 3 seconds to provide a response."
responses.add(
responses.GET,
"https://test.cubecloud.dev/cubejs-api/v1/load",
status=200,
json={"error": "Continue wait"},
)

with pytest.raises(FAIL, match=msg_match):
cubejs_task.run(
subdomain="test",
api_secret="foo",
query="query",
security_context={"foo": "bar"},
wait_time_between_api_calls=1,
max_wait_time=3,
)

0 comments on commit a065c53

Please sign in to comment.