Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dclid implementation to the CM connector and fixed a bug in the customVariables column #115

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -184,7 +184,15 @@ def _get_bq_rows(self, rows, now):

def _get_table_columns(self, client, table_name):
table = client.get_table(table_name)
return [schema.name for schema in table.schema]
columns = []
for schema in table.schema:
if len(schema.fields) == 0:
# Add only if it is not a nested column
columns.append(schema.name)
for field in schema.fields:
# Nested columns will be added here
columns.append(f'{schema.name}.{field.name}')
return columns

def _get_bq_client(self):
return bigquery.Client(location=self._bq_location)
Expand Down
11 changes: 7 additions & 4 deletions megalista_dataflow/data_sources/data_schemas.py
Expand Up @@ -30,6 +30,7 @@
{'name': 'mobileDeviceId', 'required': False, 'data_type': 'string'},
{'name': 'encryptedUserId', 'required': False, 'data_type': 'string'},
{'name': 'matchId', 'required': False, 'data_type': 'string'},
{'name': 'dclid', 'required': False, 'data_type': 'string'},
{'name': 'value', 'required': False, 'data_type': 'int'},
{'name': 'quantity', 'required': False, 'data_type': 'int'},
{'name': 'timestamp', 'required': False, 'data_type': 'string'},
Expand All @@ -39,7 +40,7 @@
'required': False, 'data_type': 'string'}
],
'groups': [
['gclid', 'mobileDeviceId', 'encryptedUserId', 'matchId']
['gclid', 'mobileDeviceId', 'encryptedUserId', 'matchId', 'dclid']
]
},
'ADS_OFFLINE_CONVERSION': {
Expand Down Expand Up @@ -300,15 +301,17 @@ def get_error_message(data_cols: List[str], destination_type: DestinationType) -
def get_cols_names(data_cols: list, destination_type: DestinationType) -> list:
data_type = _dtypes[destination_type.name]
data_type_cols = [col['name'] for col in data_type['columns']]

filtered_cols = []
for col in data_cols:
found = False
for data_type_col in data_type_cols:
if re.match(f'^{data_type_col}$', col) is not None:
nested = col.split('.')
# if nested, only add the parent column name
if len(nested) > 1:
col = nested[0]
if col not in filtered_cols:
filtered_cols.append(col)

break
return filtered_cols

# Parse columns that aren't string
Expand Down
10 changes: 8 additions & 2 deletions megalista_dataflow/data_sources/data_schemas_test.py
Expand Up @@ -88,7 +88,13 @@ def test_get_cols_names(mocker):
cols_1_filtered = ['uuid', 'gclid']
cols_2 = ['uuid', 'gclid', 'aaa']
cols_2_filtered = ['uuid', 'gclid']

cols_3 = ['uuid', 'dclid']
cols_3_filtered = ['uuid', 'dclid']
cols_4 = ['uuid', 'matchId', 'quantity', 'value', 'customVariables.type', 'customVariables.value']
# For columns with arrays and nested fields, return the parent column only
cols_4_filtered = ['uuid', 'matchId', 'quantity', 'value', 'customVariables']

assert DataSchemas.get_cols_names(cols_1, destination_type) == cols_1_filtered
assert DataSchemas.get_cols_names(cols_2, destination_type) == cols_2_filtered

assert DataSchemas.get_cols_names(cols_3, destination_type) == cols_3_filtered
assert DataSchemas.get_cols_names(cols_4, destination_type) == cols_4_filtered
2 changes: 1 addition & 1 deletion megalista_dataflow/requirements.txt
@@ -1,6 +1,6 @@
apache-beam[gcp]==2.38.0
google-ads==15.1.1
google-api-python-client==2.45.0
google-api-python-client==2.65.0
google-cloud-bigquery==2.34.3
google-cloud-firestore==2.4.0
google-cloud-storage==2.2.1
Expand Down
2 changes: 1 addition & 1 deletion megalista_dataflow/setup.py
Expand Up @@ -20,7 +20,7 @@
author='Google',
author_email='megalista-admin@google.com',
url='https://github.com/google/megalista/',
install_requires=['google-ads==15.1.1', 'google-api-python-client==2.45.0',
install_requires=['google-ads==15.1.1', 'google-api-python-client==2.65.0',
'google-cloud-bigquery==2.34.3','aiohttp==3.6.2',
'google-cloud-storage==2.2.1', 'google-cloud-firestore==2.4.0',
'pyparsing==2.4.7', 'proto-plus==1.19.6', 'protobuf==3.20.0',
Expand Down
Expand Up @@ -45,7 +45,7 @@ def _get_dcm_service(self):
'https://www.googleapis.com/auth/dfatrafficking',
'https://www.googleapis.com/auth/ddmconversions'])

return build('dfareporting', 'v3.5', credentials=credentials)
return build('dfareporting', 'v4', credentials=credentials)

def start_bundle(self):
pass
Expand Down Expand Up @@ -107,6 +107,8 @@ def _do_upload_data(
to_upload['mobileDeviceId'] = conversion['mobileDeviceId']
elif 'matchId' in conversion and conversion['matchId']:
to_upload['matchId'] = conversion['matchId']
elif 'dclid' in conversion and conversion['dclid']:
to_upload['dclid'] = conversion['dclid']

if 'value' in conversion:
to_upload['value'] = float(conversion['value'])
Expand Down