Skip to content

Commit

Permalink
Source Facebook Marketing: update cursor_field for Ads Insights Strea…
Browse files Browse the repository at this point in the history
…ms (#4437)

* Big Fix: updated_time is an attribute of ad in FB ad insights call

Co-authored-by: harshithmullapudi <harshithmullapudi@gmail.com>
Co-authored-by: ykurochkin <y.kurochkin@zazmic.com>
  • Loading branch information
3 people committed Jul 1, 2021
1 parent 097c954 commit 83ac92b
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.12",
"dockerImageTag": "0.2.13",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.12
dockerImageTag: 0.2.13
documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.12
LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tests:
validate_output_from_all_streams: yes
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
# unfortunately there is a strange transient error with creatives stream:
# API returns different thumbnail_url from time to time
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
{
"campaigns": {
"updated_time": "2021-06-21T13:34:26Z",
"updated_time": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ad_creatives": {
"updated_time": "2021-06-21T13:34:26Z",
"updated_time": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ad_sets": {
"updated_time": "2021-06-21T13:34:26Z",
"updated_time": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads": {
"updated_time": "2021-06-21T13:34:26Z",
"updated_time": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_age_and_gender": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_country": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_dma": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_platfrom_and_device": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_region": {
"updated_time": "2021-06-21T13:34:26Z",
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"default_cursor_field": ["date_start"],
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
},
Expand All @@ -81,12 +81,12 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"default_cursor_field": ["date_start"],
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"streams": [
{
"stream": {
"name": "campaigns",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ad_sets",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ads",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ad_creatives",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"airbyte-cdk~=0.1",
"cached_property~=1.5",
"facebook_business~=11.0",
"pendulum",
"pendulum>=2,<3",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def _read_records(self, params: Mapping[str, Any]):
class AdsInsights(FBMarketingIncrementalStream):
"""doc: https://developers.facebook.com/docs/marketing-api/insights"""

cursor_field = "date_start"
primary_key = None

ALL_ACTION_ATTRIBUTION_WINDOWS = [
Expand Down Expand Up @@ -297,14 +298,9 @@ def read_records(
"""Waits for current job to finish (slice) and yield its result"""
result = self.wait_for_job(stream_slice["job"])
# because we query `lookback_window` days before actual cursor we might get records older then cursor
stream_state = stream_state or {}
state_value = stream_state.get(self.cursor_field)
min_cursor = self._start_date if not state_value else pendulum.parse(state_value)

for obj in result.get_result():
record = obj.export_all_data()
if pendulum.parse(record[self.cursor_field]) >= min_cursor:
yield record
yield obj.export_all_data()

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down
5 changes: 5 additions & 0 deletions docs/project-overview/changelog/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Note: Airbyte is not built on top of Singer, but is compatible with Singer's pro

Check out our [connector roadmap](https://github.com/airbytehq/airbyte/projects/3) to see what we're currently working on.

## 7/01/2021

Bugfixes:
* **Facebook Marketing** source: Using cursor field as `date_start` for Ads Insights and taking all records.

## 6/24/2021

1 new source:
Expand Down

0 comments on commit 83ac92b

Please sign in to comment.