diff --git a/airbyte-config/init/bin/main/icons/rss.svg b/airbyte-config/init/bin/main/icons/rss.svg new file mode 100644 index 0000000000000..554d682248507 --- /dev/null +++ b/airbyte-config/init/bin/main/icons/rss.svg @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5f9522b86cc14..0a1cc5e29a9e7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1267,6 +1267,14 @@ documentationUrl: https://docs.airbyte.com/integrations/sources/rki-covid sourceType: api releaseStage: alpha +- name: RSS + sourceDefinitionId: 0efee448-6948-49e2-b786-17db50647908 + dockerRepository: airbyte/source-rss + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.com/integrations/sources/rss + icon: rss.svg + sourceType: api + releaseStage: alpha - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 46c54ee18cde0..432e1d570f2b9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11745,6 +11745,22 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-rss:0.1.0" + spec: + documentationUrl: "https://docs.airbyte.com/integrations/sources/rss" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "RSS Spec" + type: "object" + required: + - "url" + properties: + url: + type: "string" + description: "RSS Feed URL" + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-s3:0.1.26" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 228782028fa6a..6f77fee24a0c9 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -109,6 +109,7 @@ | Recurly | [![source-recurly](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-recurly%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-recurly) | | Redshift | [![source-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-redshift%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-redshift) | | Reply.io | [![source-reply-io](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-reply-io%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-reply-io) | +| RSS | [![source-rss](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-rss%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-rss) | | S3 | [![source-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-s3) | | Salesforce | [![source-salesforce](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-salesforce%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-salesforce) | | Salesloft | [![source-salesloft](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-salesloft%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-salesloft) | diff --git a/airbyte-integrations/connectors/source-rss/.dockerignore b/airbyte-integrations/connectors/source-rss/.dockerignore new file mode 100644 index 0000000000000..0e472e73c29b1 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_rss +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-rss/Dockerfile b/airbyte-integrations/connectors/source-rss/Dockerfile new file mode 100644 index 0000000000000..afeb0625ec217 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.9.13-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_rss ./source_rss + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-rss diff --git a/airbyte-integrations/connectors/source-rss/README.md b/airbyte-integrations/connectors/source-rss/README.md new file mode 100644 index 0000000000000..ecbe1a253a623 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/README.md @@ -0,0 +1,127 @@ +# RSS Source + +This is the repository for the RSS source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/rss). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-rss:build +``` + +#### Credentials + +Since this doesn't require auth, the config is just in `integration_tests/sample_config.json` instead of `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config integration_tests/sample_config.json +python main.py discover --config integration_tests/sample_config.json +python main.py read --config integration_tests/sample_config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-rss:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-rss:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-rss:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-rss:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-rss:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-rss:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-rss:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-rss:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-rss/acceptance-test-config.yml b/airbyte-integrations/connectors/source-rss/acceptance-test-config.yml new file mode 100644 index 0000000000000..92605a43cbe5c --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/acceptance-test-config.yml @@ -0,0 +1,31 @@ +# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-rss:dev +acceptance_tests: + spec: + tests: + - config_path: "integration_tests/sample_config.json" + spec_path: "source_rss/spec.yaml" + connection: + tests: + - config_path: "integration_tests/sample_config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + tests: + - config_path: "integration_tests/sample_config.json" + basic_read: + tests: + - config_path: "integration_tests/sample_config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + incremental: + tests: + - config_path: "integration_tests/sample_config.json" + configured_catalog_path: "integration_tests/incremental_configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + tests: + - config_path: "integration_tests/sample_config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-rss/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-rss/acceptance-test-docker.sh new file mode 100644 index 0000000000000..c51577d10690c --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-rss/build.gradle b/airbyte-integrations/connectors/source-rss/build.gradle new file mode 100644 index 0000000000000..42a7ced4cdbc0 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_rss' +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/__init__.py b/airbyte-integrations/connectors/source-rss/integration_tests/__init__.py new file mode 100644 index 0000000000000..1100c1c58cf51 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-rss/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..68ac9906773a4 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "items": { + "published": "3333-10-24T16:16:00+00:00" + } +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-rss/integration_tests/acceptance.py new file mode 100644 index 0000000000000..950b53b59d416 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/acceptance.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/catalog.json b/airbyte-integrations/connectors/source-rss/integration_tests/catalog.json new file mode 100644 index 0000000000000..c71d575501ea5 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/catalog.json @@ -0,0 +1,78 @@ +{ + "streams": [ + { + "name": "items", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [ + "published" + ], + "properties": { + "title": { + "type": [ + "null", + "string" + ] + }, + "link": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "author": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "comments": { + "type": [ + "null", + "string" + ] + }, + "enclosure": { + "type": [ + "null", + "string" + ] + }, + "guid": { + "type": [ + "null", + "string" + ] + }, + "published": { + "type": [ + "string" + ], + "format": "date-time" + } + } + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "source_defined_cursor": true, + "default_cursor_field": [ + "published" + ] + } + ] +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-rss/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..2166c503a704d --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/configured_catalog.json @@ -0,0 +1,47 @@ +{ + "streams": [ + { + "stream": { + "name": "items", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required" : ["published"], + "properties": { + "title": { + "type": ["null", "string"] + }, + "link": { + "type": ["null", "string"] + }, + "description": { + "type": ["null", "string"] + }, + "author": { + "type": ["null", "string"] + }, + "category": { + "type": ["null", "string"] + }, + "comments": { + "type": ["null", "string"] + }, + "enclosure": { + "type": ["null", "string"] + }, + "guid": { + "type": ["null", "string"] + }, + "published": { + "type": ["string"], + "format": "date-time" + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/incremental_configured_catalog.json b/airbyte-integrations/connectors/source-rss/integration_tests/incremental_configured_catalog.json new file mode 100644 index 0000000000000..6a17e966d695f --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/incremental_configured_catalog.json @@ -0,0 +1,47 @@ +{ + "streams": [ + { + "stream": { + "name": "items", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required" : ["published"], + "properties": { + "title": { + "type": ["null", "string"] + }, + "link": { + "type": ["null", "string"] + }, + "description": { + "type": ["null", "string"] + }, + "author": { + "type": ["null", "string"] + }, + "category": { + "type": ["null", "string"] + }, + "comments": { + "type": ["null", "string"] + }, + "enclosure": { + "type": ["null", "string"] + }, + "guid": { + "type": ["null", "string"] + }, + "published": { + "type": ["string"], + "format": "date-time" + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-rss/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..0029691134c3e --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/invalid_config.json @@ -0,0 +1,3 @@ +{ + "url": "http://somewebsitethatdoesnotexistatall.com/something.rss" +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-rss/integration_tests/sample_config.json new file mode 100644 index 0000000000000..457574262fc32 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/sample_config.json @@ -0,0 +1,3 @@ +{ + "url": "https://www.nasa.gov/rss/dyn/breaking_news.rss" +} diff --git a/airbyte-integrations/connectors/source-rss/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-rss/integration_tests/sample_state.json new file mode 100644 index 0000000000000..e9493cafc5575 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "items": { + "published": "2022-10-24T16:16:00+00:00" + } +} diff --git a/airbyte-integrations/connectors/source-rss/main.py b/airbyte-integrations/connectors/source-rss/main.py new file mode 100644 index 0000000000000..b1519bf535fe7 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_rss import SourceRss + +if __name__ == "__main__": + source = SourceRss() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-rss/requirements.txt b/airbyte-integrations/connectors/source-rss/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-rss/setup.py b/airbyte-integrations/connectors/source-rss/setup.py new file mode 100644 index 0000000000000..eee9fb4f9ae74 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/setup.py @@ -0,0 +1,27 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "feedparser~=6.0.10", "pytz~=2022.6"] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", +] + +setup( + name="source_rss", + description="Source implementation for Rss.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-rss/source_rss/__init__.py b/airbyte-integrations/connectors/source-rss/source_rss/__init__.py new file mode 100644 index 0000000000000..aff462d761a3f --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/source_rss/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceRss + +__all__ = ["SourceRss"] diff --git a/airbyte-integrations/connectors/source-rss/source_rss/schemas/items.json b/airbyte-integrations/connectors/source-rss/source_rss/schemas/items.json new file mode 100644 index 0000000000000..5f44e4324a794 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/source_rss/schemas/items.json @@ -0,0 +1,35 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required" : ["published"], + "properties": { + "title": { + "type": ["null", "string"] + }, + "link": { + "type": ["null", "string"] + }, + "description": { + "type": ["null", "string"] + }, + "author": { + "type": ["null", "string"] + }, + "category": { + "type": ["null", "string"] + }, + "comments": { + "type": ["null", "string"] + }, + "enclosure": { + "type": ["null", "string"] + }, + "guid": { + "type": ["null", "string"] + }, + "published": { + "type": ["string"], + "format": "date-time" + } + } +} diff --git a/airbyte-integrations/connectors/source-rss/source_rss/source.py b/airbyte-integrations/connectors/source-rss/source_rss/source.py new file mode 100644 index 0000000000000..c9548d064acfc --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/source_rss/source.py @@ -0,0 +1,155 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from calendar import timegm +from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import feedparser +import pytz +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from dateutil.parser import parse + +item_keys = [ + "title", + "link", + "description", + "author", + "category", + "comments", + "enclosure", + "guid", +] + + +def convert_item_to_mapping(item) -> Mapping: + mapping = {} + + for item_key in item_keys: + try: + mapping[item_key] = item[item_key] + except (AttributeError, KeyError): + pass + + try: + # get datetime in UTC + dt = datetime.utcfromtimestamp(timegm(item.published_parsed)) + # make sure that the output string is labeled as UTC + dt_tz = dt.replace(tzinfo=pytz.UTC) + mapping["published"] = dt_tz.isoformat() + except (AttributeError, KeyError): + pass + + return mapping + + +def is_newer(item, initial_state_date) -> bool: + try: + current_record_date = parse(item["published"]) + except Exception: + current_record_date = None + + if initial_state_date is None: + # if we don't have initial state they are all new + return True + elif current_record_date is None: + # if we can't parse the item timestamp, we should return it + return True + else: + return current_record_date > initial_state_date + + +# Basic stream +class RssStream(HttpStream, ABC): + # empty URL base since the stream can have its own full URL + url_base = "" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + # no pagination enabled + return None + + # since we only have one response for the stream, we should only return records newer than the initial state object if incremental + def parse_response(self, response: requests.Response, stream_state: MutableMapping[str, Any], **kwargs) -> Iterable[Mapping]: + feed = feedparser.parse(response.text) + + try: + initial_state_date = parse(stream_state["published"]) + except Exception: + initial_state_date = None + + # go through in reverse order which helps the state comparisons + all_item_mappings = [convert_item_to_mapping(item) for item in feed.entries[::-1]] + + # will only filter if we have a state object, so it's incremental + yield from [item for item in all_item_mappings if is_newer(item, initial_state_date)] + + +# Basic incremental stream +class IncrementalRssStream(RssStream, ABC): + # no reason to checkpoint if it's reading individual files without pagination + state_checkpoint_interval = None + + @property + def cursor_field(self) -> str: + return "published" + + # this will fail if the dates aren't parseable, but that means incremental isn't possible anyway for that feed + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + try: + latest_record_date = parse(latest_record["published"]) + latest_record_state = {"published": latest_record["published"]} + except Exception: + latest_record_date = None + + try: + current_record_date = parse(current_stream_state["published"]) + except Exception: + current_record_date = None + + if latest_record_date and current_record_date: + if latest_record_date > current_record_date: + return latest_record_state + else: + return current_stream_state + if latest_record_date: + return latest_record_state + if current_record_date: + return current_stream_state + else: + return {} + + +class Items(IncrementalRssStream): + def __init__(self, url: str): + super().__init__() + self.url = url + + primary_key = None + + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + return self.url + + +# Source +class SourceRss(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + try: + resp = requests.get(config.get("url")) + status = resp.status_code + if status == 200: + return True, None + else: + return False, f"Unable to connect to RSS Feed (received status code: {status})" + except Exception as e: + return False, e + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + return [Items(config.get("url"))] diff --git a/airbyte-integrations/connectors/source-rss/source_rss/spec.yaml b/airbyte-integrations/connectors/source-rss/source_rss/spec.yaml new file mode 100644 index 0000000000000..c1128cf4c0d01 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/source_rss/spec.yaml @@ -0,0 +1,11 @@ +documentationUrl: https://docs.airbyte.com/integrations/sources/rss +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: RSS Spec + type: object + required: + - url + properties: + url: + type: string + description: RSS Feed URL diff --git a/airbyte-integrations/connectors/source-rss/unit_tests/__init__.py b/airbyte-integrations/connectors/source-rss/unit_tests/__init__.py new file mode 100644 index 0000000000000..1100c1c58cf51 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-rss/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-rss/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000000000..c331b00a29faa --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/unit_tests/test_incremental_streams.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_rss.source import IncrementalRssStream + + +@fixture +def patch_incremental_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(IncrementalRssStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalRssStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalRssStream, "__abstractmethods__", set()) + + +def test_cursor_field(patch_incremental_base_class): + stream = IncrementalRssStream() + expected_cursor_field = "published" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(patch_incremental_base_class): + stream = IncrementalRssStream() + + inputs = { + "current_stream_state": {"published": "2022-10-24T16:16:00+00:00"}, + "latest_record": {"published": "2022-10-30T16:16:00+00:00"}, + } + + expected_state = {"published": "2022-10-30T16:16:00+00:00"} + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(patch_incremental_base_class): + stream = IncrementalRssStream() + # TODO: replace this with your input parameters + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": ["published"], "stream_state": {}} + # TODO: replace this with your expected stream slices list + expected_stream_slice = [None] + assert stream.stream_slices(**inputs) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker): + mocker.patch.object(IncrementalRssStream, "cursor_field", "dummy_field") + stream = IncrementalRssStream() + assert stream.supports_incremental + + +def test_source_defined_cursor(patch_incremental_base_class): + stream = IncrementalRssStream() + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(patch_incremental_base_class): + stream = IncrementalRssStream() + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval diff --git a/airbyte-integrations/connectors/source-rss/unit_tests/test_source.py b/airbyte-integrations/connectors/source-rss/unit_tests/test_source.py new file mode 100644 index 0000000000000..b3934613c96b6 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/unit_tests/test_source.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_rss.source import SourceRss + + +def test_streams(mocker): + source = SourceRss() + config_mock = MagicMock() + streams = source.streams(config_mock) + expected_streams_number = 1 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-rss/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-rss/unit_tests/test_streams.py new file mode 100644 index 0000000000000..be77e6c34fc89 --- /dev/null +++ b/airbyte-integrations/connectors/source-rss/unit_tests/test_streams.py @@ -0,0 +1,107 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import os +import time +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_rss.source import RssStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(RssStream, "path", "v0/example_endpoint") + mocker.patch.object(RssStream, "primary_key", "test_primary_key") + mocker.patch.object(RssStream, "__abstractmethods__", set()) + + +def test_request_params(patch_base_class): + stream = RssStream() + # TODO: replace this with your input parameters + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + # TODO: replace this with your expected request parameters + expected_params = {} + assert stream.request_params(**inputs) == expected_params + + +def test_next_page_token(patch_base_class): + stream = RssStream() + inputs = {"response": MagicMock()} + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class): + stream = RssStream() + + class SampleResponse: + text = """"" + + + + + Test Title + http://testlink + Test Description + Fri, 28 Oct 2022 11:16 EDT + + + + """ + + expected_parsed_object = { + "title": "Test Title", + "link": "http://testlink", + "description": "Test Description", + "published": "2022-10-28T15:16:00+00:00", + } + + assert next(stream.parse_response(response=SampleResponse(), stream_state={})) == expected_parsed_object + + # test that the local timezone doesn't impact how this is computed + os.environ['TZ'] = 'Africa/Accra' + time.tzset() + assert next(stream.parse_response(response=SampleResponse(), stream_state={})) == expected_parsed_object + os.environ['TZ'] = 'Asia/Tokyo' + time.tzset() + assert next(stream.parse_response(response=SampleResponse(), stream_state={})) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = RssStream() + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = RssStream() + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = RssStream() + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = RssStream() + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/integrations/sources/rss.md b/docs/integrations/sources/rss.md new file mode 100644 index 0000000000000..320768aea7725 --- /dev/null +++ b/docs/integrations/sources/rss.md @@ -0,0 +1,37 @@ +# RSS + +## Overview + +The RSS source allows you to read data from any individual RSS feed. + +#### Output schema + +This source is capable of syncing the following streams: +* `items` + * Provides stats about specific RSS items. + * Most fields are simply kept from RSS items as strings if present (`title`, `link`, `description`, `author`, `category`, `comments`, `enclosure`, `guid`). + * The date field is handled differently. It's transformed into a UTC datetime in a `published` field for easier use in data warehouses and other destinations. + * The RSS feed you're subscribing to must have a valid `pubDate` field for each item for incremental syncs to work properly. + * Since `guid` is not a required field, there is no primary key for the feed, only a cursor on the published date. + +#### Features + +| Feature | Supported? | +| :--- | :--- | +| Full Refresh Sync | Yes | +| Incremental - Append Sync | Yes | +| Namespaces | No | + +### Requirements / Setup Guide + +Only the `url` of an RSS feed is required. + +## Performance considerations + +None + +## Changelog + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2022-10-12 | [18838](https://github.com/airbytehq/airbyte/pull/18838) | Initial release supporting RSS |