Skip to content

Commit

Permalink
šŸ› Source Facebook Marketing: Convert values' types according to schemā€¦
Browse files Browse the repository at this point in the history
ā€¦a types (#4978)

* Convert values' types according to schema types

* Put streams back to `configured_catalog.json`

Put back `ads_insights` and `ads_insights_age_and_gender` streams.

* Pickup changes from #5946

* Implement change request + fix previous PR

* Update schema

* Remove items_type from convert_to_schema_types()

* Bump connectors version
  • Loading branch information
Zirochkaa committed Sep 14, 2021
1 parent 7eb5577 commit 6d39afc
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 38 deletions.
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.16",
"dockerImageTag": "0.2.17",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Expand Up @@ -137,7 +137,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.16
dockerImageTag: 0.2.17
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 010eb12f-837b-4685-892d-0a39f76a98f5
Expand Down
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Expand Up @@ -8,18 +8,19 @@ tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "exception"
status: "failed"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# FB serializes numeric fields as strings
validate_schema: no
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021
# because all running campaigns should be finished by that time.
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
Expand Up @@ -42,9 +42,5 @@
"ads_insights_action_types": {
"date_start": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"ads_insights_action_types": {
"date_start": "2021-07-25T13:34:26Z",
"include_deleted": true
}
}
Expand Up @@ -2,5 +2,5 @@
"start_date": "2021-04-01T00:00:00Z",
"account_id": "account",
"access_token": "wrong_token",
"include_deleted": "true"
"include_deleted": true
}
Expand Up @@ -48,20 +48,10 @@
"format": "date-time"
},
"daily_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"budget_remaining": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"effective_status": {
"type": ["null", "string"]
Expand All @@ -78,12 +68,7 @@
"format": "date-time"
},
"lifetime_budget": {
"type": ["null", "number"],
"maximum": 100000000000000000000000000000000,
"minimum": -100000000000000000000000000000000,
"multipleOf": 0.000001,
"exclusiveMaximum": true,
"exclusiveMinimum": true
"type": ["null", "number"]
},
"targeting": { "$ref": "targeting.json" },
"bid_info": {
Expand Down
Expand Up @@ -21,7 +21,7 @@
"$ref": "targeting.json#/definitions/id_name_pairs"
},
"home_type": {
"$ref$": "targeting.json#/definitions/id_name_pairs"
"$ref": "targeting.json#/definitions/id_name_pairs"
},
"friends_of_connections": {
"$ref": "targeting.json#/definitions/id_name_pairs"
Expand Down
Expand Up @@ -27,7 +27,7 @@
from abc import ABC
from collections import deque
from datetime import datetime
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union

import backoff
import pendulum
Expand All @@ -46,7 +46,7 @@
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)


def remove_params_from_url(url: str, params: [str]) -> str:
def remove_params_from_url(url: str, params: List[str]) -> str:
"""
Parses a URL and removes the query parameters specified in params
:param url: URL
Expand Down Expand Up @@ -110,7 +110,63 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
yield self._extend_record(record, fields=self.fields)
yield self.transform(self._extend_record(record, fields=self.fields))

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Use this method to remove update fields types in record according to schema.
"""
schema = self.get_json_schema()
self.convert_to_schema_types(record, schema["properties"])
return record

def get_python_type(self, _types: Union[list, str]) -> tuple:
"""Converts types from schema to python types. Examples:
- `["string", "null"]` will be converted to `(str,)`
- `["array", "string", "null"]` will be converted to `(list, str,)`
- `"boolean"` will be converted to `(bool,)`
"""
types_mapping = {
"string": str,
"number": float,
"integer": int,
"object": dict,
"array": list,
"boolean": bool,
}

if isinstance(_types, list):
return tuple([types_mapping[t] for t in _types if t != "null"])

return (types_mapping[_types],)

def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]):
"""
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
types from schema.
"""
if not schema:
return

for key, value in record.items():
if key not in schema:
continue

if isinstance(value, dict):
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {}))
elif isinstance(value, list) and "items" in schema[key]:
for record_list_item in value:
if list in self.get_python_type(schema[key]["items"]["type"]):
# TODO Currently we don't have support for list of lists.
pass
elif dict in self.get_python_type(schema[key]["items"]["type"]):
self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"])
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)
elif not isinstance(value, self.get_python_type(schema[key]["type"])):
record[key] = self.get_python_type(schema[key]["type"])[0](value)

def _read_records(self, params: Mapping[str, Any]) -> Iterable:
"""Wrapper around query to backoff errors.
Expand Down Expand Up @@ -298,7 +354,7 @@ class AdsInsights(FBMarketingIncrementalStream):
MAX_WAIT_TO_START = pendulum.duration(minutes=5)
MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
MAX_ASYNC_JOBS = 3
MAX_ASYNC_JOBS = 10
INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)

action_breakdowns = ALL_ACTION_BREAKDOWNS
Expand All @@ -325,7 +381,7 @@ def read_records(
# because we query `lookback_window` days before actual cursor we might get records older then cursor

for obj in result.get_result():
yield obj.export_all_data()
yield self.transform(obj.export_all_data())

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down Expand Up @@ -356,7 +412,7 @@ def wait_for_job(self, job) -> AdReportRun:
job = job.api_get()
job_progress_pct = job["async_percent_completion"]
job_id = job["report_run_id"]
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete")
self.logger.info(f"ReportRunId {job_id} is {job_progress_pct}% complete ({job['async_status']})")
runtime = pendulum.now() - start_time

if job["async_status"] == "Job Completed":
Expand Down
Expand Up @@ -21,6 +21,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from source_facebook_marketing.streams import remove_params_from_url


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Expand Up @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
| 0.2.16 | 2021-09-14 | [6060](https://github.com/airbytehq/airbyte/pull/6060) | Fix schema for `ads_insights` stream |
| 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions |
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management |
Expand Down

0 comments on commit 6d39afc

Please sign in to comment.