Skip to content

Commit

Permalink
skip reading today data
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr committed May 23, 2022
1 parent 58be0f6 commit 23f38b1
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Waits for current job to finish (slice) and yield its result"""

today = pendulum.today(tz="UTC").date()
date_start = stream_state and stream_state.get("date_start")
if date_start:
date_start = pendulum.parse(date_start).date()

job = stream_slice["insight_job"]
for obj in job.get_result():
record = obj.export_all_data()
if date_start and pendulum.parse(record["updated_time"]).date() <= date_start:
continue
if date_start:
updated_time = pendulum.parse(record["updated_time"]).date()
if updated_time <= date_start or updated_time >= today:
continue
yield record

self._completed_slices.add(job.interval.start)
Expand Down Expand Up @@ -160,8 +163,8 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late

def _date_intervals(self) -> Iterator[pendulum.Date]:
"""Get date period to sync"""
today = pendulum.today(tz="UTC").date()
end_date = min(self._end_date, today)
yesterday = pendulum.yesterday(tz="UTC").date()
end_date = min(self._end_date, yesterday)
if end_date < self._next_cursor_value:
return
date_range = end_date - self._next_cursor_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ def api_fixture(some_config, requests_mock, fb_account_response):
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
return api


@fixture
def set_today(mocker, monkeypatch):
def inner(date: str):
today = pendulum.parse(date)
yesterday = today - pendulum.duration(days=1)
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
monkeypatch.setattr(pendulum, "yesterday", mocker.MagicMock(return_value=yesterday))
return yesterday, today

return inner
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days + 1
assert len(generated_jobs) == (end_date - start_date).days
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -225,7 +225,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days + 1
assert len(generated_jobs) == (end_date - start_date).days
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -293,10 +293,10 @@ def test_fields_custom(self, api):

assert stream.fields == ["account_id", "account_currency"]

def test_completed_slices_in_lookback_period(self, api, mocker, monkeypatch):
def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=pendulum.parse("2020-04-01")))
set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)
Expand All @@ -323,43 +323,41 @@ def test_completed_slices_in_lookback_period(self, api, mocker, monkeypatch):
assert pendulum.parse("2020-03-23").date() in slices
assert stream._completed_slices == {pendulum.Date(2020, 3, 21)}

def test_incremental_lookback_period_updated(self, api, mocker, monkeypatch):
def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
today = pendulum.parse("2020-04-01")
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
yesterday, _ = set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)

records = read_full_refresh(stream)
assert len(records) == (today - start_date).days + 1
assert len(records) == (yesterday - start_date).days + 1
assert records[0]["date_start"] == str(start_date.date())
assert records[-1]["date_start"] == str(today.date())
assert records[-1]["date_start"] == str(yesterday.date())

state = {AdsInsights.cursor_field: "2020-03-20", "time_increment": 1}
records = read_incremental(stream, state)
assert len(records) == (today - pendulum.parse("2020-03-20")).days
assert len(records) == (yesterday - pendulum.parse("2020-03-20")).days
assert records[0]["date_start"] == "2020-03-21"
assert records[-1]["date_start"] == str(today.date())
assert state == {"date_start": str(today.date()), "slices": [], "time_increment": 1}
assert records[-1]["date_start"] == str(yesterday.date())
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

today = pendulum.parse("2020-04-02")
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
yesterday, _ = set_today("2020-04-02")
records = read_incremental(stream, state)
assert records == [{"date_start": str(today.date()), "updated_time": str(today.date())}]
assert state == {"date_start": str(today.date()), "slices": [], "time_increment": 1}
assert records == [{"date_start": str(yesterday.date()), "updated_time": str(yesterday.date())}]
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

today = pendulum.parse("2020-04-03")
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
yesterday, _ = set_today("2020-04-03")
FakeInsightAsyncJob.update_insight("2020-03-26", "2020-04-01")
FakeInsightAsyncJob.update_insight("2020-03-27", "2020-04-02")
FakeInsightAsyncJob.update_insight("2020-03-28", "2020-04-03")

records = read_incremental(stream, state)
assert records == [
{"date_start": "2020-03-28", "updated_time": "2020-04-03"},
{"date_start": "2020-04-03", "updated_time": "2020-04-03"},
{"date_start": "2020-03-27", "updated_time": "2020-04-02"},
{"date_start": "2020-04-02", "updated_time": "2020-04-02"},
]
assert state == {"date_start": str(today.date()), "slices": [], "time_increment": 1}
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

0 comments on commit 23f38b1

Please sign in to comment.