Skip to content

Commit

Permalink
馃帀 Source Salesforce: Add the ability to filter streams (#8871)
Browse files Browse the repository at this point in the history
* Source Salesforce: Add the ability to filter streams (version 0.1.11)
* Source Salesforce: fix examples for new field in specification (version 0.1.12)
  • Loading branch information
yevhenii-ldv committed Dec 23, 2021
1 parent 4501ad0 commit ea578e8
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.10
dockerImageTag: 0.1.12
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
29 changes: 28 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6124,7 +6124,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.10"
- dockerImage: "airbyte/source-salesforce:0.1.12"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand Down Expand Up @@ -6185,6 +6185,33 @@
- "BULK"
- "REST"
default: "BULK"
streams_criteria:
type: "array"
items:
type: "object"
required:
- "criteria"
- "value"
properties:
criteria:
type: "string"
title: "Search criteria"
enum:
- "starts with"
- "ends with"
- "contains"
- "exacts"
- "starts not with"
- "ends not with"
- "not contains"
- "not exacts"
default: "contains"
value:
type: "string"
title: "Search value"
title: "Streams filter criteria"
description: "Add selection criteria for streams to get only streams that\
\ are relevant to you"
wait_timeout:
title: "Response Waiting Time"
description: "Maximum wait time of Safesforce responses in minutes. This\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
# Discover test is disabled for this connector, because each time it starts, about 700 requests are consumed, and we have a Salesforce limit of 15,000 requests per day.
# discovery:
# - config_path: "secrets/config.json"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
timeout_seconds: 600
- config_path: "secrets/config_bulk.json"
configured_catalog_path: "integration_tests/configured_catalog_bulk.json"
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_rest.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,6 @@
#
# MIT License
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


from .source import SourceSalesforce

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .exceptions import TypeSalesforceException
from .rate_limiting import default_backoff_handler
from .utils import filter_streams

STRING_TYPES = [
"byte",
Expand Down Expand Up @@ -211,19 +212,21 @@ def filter_streams(self, stream_name: str) -> bool:
return False
return True

def get_validated_streams(self, catalog: ConfiguredAirbyteCatalog = None):
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None):
salesforce_objects = self.describe()["sobjects"]
validated_streams = []
stream_names = [stream_object["name"] for stream_object in salesforce_objects]
if catalog:
streams_for_read = [configured_stream.stream.name for configured_stream in catalog.streams]
return [configured_stream.stream.name for configured_stream in catalog.streams]

for stream_object in salesforce_objects:
stream_name = stream_object["name"]
if catalog and stream_name not in streams_for_read:
continue
if self.filter_streams(stream_name):
validated_streams.append(stream_name)
if config.get("streams_criteria"):
filtered_stream_list = []
for stream_criteria in config["streams_criteria"]:
filtered_stream_list += filter_streams(
streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"]
)
stream_names = list(set(filtered_stream_list))

validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
return validated_streams

@default_backoff_handler(max_tries=5, factor=15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def generate_streams(cls, config: Mapping[str, Any], stream_names: List[str], sf

def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> List[Stream]:
sf = self._get_sf_object(config)
stream_names = sf.get_validated_streams(catalog=catalog)
stream_names = sf.get_validated_streams(config=config, catalog=catalog)
return self.generate_streams(config, stream_names, sf)

def read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,36 @@
"enum": ["BULK", "REST"],
"default": "BULK"
},
"streams_criteria": {
"type": "array",
"items": {
"type": "object",
"required": ["criteria", "value"],
"properties": {
"criteria": {
"type": "string",
"title": "Search criteria",
"enum": [
"starts with",
"ends with",
"contains",
"exacts",
"starts not with",
"ends not with",
"not contains",
"not exacts"
],
"default": "contains"
},
"value": {
"type": "string",
"title": "Search value"
}
}
},
"title": "Streams filter criteria",
"description": "Add selection criteria for streams to get only streams that are relevant to you"
},
"wait_timeout": {
"title": "Response Waiting Time",
"description": "Maximum wait time of Safesforce responses in minutes. This option is used for the BULK mode only",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


def filter_streams(streams_list: list, search_word: str, search_criteria: str):
search_word = search_word.lower()
criteria_mapping = {
"starts with": lambda stream_name: stream_name.startswith(search_word),
"starts not with": lambda stream_name: not stream_name.startswith(search_word),
"ends with": lambda stream_name: stream_name.endswith(search_word),
"ends not with": lambda stream_name: not stream_name.endswith(search_word),
"contains": lambda stream_name: search_word in stream_name,
"not contains": lambda stream_name: search_word not in stream_name,
"exacts": lambda stream_name: search_word == stream_name,
"not exacts": lambda stream_name: search_word != stream_name,
}
new_streams_list = []
for stream in streams_list:
if criteria_mapping[search_criteria](stream.lower()):
new_streams_list.append(stream)
return new_streams_list
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,54 @@ def test_download_data_filter_null_bytes(stream_bulk_config, stream_bulk_api):
m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.download_data(url=job_full_url))
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": "false"})]


@pytest.mark.parametrize(
"streams_criteria,predicted_filtered_streams",
[
([{"criteria": "exacts", "value": "Account"}], ["Account"]),
(
[{"criteria": "not exacts", "value": "CustomStreamHistory"}],
["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream"],
),
([{"criteria": "starts with", "value": "lead"}], ["Leads", "LeadHistory"]),
(
[{"criteria": "starts not with", "value": "custom"}],
["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"],
),
([{"criteria": "ends with", "value": "story"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]),
([{"criteria": "ends not with", "value": "s"}], ["Account", "LeadHistory", "OrderHistory", "CustomStream", "CustomStreamHistory"]),
([{"criteria": "contains", "value": "applicat"}], ["AIApplications"]),
([{"criteria": "contains", "value": "hist"}], ["LeadHistory", "OrderHistory", "CustomStreamHistory"]),
(
[{"criteria": "not contains", "value": "stream"}],
["Account", "AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory"],
),
(
[{"criteria": "not contains", "value": "Account"}],
["AIApplications", "Leads", "LeadHistory", "Orders", "OrderHistory", "CustomStream", "CustomStreamHistory"],
),
],
)
def test_discover_with_streams_criteria_param(streams_criteria, predicted_filtered_streams, stream_rest_config):
updated_config = {**stream_rest_config, **{"streams_criteria": streams_criteria}}
sf_object = Salesforce(**stream_rest_config)
sf_object.login = Mock()
sf_object.access_token = Mock()
sf_object.instance_url = "https://fase-account.salesforce.com"
sf_object.describe = Mock(
return_value={
"sobjects": [
{"name": "Account"},
{"name": "AIApplications"},
{"name": "Leads"},
{"name": "LeadHistory"},
{"name": "Orders"},
{"name": "OrderHistory"},
{"name": "CustomStream"},
{"name": "CustomStreamHistory"},
]
}
)
filtered_streams = sf_object.get_validated_streams(config=updated_config)
assert sorted(filtered_streams) == sorted(predicted_filtered_streams)
2 changes: 2 additions & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,8 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.12 | 2021-12-23 | [8871](https://github.com/airbytehq/airbyte/pull/8871) | Fix `examples` for new field in specification |
| 0.1.11 | 2021-12-23 | [8871](https://github.com/airbytehq/airbyte/pull/8871) | Add the ability to filter streams by user |
| 0.1.10 | 2021-12-23 | [9005](https://github.com/airbytehq/airbyte/pull/9005) | Handling 400 error when a stream is not queryable |
| 0.1.9 | 2021-12-07 | [8405](https://github.com/airbytehq/airbyte/pull/8405) | Filter 'null' byte(s) in HTTP responses |
| 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` |
Expand Down

0 comments on commit ea578e8

Please sign in to comment.