Skip to content

Commit

Permalink
[dagster-airbyte] don't cancel completed syncs (#7888)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 18, 2022
1 parent d6b4eaf commit 0424134
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def sync_and_poll(
start = time.monotonic()
logged_attempts = 0
logged_lines = 0
state = None

try:
while True:
Expand Down Expand Up @@ -177,9 +178,10 @@ def sync_and_poll(
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# make sure that the Airbyte job does not outlive the python process
# cancelling a successfully completed job has no effect
self.cancel_job(job_id)
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED):
self.cancel_job(job_id)

return AirbyteOutput(job_details=job_details, connection_details=connection_details)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def test_assets(schema_prefix):
json=get_sample_job_json(schema_prefix=schema_prefix),
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

ab_job = build_assets_job(
"ab_job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def airbyte_sync_job():
rsps.add(rsps.POST, f"{ab_url}/connections/sync", json={"job": {"id": 1}})
rsps.add(rsps.POST, f"{ab_url}/jobs/get", json={"job": {"id": 1, "status": "running"}})
rsps.add(rsps.POST, f"{ab_url}/jobs/get", json={"job": {"id": 1, "status": "succeeded"}})
rsps.add(rsps.POST, f"{ab_url}/jobs/cancel", status=204)

result = airbyte_sync_job.execute_in_process()
assert result.output_for_node("airbyte_sync_op") == AirbyteOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def test_sync_and_poll(state):
json={"job": {"id": 1, "status": state}},
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)
if state == "unrecognized":
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

if state == AirbyteState.ERROR:
with pytest.raises(Failure, match="Job failed"):
Expand Down

0 comments on commit 0424134

Please sign in to comment.