Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Orb: Fix bug to enrich multiple events with the same event_id #17761

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@
- name: Orb
sourceDefinitionId: 7f0455fb-4518-4ec0-b7a3-d808bf8081cc
dockerRepository: airbyte/source-orb
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/orb
icon: orb.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7994,7 +7994,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-orb:0.1.3"
- dockerImage: "airbyte/source-orb:0.1.4"
spec:
documentationUrl: "https://docs.withorb.com/"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-orb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_orb ./source_orb
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-orb
23 changes: 13 additions & 10 deletions airbyte-integrations/connectors/source-orb/source_orb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,15 @@ def enrich_ledger_entries_with_event_data(self, ledger_entries):
"""
# Build up a list of the subset of ledger entries we are expected
# to enrich with event metadata.
event_id_to_ledger_entry = {}
event_id_to_ledger_entries = {}
for entry in ledger_entries:
maybe_event_id: Optional[str] = entry.get("event_id")
if maybe_event_id:
event_id_to_ledger_entry[maybe_event_id] = entry
# There can be multiple entries with the same event ID
event_id_to_ledger_entries[maybe_event_id] = event_id_to_ledger_entries.get(maybe_event_id, []) + [entry]

# Nothing to enrich; short-circuit
if len(event_id_to_ledger_entry) == 0:
if len(event_id_to_ledger_entries) == 0:
return ledger_entries

def modify_ledger_entry_schema(ledger_entry):
Expand All @@ -321,8 +322,9 @@ def modify_ledger_entry_schema(ledger_entry):
ledger_entry["event"] = {}
ledger_entry["event"]["id"] = event_id

for ledger_entry in event_id_to_ledger_entry.values():
modify_ledger_entry_schema(ledger_entry=ledger_entry)
for ledger_entries_in_map in event_id_to_ledger_entries.values():
for ledger_entry in ledger_entries_in_map:
modify_ledger_entry_schema(ledger_entry=ledger_entry)

# Nothing to extract for each ledger entry
merged_properties_keys = (self.string_event_properties_keys or []) + (self.numeric_event_properties_keys or [])
Expand All @@ -331,7 +333,7 @@ def modify_ledger_entry_schema(ledger_entry):

# The events endpoint is a `POST` endpoint which expects a list of
# event_ids to filter on
request_filter_json = {"event_ids": list(event_id_to_ledger_entry)}
request_filter_json = {"event_ids": list(event_id_to_ledger_entries)}

# Prepare request with self._session, which should
# automatically deal with the authentication header.
Expand All @@ -354,16 +356,17 @@ def modify_ledger_entry_schema(ledger_entry):

# This would imply that the endpoint returned an event that wasn't part of the filter
# parameters, so log an error but ignore it.
if event_id not in event_id_to_ledger_entry:
if event_id not in event_id_to_ledger_entries:
self.logger.error(f"Unrecognized event received with ID {event_id} when trying to enrich ledger entries")
continue

# Replace ledger_entry.event_id with ledger_entry.event
event_id_to_ledger_entry[event_id]["event"]["properties"] = desired_properties_subset
num_events_enriched += 1
for ledger_entry in event_id_to_ledger_entries[event_id]:
ledger_entry["event"]["properties"] = desired_properties_subset
num_events_enriched += 1

# Log an error if we did not enrich all the entries we asked for.
if num_events_enriched != len(event_id_to_ledger_entry):
if num_events_enriched != sum(len(le) for le in event_id_to_ledger_entries.values()):
self.logger.error("Unable to enrich all eligible credit ledger entries with event metadata.")

# Mutating entries within `event_id_to_ledger_entry` should have modified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,32 @@ def test_credits_ledger_entries_enriches_selected_property_keys(
# Does not enrich, but still passes back, irrelevant (for enrichment purposes) ledger entry
assert enriched_entries[1] == original_entry_1

@responses.activate
def test_credits_ledger_entries_enriches_with_multiple_entries_per_event(mocker):
stream = CreditsLedgerEntries(string_event_properties_keys=["ping"])
ledger_entries = [{"event_id": "foo-event-id", "entry_type": "decrement"}, {"event_id": "foo-event-id", "entry_type": "decrement"}]
mock_response = {
"data": [
{
"customer_id": "foo-customer-id",
"event_name": "foo-name",
"id": "foo-event-id",
"properties": {"ping": "pong"},
"timestamp": "2022-02-21T07:00:00+00:00",
}
],
"pagination_metadata": {"has_more": False, "next_cursor": None},
}
responses.add(responses.POST, f"{stream.url_base}events", json=mock_response, status=200)
enriched_entries = stream.enrich_ledger_entries_with_event_data(ledger_entries)

# We expect both events are enriched correctly
assert enriched_entries == [
{"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"},
{"event": {"id": "foo-event-id", "properties": {"ping": "pong"}}, "entry_type": "decrement"},
]



def test_supports_incremental(patch_incremental_base_class, mocker):
mocker.patch.object(IncrementalOrbStream, "cursor_field", "dummy_field")
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/orb.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ an Orb Account and API Key.

| Version | Date | Pull Request | Subject |
| --- | --- | --- | --- |
| 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries
| 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries
| 0.1.2 | 2022-04-20 | [11528](https://github.com/airbytehq/airbyte/pull/11528) | Add cost basis to ledger entries, update expiration date, sync only committed entries
| 0.1.1 | 2022-03-03 | [10839](https://github.com/airbytehq/airbyte/pull/10839) | Support ledger entries with numeric properties + schema fixes
Expand Down