From 1cb161801ba1384fe1076c18badbba73ef481fe5 Mon Sep 17 00:00:00 2001 From: oleh Date: Fri, 20 Aug 2021 10:27:20 +0300 Subject: [PATCH] 4827 WIP !!! --- .../source_acceptance_test/tests/test_core.py | 2 +- .../acceptance-test-config.yml | 5 +- .../integration_tests/configured_catalog.json | 60 ------------ ...{abnormal_state.json => future_state.json} | 20 ++-- .../schemas/ads.json | 2 +- .../source_facebook_marketing/streams.py | 96 ++++++++++++++++++- 6 files changed, 107 insertions(+), 78 deletions(-) rename airbyte-integrations/connectors/source-facebook-marketing/integration_tests/{abnormal_state.json => future_state.json} (57%) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 146ac6823d8f2..1a7f7d558ae7d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -155,7 +155,7 @@ def _validate_schema(records, configured_catalog): for stream_name, errors in streams_errors.items(): errors = map(str, errors.values()) str_errors = f"\n{bar}\n".join(errors) - logging.error(f"The {stream_name} stream has the following schema errors:\n{str_errors}") + logging.error(f"\n3The {stream_name} stream has the following schema errors:\n{str_errors}") if streams_errors: pytest.fail(f"Please check your json_schema in selected streams {tuple(streams_errors.keys())}.") diff --git a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml index 70eed799e56d3..75e63f3a34305 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml @@ -15,11 +15,12 @@ tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" # FB serializes numeric fields as strings - validate_schema: no +# validate_schema: no + timeout_seconds: 600 incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_without_insights.json" - future_state_path: "integration_tests/abnormal_state.json" + future_state_path: "integration_tests/future_state.json" full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json index 67dc36d5d8071..c2675b3e81095 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json @@ -1,20 +1,5 @@ { "streams": [ - { - "stream": { - "name": "campaigns", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null - }, { "stream": { "name": "ad_sets", @@ -30,36 +15,6 @@ "destination_sync_mode": "append", "primary_key": null }, - { - "stream": { - "name": "ads", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null - }, - { - "stream": { - "name": "ad_creatives", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": null, - "default_cursor_field": null, - "source_defined_primary_key": [["id"]], - "namespace": null - }, - "sync_mode": "full_refresh", - "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null - }, { "stream": { "name": "ads_insights", @@ -74,21 +29,6 @@ "cursor_field": ["date_start"], "destination_sync_mode": "append", "primary_key": null - }, - { - "stream": { - "name": "ads_insights_age_and_gender", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["date_start"], - "source_defined_primary_key": null, - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": ["date_start"], - "destination_sync_mode": "append", - "primary_key": null } ] } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json similarity index 57% rename from airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json rename to airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json index 437161ace3dce..f516de3682e07 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/future_state.json @@ -1,42 +1,42 @@ { "campaigns": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ad_creatives": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ad_sets": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads": { - "updated_time": "2021-07-25T13:34:26Z", + "updated_time": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_age_and_gender": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_country": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_dma": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_platfrom_and_device": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true }, "ads_insights_region": { - "date_start": "2021-07-25T13:34:26Z", + "date_start": "2121-07-25T13:34:26Z", "include_deleted": true } } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads.json index ce9dcb853fc8c..ea717654e1789 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads.json @@ -148,7 +148,7 @@ "action.type": { "type": ["null", "array"], "items": { - "type": ["null", "string"] + "type": ["null", "array"] } }, "post.wall": { diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index 1c375bb4f7bb5..d127c7c2da995 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -107,7 +107,78 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: """Main read method used by CDK""" for record in self._read_records(params=self.request_params(stream_state=stream_state)): - yield self._extend_record(record, fields=self.fields) + yield self.transform(self._extend_record(record, fields=self.fields)) + + # for i in range(3): + # yield self.transform( + # { + # 'tracking_specs': [ + # {'action.type': ['offsite_conversion'], 'fb_pixel': ['2667253716886462']}, + # {'action.type': ['attention_event'], 'creative': ['23846815595220398']}, + # {'action.type': ['post_engagement'], 'page': ['112704783733939'], 'post': ['244953057175777']}, + # {'action.type': ['link_click'], 'post': ['244953057175777'], 'post.wall': ['112704783733939']}, + # {'action.type': ['dwell'], 'creative': ['23846815595220398']} + # ], + # 'updated_time': '2021-02-15T08:49:56-0800' + # } + # ) + + def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + """ + Use this method to remove update fields types in record according to schema. + """ + schema = self.get_json_schema() + self.logger.error(f"12{str(record)}") + self.lol_dict(record, schema["properties"]) + + return record + + def get_python_type(self, _types): + types_mapping = { + "string": str, + "number": float, + "integer": int, + "null": None, + "object": dict, + "array": list, + "boolean": bool, + } + + if isinstance(_types, list): + return tuple([types_mapping[t] for t in _types if t != "null"]) + + return tuple(types_mapping[_types]) + + def lol_dict(self, record, schema): + for key, value in record.items(): + if key not in schema: + continue + + if isinstance(value, dict): + self.lol_dict(record=value, schema=schema[key].get("properties", {})) + elif isinstance(value, list) and "items" in schema[key]: + for record_list_item in value: + if list in self.get_python_type(schema[key]["items"]["type"]): + # TODO If you have list of lists then add `if` below + pass + elif dict in self.get_python_type(schema[key]["items"]["type"]): + self.lol_dict(record=record_list_item, schema=schema[key]["items"]["properties"]) + elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): + record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) + + if not isinstance(value, self.get_python_type(schema[key]["type"])): + record[key] = self.get_python_type(schema[key]["type"])[0](value) + + + # def lol_list(self, record, list_records, schema): + # for list_item in list_records: + # if list in self.get_python_type(schema[key]["items"]["type"]): + # # TODO If you have list of lists then add `if` below + # pass + # elif dict in self.get_python_type(schema[key]["items"]["type"]): + # self.lol_dict(record=record_list_item, schema=schema[key]["items"]["properties"]) + # elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): + # record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) def _read_records(self, params: Mapping[str, Any]) -> Iterable: """Wrapper around query to backoff errors. @@ -295,7 +366,7 @@ class AdsInsights(FBMarketingIncrementalStream): MAX_WAIT_TO_START = pendulum.duration(minutes=5) MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30) MAX_ASYNC_SLEEP = pendulum.duration(minutes=5) - MAX_ASYNC_JOBS = 3 + MAX_ASYNC_JOBS = 10 INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30) action_breakdowns = ALL_ACTION_BREAKDOWNS @@ -305,6 +376,23 @@ class AdsInsights(FBMarketingIncrementalStream): breakdowns = [] + fields_to_transform = ( + (int, ("clicks", "impressions", "reach", "unique_clicks", )), + (float, ("frequency", "social_spend", "spend", "wish_bid", )), + + + (list, ( + ("actions", ( + (int, ("1d_click", "7d_click", "28d_click", )), + (float, ("value", )) + )), + ("unique_actions", ( + (int, ("1d_click", "7d_click", "28d_click",)), + (float, ("value",)) + )), + )), + ) + def __init__(self, buffer_days, days_per_job, **kwargs): super().__init__(**kwargs) self.lookback_window = pendulum.duration(days=buffer_days) @@ -322,7 +410,7 @@ def read_records( # because we query `lookback_window` days before actual cursor we might get records older then cursor for obj in result.get_result(): - yield obj.export_all_data() + yield self.transform(obj.export_all_data()) def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. @@ -353,7 +441,7 @@ def wait_for_job(self, job) -> AdReportRun: job = job.api_get() job_progress_pct = job["async_percent_completion"] job_id = job["report_run_id"] - self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete") + self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})") runtime = pendulum.now() - start_time if job["async_status"] == "Job Completed":