Skip to content

Commit

Permalink
Severale adjustments based on reviewers' suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
alollo-ca committed Dec 29, 2021
1 parent 56c4ad1 commit 100fa87
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
black
graphviz >= 0.8
jinja2 >= 2.0, < 4.0
mypy >= 0.600, < 0.931
mypy >= 0.600, < 0.813
Pygments >= 2.2, < 3.0
30 changes: 22 additions & 8 deletions src/prefect/tasks/cubejs/cubejs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class CubeJSQueryTask(Task):
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at: https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
- **kwargs (optional): Additional keyword arguments to pass to the
Expand All @@ -57,6 +60,7 @@ def __init__(
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
**kwargs,
):
Expand All @@ -66,6 +70,7 @@ def __init__(
self.api_secret_env_var = api_secret_env_var
self.query = query
self.security_context = security_context
self.wait_time_between_api_calls = wait_time_between_api_calls
self.max_wait_time = max_wait_time
super().__init__(**kwargs)

Expand All @@ -76,6 +81,7 @@ def __init__(
"api_secret_env_var",
"query",
"security_context",
"wait_time_between_api_calls",
"max_wait_time",
)
def run(
Expand All @@ -86,6 +92,7 @@ def run(
api_secret_env_var: str = "CUBEJS_API_SECRET",
query: Union[Dict, List[Dict]] = None,
security_context: Union[str, Dict] = None,
wait_time_between_api_calls: int = 10,
max_wait_time: int = None,
):
"""
Expand Down Expand Up @@ -114,6 +121,9 @@ def run(
If the security context does not contain an expiration period,
then a 7-day expiration period is added automatically.
More info at https://cube.dev/docs/security/context.
- wait_time_between_api_calls (int, optional): The number of seconds to
wait between API calls.
Default to 10.
- max_wait_time (int, optional): The number of seconds to wait for the
Cube.js load API to return a response.
Expand Down Expand Up @@ -154,7 +164,7 @@ def run(

extended_context = security_context
if "exp" not in security_context and "expiresIn" not in security_context:
extended_context["epxiresIn"] = "7d"
extended_context["expiresIn"] = "7d"
api_token = jwt.encode(
payload=extended_context, key=secret, algorithm="HS256"
)
Expand All @@ -172,7 +182,9 @@ def run(

params = {"query": json.dumps(query)}

wait_time_between_api_calls = 10
wait_api_call_secs = (
wait_time_between_api_calls if wait_time_between_api_calls > 0 else 10
)
elapsed_wait_time = 0
while not max_wait_time or elapsed_wait_time <= max_wait_time:

Expand All @@ -185,19 +197,21 @@ def run(
if "error" in data.keys() and "Continue wait" in data["error"]:
msg = (
"Cube.js load API still running."
"Waiting {wait_time_between_api_calls} seconds before retrying"
"Waiting {wait_api_call_secs} seconds before retrying"
)
self.logger.warning(msg)
time.sleep(wait_time_between_api_calls)
elapsed_wait_time += wait_time_between_api_calls
self.logger.info(msg)
time.sleep(wait_api_call_secs)
elapsed_wait_time += wait_api_call_secs
continue

else:
return data

else:
raise FAIL(
message=f"Cube.js load API failed!. Error is: {response.reason}"
message=f"Cube.js load API failed! Error is: {response.reason}"
)

raise FAIL(message="Cube.js load API took too long to provide a response.")
raise FAIL(
message=f"Cube.js load API took longer than {max_wait_time} seconds to provide a response."
)
18 changes: 14 additions & 4 deletions tests/tasks/cubejs/test_cubejs_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import jwt
import pytest
from prefect.engine.signals import FAIL
from prefect.tasks.cubejs import CubeJSQueryTask
Expand Down Expand Up @@ -65,7 +66,7 @@ def test_run_with_failing_api_raises(self):
with pytest.raises(FAIL) as exc:
cubejs_task.run(subdomain="test", api_secret="foo", query="query")

assert "Cube.js load API failed!." in str(exc)
assert "Cube.js load API failed!" in str(exc)

@responses.activate
def test_run_with_continue_waiting(self, caplog):
Expand Down Expand Up @@ -110,7 +111,12 @@ def test_run_with_security_context(self, caplog):
security_context={"foo": "bar"},
)

assert "JWT token generated with security context." in caplog.text
expected_jwt = jwt.encode(
payload={"foo": "bar", "expiresIn": "7d"}, key="foo", algorithm="HS256"
)

# assert "JWT token generated with security context." in caplog.text
assert responses.calls[0].request.headers["Authorization"] == expected_jwt

@responses.activate
def test_run_with_max_wait_time_raises(self):
Expand All @@ -129,7 +135,11 @@ def test_run_with_max_wait_time_raises(self):
api_secret="foo",
query="query",
security_context={"foo": "bar"},
max_wait_time=15,
wait_time_between_api_calls=1,
max_wait_time=3,
)

assert "Cube.js load API took too long to provide a response." in str(exc)
assert (
"Cube.js load API took longer than 3 seconds to provide a response."
in str(exc)
)

0 comments on commit 100fa87

Please sign in to comment.