From 443781eaa9e7770e8cad85ab68a7e29e563e58ab Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Mon, 17 Jul 2023 15:48:44 +0300 Subject: [PATCH] do not use stream state for generating request params! --- .../acceptance-test-config.yml | 3 +- .../integration_tests/abnormal_state.json | 2 +- .../source_mixpanel/streams/base.py | 24 ++++----- .../source_mixpanel/streams/export.py | 6 +-- .../source_mixpanel/testing.py | 1 + .../test_property_transformation.py | 3 +- .../unit_tests/test_streams.py | 27 +++++----- .../source-mixpanel/unit_tests/unit_test.py | 49 ++++++++++++------- 8 files changed, 63 insertions(+), 52 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml index d8d87b79562c3..d47549eeec90a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml @@ -60,5 +60,6 @@ acceptance_tests: export: ["time"] funnels: ["41833532", "date"] revenue: ["date"] - cohort_members": ["last_seen"] + engage: ["last_seen"] + cohort_members: ["last_seen"] timeout_seconds: 9000 diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json index 0d4d97a4e1aeb..89a95990ac336 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json @@ -2,7 +2,7 @@ { "type": "STREAM", "stream": { - "stream_state": { "36152117": { "date": "2030-01-01" } }, + "stream_state": { "41833532": { "date": "2030-01-01" } }, "stream_descriptor": { "name": "funnels" } } }, diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index d6bf2327d0d5d..f70dd05c4301d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -153,25 +153,25 @@ def stream_slices( date_slices: list = [] # use the latest date between self.start_date and stream_state - start_date = self.start_date + start_date = pendulum.datetime(self.start_date.year, self.start_date.month, self.start_date.day, tz=self.project_timezone) + # end_date cannot be later than today + end_date = pendulum.datetime(self.end_date.year, self.end_date.month, self.end_date.day, tz=self.project_timezone) + end_date = min(end_date, pendulum.today(tz=self.project_timezone)) if stream_state and self.cursor_field and self.cursor_field in stream_state: # Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD') # It also means that sync returns duplicated entries for the date from the state (date range is inclusive) - stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date() - start_date = max(start_date, stream_state_date) + stream_state_date = pendulum.parse(stream_state[self.cursor_field]) + start_date = max(start_date, stream_state_date).in_tz(self.project_timezone) # move start_date back days to sync data since that time as well start_date = start_date - timedelta(days=self.attribution_window) - # end_date cannot be later than today - end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date()) - while start_date <= end_date: current_end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive date_slices.append( { - "start_date": str(start_date), - "end_date": str(min(current_end_date, end_date)), + "start_date": start_date, + "end_date": min(current_end_date, end_date), } ) # add 1 additional day because date range is inclusive @@ -185,8 +185,8 @@ def request_params( params = super().request_params(stream_state, stream_slice, next_page_token) return { **params, - "from_date": stream_slice["start_date"], - "to_date": stream_slice["end_date"], + "from_date": str(stream_slice["start_date"].date()), + "to_date": str(stream_slice["end_date"].date()), } @@ -196,6 +196,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late if updated_state: state_value = current_stream_state.get(self.cursor_field) if state_value: - updated_state = max(updated_state, state_value) - current_stream_state[self.cursor_field] = updated_state + updated_state = max(pendulum.parse(updated_state), pendulum.parse(state_value)) + current_stream_state[self.cursor_field] = str(updated_state.in_tz(self.project_timezone)) return current_stream_state diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index a3d6c9bae204e..16e52ad353a8b 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -187,9 +187,9 @@ def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: mapping = super().request_params(stream_state, stream_slice, next_page_token) - if stream_state and "date" in stream_state: - timestamp = int(pendulum.parse(stream_state["date"]).timestamp()) - mapping["where"] = f'properties["$time"]>=datetime({timestamp})' + after_timestamp = int((stream_slice["start_date"]).timestamp()) + before_timestamp = int((stream_slice["end_date"]).timestamp()) + mapping["where"] = f'properties["$time"]>=datetime({after_timestamp}) and properties["$time"]=datetime({timestamp})' + # stream state should be ignored in request params to not trigger race conditions! + # stream state only affects stream slices. + start_timestamp = int(pendulum.parse("2017-01-25T00:00:00Z").timestamp()) + end_timestamp = int(pendulum.parse("2023-02-25T00:00:00Z").timestamp()) + assert request_params.get("where") == f'properties["$time"]>=datetime({start_timestamp}) and properties["$time"]