Skip to content

Commit

Permalink
Add AB Protocol PG source & various python fixes (#610)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada committed Oct 19, 2020
1 parent 238867d commit d4b6ebc
Show file tree
Hide file tree
Showing 30 changed files with 610 additions and 58 deletions.
Expand Up @@ -87,6 +87,7 @@ public static Schema toSchema(AirbyteCatalog catalog) {
}

// todo (cgardens) - add more robust handling for jsonschema types.

/**
* JsonSchema tends to have 2 types for fields one of which is null. The null is pretty irrelevant,
* so look at types and find the first non-null one and use that.
Expand All @@ -96,11 +97,11 @@ public static Schema toSchema(AirbyteCatalog catalog) {
*/
private static DataType jsonSchemaTypesToDataType(JsonNode node) {
if (node.isTextual()) {
return DataType.valueOf(node.asText().toUpperCase());
return DataType.valueOf(convertToNumberIfInteger(node.asText().toUpperCase()));
} else if (node.isArray()) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false)
.filter(typeString -> !typeString.asText().toUpperCase().equals("NULL"))
.map(typeString -> DataType.valueOf(typeString.asText().toUpperCase()))
.map(typeString -> DataType.valueOf(convertToNumberIfInteger(typeString.asText().toUpperCase())))
.findFirst()
// todo (cgardens) - or throw?
.orElse(DataType.STRING);
Expand All @@ -109,4 +110,13 @@ private static DataType jsonSchemaTypesToDataType(JsonNode node) {
}
}

// TODO HACK: convert Integer to Number until we have a more solid typing system
private static String convertToNumberIfInteger(String type) {
if (type.toUpperCase().equals("INTEGER")) {
return "NUMBER";
} else {
return type;
}
}

}
2 changes: 2 additions & 0 deletions airbyte-integrations/base-python/airbyte_protocol/__init__.py
Expand Up @@ -2,11 +2,13 @@
from .logger import AirbyteLogger
from .models import AirbyteCatalog
from .models import AirbyteConnectionStatus
from .models import Status
from .models import AirbyteLogMessage
from .models import AirbyteMessage
from .models import AirbyteRecordMessage
from .models import AirbyteStateMessage
from .models import AirbyteStream
from .models import Type
from .models import ConnectorSpecification

# Must be the last one because the way we load the connector module creates a circular
Expand Down
15 changes: 7 additions & 8 deletions airbyte-integrations/base-python/airbyte_protocol/entrypoint.py
Expand Up @@ -3,10 +3,11 @@
import os.path
import sys
import tempfile
import json

from .integration import ConfigContainer, Source
from .logger import AirbyteLogger
from airbyte_protocol import AirbyteMessage, AirbyteConnectionStatus
from airbyte_protocol import AirbyteMessage, AirbyteConnectionStatus, Type, Status

impl_module = os.environ.get('AIRBYTE_IMPL_MODULE', Source.__module__)
impl_class = os.environ.get('AIRBYTE_IMPL_PATH', Source.__name__)
Expand Down Expand Up @@ -85,19 +86,17 @@ def start(self, args):
if cmd == "check":
check_result = self.source.check(logger, config_container)

if check_result.successful:
if check_result.status == Status.SUCCEEDED:
logger.info("Check succeeded")
status = AirbyteConnectionStatus(status='SUCCEEDED')
else:
logger.error("Check failed")
status = AirbyteConnectionStatus(status='FAILED')
message = AirbyteMessage(type='CONNECTION_STATUS', connectionStatus=status)
print(message.json(exclude_unset=True))

output_message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result).json(exclude_unset=True)
print(output_message)
sys.exit(0)
elif cmd == "discover":
catalog = self.source.discover(logger, config_container)
message = AirbyteMessage(type='CATALOG', catalog=catalog)
print(message.json(exclude_unset=True))
print(AirbyteMessage(type=Type.CATALOG, catalog=catalog).json(exclude_unset=True))
sys.exit(0)
elif cmd == "read":
generator = self.source.read(logger, config_container, parsed_args.catalog, parsed_args.state)
Expand Down
Expand Up @@ -19,7 +19,7 @@ function main() {
--input "/airbyte/$YAML_DIR/$filename_wo_ext.yaml" \
--output "/airbyte/$OUTPUT_DIR/$filename_wo_ext.py" \
--disable-timestamp
done
done
}

main "$@"
12 changes: 10 additions & 2 deletions airbyte-integrations/base-singer/base_singer/singer_helpers.py
Expand Up @@ -8,7 +8,7 @@
from airbyte_protocol import AirbyteStream
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
from typing import Generator, List, DefaultDict

import json

Expand All @@ -34,6 +34,12 @@ class Catalogs:


class SingerHelper:
@staticmethod
def _transform_types(stream_properties: DefaultDict):
for field_name in stream_properties:
field_object = stream_properties[field_name]
field_object['type'] = SingerHelper._parse_type(field_object['type'])

@staticmethod
def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs:
completed_process = subprocess.run(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
Expand Down Expand Up @@ -72,7 +78,7 @@ def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x
if out_json is not None and is_message(out_json):
transformed_json = transform(out_json)
if transformed_json is not None:
if transformed_json.get('type') == "SCHEMA":
if transformed_json.get('type') == "SCHEMA" or transformed_json.get('type') == "ACTIVATE_VERSION":
pass
elif transformed_json.get('type') == "STATE":
out_record = AirbyteStateMessage(data=transformed_json["value"])
Expand Down Expand Up @@ -104,13 +110,15 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing
for singer_stream in discovered_singer_catalog.get("streams"):
if singer_stream.get("stream") in stream_to_airbyte_schema:
new_metadatas = []

if singer_stream.get("metadata"):
metadatas = singer_stream.get("metadata")
for metadata in metadatas:
new_metadata = metadata
new_metadata["metadata"]["selected"] = True
if not is_field_metadata(new_metadata):
new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE"
new_metadata["metadata"]["replication-method"] = "FULL_TABLE"
new_metadatas += [new_metadata]
singer_stream["metadata"] = new_metadatas

Expand Down
Expand Up @@ -122,9 +122,11 @@ public abstract class TestDestination {

@BeforeEach
void setUpInternal() throws Exception {
final Path workspaceRoot = Files.createTempDirectory("test");
Path testDir = Path.of("/tmp/airbyte_tests/");
Files.createDirectories(testDir);
final Path workspaceRoot = Files.createTempDirectory(testDir, "test");
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"));
localRoot = Files.createTempDirectory("output");
localRoot = Files.createTempDirectory(testDir, "output");
testEnv = new TestDestinationEnv(localRoot);

setup(testEnv);
Expand Down
@@ -1,17 +1,20 @@
import urllib.request

from airbyte_protocol import AirbyteCheckResponse
from airbyte_protocol import AirbyteConnectionStatus, Status
from base_singer import SingerSource


class SourceExchangeRatesApiSinger(SingerSource):
def __init__(self):
pass

def check(self, logger, config_path) -> AirbyteCheckResponse:
code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode()
logger.info(f"Ping response code: {code}")
return AirbyteCheckResponse(code == 200, {})
def check(self, logger, config_path) -> AirbyteConnectionStatus:
try:
code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode()
logger.info(f"Ping response code: {code}")
return AirbyteConnectionStatus(status=Status.SUCCEEDED if (code==200) else Status.FAILED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"{str(e)}")

def discover_cmd(self, logger, config_path) -> str:
return "tap-exchangeratesapi | grep '\"type\": \"SCHEMA\"' | head -1 | jq -c '{\"streams\":[{\"stream\": .stream, \"schema\": .schema}]}'"
Expand Down
@@ -0,0 +1 @@
build
1 change: 1 addition & 0 deletions airbyte-integrations/singer/postgres_abprotocol/.gitignore
@@ -0,0 +1 @@
secrets
20 changes: 20 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/Dockerfile
@@ -0,0 +1,20 @@
FROM airbyte/integration-base-singer:dev

RUN apt-get update && apt-get install -y \
jq libpq-dev gcc\
&& rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="postgres_singer_source"
ENV AIRBYTE_IMPL_MODULE="postgres_singer_source"
ENV AIRBYTE_IMPL_PATH="PostgresSingerSource"

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/postgres-singer-source-abprotocol

WORKDIR /airbyte/integration_code

COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .

WORKDIR /airbyte
21 changes: 21 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/build.gradle
@@ -0,0 +1,21 @@
plugins {
id 'java'
}

apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle')
dependencies {
integrationTestImplementation 'org.apache.commons:commons-dbcp2:2.7.0'
integrationTestImplementation 'com.fasterxml.jackson.core:jackson-databind'
integrationTestImplementation 'org.apache.commons:commons-text:1.9'
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
integrationTestImplementation "org.postgresql:postgresql:42.2.16"

integrationTestImplementation project(':airbyte-config:models')
integrationTestImplementation project(':airbyte-workers')
integrationTestImplementation project(':airbyte-protocol:models')
integrationTestImplementation project(':airbyte-test-utils')
}

integrationTest.dependsOn(buildImage)
buildImage.dependsOn ":airbyte-integrations:base-singer:buildImage"
8 changes: 8 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/main_dev.py
@@ -0,0 +1,8 @@
import sys
from airbyte_protocol.entrypoint import launch

from postgres_singer_source import PostgresSingerSource

if __name__ == "__main__":
source = PostgresSingerSource()
launch(source, sys.argv[1:])
@@ -0,0 +1 @@
from .source import *
@@ -0,0 +1,37 @@
import urllib.request
import psycopg2

from typing import Generator

from airbyte_protocol import AirbyteCatalog
from airbyte_protocol import AirbyteConnectionStatus
from airbyte_protocol import Status
from airbyte_protocol import AirbyteMessage
from airbyte_protocol import Source
from airbyte_protocol import ConfigContainer
from base_singer import SingerHelper, SingerSource


TAP_CMD = "PGCLIENTENCODING=UTF8 tap-postgres"
class PostgresSingerSource(SingerSource):
def __init__(self):
pass

def check(self, logger, config_container: ConfigContainer) -> AirbyteConnectionStatus:
config = config_container.rendered_config
try:
params="dbname='{dbname}' user='{user}' host='{host}' password='{password}' port='{port}'".format(**config)
psycopg2.connect(params)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
logger.error(f"Exception while connecting to postgres database: {e}")
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

def discover_cmd(self, logger, config_path) -> AirbyteCatalog:
return f"{TAP_CMD} --config {config_path} --discover"

def read_cmd(self, logger, config_path, catalog_path, state_path=None) -> Generator[AirbyteMessage, None, None]:
catalog_option = f"--properties {catalog_path}"
config_option = f"--config {config_path}"
state_option = f"--state {state_path}" if state_path else ""
return f"{TAP_CMD} {catalog_option} {config_option} {state_option}"
@@ -0,0 +1,34 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgres Source Spec",
"type": "object",
"required": ["host", "port", "user", "dbname"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Hostname of the database.",
"type": "string"
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536
},
"user": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string"
},
"dbname": {
"description": "Name of the database.",
"type": "string"
}
}
}
}
@@ -0,0 +1,3 @@
-e ../../base-python
-e ../../singer/base-singer
-e .
20 changes: 20 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/setup.py
@@ -0,0 +1,20 @@
from setuptools import setup, find_packages

setup(
name='postgres-singer-source',
description='Postgres Singer source',
author='Airbyte',
author_email='contact@airbyte.io',

packages=find_packages(),
package_data={
'': ['*.json']
},

install_requires=[
'psycopg2==2.7.4',
'tap-postgres==0.1.0',
'base_singer',
'airbyte_protocol'
]
)

0 comments on commit d4b6ebc

Please sign in to comment.