Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use composite api in discover #45

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 148 additions & 135 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,151 +140,164 @@ def do_discover(sf: Salesforce, streams: list[str]):
sf_custom_setting_objects = []
object_to_tag_references = {}

# For each SF Object describe it, loop its fields and build a schema
entries = []
sobject_batches = []
batch = []
for sobject_name in objects_to_discover:

# Skip blacklisted SF objects depending on the api_type in use
# ChangeEvent objects are not queryable via Bulk or REST (undocumented)
if sobject_name in sf.get_blacklisted_objects() \
or sobject_name.endswith("ChangeEvent"):
continue
batch.append(sobject_name)
if len(batch) == 25:
sobject_batches.append(batch)
batch = []
if len(batch) > 0:
sobject_batches.append(batch)

sobject_description = sf.describe(sobject_name)

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"]
[0]] = sobject_name

fields = sobject_description['fields']
replication_key = get_replication_key(sobject_name, fields)

unsupported_fields = set()
properties = {}
mdata = metadata.new()

found_id_field = False

# Loop over the object's fields
for f in fields:
field_name = f['name']
field_type = f['type']

if field_name == "Id":
found_id_field = True

property_schema, mdata = create_property_schema(
f, mdata)

# Compound Address fields cannot be queried by the Bulk API
if f['type'] == "address" and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE:
unsupported_fields.add(
(field_name, 'cannot query compound address fields with bulk API'))

# we haven't been able to observe any records with a json field, so we
# are marking it as unavailable until we have an example to work with
if f['type'] == "json":
unsupported_fields.add(
(field_name, 'do not currently support json fields - please contact support'))

# Blacklisted fields are dependent on the api_type being used
field_pair = (sobject_name, field_name)
if field_pair in sf.get_blacklisted_fields():
unsupported_fields.add(
(field_name, sf.get_blacklisted_fields()[field_pair]))

inclusion = metadata.get(
mdata, ('properties', field_name), 'inclusion')

if sf.select_fields_by_default and inclusion != 'unsupported':
# For each SF Object describe it, loop its fields and build a schema
entries = []
for batch in sobject_batches:
sobject_descriptions = sf.describe(batch)

for subrequest_result in sobject_descriptions:
sobject_description = subrequest_result["result"]
sobject_name = sobject_description["name"]

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"]
[0]] = sobject_name

fields = sobject_description['fields']
replication_key = get_replication_key(sobject_name, fields)

unsupported_fields = set()
properties = {}
mdata = metadata.new()

found_id_field = False

# Loop over the object's fields
for f in fields:
field_name = f['name']
field_type = f['type']

if field_name == "Id":
found_id_field = True

property_schema, mdata = create_property_schema(
f, mdata)

# Compound Address fields cannot be queried by the Bulk API
if f['type'] == "address" and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE:
unsupported_fields.add(
(field_name, 'cannot query compound address fields with bulk API'))

# we haven't been able to observe any records with a json field, so we
# are marking it as unavailable until we have an example to work with
if f['type'] == "json":
unsupported_fields.add(
(field_name, 'do not currently support json fields - please contact support'))

# Blacklisted fields are dependent on the api_type being used
field_pair = (sobject_name, field_name)
if field_pair in sf.get_blacklisted_fields():
unsupported_fields.add(
(field_name, sf.get_blacklisted_fields()[field_pair]))

inclusion = metadata.get(
mdata, ('properties', field_name), 'inclusion')

if sf.select_fields_by_default and inclusion != 'unsupported':
mdata = metadata.write(
mdata, ('properties', field_name), 'selected-by-default', True)

properties[field_name] = property_schema

if replication_key:
mdata = metadata.write(
mdata, ('properties', field_name), 'selected-by-default', True)

properties[field_name] = property_schema

if replication_key:
mdata = metadata.write(
mdata, ('properties', replication_key), 'inclusion', 'automatic')

# There are cases where compound fields are referenced by the associated
# subfields but are not actually present in the field list
field_name_set = {f['name'] for f in fields}
filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set]
missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set]

if missing_unsupported_field_names:
LOGGER.info("Ignoring the following unsupported fields for object %s as they are missing from the field list: %s",
sobject_name,
', '.join(sorted(missing_unsupported_field_names)))

if filtered_unsupported_fields:
LOGGER.info("Not syncing the following unsupported fields for object %s: %s",
sobject_name,
', '.join(sorted([k for k, _ in filtered_unsupported_fields])))

# Salesforce Objects are skipped when they do not have an Id field
if not found_id_field:
LOGGER.info(
"Skipping Salesforce Object %s, as it has no Id field",
sobject_name)
continue
mdata, ('properties', replication_key), 'inclusion', 'automatic')

# There are cases where compound fields are referenced by the associated
# subfields but are not actually present in the field list
field_name_set = {f['name'] for f in fields}
filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set]
missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set]

if missing_unsupported_field_names:
LOGGER.info("Ignoring the following unsupported fields for object %s as they are missing from the field list: %s",
sobject_name,
', '.join(sorted(missing_unsupported_field_names)))

if filtered_unsupported_fields:
LOGGER.info("Not syncing the following unsupported fields for object %s: %s",
sobject_name,
', '.join(sorted([k for k, _ in filtered_unsupported_fields])))

# Salesforce Objects are skipped when they do not have an Id field
if not found_id_field:
LOGGER.info(
"Skipping Salesforce Object %s, as it has no Id field",
sobject_name)
continue

# Any property added to unsupported_fields has metadata generated and
# removed
for prop, description in filtered_unsupported_fields:
if metadata.get(mdata, ('properties', prop),
'selected-by-default'):
metadata.delete(
mdata, ('properties', prop), 'selected-by-default')

# Any property added to unsupported_fields has metadata generated and
# removed
for prop, description in filtered_unsupported_fields:
if metadata.get(mdata, ('properties', prop),
'selected-by-default'):
metadata.delete(
mdata, ('properties', prop), 'selected-by-default')

mdata = metadata.write(
mdata, ('properties', prop), 'unsupported-description', description)
mdata = metadata.write(
mdata, ('properties', prop), 'inclusion', 'unsupported')

if replication_key:
mdata = metadata.write(
mdata, (), 'valid-replication-keys', [replication_key])
mdata = metadata.write(
mdata, (), 'replication-key', replication_key
)
mdata = metadata.write(
mdata, (), 'replication-method', "INCREMENTAL"
)
else:
mdata = metadata.write(
mdata,
(),
'forced-replication-method',
{
'replication-method': 'FULL_TABLE',
'reason': 'No replication keys found from the Salesforce API'})

mdata = metadata.write(mdata, (), 'table-key-properties', key_properties)

schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties
}

entry = {
'stream': sobject_name,
'tap_stream_id': sobject_name,
'schema': schema,
'metadata': metadata.to_list(mdata)
}

entries.append(entry)
mdata = metadata.write(
mdata, ('properties', prop), 'unsupported-description', description)
mdata = metadata.write(
mdata, ('properties', prop), 'inclusion', 'unsupported')

if replication_key:
mdata = metadata.write(
mdata, (), 'valid-replication-keys', [replication_key])
mdata = metadata.write(
mdata, (), 'replication-key', replication_key
)
mdata = metadata.write(
mdata, (), 'replication-method', "INCREMENTAL"
)
else:
mdata = metadata.write(
mdata,
(),
'forced-replication-method',
{
'replication-method': 'FULL_TABLE',
'reason': 'No replication keys found from the Salesforce API'})

mdata = metadata.write(mdata, (), 'table-key-properties', key_properties)

schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties
}

entry = {
'stream': sobject_name,
'tap_stream_id': sobject_name,
'schema': schema,
'metadata': metadata.to_list(mdata)
}

entries.append(entry)

# For each custom setting field, remove its associated tag from entries
# See Blacklisting.md for more information
Expand Down
25 changes: 23 additions & 2 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import time
import json
import backoff
import requests
from requests.exceptions import RequestException
Expand Down Expand Up @@ -318,20 +319,40 @@ def describe(self, sobject=None):
"""Describes all objects or a specific object"""
headers = self.auth.rest_headers
instance_url = self.auth.instance_url
body = None
method = "GET"
if sobject is None:
endpoint = "sobjects"
endpoint_tag = "sobjects"
url = self.data_url.format(instance_url, endpoint)
elif isinstance(sobject, list):
if len(sobject) > 25:
raise TapSalesforceException(f"Composite describe limited to 25 sObjects per batch. Given list of {len(sobject)}.")
endpoint = "composite/batch"
endpoint_tag = sobject
url = self.data_url.format(instance_url, endpoint)
method = "POST"
headers['Content-Type'] = 'application/json'
composite_subrequests = []
for obj in sobject:
sub_endpoint = "sobjects/{}/describe".format(obj)
sub_url = self.data_url.format("", sub_endpoint)
subrequest = {"method": "GET", "url": sub_url}
composite_subrequests.append(subrequest)
body = json.dumps({"batchRequests": composite_subrequests})
else:
endpoint = "sobjects/{}/describe".format(sobject)
endpoint_tag = sobject
url = self.data_url.format(instance_url, endpoint)

with metrics.http_request_timer("describe") as timer:
timer.tags['endpoint'] = endpoint_tag
resp = self._make_request('GET', url, headers=headers)
resp = self._make_request(method, url, headers=headers, body=body)

return resp.json()
if isinstance(sobject, list):
return resp.json()['results']
else:
return resp.json()

# pylint: disable=no-self-use
def _get_selected_properties(self, catalog_entry):
Expand Down