diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7d3b9b6efe0e5..62272da05e586 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -504,7 +504,7 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.18 + dockerImageTag: 0.1.19 documentationUrl: https://docs.airbyte.io/integrations/sources/iterable icon: iterable.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4b1445049ffa9..50857ec7d4b26 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5002,7 +5002,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-iterable:0.1.18" +- dockerImage: "airbyte/source-iterable:0.1.19" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/iterable" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index 3a593b4f29409..5f9707ca08158 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.18 +LABEL io.airbyte.version=0.1.19 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/README.md b/airbyte-integrations/connectors/source-iterable/README.md index 8ca068e582ea0..409386e0566ee 100644 --- a/airbyte-integrations/connectors/source-iterable/README.md +++ b/airbyte-integrations/connectors/source-iterable/README.md @@ -64,7 +64,7 @@ python -m pytest unit_tests #### Build First, make sure you build the latest Docker image: ``` -docker build . -t airbyte/iterable:dev +docker build . -t airbyte/source-iterable:dev ``` You can also build the connector image via Gradle: diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index 81ca9a88309e9..cdb2d8a9521ae 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", + "airbyte-cdk", "pendulum~=2.1.2", "requests~=2.25", ] diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py index c3613d272d838..c24744e117763 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py @@ -16,6 +16,7 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from pendulum.datetime import DateTime +from requests import codes from requests.exceptions import ChunkedEncodingError from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice @@ -24,6 +25,7 @@ class IterableStream(HttpStream, ABC): + raise_on_http_errors = True # Hardcode the value because it is not returned from the API BACKOFF_TIME_CONSTANT = 10.0 @@ -43,6 +45,13 @@ def data_field(self) -> str: :return: Default field name to get data from response """ + def check_unauthorized_key(self, response: requests.Response) -> bool: + if response.status_code == codes.UNAUTHORIZED: + self.logger.warn(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}") + setattr(self, "raise_on_http_errors", False) + return False + return True + def backoff_time(self, response: requests.Response) -> Optional[float]: return self.BACKOFF_TIME_CONSTANT @@ -53,12 +62,20 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return None def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if not self.check_unauthorized_key(response): + yield from [] response_json = response.json() records = response_json.get(self.data_field, []) for record in records: yield record + def should_retry(self, response: requests.Response) -> bool: + if not self.check_unauthorized_key(response): + return False + else: + return super().should_retry(response) + class IterableExportStream(IterableStream, ABC): """ @@ -151,6 +168,8 @@ def request_params( return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if not self.check_unauthorized_key(response): + return None for obj in response.iter_lines(): record = json.loads(obj) record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) @@ -301,6 +320,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: yield {"list_id": list_record["id"]} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if not self.check_unauthorized_key(response): + yield from [] list_id = self._get_list_id(response.url) for user in response.iter_lines(): yield {"email": user.decode(), "listId": list_id} @@ -359,6 +380,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: yield {"campaign_ids": campaign_ids} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if not self.check_unauthorized_key(response): + yield from [] content = response.content.decode() records = self._parse_csv_string_to_dict(content) @@ -456,7 +479,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp Put common event fields at the top level. Put the rest of the fields in the `data` subobject. """ - + if not self.check_unauthorized_key(response): + yield from [] jsonl_records = StringIO(response.text) for record in jsonl_records: record_dict = json.loads(record) @@ -618,6 +642,8 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg yield from super().read_records(stream_slice=stream_slice, **kwargs) def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if not self.check_unauthorized_key(response): + yield from [] response_json = response.json() records = response_json.get(self.data_field, []) diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index c278e448018d4..fa2d65c1c99be 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------| +| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions | | 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs | | 0.1.17 | 2022-09-02 | [16067](https://github.com/airbytehq/airbyte/pull/16067) | added new events streams | | 0.1.16 | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header |