From d4b6ebcabf83b526c8af61ca1a73c52ba7803efc Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Mon, 19 Oct 2020 14:13:36 -0700 Subject: [PATCH] Add AB Protocol PG source & various python fixes (#610) --- .../config/AirbyteProtocolConverters.java | 14 +- .../base-python/airbyte_protocol/__init__.py | 2 + .../airbyte_protocol/entrypoint.py | 15 +- .../bin/generate-protocol-files.sh | 2 +- .../base-singer/base_singer/singer_helpers.py | 12 +- .../integrations/base/TestDestination.java | 6 +- .../source_exchangeratesapi_singer/source.py | 13 +- .../singer/postgres_abprotocol/.dockerignore | 1 + .../singer/postgres_abprotocol/.gitignore | 1 + .../singer/postgres_abprotocol/Dockerfile | 20 ++ .../singer/postgres_abprotocol/build.gradle | 21 ++ .../singer/postgres_abprotocol/main_dev.py | 8 + .../postgres_singer_source/__init__.py | 1 + .../postgres_singer_source/source.py | 37 +++ .../postgres_singer_source/spec.json | 34 +++ .../postgres_abprotocol/requirements.txt | 3 + .../singer/postgres_abprotocol/setup.py | 20 ++ .../sources/SingerPostgresSourceTest.java | 252 ++++++++++++++++++ .../resources/expected_messages.txt | 5 + .../resources/expected_utf8_messages.txt | 2 + .../test-integration/resources/init_ascii.sql | 33 +++ .../test-integration/resources/init_utf8.sql | 22 ++ .../test-integration/resources/schema.json | 19 ++ .../resources/selected_catalog.json | 18 ++ .../src/test-integration/resources/spec.json | 37 +++ .../source/source_stripe_singer/source.py | 17 +- .../sources/SingerStripeSourceTest.java | 9 +- .../template/singer-source/README.md | 4 +- .../workers/DefaultCheckConnectionWorker.java | 9 +- .../singer/SingerCatalogConverters.java | 31 +-- 30 files changed, 610 insertions(+), 58 deletions(-) create mode 100644 airbyte-integrations/singer/postgres_abprotocol/.dockerignore create mode 100644 airbyte-integrations/singer/postgres_abprotocol/.gitignore create mode 100644 airbyte-integrations/singer/postgres_abprotocol/Dockerfile create mode 100644 airbyte-integrations/singer/postgres_abprotocol/build.gradle create mode 100644 airbyte-integrations/singer/postgres_abprotocol/main_dev.py create mode 100644 airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/__init__.py create mode 100644 airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/source.py create mode 100644 airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/spec.json create mode 100644 airbyte-integrations/singer/postgres_abprotocol/requirements.txt create mode 100644 airbyte-integrations/singer/postgres_abprotocol/setup.py create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SingerPostgresSourceTest.java create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_messages.txt create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_utf8_messages.txt create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_ascii.sql create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_utf8.sql create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/schema.json create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/selected_catalog.json create mode 100644 airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/spec.json diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java index 9bc5cc058f1db..06ea7c8ff7551 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java @@ -87,6 +87,7 @@ public static Schema toSchema(AirbyteCatalog catalog) { } // todo (cgardens) - add more robust handling for jsonschema types. + /** * JsonSchema tends to have 2 types for fields one of which is null. The null is pretty irrelevant, * so look at types and find the first non-null one and use that. @@ -96,11 +97,11 @@ public static Schema toSchema(AirbyteCatalog catalog) { */ private static DataType jsonSchemaTypesToDataType(JsonNode node) { if (node.isTextual()) { - return DataType.valueOf(node.asText().toUpperCase()); + return DataType.valueOf(convertToNumberIfInteger(node.asText().toUpperCase())); } else if (node.isArray()) { return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false) .filter(typeString -> !typeString.asText().toUpperCase().equals("NULL")) - .map(typeString -> DataType.valueOf(typeString.asText().toUpperCase())) + .map(typeString -> DataType.valueOf(convertToNumberIfInteger(typeString.asText().toUpperCase()))) .findFirst() // todo (cgardens) - or throw? .orElse(DataType.STRING); @@ -109,4 +110,13 @@ private static DataType jsonSchemaTypesToDataType(JsonNode node) { } } + // TODO HACK: convert Integer to Number until we have a more solid typing system + private static String convertToNumberIfInteger(String type) { + if (type.toUpperCase().equals("INTEGER")) { + return "NUMBER"; + } else { + return type; + } + } + } diff --git a/airbyte-integrations/base-python/airbyte_protocol/__init__.py b/airbyte-integrations/base-python/airbyte_protocol/__init__.py index 7acdead595533..59d7c1a11209d 100644 --- a/airbyte-integrations/base-python/airbyte_protocol/__init__.py +++ b/airbyte-integrations/base-python/airbyte_protocol/__init__.py @@ -2,11 +2,13 @@ from .logger import AirbyteLogger from .models import AirbyteCatalog from .models import AirbyteConnectionStatus +from .models import Status from .models import AirbyteLogMessage from .models import AirbyteMessage from .models import AirbyteRecordMessage from .models import AirbyteStateMessage from .models import AirbyteStream +from .models import Type from .models import ConnectorSpecification # Must be the last one because the way we load the connector module creates a circular diff --git a/airbyte-integrations/base-python/airbyte_protocol/entrypoint.py b/airbyte-integrations/base-python/airbyte_protocol/entrypoint.py index 1d02a4b053926..99f37a011f053 100644 --- a/airbyte-integrations/base-python/airbyte_protocol/entrypoint.py +++ b/airbyte-integrations/base-python/airbyte_protocol/entrypoint.py @@ -3,10 +3,11 @@ import os.path import sys import tempfile +import json from .integration import ConfigContainer, Source from .logger import AirbyteLogger -from airbyte_protocol import AirbyteMessage, AirbyteConnectionStatus +from airbyte_protocol import AirbyteMessage, AirbyteConnectionStatus, Type, Status impl_module = os.environ.get('AIRBYTE_IMPL_MODULE', Source.__module__) impl_class = os.environ.get('AIRBYTE_IMPL_PATH', Source.__name__) @@ -85,19 +86,17 @@ def start(self, args): if cmd == "check": check_result = self.source.check(logger, config_container) - if check_result.successful: + if check_result.status == Status.SUCCEEDED: logger.info("Check succeeded") - status = AirbyteConnectionStatus(status='SUCCEEDED') else: logger.error("Check failed") - status = AirbyteConnectionStatus(status='FAILED') - message = AirbyteMessage(type='CONNECTION_STATUS', connectionStatus=status) - print(message.json(exclude_unset=True)) + + output_message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result).json(exclude_unset=True) + print(output_message) sys.exit(0) elif cmd == "discover": catalog = self.source.discover(logger, config_container) - message = AirbyteMessage(type='CATALOG', catalog=catalog) - print(message.json(exclude_unset=True)) + print(AirbyteMessage(type=Type.CATALOG, catalog=catalog).json(exclude_unset=True)) sys.exit(0) elif cmd == "read": generator = self.source.read(logger, config_container, parsed_args.catalog, parsed_args.state) diff --git a/airbyte-integrations/base-python/bin/generate-protocol-files.sh b/airbyte-integrations/base-python/bin/generate-protocol-files.sh index 72c822e992ab5..eefbb6e85398e 100755 --- a/airbyte-integrations/base-python/bin/generate-protocol-files.sh +++ b/airbyte-integrations/base-python/bin/generate-protocol-files.sh @@ -19,7 +19,7 @@ function main() { --input "/airbyte/$YAML_DIR/$filename_wo_ext.yaml" \ --output "/airbyte/$OUTPUT_DIR/$filename_wo_ext.py" \ --disable-timestamp -done + done } main "$@" diff --git a/airbyte-integrations/base-singer/base_singer/singer_helpers.py b/airbyte-integrations/base-singer/base_singer/singer_helpers.py index bf00cc91c92ff..68f35a21fadbc 100644 --- a/airbyte-integrations/base-singer/base_singer/singer_helpers.py +++ b/airbyte-integrations/base-singer/base_singer/singer_helpers.py @@ -8,7 +8,7 @@ from airbyte_protocol import AirbyteStream from dataclasses import dataclass from datetime import datetime -from typing import Generator +from typing import Generator, List, DefaultDict import json @@ -34,6 +34,12 @@ class Catalogs: class SingerHelper: + @staticmethod + def _transform_types(stream_properties: DefaultDict): + for field_name in stream_properties: + field_object = stream_properties[field_name] + field_object['type'] = SingerHelper._parse_type(field_object['type']) + @staticmethod def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs: completed_process = subprocess.run(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -72,7 +78,7 @@ def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x if out_json is not None and is_message(out_json): transformed_json = transform(out_json) if transformed_json is not None: - if transformed_json.get('type') == "SCHEMA": + if transformed_json.get('type') == "SCHEMA" or transformed_json.get('type') == "ACTIVATE_VERSION": pass elif transformed_json.get('type') == "STATE": out_record = AirbyteStateMessage(data=transformed_json["value"]) @@ -104,6 +110,7 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing for singer_stream in discovered_singer_catalog.get("streams"): if singer_stream.get("stream") in stream_to_airbyte_schema: new_metadatas = [] + if singer_stream.get("metadata"): metadatas = singer_stream.get("metadata") for metadata in metadatas: @@ -111,6 +118,7 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing new_metadata["metadata"]["selected"] = True if not is_field_metadata(new_metadata): new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE" + new_metadata["metadata"]["replication-method"] = "FULL_TABLE" new_metadatas += [new_metadata] singer_stream["metadata"] = new_metadatas diff --git a/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java b/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java index 2f97be662f2af..bd3263220e81c 100644 --- a/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java +++ b/airbyte-integrations/integration-test-lib/src/main/java/io/airbyte/integrations/base/TestDestination.java @@ -122,9 +122,11 @@ public abstract class TestDestination { @BeforeEach void setUpInternal() throws Exception { - final Path workspaceRoot = Files.createTempDirectory("test"); + Path testDir = Path.of("/tmp/airbyte_tests/"); + Files.createDirectories(testDir); + final Path workspaceRoot = Files.createTempDirectory(testDir, "test"); jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); - localRoot = Files.createTempDirectory("output"); + localRoot = Files.createTempDirectory(testDir, "output"); testEnv = new TestDestinationEnv(localRoot); setup(testEnv); diff --git a/airbyte-integrations/singer/exchangeratesapi/source/source_exchangeratesapi_singer/source.py b/airbyte-integrations/singer/exchangeratesapi/source/source_exchangeratesapi_singer/source.py index 333e22a81333f..e762aea40d600 100644 --- a/airbyte-integrations/singer/exchangeratesapi/source/source_exchangeratesapi_singer/source.py +++ b/airbyte-integrations/singer/exchangeratesapi/source/source_exchangeratesapi_singer/source.py @@ -1,6 +1,6 @@ import urllib.request -from airbyte_protocol import AirbyteCheckResponse +from airbyte_protocol import AirbyteConnectionStatus, Status from base_singer import SingerSource @@ -8,10 +8,13 @@ class SourceExchangeRatesApiSinger(SingerSource): def __init__(self): pass - def check(self, logger, config_path) -> AirbyteCheckResponse: - code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode() - logger.info(f"Ping response code: {code}") - return AirbyteCheckResponse(code == 200, {}) + def check(self, logger, config_path) -> AirbyteConnectionStatus: + try: + code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode() + logger.info(f"Ping response code: {code}") + return AirbyteConnectionStatus(status=Status.SUCCEEDED if (code==200) else Status.FAILED) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"{str(e)}") def discover_cmd(self, logger, config_path) -> str: return "tap-exchangeratesapi | grep '\"type\": \"SCHEMA\"' | head -1 | jq -c '{\"streams\":[{\"stream\": .stream, \"schema\": .schema}]}'" diff --git a/airbyte-integrations/singer/postgres_abprotocol/.dockerignore b/airbyte-integrations/singer/postgres_abprotocol/.dockerignore new file mode 100644 index 0000000000000..378eac25d3117 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/.dockerignore @@ -0,0 +1 @@ +build diff --git a/airbyte-integrations/singer/postgres_abprotocol/.gitignore b/airbyte-integrations/singer/postgres_abprotocol/.gitignore new file mode 100644 index 0000000000000..db2fc0de62d01 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/.gitignore @@ -0,0 +1 @@ +secrets diff --git a/airbyte-integrations/singer/postgres_abprotocol/Dockerfile b/airbyte-integrations/singer/postgres_abprotocol/Dockerfile new file mode 100644 index 0000000000000..44178344832d0 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/Dockerfile @@ -0,0 +1,20 @@ +FROM airbyte/integration-base-singer:dev + +RUN apt-get update && apt-get install -y \ + jq libpq-dev gcc\ + && rm -rf /var/lib/apt/lists/* + +ENV CODE_PATH="postgres_singer_source" +ENV AIRBYTE_IMPL_MODULE="postgres_singer_source" +ENV AIRBYTE_IMPL_PATH="PostgresSingerSource" + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/postgres-singer-source-abprotocol + +WORKDIR /airbyte/integration_code + +COPY $CODE_PATH ./$CODE_PATH +COPY setup.py ./ +RUN pip install . + +WORKDIR /airbyte diff --git a/airbyte-integrations/singer/postgres_abprotocol/build.gradle b/airbyte-integrations/singer/postgres_abprotocol/build.gradle new file mode 100644 index 0000000000000..49ec5fe226344 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'java' +} + +apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle') +apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle') +dependencies { + integrationTestImplementation 'org.apache.commons:commons-dbcp2:2.7.0' + integrationTestImplementation 'com.fasterxml.jackson.core:jackson-databind' + integrationTestImplementation 'org.apache.commons:commons-text:1.9' + integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2" + integrationTestImplementation "org.postgresql:postgresql:42.2.16" + + integrationTestImplementation project(':airbyte-config:models') + integrationTestImplementation project(':airbyte-workers') + integrationTestImplementation project(':airbyte-protocol:models') + integrationTestImplementation project(':airbyte-test-utils') +} + +integrationTest.dependsOn(buildImage) +buildImage.dependsOn ":airbyte-integrations:base-singer:buildImage" diff --git a/airbyte-integrations/singer/postgres_abprotocol/main_dev.py b/airbyte-integrations/singer/postgres_abprotocol/main_dev.py new file mode 100644 index 0000000000000..476e0ac92c5c0 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/main_dev.py @@ -0,0 +1,8 @@ +import sys +from airbyte_protocol.entrypoint import launch + +from postgres_singer_source import PostgresSingerSource + +if __name__ == "__main__": + source = PostgresSingerSource() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/__init__.py b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/__init__.py new file mode 100644 index 0000000000000..3caf4212a67ab --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/__init__.py @@ -0,0 +1 @@ +from .source import * diff --git a/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/source.py b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/source.py new file mode 100644 index 0000000000000..bda0258608fc4 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/source.py @@ -0,0 +1,37 @@ +import urllib.request +import psycopg2 + +from typing import Generator + +from airbyte_protocol import AirbyteCatalog +from airbyte_protocol import AirbyteConnectionStatus +from airbyte_protocol import Status +from airbyte_protocol import AirbyteMessage +from airbyte_protocol import Source +from airbyte_protocol import ConfigContainer +from base_singer import SingerHelper, SingerSource + + +TAP_CMD = "PGCLIENTENCODING=UTF8 tap-postgres" +class PostgresSingerSource(SingerSource): + def __init__(self): + pass + + def check(self, logger, config_container: ConfigContainer) -> AirbyteConnectionStatus: + config = config_container.rendered_config + try: + params="dbname='{dbname}' user='{user}' host='{host}' password='{password}' port='{port}'".format(**config) + psycopg2.connect(params) + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + logger.error(f"Exception while connecting to postgres database: {e}") + return AirbyteConnectionStatus(status=Status.FAILED, message=str(e)) + + def discover_cmd(self, logger, config_path) -> AirbyteCatalog: + return f"{TAP_CMD} --config {config_path} --discover" + + def read_cmd(self, logger, config_path, catalog_path, state_path=None) -> Generator[AirbyteMessage, None, None]: + catalog_option = f"--properties {catalog_path}" + config_option = f"--config {config_path}" + state_option = f"--state {state_path}" if state_path else "" + return f"{TAP_CMD} {catalog_option} {config_option} {state_option}" diff --git a/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/spec.json b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/spec.json new file mode 100644 index 0000000000000..da6a6835294e4 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/postgres_singer_source/spec.json @@ -0,0 +1,34 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgres Source Spec", + "type": "object", + "required": ["host", "port", "user", "dbname"], + "additionalProperties": false, + "properties": { + "host": { + "description": "Hostname of the database.", + "type": "string" + }, + "port": { + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536 + }, + "user": { + "description": "Username to use to access the database.", + "type": "string" + }, + "password": { + "description": "Password associated with the username.", + "type": "string" + }, + "dbname": { + "description": "Name of the database.", + "type": "string" + } + } + } +} diff --git a/airbyte-integrations/singer/postgres_abprotocol/requirements.txt b/airbyte-integrations/singer/postgres_abprotocol/requirements.txt new file mode 100644 index 0000000000000..d1efae608b88f --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/requirements.txt @@ -0,0 +1,3 @@ +-e ../../base-python +-e ../../singer/base-singer +-e . diff --git a/airbyte-integrations/singer/postgres_abprotocol/setup.py b/airbyte-integrations/singer/postgres_abprotocol/setup.py new file mode 100644 index 0000000000000..e764cddc5ba51 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/setup.py @@ -0,0 +1,20 @@ +from setuptools import setup, find_packages + +setup( + name='postgres-singer-source', + description='Postgres Singer source', + author='Airbyte', + author_email='contact@airbyte.io', + + packages=find_packages(), + package_data={ + '': ['*.json'] + }, + + install_requires=[ + 'psycopg2==2.7.4', + 'tap-postgres==0.1.0', + 'base_singer', + 'airbyte_protocol' + ] +) diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SingerPostgresSourceTest.java b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SingerPostgresSourceTest.java new file mode 100644 index 0000000000000..709a35bae8a01 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SingerPostgresSourceTest.java @@ -0,0 +1,252 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.Schema; +import io.airbyte.config.SourceConnectionImplementation; +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.config.StandardDiscoverCatalogOutput; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardTapConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.test.utils.PostgreSQLContainerHelper; +import io.airbyte.workers.DefaultCheckConnectionWorker; +import io.airbyte.workers.DefaultDiscoverCatalogWorker; +import io.airbyte.workers.JobStatus; +import io.airbyte.workers.OutputAndStatus; +import io.airbyte.workers.WorkerException; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.DockerProcessBuilderFactory; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource; +import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +@SuppressWarnings("rawtypes") +public class SingerPostgresSourceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SingerPostgresSourceTest.class); + private static final String IMAGE_NAME = "airbyte/postgres-singer-source-abprotocol:dev"; + private static final Path TESTS_PATH = Path.of("/tmp/airbyte_integration_tests"); + + private PostgreSQLContainer psqlDb; + private Path jobRoot; + private IntegrationLauncher integrationLauncher; + + @BeforeEach + public void init() throws IOException { + psqlDb = new PostgreSQLContainer(); + psqlDb.start(); + + PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("init_ascii.sql"), psqlDb); + Files.createDirectories(TESTS_PATH); + Path workspaceRoot = Files.createTempDirectory(TESTS_PATH, "airbyte-integration"); + jobRoot = workspaceRoot.resolve("job"); + Files.createDirectories(jobRoot); + + integrationLauncher = new AirbyteIntegrationLauncher( + IMAGE_NAME, + new DockerProcessBuilderFactory(workspaceRoot, workspaceRoot.toString(), "", "host")); + } + + @AfterEach + public void cleanUp() { + psqlDb.stop(); + } + + @Test + public void testFullRefreshStatelessRead() throws Exception { + + Schema schema = Jsons.deserialize(MoreResources.readResource("schema.json"), Schema.class); + + // select all streams and all fields + schema.getStreams().forEach(s -> s.setSelected(true)); + schema.getStreams().forEach(t -> t.getFields().forEach(c -> c.setSelected(true))); + + StandardSync syncConfig = new StandardSync().withSyncMode(StandardSync.SyncMode.FULL_REFRESH).withSchema(schema); + SourceConnectionImplementation sourceImpl = + new SourceConnectionImplementation().withConfiguration(Jsons.jsonNode(getDbConfig(psqlDb))); + + StandardTapConfig tapConfig = new StandardTapConfig() + .withStandardSync(syncConfig) + .withSourceConnectionImplementation(sourceImpl); + + DefaultAirbyteSource source = new DefaultAirbyteSource(integrationLauncher); + source.start(tapConfig, jobRoot); + + List actualMessages = Lists.newArrayList(); + while (!source.isFinished()) { + Optional maybeMessage = source.attemptRead(); + if (maybeMessage.isPresent() && maybeMessage.get().getType() == AirbyteMessage.Type.RECORD) { + actualMessages.add(maybeMessage.get().getRecord()); + } + } + + String lineSeparatedMessages = MoreResources.readResource("expected_messages.txt"); + List expectedMessages = deserializeLineSeparatedJsons(lineSeparatedMessages, AirbyteRecordMessage.class); + + assertMessagesEquivalent(expectedMessages, actualMessages); + } + + private void assertMessagesEquivalent(List expectedMessages, List actualMessages) { + assertEquals(expectedMessages.size(), actualMessages.size()); + + for (int i = 0; i < expectedMessages.size(); i++) { + AirbyteRecordMessage expected = expectedMessages.get(i); + AirbyteRecordMessage actual = actualMessages.get(i); + assertEquals(expected.getStream(), actual.getStream()); + assertEquals(expected.getData(), actual.getData()); + } + } + + @Test + public void testCanReadUtf8() throws IOException, InterruptedException, WorkerException { + // force the db server to start with sql_ascii encoding to verify the tap can read UTF8 even when + // default settings are in another encoding + PostgreSQLContainer db = (PostgreSQLContainer) new PostgreSQLContainer().withCommand("postgres -c client_encoding=sql_ascii"); + db.start(); + PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("init_utf8.sql"), db); + + String configFileName = "config.json"; + String catalogFileName = "selected_catalog.json"; + writeFileToJobRoot(catalogFileName, MoreResources.readResource(catalogFileName)); + writeFileToJobRoot(configFileName, Jsons.serialize(getDbConfig(db))); + + Process tapProcess = integrationLauncher.read(jobRoot, configFileName, catalogFileName).start(); + tapProcess.waitFor(); + + DefaultAirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory(); + List actualMessages = streamFactory.create(IOs.newBufferedReader(tapProcess.getInputStream())) + .filter(message -> message.getType() == AirbyteMessage.Type.RECORD) + .map(AirbyteMessage::getRecord) + .collect(Collectors.toList()); + + String lineSeparatedMessages = MoreResources.readResource("expected_utf8_messages.txt"); + List expectedMessages = deserializeLineSeparatedJsons(lineSeparatedMessages, AirbyteRecordMessage.class); + + assertMessagesEquivalent(expectedMessages, actualMessages); + + if (tapProcess.exitValue() != 0) { + fail("Docker container exited with non-zero exit code: " + tapProcess.exitValue()); + } + } + + private List deserializeLineSeparatedJsons(String lineSeparatedMessages, Class clazz) { + return new BufferedReader(new StringReader(lineSeparatedMessages)) + .lines() + .map(l -> Jsons.deserialize(l, clazz)) + .collect(Collectors.toList()); + } + + @Test + public void testGetSpec() throws WorkerException, IOException, InterruptedException { + Process process = integrationLauncher.spec(jobRoot).start(); + process.waitFor(); + InputStream expectedSpecInputStream = Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream("spec.json")); + JsonNode expectedSpec = Jsons.deserialize(new String(expectedSpecInputStream.readAllBytes())); + JsonNode actualSpec = Jsons.deserialize(new String(process.getInputStream().readAllBytes())); + assertEquals(expectedSpec, actualSpec); + } + + @Test + public void testDiscover() throws IOException { + StandardDiscoverCatalogInput inputConfig = new StandardDiscoverCatalogInput().withConnectionConfiguration(Jsons.jsonNode(getDbConfig(psqlDb))); + OutputAndStatus run = new DefaultDiscoverCatalogWorker(integrationLauncher).run(inputConfig, jobRoot); + + Schema expected = Jsons.deserialize(MoreResources.readResource("schema.json"), Schema.class); + assertEquals(JobStatus.SUCCEEDED, run.getStatus()); + assertTrue(run.getOutput().isPresent()); + assertEquals(expected, run.getOutput().get().getSchema()); + } + + @Test + public void testSuccessfulConnectionCheck() { + StandardCheckConnectionInput inputConfig = new StandardCheckConnectionInput().withConnectionConfiguration(Jsons.jsonNode(getDbConfig(psqlDb))); + OutputAndStatus run = + new DefaultCheckConnectionWorker(integrationLauncher).run(inputConfig, jobRoot); + + assertEquals(JobStatus.SUCCEEDED, run.getStatus()); + assertTrue(run.getOutput().isPresent()); + assertEquals(StandardCheckConnectionOutput.Status.SUCCEEDED, run.getOutput().get().getStatus()); + } + + @Test + public void testInvalidCredsFailedConnectionCheck() { + Map dbConfig = getDbConfig(psqlDb); + dbConfig.put("password", "notarealpassword"); + StandardCheckConnectionInput inputConfig = new StandardCheckConnectionInput().withConnectionConfiguration(Jsons.jsonNode(dbConfig)); + OutputAndStatus run = + new DefaultCheckConnectionWorker(integrationLauncher).run(inputConfig, jobRoot); + + assertEquals(JobStatus.SUCCEEDED, run.getStatus()); + assertTrue(run.getOutput().isPresent()); + assertEquals(StandardCheckConnectionOutput.Status.FAILED, run.getOutput().get().getStatus()); + } + + private Map getDbConfig(PostgreSQLContainer containerDb) { + Map confMap = new HashMap<>(); + confMap.put("dbname", containerDb.getDatabaseName()); + confMap.put("user", containerDb.getUsername()); + confMap.put("password", containerDb.getPassword()); + confMap.put("port", containerDb.getFirstMappedPort()); + confMap.put("host", containerDb.getHost()); + return confMap; + } + + private void writeFileToJobRoot(String fileName, String contents) throws IOException { + Files.writeString(jobRoot.resolve(fileName), contents); + } + +} diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_messages.txt b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_messages.txt new file mode 100644 index 0000000000000..e71e6e6786b9e --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_messages.txt @@ -0,0 +1,5 @@ +{"stream":"id_and_name", "data":{"id":1, "name":"sherif"}, "emitted_at":1} +{"stream":"id_and_name", "data":{"id":2, "name":"charles"}, "emitted_at":1} +{"stream":"id_and_name", "data":{"id":3, "name":"jared"}, "emitted_at":1} +{"stream":"id_and_name", "data":{"id":4, "name":"michel"}, "emitted_at":1} +{"stream":"id_and_name", "data":{"id":5, "name":"john"}, "emitted_at":1} diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_utf8_messages.txt b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_utf8_messages.txt new file mode 100644 index 0000000000000..bd84165965e1c --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/expected_utf8_messages.txt @@ -0,0 +1,2 @@ +{"stream": "id_and_name", "data": {"id": 1, "name": "\u2013 someutfstring"}, "emitted_at":1} +{"stream": "id_and_name", "data": {"id": 2, "name": "\u2215"}, "emitted_at":1} diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_ascii.sql b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_ascii.sql new file mode 100644 index 0000000000000..7510f67569101 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_ascii.sql @@ -0,0 +1,33 @@ +CREATE + TABLE + id_and_name( + id INTEGER, + name VARCHAR(200) + ); + +INSERT + INTO + id_and_name( + id, + name + ) + VALUES( + 1, + 'sherif' + ), + ( + 2, + 'charles' + ), + ( + 3, + 'jared' + ), + ( + 4, + 'michel' + ), + ( + 5, + 'john' + ); diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_utf8.sql b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_utf8.sql new file mode 100644 index 0000000000000..0d24a4ac42a5b --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/init_utf8.sql @@ -0,0 +1,22 @@ +CREATE + TABLE + id_and_name( + id INTEGER, + name VARCHAR(200) + ); + +-- Add UTF characters to make sure the tap can read UTF + INSERT + INTO + id_and_name( + id, + name + ) + VALUES( + 1, + E'\u2013 someutfstring' + ), + ( + 2, + E'\u2215' + ); diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/schema.json b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/schema.json new file mode 100644 index 0000000000000..bed06a9b70aa9 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/schema.json @@ -0,0 +1,19 @@ +{ + "streams": [ + { + "name": "id_and_name", + "fields": [ + { + "name": "id", + "dataType": "number", + "selected": true + }, + { + "name": "name", + "dataType": "string", + "selected": true + } + ] + } + ] +} diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/selected_catalog.json b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/selected_catalog.json new file mode 100644 index 0000000000000..8915aa3c1cbdf --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/selected_catalog.json @@ -0,0 +1,18 @@ +{ + "streams": [ + { + "name": "id_and_name", + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "number" + }, + "name": { + "type": "string" + } + } + } + } + ] +} diff --git a/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/spec.json b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/spec.json new file mode 100644 index 0000000000000..9caf2f5d5c535 --- /dev/null +++ b/airbyte-integrations/singer/postgres_abprotocol/src/test-integration/resources/spec.json @@ -0,0 +1,37 @@ +{ + "type": "SPEC", + "spec": { + "documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgres Source Spec", + "type": "object", + "required": ["host", "port", "user", "dbname"], + "additionalProperties": false, + "properties": { + "host": { + "description": "Hostname of the database.", + "type": "string" + }, + "port": { + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536 + }, + "user": { + "description": "Username to use to access the database.", + "type": "string" + }, + "password": { + "description": "Password associated with the username.", + "type": "string" + }, + "dbname": { + "description": "Name of the database.", + "type": "string" + } + } + } + } +} diff --git a/airbyte-integrations/singer/stripe_abprotocol/source/source_stripe_singer/source.py b/airbyte-integrations/singer/stripe_abprotocol/source/source_stripe_singer/source.py index 3121b4efed100..1545022186878 100644 --- a/airbyte-integrations/singer/stripe_abprotocol/source/source_stripe_singer/source.py +++ b/airbyte-integrations/singer/stripe_abprotocol/source/source_stripe_singer/source.py @@ -1,5 +1,5 @@ import requests -from airbyte_protocol import AirbyteCheckResponse +from airbyte_protocol import AirbyteConnectionStatus, Status from base_singer import SingerSource @@ -7,11 +7,16 @@ class SourceStripeSinger(SingerSource): def __init__(self): pass - def check(self, logger, config_container) -> AirbyteCheckResponse: - json_config = config_container.rendered_config - r = requests.get('https://api.stripe.com/v1/customers', auth=(json_config['client_secret'], '')) - - return AirbyteCheckResponse(r.status_code == 200, {}) + def check(self, logger, config_container) -> AirbyteConnectionStatus: + try: + json_config = config_container.rendered_config + r = requests.get('https://api.stripe.com/v1/customers', auth=(json_config['client_secret'], '')) + if r.status_code == 200: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + else: + return AirbyteConnectionStatus(status=Status.FAILED, message=r.text) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"{str(e)}") def discover_cmd(self, logger, config_path) -> str: return f"tap-stripe --config {config_path} --discover" diff --git a/airbyte-integrations/singer/stripe_abprotocol/source/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java b/airbyte-integrations/singer/stripe_abprotocol/source/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java index 74f28db1b9c93..bcc33858d9731 100644 --- a/airbyte-integrations/singer/stripe_abprotocol/source/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java +++ b/airbyte-integrations/singer/stripe_abprotocol/source/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java @@ -170,9 +170,10 @@ public void testInvalidCredentialsCheck() throws IOException, InterruptedExcepti new BufferedReader(new InputStreamReader(process.getInputStream())).lines().filter(s -> s.contains("CONNECTION_STATUS")).findFirst(); assertTrue(statusMessageString.isPresent()); - assertEquals( - new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(new AirbyteConnectionStatus().withStatus(Status.FAILED)), - Jsons.deserialize(statusMessageString.get(), AirbyteMessage.class)); + AirbyteMessage response = Jsons.deserialize(statusMessageString.get(), AirbyteMessage.class); + assertEquals(Type.CONNECTION_STATUS, response.getType()); + assertEquals(Status.FAILED, response.getConnectionStatus().getStatus()); + assertTrue(response.getConnectionStatus().getMessage().length() > 0); } @Test @@ -209,7 +210,7 @@ public void testSync() throws IOException, InterruptedException, WorkerException MoreResources.readResource("sync_output_subset.txt").lines() .map(Jsons::deserialize) .map(SingerStripeSourceTest::normalize) - .forEach(record -> assertTrue(actualSyncOutput.contains(record))); + .forEach(record -> assertTrue(actualSyncOutput.contains(record), "Actual output: " + actualSyncOutput)); } private static JsonNode normalize(JsonNode node) { diff --git a/airbyte-integrations/template/singer-source/README.md b/airbyte-integrations/template/singer-source/README.md index 7880a961759b9..74cf96731ef9e 100644 --- a/airbyte-integrations/template/singer-source/README.md +++ b/airbyte-integrations/template/singer-source/README.md @@ -35,8 +35,8 @@ Prepare development environment: ``` cd airbyte-integrations/template/singer-source -# create & activate virtualenv -virtualenv build/venv +# create & activate virtualenv using python 3.7 +python3.7 -m venv build/venv source build/venv/bin/activate # install necessary dependencies diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java index 34bba60170690..720f3988b84e1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java @@ -87,19 +87,16 @@ public OutputAndStatus run(StandardCheckConnectio WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES); int exitCode = process.exitValue(); - if (exitCode == 0) { - if (status.isEmpty()) { - LOGGER.error("integration failed to output a connection status struct."); - return new OutputAndStatus<>(JobStatus.FAILED); - } + if (status.isPresent() && exitCode == 0) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() .withStatus(Enums.convertTo(status.get().getStatus(), Status.class)) .withMessage(status.get().getMessage()); + LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); + LOGGER.debug("Check connection job received output: {}", output); return new OutputAndStatus<>(SUCCEEDED, output); } else { - LOGGER.error("Check connection job subprocess finished with exit code {}", exitCode); return new OutputAndStatus<>(FAILED); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/singer/SingerCatalogConverters.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/singer/SingerCatalogConverters.java index 4d90247968d08..d0a3e5c09803c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/singer/SingerCatalogConverters.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/singer/SingerCatalogConverters.java @@ -255,26 +255,17 @@ private static DataType singerTypesToDataType(List singerTypes) { */ @VisibleForTesting static DataType singerTypeToDataType(SingerType singerType) { - switch (singerType) { - case STRING: - return DataType.STRING; - case INTEGER: - return DataType.NUMBER; - case NUMBER: - return DataType.NUMBER; - case NULL: - // noinspection DuplicateBranchesInSwitch - return DataType.STRING; // todo (cgardens) - hackasaurus rex - case BOOLEAN: - return DataType.BOOLEAN; - case OBJECT: - return DataType.OBJECT; - case ARRAY: - return DataType.ARRAY; - default: - throw new RuntimeException( - String.format("could not map SingerType: %s to DataType", singerType)); - } + return switch (singerType) { + case STRING -> DataType.STRING; + case NULL -> DataType.STRING; + case NUMBER -> DataType.NUMBER; + case INTEGER -> DataType.NUMBER; + case BOOLEAN -> DataType.BOOLEAN; + case OBJECT -> DataType.OBJECT; + case ARRAY -> DataType.ARRAY; + default -> throw new RuntimeException( + String.format("could not map SingerType: %s to DataType", singerType)); + }; } }