Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Facebook Marketing: Convert values' types according to schema types #4978

Merged
merged 9 commits into from Sep 14, 2021
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.16",
"dockerImageTag": "0.2.17",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Expand Up @@ -137,7 +137,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.16
dockerImageTag: 0.2.17
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 010eb12f-837b-4685-892d-0a39f76a98f5
Expand Down
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Expand Up @@ -8,18 +8,19 @@ tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "exception"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# FB serializes numeric fields as strings
validate_schema: no
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021
# because all running campaigns should be finished by that time.
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
Expand Up @@ -42,9 +42,5 @@
"ads_insights_action_types": {
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_action_types": {
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
}
}
Expand Up @@ -2,5 +2,5 @@
"start_date": "2021-04-01T00:00:00Z",
"account_id": "account",
"access_token": "wrong_token",
"include_deleted": "true"
"include_deleted": true
}
Expand Up @@ -48,20 +48,10 @@
"format": "date-time"
},
"daily_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"budget_remaining": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"effective_status": {
"type": ["null", "string"]
Expand All @@ -78,12 +68,7 @@
"format": "date-time"
},
"lifetime_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"targeting": { "$ref": "targeting.json" },
"bid_info": {
Expand Down
Expand Up @@ -21,7 +21,7 @@
"$ref": "targeting.json#/definitions/id_name_pairs"
},
"home_type": {
"$ref$": "targeting.json#/definitions/id_name_pairs"
"$ref": "targeting.json#/definitions/id_name_pairs"
},
"friends_of_connections": {
"$ref": "targeting.json#/definitions/id_name_pairs"
Expand Down
Expand Up @@ -27,7 +27,7 @@
from abc import ABC
from collections import deque
from datetime import datetime
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union

import backoff
import pendulum
Expand All @@ -46,7 +46,7 @@
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)


def remove_params_from_url(url: str, params: [str]) -> str:
def remove_params_from_url(url: str, params: List[str]) -> str:
"""
Parses a URL and removes the query parameters specified in params
:param url: URL
Expand Down Expand Up @@ -110,7 +110,63 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
yield self._extend_record(record, fields=self.fields)
yield self.transform(self._extend_record(record, fields=self.fields))

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Use this method to remove update fields types in record according to schema.
"""
schema = self.get_json_schema()
self.convert_to_schema_types(record, schema["properties"])
return record

def get_python_type(self, _types: Union[list, str]) -> tuple:
"""Converts types from schema to python types. Examples:
- `["string", "null"]` will be converted to `(str,)`
- `["array", "string", "null"]` will be converted to `(list, str,)`
- `"boolean"` will be converted to `(bool,)`
"""
types_mapping = {
"string": str,
"number": float,
"integer": int,
"object": dict,
"array": list,
"boolean": bool,
}

if isinstance(_types, list):
return tuple([types_mapping[t] for t in _types if t != "null"])

return (types_mapping[_types],)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return the 0th element in this method instead of from the calling code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_python_type() function converts types from schema to python types. This means that we must return values only from types_mapping dictionary.


def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]):
"""
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
types from schema.
"""
if not schema:
return

for key, value in record.items():
if key not in schema:
continue

if isinstance(value, dict):
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {}))
elif isinstance(value, list) and "items" in schema[key]:
for record_list_item in value:
if list in self.get_python_type(schema[key]["items"]["type"]):
# TODO Currently we don't have support for list of lists.
pass
elif dict in self.get_python_type(schema[key]["items"]["type"]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expression schema[key][items][type] is repeated 3 times, might be good to refactor into a variable for readability e.g: list_items_type = schema[key][items][type]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, we can't do that. schema[key]["items"]["type"] is being used specifically in if statements in which we are sure key items is presented. If I will move schema[key]["items"]["type"] in one place above all if statements then we will keep on getting errors like this one.

self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"])
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)
elif not isinstance(value, self.get_python_type(schema[key]["type"])):
record[key] = self.get_python_type(schema[key]["type"])[0](value)

def _read_records(self, params: Mapping[str, Any]) -> Iterable:
"""Wrapper around query to backoff errors.
Expand Down Expand Up @@ -298,7 +354,7 @@ class AdsInsights(FBMarketingIncrementalStream):
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
MAX_ASYNC_JOBS = 3
MAX_ASYNC_JOBS = 10
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)

action_breakdowns = ALL_ACTION_BREAKDOWNS
Expand All @@ -325,7 +381,7 @@ def read_records(
# because we query `lookback_window` days before actual cursor we might get records older then cursor

for obj in result.get_result():
yield obj.export_all_data()
yield self.transform(obj.export_all_data())

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down Expand Up @@ -356,7 +412,7 @@ def wait_for_job(self, job) -> AdReportRun:
job = job.api_get()
job_progress_pct = job["async_percent_completion"]
job_id = job["report_run_id"]
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete")
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})")
runtime = pendulum.now() - start_time

if job["async_status"] == "Job Completed":
Expand Down
Expand Up @@ -21,6 +21,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from source_facebook_marketing.streams import remove_params_from_url


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Expand Up @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
| 0.2.16 | 2021-09-14 | [6060](https://github.com/airbytehq/airbyte/pull/6060) | Fix schema for `ads_insights` stream |
| 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions |
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management |
Expand Down