diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 8110893..7bce5ec 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -140,8 +140,8 @@ def do_discover(sf: Salesforce, streams: list[str]): sf_custom_setting_objects = [] object_to_tag_references = {} - # For each SF Object describe it, loop its fields and build a schema - entries = [] + sobject_batches = [] + batch = [] for sobject_name in objects_to_discover: # Skip blacklisted SF objects depending on the api_type in use @@ -149,142 +149,155 @@ def do_discover(sf: Salesforce, streams: list[str]): if sobject_name in sf.get_blacklisted_objects() \ or sobject_name.endswith("ChangeEvent"): continue + batch.append(sobject_name) + if len(batch) == 25: + sobject_batches.append(batch) + batch = [] + if len(batch) > 0: + sobject_batches.append(batch) - sobject_description = sf.describe(sobject_name) - - # Cache customSetting and Tag objects to check for blacklisting after - # all objects have been described - if sobject_description.get("customSetting"): - sf_custom_setting_objects.append(sobject_name) - elif sobject_name.endswith("__Tag"): - relationship_field = next( - (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), - None) - if relationship_field: - # Map {"Object":"Object__Tag"} - object_to_tag_references[relationship_field["referenceTo"] - [0]] = sobject_name - - fields = sobject_description['fields'] - replication_key = get_replication_key(sobject_name, fields) - - unsupported_fields = set() - properties = {} - mdata = metadata.new() - - found_id_field = False - - # Loop over the object's fields - for f in fields: - field_name = f['name'] - field_type = f['type'] - - if field_name == "Id": - found_id_field = True - - property_schema, mdata = create_property_schema( - f, mdata) - - # Compound Address fields cannot be queried by the Bulk API - if f['type'] in ("address", "location") and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE: - unsupported_fields.add( - (field_name, 'cannot query compound address fields with bulk API')) - - # we haven't been able to observe any records with a json field, so we - # are marking it as unavailable until we have an example to work with - if f['type'] == "json": - unsupported_fields.add( - (field_name, 'do not currently support json fields - please contact support')) - - # Blacklisted fields are dependent on the api_type being used - field_pair = (sobject_name, field_name) - if field_pair in sf.get_blacklisted_fields(): - unsupported_fields.add( - (field_name, sf.get_blacklisted_fields()[field_pair])) - - inclusion = metadata.get( - mdata, ('properties', field_name), 'inclusion') - - if sf.select_fields_by_default and inclusion != 'unsupported': + # For each SF Object describe it, loop its fields and build a schema + entries = [] + for batch in sobject_batches: + sobject_descriptions = sf.describe(batch) + + for subrequest_result in sobject_descriptions: + sobject_description = subrequest_result["result"] + sobject_name = sobject_description["name"] + + # Cache customSetting and Tag objects to check for blacklisting after + # all objects have been described + if sobject_description.get("customSetting"): + sf_custom_setting_objects.append(sobject_name) + elif sobject_name.endswith("__Tag"): + relationship_field = next( + (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), + None) + if relationship_field: + # Map {"Object":"Object__Tag"} + object_to_tag_references[relationship_field["referenceTo"] + [0]] = sobject_name + + fields = sobject_description['fields'] + replication_key = get_replication_key(sobject_name, fields) + + unsupported_fields = set() + properties = {} + mdata = metadata.new() + + found_id_field = False + + # Loop over the object's fields + for f in fields: + field_name = f['name'] + field_type = f['type'] + + if field_name == "Id": + found_id_field = True + + property_schema, mdata = create_property_schema( + f, mdata) + + # Compound Address fields cannot be queried by the Bulk API + if f['type'] in ("address", "location") and sf.api_type == tap_salesforce.salesforce.BULK_API_TYPE: + unsupported_fields.add( + (field_name, 'cannot query compound address fields with bulk API')) + + # we haven't been able to observe any records with a json field, so we + # are marking it as unavailable until we have an example to work with + if f['type'] == "json": + unsupported_fields.add( + (field_name, 'do not currently support json fields - please contact support')) + + # Blacklisted fields are dependent on the api_type being used + field_pair = (sobject_name, field_name) + if field_pair in sf.get_blacklisted_fields(): + unsupported_fields.add( + (field_name, sf.get_blacklisted_fields()[field_pair])) + + inclusion = metadata.get( + mdata, ('properties', field_name), 'inclusion') + + if sf.select_fields_by_default and inclusion != 'unsupported': + mdata = metadata.write( + mdata, ('properties', field_name), 'selected-by-default', True) + + properties[field_name] = property_schema + + if replication_key: mdata = metadata.write( - mdata, ('properties', field_name), 'selected-by-default', True) - - properties[field_name] = property_schema - - if replication_key: - mdata = metadata.write( - mdata, ('properties', replication_key), 'inclusion', 'automatic') - - # There are cases where compound fields are referenced by the associated - # subfields but are not actually present in the field list - field_name_set = {f['name'] for f in fields} - filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set] - missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set] - - if missing_unsupported_field_names: - LOGGER.info("Ignoring the following unsupported fields for object %s as they are missing from the field list: %s", - sobject_name, - ', '.join(sorted(missing_unsupported_field_names))) - - if filtered_unsupported_fields: - LOGGER.info("Not syncing the following unsupported fields for object %s: %s", - sobject_name, - ', '.join(sorted([k for k, _ in filtered_unsupported_fields]))) - - # Salesforce Objects are skipped when they do not have an Id field - if not found_id_field: - LOGGER.info( - "Skipping Salesforce Object %s, as it has no Id field", - sobject_name) - continue + mdata, ('properties', replication_key), 'inclusion', 'automatic') + + # There are cases where compound fields are referenced by the associated + # subfields but are not actually present in the field list + field_name_set = {f['name'] for f in fields} + filtered_unsupported_fields = [f for f in unsupported_fields if f[0] in field_name_set] + missing_unsupported_field_names = [f[0] for f in unsupported_fields if f[0] not in field_name_set] + + if missing_unsupported_field_names: + LOGGER.info("Ignoring the following unsupported fields for object %s as they are missing from the field list: %s", + sobject_name, + ', '.join(sorted(missing_unsupported_field_names))) + + if filtered_unsupported_fields: + LOGGER.info("Not syncing the following unsupported fields for object %s: %s", + sobject_name, + ', '.join(sorted([k for k, _ in filtered_unsupported_fields]))) + + # Salesforce Objects are skipped when they do not have an Id field + if not found_id_field: + LOGGER.info( + "Skipping Salesforce Object %s, as it has no Id field", + sobject_name) + continue + + # Any property added to unsupported_fields has metadata generated and + # removed + for prop, description in filtered_unsupported_fields: + if metadata.get(mdata, ('properties', prop), + 'selected-by-default'): + metadata.delete( + mdata, ('properties', prop), 'selected-by-default') - # Any property added to unsupported_fields has metadata generated and - # removed - for prop, description in filtered_unsupported_fields: - if metadata.get(mdata, ('properties', prop), - 'selected-by-default'): - metadata.delete( - mdata, ('properties', prop), 'selected-by-default') - - mdata = metadata.write( - mdata, ('properties', prop), 'unsupported-description', description) - mdata = metadata.write( - mdata, ('properties', prop), 'inclusion', 'unsupported') - - if replication_key: - mdata = metadata.write( - mdata, (), 'valid-replication-keys', [replication_key]) - mdata = metadata.write( - mdata, (), 'replication-key', replication_key - ) - mdata = metadata.write( - mdata, (), 'replication-method', "INCREMENTAL" - ) - else: - mdata = metadata.write( - mdata, - (), - 'forced-replication-method', - { - 'replication-method': 'FULL_TABLE', - 'reason': 'No replication keys found from the Salesforce API'}) - - mdata = metadata.write(mdata, (), 'table-key-properties', key_properties) - - schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': properties - } - - entry = { - 'stream': sobject_name, - 'tap_stream_id': sobject_name, - 'schema': schema, - 'metadata': metadata.to_list(mdata) - } - - entries.append(entry) + mdata = metadata.write( + mdata, ('properties', prop), 'unsupported-description', description) + mdata = metadata.write( + mdata, ('properties', prop), 'inclusion', 'unsupported') + + if replication_key: + mdata = metadata.write( + mdata, (), 'valid-replication-keys', [replication_key]) + mdata = metadata.write( + mdata, (), 'replication-key', replication_key + ) + mdata = metadata.write( + mdata, (), 'replication-method', "INCREMENTAL" + ) + else: + mdata = metadata.write( + mdata, + (), + 'forced-replication-method', + { + 'replication-method': 'FULL_TABLE', + 'reason': 'No replication keys found from the Salesforce API'}) + + mdata = metadata.write(mdata, (), 'table-key-properties', key_properties) + + schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': properties + } + + entry = { + 'stream': sobject_name, + 'tap_stream_id': sobject_name, + 'schema': schema, + 'metadata': metadata.to_list(mdata) + } + + entries.append(entry) # For each custom setting field, remove its associated tag from entries # See Blacklisting.md for more information diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 36b4a0b..13fc87d 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -1,5 +1,6 @@ import re import time +import json import backoff import requests from datetime import timedelta @@ -325,10 +326,27 @@ def describe(self, sobject=None): """Describes all objects or a specific object""" headers = self.auth.rest_headers instance_url = self.auth.instance_url + body = None + method = "GET" if sobject is None: endpoint = "sobjects" endpoint_tag = "sobjects" url = self.data_url.format(instance_url, endpoint) + elif isinstance(sobject, list): + if len(sobject) > 25: + raise TapSalesforceException(f"Composite describe limited to 25 sObjects per batch. Given list of {len(sobject)}.") + endpoint = "composite/batch" + endpoint_tag = sobject + url = self.data_url.format(instance_url, endpoint) + method = "POST" + headers['Content-Type'] = 'application/json' + composite_subrequests = [] + for obj in sobject: + sub_endpoint = "sobjects/{}/describe".format(obj) + sub_url = self.data_url.format("", sub_endpoint) + subrequest = {"method": "GET", "url": sub_url} + composite_subrequests.append(subrequest) + body = json.dumps({"batchRequests": composite_subrequests}) else: endpoint = "sobjects/{}/describe".format(sobject) endpoint_tag = sobject @@ -336,9 +354,12 @@ def describe(self, sobject=None): with metrics.http_request_timer("describe") as timer: timer.tags['endpoint'] = endpoint_tag - resp = self._make_request('GET', url, headers=headers) + resp = self._make_request(method, url, headers=headers, body=body) - return resp.json() + if isinstance(sobject, list): + return resp.json()['results'] + else: + return resp.json() # pylint: disable=no-self-use def _get_selected_properties(self, catalog_entry):