Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Facebook Marketing: Convert values' types according to schema types #4978

Merged
merged 9 commits into from Sep 14, 2021
Expand Up @@ -8,18 +8,19 @@ tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "exception"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# FB serializes numeric fields as strings
validate_schema: no
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021
# because all running campaigns should be finished by that time.
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
@@ -1,42 +1,42 @@
{
"campaigns": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ad_creatives": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ad_sets": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads": {
"updated_time": "2021-07-25T13:34:26Z",
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_age_and_gender": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_country": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_dma": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_platfrom_and_device": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_region": {
"date_start": "2021-07-25T13:34:26Z",
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
}
}
Expand Up @@ -2,5 +2,5 @@
"start_date": "2021-04-01T00:00:00Z",
"account_id": "account",
"access_token": "wrong_token",
"include_deleted": "true"
"include_deleted": true
}
Expand Up @@ -48,20 +48,10 @@
"format": "date-time"
},
"daily_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"budget_remaining": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"effective_status": {
"type": ["null", "string"]
Expand All @@ -78,12 +68,7 @@
"format": "date-time"
},
"lifetime_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"targeting": { "$ref": "targeting.json" },
"bid_info": {
Expand Down
Expand Up @@ -148,18 +148,18 @@
"type": ["null", "number"]
},
"created_time": {
"format": "date-time",
"format": "date",
"type": ["null", "string"]
},
"ctr": {
"type": ["null", "number"]
},
"date_start": {
"format": "date-time",
"format": "date",
"type": ["null", "string"]
},
"date_stop": {
"format": "date-time",
"format": "date",
"type": ["null", "string"]
},
"engagement_rate_ranking": {
Expand Down Expand Up @@ -280,7 +280,7 @@
"$ref": "ads_action_stats.json"
},
"updated_time": {
"format": "date-time",
"format": "date",
"type": ["null", "string"]
},
"video_15_sec_watched_actions": {
Expand Down
Expand Up @@ -27,7 +27,7 @@
from abc import ABC
from collections import deque
from datetime import datetime
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union

import backoff
import pendulum
Expand Down Expand Up @@ -107,7 +107,57 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
yield self._extend_record(record, fields=self.fields)
yield self.transform(self._extend_record(record, fields=self.fields))

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Use this method to remove update fields types in record according to schema.
"""
schema = self.get_json_schema()
self.convert_to_schema_types(record, schema["properties"])
return record

def get_python_type(self, _types: Union[list, str]) -> tuple:
types_mapping = {
"string": str,
"number": float,
"integer": int,
"null": None,
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
"object": dict,
"array": list,
"boolean": bool,
}

if isinstance(_types, list):
return tuple([types_mapping[t] for t in _types if t != "null"])

return (types_mapping[_types],)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return the 0th element in this method instead of from the calling code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_python_type() function converts types from schema to python types. This means that we must return values only from types_mapping dictionary.


def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]):
"""
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
types from schema.
"""
for key, value in record.items():
if key not in schema:
continue

if isinstance(value, dict):
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {}))
elif isinstance(value, list) and "items" in schema[key]:
for record_list_item in value:
if list in self.get_python_type(schema[key]["items"]["type"]):
# TODO Currently we don't have support for list of lists.
pass
elif dict in self.get_python_type(schema[key]["items"]["type"]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expression schema[key][items][type] is repeated 3 times, might be good to refactor into a variable for readability e.g: list_items_type = schema[key][items][type]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, we can't do that. schema[key]["items"]["type"] is being used specifically in if statements in which we are sure key items is presented. If I will move schema[key]["items"]["type"] in one place above all if statements then we will keep on getting errors like this one.

self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"])
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)

if not isinstance(value, self.get_python_type(schema[key]["type"])):
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
record[key] = self.get_python_type(schema[key]["type"])[0](value)

def _read_records(self, params: Mapping[str, Any]) -> Iterable:
"""Wrapper around query to backoff errors.
Expand Down Expand Up @@ -295,7 +345,7 @@ class AdsInsights(FBMarketingIncrementalStream):
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
MAX_ASYNC_JOBS = 3
MAX_ASYNC_JOBS = 10
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)

action_breakdowns = ALL_ACTION_BREAKDOWNS
Expand All @@ -322,7 +372,7 @@ def read_records(
# because we query `lookback_window` days before actual cursor we might get records older then cursor

for obj in result.get_result():
yield obj.export_all_data()
yield self.transform(obj.export_all_data())

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down Expand Up @@ -353,7 +403,7 @@ def wait_for_job(self, job) -> AdReportRun:
job = job.api_get()
job_progress_pct = job["async_percent_completion"]
job_id = job["report_run_id"]
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete")
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})")
runtime = pendulum.now() - start_time

if job["async_status"] == "Job Completed":
Expand Down
13 changes: 7 additions & 6 deletions docs/integrations/sources/facebook-marketing.md
Expand Up @@ -56,7 +56,7 @@ See Facebook's [documentation on rate limiting](https://developers.facebook.com/

### Requirements

* A Facebook Ad Account ID
* A Facebook Ad Account ID
* A Facebook App which has the Marketing API enabled
* A Facebook Marketing API Access Token
* Request a rate limit increase from Facebook
Expand Down Expand Up @@ -101,15 +101,16 @@ With the Ad Account ID and API access token, you should be ready to start pullin

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management|
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:<br>- Improve error handling.<br>- Improve async job performance (insights).<br>- Add new configuration parameter `insights_days_per_job`.<br>- Rename stream `adsets` to `ad_sets`.<br>- Refactor schema logic for insights, allowing to configure any possible insight stream.|
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0|
| 0.2.15 | 2021-09-08 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management |
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:<br>- Improve error handling.<br>- Improve async job performance (insights).<br>- Add new configuration parameter `insights_days_per_job`.<br>- Rename stream `adsets` to `ad_sets`.<br>- Refactor schema logic for insights, allowing to configure any possible insight stream. |
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0 |
| 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code|
| 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code |
| 0.2.7 | 2021-06-03 | [3646](https://github.com/airbytehq/airbyte/pull/3646) | Add missing fields to AdInsights streams |
| 0.2.6 | 2021-05-25 | [3525](https://github.com/airbytehq/airbyte/pull/3525) | Fix handling call rate limit |
| 0.2.5 | 2021-05-20 | [3396](https://github.com/airbytehq/airbyte/pull/3396) | Allow configuring insights lookback window |
| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync|
| 0.2.4 | 2021-05-13 | [3395](https://github.com/airbytehq/airbyte/pull/3395) | Fix an issue that caused losing Insights data from the past 28 days while incremental sync |
| 0.2.3 | 2021-04-28 | [3116](https://github.com/airbytehq/airbyte/pull/3116) | Wait longer (5 min) for async jobs to start |
| 0.2.2 | 2021-04-03 | [2726](https://github.com/airbytehq/airbyte/pull/2726) | Fix base connector versioning |
| 0.2.1 | 2021-03-12 | [2391](https://github.com/airbytehq/airbyte/pull/2391) | Support FB Marketing API v10 |
Expand Down