Skip to content

Commit

Permalink
[dagster-airbyte] fix airbyte materializations without streamStats (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Apr 5, 2022
1 parent 052bf22 commit e965190
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 21 deletions.
32 changes: 18 additions & 14 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,28 @@ def _materialization_for_stream(

def generate_materializations(output: AirbyteOutput, asset_key_prefix: List[str]):
prefix = output.connection_details.get("prefix") or ""
stream_info = {
prefix + stream["stream"]["name"]: stream
# all the streams that are set to be sync'd by this connection
all_stream_props = {
prefix
+ stream["stream"]["name"]: stream.get("stream", {})
.get("jsonSchema", {})
.get("properties", {})
for stream in output.connection_details.get("syncCatalog", {}).get("streams", [])
if stream.get("config", {}).get("selected")
}

stream_stats = (
output.job_details.get("attempts", [{}])[-1].get("attempt", {}).get("streamStats", [])
)
for stats in stream_stats:
name = stats["streamName"]

stream_schema_props = (
stream_info.get(name, {}).get("stream", {}).get("jsonSchema", {}).get("properties", {})
)
# stats for each stream that had data sync'd
all_stream_stats = {
s["streamName"]: s.get("stats", {})
for s in output.job_details.get("attempts", [{}])[-1]
.get("attempt", {})
.get("streamStats", [])
}
for stream_name, stream_props in all_stream_props.items():
yield _materialization_for_stream(
name,
stream_schema_props,
stats.get("stats", {}),
stream_name,
stream_props,
# if no records are sync'd, no stats will be avaiable for this stream
all_stream_stats.get(stream_name, {}),
asset_key_prefix=asset_key_prefix,
)
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ def get_sample_job_json(schema_prefix=""):
"recordsCommitted": 4321,
},
},
{
"streamName": schema_prefix + "bar",
"stats": {
"bytesEmitted": 1234,
"recordsCommitted": 4321,
},
},
{
"streamName": schema_prefix + "baz",
"stats": {
Expand Down

0 comments on commit e965190

Please sign in to comment.