Skip to content

Commit

Permalink
4827 WIP !!!
Browse files Browse the repository at this point in the history
  • Loading branch information
Zirochkaa committed Aug 20, 2021
1 parent d1c3a7b commit 1cb1618
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 78 deletions.
Expand Up @@ -155,7 +155,7 @@ def _validate_schema(records, configured_catalog):
for stream_name, errors in streams_errors.items():
errors = map(str, errors.values())
str_errors = f"\n{bar}\n".join(errors)
logging.error(f"The {stream_name} stream has the following schema errors:\n{str_errors}")
logging.error(f"\n3The {stream_name} stream has the following schema errors:\n{str_errors}")

if streams_errors:
pytest.fail(f"Please check your json_schema in selected streams {tuple(streams_errors.keys())}.")
Expand Down
Expand Up @@ -15,11 +15,12 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# FB serializes numeric fields as strings
validate_schema: no
# validate_schema: no
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
@@ -1,20 +1,5 @@
{
"streams": [
{
"stream": {
"name": "campaigns",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ad_sets",
Expand All @@ -30,36 +15,6 @@
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ads",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ad_creatives",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": null,
"default_cursor_field": null,
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "full_refresh",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ads_insights",
Expand All @@ -74,21 +29,6 @@
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "ads_insights_age_and_gender",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["date_start"],
"source_defined_primary_key": null,
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
}
]
}
@@ -1,42 +1,42 @@
{
"campaigns": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ad_creatives": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ad_sets": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_age_and_gender": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_country": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_dma": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_platfrom_and_device": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_region": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
}
}
Expand Up @@ -148,7 +148,7 @@
"action.type": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
"type": ["null", "array"]
}
},
"post.wall": {
Expand Down
Expand Up @@ -107,7 +107,78 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
yield self._extend_record(record, fields=self.fields)
yield self.transform(self._extend_record(record, fields=self.fields))

# for i in range(3):
# yield self.transform(
# {
# 'tracking_specs': [
# {'action.type': ['offsite_conversion'], 'fb_pixel': ['2667253716886462']},
# {'action.type': ['attention_event'], 'creative': ['23846815595220398']},
# {'action.type': ['post_engagement'], 'page': ['112704783733939'], 'post': ['244953057175777']},
# {'action.type': ['link_click'], 'post': ['244953057175777'], 'post.wall': ['112704783733939']},
# {'action.type': ['dwell'], 'creative': ['23846815595220398']}
# ],
# 'updated_time': '2021-02-15T08:49:56-0800'
# }
# )

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Use this method to remove update fields types in record according to schema.
"""
schema = self.get_json_schema()
self.logger.error(f"12{str(record)}")
self.lol_dict(record, schema["properties"])

return record

def get_python_type(self, _types):
types_mapping = {
"string": str,
"number": float,
"integer": int,
"null": None,
"object": dict,
"array": list,
"boolean": bool,
}

if isinstance(_types, list):
return tuple([types_mapping[t] for t in _types if t != "null"])

return tuple(types_mapping[_types])

def lol_dict(self, record, schema):
for key, value in record.items():
if key not in schema:
continue

if isinstance(value, dict):
self.lol_dict(record=value, schema=schema[key].get("properties", {}))
elif isinstance(value, list) and "items" in schema[key]:
for record_list_item in value:
if list in self.get_python_type(schema[key]["items"]["type"]):
# TODO If you have list of lists then add `if` below
pass
elif dict in self.get_python_type(schema[key]["items"]["type"]):
self.lol_dict(record=record_list_item, schema=schema[key]["items"]["properties"])
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)

if not isinstance(value, self.get_python_type(schema[key]["type"])):
record[key] = self.get_python_type(schema[key]["type"])[0](value)


# def lol_list(self, record, list_records, schema):
# for list_item in list_records:
# if list in self.get_python_type(schema[key]["items"]["type"]):
# # TODO If you have list of lists then add `if` below
# pass
# elif dict in self.get_python_type(schema[key]["items"]["type"]):
# self.lol_dict(record=record_list_item, schema=schema[key]["items"]["properties"])
# elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
# record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)

def _read_records(self, params: Mapping[str, Any]) -> Iterable:
"""Wrapper around query to backoff errors.
Expand Down Expand Up @@ -295,7 +366,7 @@ class AdsInsights(FBMarketingIncrementalStream):
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
MAX_ASYNC_JOBS = 3
MAX_ASYNC_JOBS = 10
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)

action_breakdowns = ALL_ACTION_BREAKDOWNS
Expand All @@ -305,6 +376,23 @@ class AdsInsights(FBMarketingIncrementalStream):

breakdowns = []

fields_to_transform = (
(int, ("clicks", "impressions", "reach", "unique_clicks", )),
(float, ("frequency", "social_spend", "spend", "wish_bid", )),


(list, (
("actions", (
(int, ("1d_click", "7d_click", "28d_click", )),
(float, ("value", ))
)),
("unique_actions", (
(int, ("1d_click", "7d_click", "28d_click",)),
(float, ("value",))
)),
)),
)

def __init__(self, buffer_days, days_per_job, **kwargs):
super().__init__(**kwargs)
self.lookback_window = pendulum.duration(days=buffer_days)
Expand All @@ -322,7 +410,7 @@ def read_records(
# because we query `lookback_window` days before actual cursor we might get records older then cursor

for obj in result.get_result():
yield obj.export_all_data()
yield self.transform(obj.export_all_data())

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down Expand Up @@ -353,7 +441,7 @@ def wait_for_job(self, job) -> AdReportRun:
job = job.api_get()
job_progress_pct = job["async_percent_completion"]
job_id = job["report_run_id"]
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete")
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})")
runtime = pendulum.now() - start_time

if job["async_status"] == "Job Completed":
Expand Down

0 comments on commit 1cb1618

Please sign in to comment.