Skip to content

Commit

Permalink
#20703 Source Salesforce: fix properties chunk length count
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Feb 13, 2023
1 parent f613f35 commit 82966a2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import os
import time
import urllib.parse
from abc import ABC
from contextlib import closing
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
Expand Down Expand Up @@ -38,7 +39,6 @@ class SalesforceStream(HttpStream, ABC):
page_size = 2000
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
encoding = DEFAULT_ENCODING
MAX_PROPERTIES_LENGTH = Salesforce.REQUEST_SIZE_LIMITS - 2000

def __init__(
self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs
Expand All @@ -50,6 +50,10 @@ def __init__(
self.schema: Mapping[str, Any] = schema # type: ignore[assignment]
self.sobject_options = sobject_options

@property
def max_properties_length(self) -> int:
return Salesforce.REQUEST_SIZE_LIMITS - len(self.url_base) - 2000

@property
def name(self) -> str:
return self.stream_name
Expand All @@ -69,8 +73,8 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
@property
def too_many_properties(self):
selected_properties = self.get_json_schema().get("properties", {})
properties_length = len(",".join(p for p in selected_properties))
return properties_length > self.MAX_PROPERTIES_LENGTH
properties_length = len(urllib.parse.quote(",".join(p for p in selected_properties)))
return properties_length > self.max_properties_length

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json()["records"]
Expand Down Expand Up @@ -135,8 +139,8 @@ def chunk_properties(self) -> Iterable[Mapping[str, Any]]:
summary_length = 0
local_properties = {}
for property_name, value in selected_properties.items():
current_property_length = len(property_name) + 1 # properties are split with commas
if current_property_length + summary_length >= self.MAX_PROPERTIES_LENGTH:
current_property_length = len(urllib.parse.quote(f"{property_name},"))
if current_property_length + summary_length >= self.max_properties_length:
yield local_properties
local_properties = {}
summary_length = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import (
CSV_FIELD_SIZE_LIMIT,
Expand Down Expand Up @@ -679,3 +680,5 @@ def test_too_many_properties(stream_config, stream_api_v2_pk_too_many_properties
{"Id": 3, "propertyA": "A", "propertyB": "B"},
{"Id": 4, "propertyA": "A", "propertyB": "B"}
]
for call in requests_mock.request_history:
assert len(call.url) < Salesforce.REQUEST_SIZE_LIMITS
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from source_salesforce.api import Salesforce
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import SalesforceStream


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -109,7 +108,7 @@ def stream_api_pk(stream_config):
@pytest.fixture(scope="module")
def stream_api_v2_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"PropertyName{str(i)}", "type": "string"} for i in range(SalesforceStream.MAX_PROPERTIES_LENGTH)]
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([{"name": "BillingAddress", "type": "address"}])
return _stream_api(stream_config, describe_response_data=describe_response_data)
Expand All @@ -118,7 +117,7 @@ def stream_api_v2_too_many_properties(stream_config):
@pytest.fixture(scope="module")
def stream_api_v2_pk_too_many_properties(stream_config):
describe_response_data = {
"fields": [{"name": f"PropertyName{str(i)}", "type": "string"} for i in range(SalesforceStream.MAX_PROPERTIES_LENGTH)]
"fields": [{"name": f"Property{str(i)}", "type": "string"} for i in range(Salesforce.REQUEST_SIZE_LIMITS)]
}
describe_response_data["fields"].extend([
{"name": "BillingAddress", "type": "address"}, {"name": "Id", "type": "string"}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
| 2.0.2 | 2023-02-13 | [00000](https://github.com/airbytehq/airbyte/pull/00000) | Count the URL length based on encoded params |
| 2.0.1 | 2023-02-08 | [22597](https://github.com/airbytehq/airbyte/pull/22597) | Make multiple requests if a REST stream has too many properties |
| 2.0.0 | 2023-02-02 | [22322](https://github.com/airbytehq/airbyte/pull/22322) | Remove `ActivityMetricRollup` stream |
| 1.0.30 | 2023-01-27 | [22016](https://github.com/airbytehq/airbyte/pull/22016) | Set `AvailabilityStrategy` for streams explicitly to `None` |
Expand Down

0 comments on commit 82966a2

Please sign in to comment.