Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Salesforce: fix properties chunk length count #22896

Expand Up @@ -1570,6 +1570,9 @@
icon: salesforce.svg
sourceType: api
releaseStage: generally_available
allowedHosts:
hosts:
- "*.salesforce.com"
- name: SAP Fieldglass
sourceDefinitionId: ec5f3102-fb31-4916-99ae-864faf8e7e25
dockerRepository: airbyte/source-sap-fieldglass
Expand Down
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.name=airbyte/source-salesforce
Expand Up @@ -7,6 +7,7 @@
import math
import os
import time
import urllib.parse
from abc import ABC
from contextlib import closing
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
Expand Down Expand Up @@ -38,7 +39,6 @@ class SalesforceStream(HttpStream, ABC):
page_size = 2000
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
encoding = DEFAULT_ENCODING
MAX_PROPERTIES_LENGTH = Salesforce.REQUEST_SIZE_LIMITS - 2000

def __init__(
self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs
Expand All @@ -50,6 +50,10 @@ def __init__(
self.schema: Mapping[str, Any] = schema # type: ignore[assignment]
self.sobject_options = sobject_options

@property
def max_properties_length(self) -> int:
return Salesforce.REQUEST_SIZE_LIMITS - len(self.url_base) - 2000

@property
def name(self) -> str:
return self.stream_name
Expand All @@ -69,8 +73,8 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
@property
def too_many_properties(self):
selected_properties = self.get_json_schema().get("properties", {})
properties_length = len(",".join(p for p in selected_properties))
return properties_length > self.MAX_PROPERTIES_LENGTH
properties_length = len(urllib.parse.quote(",".join(p for p in selected_properties)))
return properties_length > self.max_properties_length

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json()["records"]
Expand Down Expand Up @@ -135,8 +139,8 @@ def chunk_properties(self) -> Iterable[Mapping[str, Any]]:
summary_length = 0
local_properties = {}
for property_name, value in selected_properties.items():
current_property_length = len(property_name) + 1 # properties are split with commas
if current_property_length + summary_length >= self.MAX_PROPERTIES_LENGTH:
current_property_length = len(urllib.parse.quote(f"{property_name},"))
if current_property_length + summary_length >= self.max_properties_length:
yield local_properties
local_properties = {}
summary_length = 0
Expand Down
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import (
CSV_FIELD_SIZE_LIMIT,
Expand Down Expand Up @@ -679,3 +680,5 @@ def test_too_many_properties(stream_config, stream_api_v2_pk_too_many_properties
{"Id": 3, "propertyA": "A", "propertyB": "B"},
{"Id": 4, "propertyA": "A", "propertyB": "B"}
]
for call in requests_mock.request_history:
assert len(call.url) < Salesforce.REQUEST_SIZE_LIMITS
Expand Up @@ -9,7 +9,6 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import SalesforceStream


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -109,7 +108,7 @@ def stream_api_pk(stream_config):
@pytest.fixture(scope="module")
def stream_api_v2_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"PropertyName{str(i)}", "type": "string"} for i in range(SalesforceStream.MAX_PROPERTIES_LENGTH)]
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([{"name": "BillingAddress", "type": "address"}])
return _stream_api(stream_config, describe_response_data=describe_response_data)
Expand All @@ -118,7 +117,7 @@ def stream_api_v2_too_many_properties(stream_config):
@pytest.fixture(scope="module")
def stream_api_v2_pk_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"PropertyName{str(i)}", "type": "string"} for i in range(SalesforceStream.MAX_PROPERTIES_LENGTH)]
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([
{"name": "BillingAddress", "type": "address"}, {"name": "Id", "type": "string"}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.2 | 2023-02-13 | [22896](https://github.com/airbytehq/airbyte/pull/22896) | Count the URL length based on encoded params |
| 2.0.1 | 2023-02-08 | [22597](https://github.com/airbytehq/airbyte/pull/22597) | Make multiple requests if a REST stream has too many properties |
| 2.0.0 | 2023-02-02 | [22322](https://github.com/airbytehq/airbyte/pull/22322) | Remove `ActivityMetricRollup` stream |
| 1.0.30 | 2023-01-27 | [22016](https://github.com/airbytehq/airbyte/pull/22016) | Set `AvailabilityStrategy` for streams explicitly to `None` |
Expand Down