Skip to content

Commit

Permalink
Source Facebook marketing: fix not syncing any data issue (#13749)
Browse files Browse the repository at this point in the history
* #253 oncall source fb marketing: fix  issue

* #253 oncall: upd changelog

* Update docs/integrations/sources/facebook-marketing.md

* auto-bump connector version

Co-authored-by: Evan Tahler <evan@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 14, 2022
1 parent 22b727c commit 81572a9
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.51
dockerImageTag: 0.2.52
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.51"
- dockerImage: "airbyte/source-facebook-marketing:0.2.52"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.51
LABEL io.airbyte.version=0.2.52
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,9 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""Waits for current job to finish (slice) and yield its result"""

today = pendulum.today(tz="UTC").date()
date_start = stream_state and stream_state.get("date_start")
if date_start:
date_start = pendulum.parse(date_start).date()

job = stream_slice["insight_job"]
for obj in job.get_result():
record = obj.export_all_data()
if date_start:
updated_time = pendulum.parse(record["updated_time"]).date()
if updated_time <= date_start or updated_time >= today:
continue
yield record
yield obj.export_all_data()

self._completed_slices.add(job.interval.start)
if job.interval.start == self._next_cursor_value:
Expand Down Expand Up @@ -172,11 +161,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late

def _date_intervals(self) -> Iterator[pendulum.Date]:
"""Get date period to sync"""
yesterday = pendulum.yesterday(tz="UTC").date()
end_date = min(self._end_date, yesterday)
if end_date < self._next_cursor_value:
if self._end_date < self._next_cursor_value:
return
date_range = end_date - self._next_cursor_value
date_range = self._end_date - self._next_cursor_value
yield from date_range.range("days", self.time_increment)

def _advance_cursor(self):
Expand All @@ -195,14 +182,9 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
:return:
"""

today = pendulum.today(tz="UTC").date()
refresh_date = today - self.insights_lookback_period

for ts_start in self._date_intervals():
if ts_start in self._completed_slices:
if ts_start < refresh_date:
continue
self._completed_slices.remove(ts_start)
continue
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
interval = pendulum.Period(ts_start, ts_end)
yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params)
Expand Down Expand Up @@ -242,7 +224,7 @@ def _get_start_date(self) -> pendulum.Date:
:return: the first date to sync
"""
today = pendulum.today(tz="UTC").date()
today = pendulum.today().date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
refresh_date = today - self.insights_lookback_period
if self._cursor_value:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,3 @@ def api_fixture(some_config, requests_mock, fb_account_response):
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
return api


@fixture
def set_today(mocker, monkeypatch):
def inner(date: str):
today = pendulum.parse(date)
yesterday = today - pendulum.duration(days=1)
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
monkeypatch.setattr(pendulum, "yesterday", mocker.MagicMock(return_value=yesterday))
return yesterday, today

return inner
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,4 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


from typing import Any, MutableMapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream


def read_full_refresh(stream_instance: Stream):
records = []
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for slice in slices:
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh)))
return records


def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
records = []
stream_instance.state = stream_state
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
for slice in slices:
records.extend(list(stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)))
stream_state.clear()
stream_state.update(stream_instance.state)
return records


class FakeInsightAsyncJobManager:
def __init__(self, jobs, **kwargs):
self.jobs = jobs

def completed_jobs(self):
yield from self.jobs


class FakeInsightAsyncJob:
updated_insights = {}

@classmethod
def update_insight(cls, date_start, updated_time):
cls.updated_insights[date_start] = updated_time

def __init__(self, interval, **kwargs):
self.interval = interval

def get_result(self):
return [self]

def export_all_data(self):
date_start = str(self.interval.start)
return {"date_start": date_start, "updated_time": self.updated_insights.get(date_start, date_start)}
#
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

import pendulum
import pytest
import source_facebook_marketing.streams.base_insight_streams
from airbyte_cdk.models import SyncMode
from helpers import FakeInsightAsyncJob, FakeInsightAsyncJobManager, read_full_refresh, read_incremental
from pendulum import duration
from source_facebook_marketing.streams import AdsInsights
from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
Expand Down Expand Up @@ -182,7 +180,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days
assert len(generated_jobs) == (end_date - start_date).days + 1
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -219,7 +217,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days
assert len(generated_jobs) == (end_date - start_date).days + 1
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -292,72 +290,3 @@ def test_fields_custom(self, api):
)

assert stream.fields == ["account_id", "account_currency"]

def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
set_today("2020-04-01")

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

state = {
AdsInsights.cursor_field: "2020-03-19",
"slices": [
"2020-03-21",
"2020-03-22",
"2020-03-23",
],
"time_increment": 1,
}

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=10)
stream.state = state
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}

slices = stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
slices = [x["insight_job"].interval.start for x in slices]

assert pendulum.parse("2020-03-21").date() not in slices
assert pendulum.parse("2020-03-22").date() in slices
assert pendulum.parse("2020-03-23").date() in slices
assert stream._completed_slices == {pendulum.Date(2020, 3, 21)}

def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
yesterday, _ = set_today("2020-04-01")

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=20)

records = read_full_refresh(stream)
assert len(records) == (yesterday - start_date).days + 1
assert records[0]["date_start"] == str(start_date.date())
assert records[-1]["date_start"] == str(yesterday.date())

state = {AdsInsights.cursor_field: "2020-03-20", "time_increment": 1}
records = read_incremental(stream, state)
assert len(records) == (yesterday - pendulum.parse("2020-03-20")).days
assert records[0]["date_start"] == "2020-03-21"
assert records[-1]["date_start"] == str(yesterday.date())
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

yesterday, _ = set_today("2020-04-02")
records = read_incremental(stream, state)
assert records == [{"date_start": str(yesterday.date()), "updated_time": str(yesterday.date())}]
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

yesterday, _ = set_today("2020-04-03")
FakeInsightAsyncJob.update_insight("2020-03-26", "2020-04-01")
FakeInsightAsyncJob.update_insight("2020-03-27", "2020-04-02")
FakeInsightAsyncJob.update_insight("2020-03-28", "2020-04-03")

records = read_incremental(stream, state)
assert records == [
{"date_start": "2020-03-27", "updated_time": "2020-04-02"},
{"date_start": "2020-04-02", "updated_time": "2020-04-02"},
]
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}
Loading

0 comments on commit 81572a9

Please sign in to comment.