Skip to content

Commit

Permalink
[dagster-airbyte] Handle 204 reponse in Airbyte Library (#7209)
Browse files Browse the repository at this point in the history
  • Loading branch information
HAMZA310 committed Apr 12, 2022
1 parent 7acd90e commit 96a6762
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dagster_airbyte.types import AirbyteOutput
from requests.exceptions import RequestException

from dagster import Failure, Field, StringSource, __version__, get_dagster_logger, resource
from dagster import Failure, Field, StringSource, __version__, check, get_dagster_logger, resource

DEFAULT_POLL_INTERVAL_SECONDS = 10

Expand Down Expand Up @@ -52,7 +52,9 @@ def api_base_url(self) -> str:
+ "/api/v1"
)

def make_request(self, endpoint: str, data: Optional[Dict[str, Any]]):
def make_request(
self, endpoint: str, data: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""
Creates and sends a request to the desired Airbyte REST API endpoint.
Expand All @@ -77,6 +79,8 @@ def make_request(self, endpoint: str, data: Optional[Dict[str, Any]]):
timeout=15,
)
response.raise_for_status()
if response.status_code == 204:
return None
return response.json()
except RequestException as e:
self._log.error("Request to Airbyte API failed: %s", e)
Expand All @@ -88,13 +92,17 @@ def make_request(self, endpoint: str, data: Optional[Dict[str, Any]]):
raise Failure("Exceeded max number of retries.")

def start_sync(self, connection_id: str) -> dict:
return self.make_request(endpoint="/connections/sync", data={"connectionId": connection_id})
return check.is_dict(
self.make_request(endpoint="/connections/sync", data={"connectionId": connection_id})
)

def get_job_status(self, job_id: int) -> dict:
return self.make_request(endpoint="/jobs/get", data={"id": job_id})
return check.is_dict(self.make_request(endpoint="/jobs/get", data={"id": job_id}))

def get_connection_details(self, connection_id: str) -> dict:
return self.make_request(endpoint="/connections/get", data={"connectionId": connection_id})
return check.is_dict(
self.make_request(endpoint="/connections/get", data={"connectionId": connection_id})
)

def sync_and_poll(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dagster_airbyte import AirbyteOutput, AirbyteState, airbyte_resource
from dagster_airbyte.utils import generate_materializations

from dagster import Failure, MetadataEntry, build_init_resource_context
from dagster import Failure, MetadataEntry, build_init_resource_context, check

from .utils import get_sample_connection_json, get_sample_job_json

Expand Down Expand Up @@ -71,24 +71,84 @@ def test_sync_and_poll(state):

if state == AirbyteState.ERROR:
with pytest.raises(Failure, match="Job failed"):
r = ab_resource.sync_and_poll("some_connection", 0)
ab_resource.sync_and_poll("some_connection", 0)

elif state == AirbyteState.CANCELLED:
with pytest.raises(Failure, match="Job was cancelled"):
r = ab_resource.sync_and_poll("some_connection", 0)
ab_resource.sync_and_poll("some_connection", 0)

elif state == "unrecognized":
with pytest.raises(Failure, match="unexpected state"):
r = ab_resource.sync_and_poll("some_connection", 0)
ab_resource.sync_and_poll("some_connection", 0)

else:
r = ab_resource.sync_and_poll("some_connection", 0)
assert r == AirbyteOutput(
result = ab_resource.sync_and_poll("some_connection", 0)
assert result == AirbyteOutput(
job_details={"job": {"id": 1, "status": state}},
connection_details=get_sample_connection_json(),
)


@responses.activate
def test_start_sync_bad_out_fail():
ab_resource = airbyte_resource(
build_init_resource_context(
config={
"host": "some_host",
"port": "8000",
}
)
)
responses.add(
method=responses.POST,
url=ab_resource.api_base_url + "/connections/sync",
json=None,
status=204,
)
with pytest.raises(check.CheckError):
ab_resource.start_sync("some_connection")


@responses.activate
def test_get_connection_details_bad_out_fail():
ab_resource = airbyte_resource(
build_init_resource_context(
config={
"host": "some_host",
"port": "8000",
}
)
)
responses.add(
method=responses.POST,
url=ab_resource.api_base_url + "/connections/get",
json=None,
status=204,
)
with pytest.raises(check.CheckError):
ab_resource.get_connection_details("some_connection")


@responses.activate
def test_get_job_status_bad_out_fail():
ab_resource = airbyte_resource(
build_init_resource_context(
config={
"host": "some_host",
"port": "8000",
}
)
)
responses.add(
method=responses.POST,
url=ab_resource.api_base_url + "/jobs/get",
json=None,
status=204,
)
with pytest.raises(check.CheckError):
ab_resource.get_job_status("some_connection")


@responses.activate
def test_logging_multi_attempts(capsys):
def _get_attempt(ls):
Expand Down

0 comments on commit 96a6762

Please sign in to comment.