Skip to content

Commit

Permalink
[dagster-airbyte] terminate airbyte sync w python process (#7687)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 2, 2022
1 parent c6ff832 commit faad5ea
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ def make_request(

raise Failure("Exceeded max number of retries.")

def cancel_job(self, job_id: str):
self.make_request(endpoint="/jobs/cancel", data={"id": job_id})

def get_job_status(self, job_id: int) -> dict:
return check.is_dict(self.make_request(endpoint="/jobs/get", data={"id": job_id}))

def start_sync(self, connection_id: str) -> dict:
return check.not_none(
self.make_request(endpoint="/connections/sync", data={"connectionId": connection_id})
)

def get_job_status(self, job_id: int) -> dict:
return check.not_none(self.make_request(endpoint="/jobs/get", data={"id": job_id}))

def get_connection_details(self, connection_id: str) -> dict:
return check.not_none(
self.make_request(endpoint="/connections/get", data={"connectionId": connection_id})
Expand Down Expand Up @@ -132,42 +135,47 @@ def sync_and_poll(
logged_attempts = 0
logged_lines = 0

while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Airbyte job {job_id} is not ready after the timeout {poll_timeout} seconds"
)
time.sleep(poll_interval)
job_details = self.get_job_status(job_id)
cur_attempt = len(job_details.get("attempts", []))
# spit out the available Airbyte log info
if cur_attempt:
log_lines = (
job_details["attempts"][logged_attempts].get("logs", {}).get("logLines", [])
)

for line in log_lines[logged_lines:]:
sys.stdout.write(line + "\n")
sys.stdout.flush()
logged_lines = len(log_lines)

# if there's a next attempt, this one will have no more log messages
if logged_attempts < cur_attempt - 1:
logged_lines = 0
logged_attempts += 1

state = job_details.get("job", {}).get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
continue
elif state == AirbyteState.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Airbyte job {job_id} is not ready after the timeout {poll_timeout} seconds"
)
time.sleep(poll_interval)
job_details = self.get_job_status(job_id)
cur_attempt = len(job_details.get("attempts", []))
# spit out the available Airbyte log info
if cur_attempt:
log_lines = (
job_details["attempts"][logged_attempts].get("logs", {}).get("logLines", [])
)

for line in log_lines[logged_lines:]:
sys.stdout.write(line + "\n")
sys.stdout.flush()
logged_lines = len(log_lines)

# if there's a next attempt, this one will have no more log messages
if logged_attempts < cur_attempt - 1:
logged_lines = 0
logged_attempts += 1

state = job_details.get("job", {}).get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
continue
elif state == AirbyteState.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# make sure that the Airbyte job does not outlive the python process
# cancelling a successfully completed job has no effect
self.cancel_job(job_id)

return AirbyteOutput(job_details=job_details, connection_details=connection_details)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_assets(schema_prefix):
json=get_sample_job_json(schema_prefix=schema_prefix),
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

ab_job = build_assets_job(
"ab_job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def airbyte_sync_job():
rsps.add(rsps.POST, f"{ab_url}/connections/sync", json={"job": {"id": 1}})
rsps.add(rsps.POST, f"{ab_url}/jobs/get", json={"job": {"id": 1, "status": "running"}})
rsps.add(rsps.POST, f"{ab_url}/jobs/get", json={"job": {"id": 1, "status": "succeeded"}})
rsps.add(rsps.POST, f"{ab_url}/jobs/cancel", status=204)

result = airbyte_sync_job.execute_in_process()
assert result.output_for_node("airbyte_sync_op") == AirbyteOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def test_sync_and_poll(state):
json={"job": {"id": 1, "status": state}},
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

if state == AirbyteState.ERROR:
with pytest.raises(Failure, match="Job failed"):
Expand Down Expand Up @@ -220,6 +221,7 @@ def _get_attempt(ls):
},
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)
ab_resource.sync_and_poll("some_connection", 0, None)
captured = capsys.readouterr()
assert captured.out == "\n".join(["log1a", "log1b", "log1c", "log2a", "log2b"]) + "\n"
Expand Down Expand Up @@ -254,6 +256,7 @@ def test_assets():
json=get_sample_job_json(),
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

airbyte_output = ab_resource.sync_and_poll("some_connection", 0, None)

Expand Down Expand Up @@ -304,6 +307,7 @@ def test_sync_and_poll_timeout():
json={"job": {"id": 1, "status": "running"}},
status=200,
)
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)
poll_wait_second = 2
timeout = 1
with pytest.raises(Failure, match="Timeout: Airbyte job"):
Expand Down

0 comments on commit faad5ea

Please sign in to comment.