Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move postgres source to AB Protocol & various python fixes #610

Merged
merged 34 commits into from Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
928af6c
template
sherifnada Oct 16, 2020
577f16b
save
sherifnada Oct 16, 2020
c8b8cb8
save
sherifnada Oct 17, 2020
a511db7
Merge branch 'master' of github.com:airbytehq/airbyte into sherif/mov…
sherifnada Oct 17, 2020
37e9ced
save
sherifnada Oct 18, 2020
3b6a23a
save
sherifnada Oct 18, 2020
7651298
use correct param name
sherifnada Oct 18, 2020
af37eac
save 'progress'
sherifnada Oct 18, 2020
f451dba
discovery passing
sherifnada Oct 19, 2020
0f6f5f5
all tests passing
sherifnada Oct 19, 2020
a7182ae
spotless
sherifnada Oct 19, 2020
2d3926c
save
sherifnada Oct 19, 2020
316b06a
fix eraio
sherifnada Oct 19, 2020
e044856
remove unnecessary test
sherifnada Oct 19, 2020
18e48bc
save
sherifnada Oct 19, 2020
c1b8c3c
try fixing test
sherifnada Oct 19, 2020
a85d44a
try more days
sherifnada Oct 19, 2020
41dc190
remove eraio step
sherifnada Oct 19, 2020
91771e0
extend singer source
sherifnada Oct 19, 2020
e25c378
cleaner if-else in stripe
sherifnada Oct 19, 2020
844902b
actually extend singer source
sherifnada Oct 19, 2020
22fb146
for real though
sherifnada Oct 19, 2020
0ca290b
fix the other eraio 🙃
sherifnada Oct 19, 2020
4126ed8
add newline
sherifnada Oct 19, 2020
525f4e4
fix stripe
sherifnada Oct 19, 2020
9f2858d
Merge branch 'master' of github.com:airbytehq/airbyte into sherif/mov…
sherifnada Oct 19, 2020
77e8617
place test mount roots in /tmp
sherifnada Oct 19, 2020
cd357cf
Merge branch 'sherif/test-roots' into sherif/move-postgres-to-abprotocol
sherifnada Oct 19, 2020
c390e88
config -> secrets
sherifnada Oct 19, 2020
feebd91
fix conflict
sherifnada Oct 19, 2020
c406f21
confs
sherifnada Oct 19, 2020
3cfa4c7
fix python indentation issue
sherifnada Oct 19, 2020
313cadb
coerce ints into numbers'
sherifnada Oct 19, 2020
e1f8f2c
my code is now spotfree
sherifnada Oct 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/gradle.yml
Expand Up @@ -41,6 +41,9 @@ jobs:
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }}

- name: eraio tests
run: ./gradlew :airbyte-integrations:singer:exchangeratesapi:source:integrationTest --no-daemon -g ${{ env.GRADLE_PATH }}

- name: Build
run: ./gradlew build --no-daemon -g ${{ env.GRADLE_PATH }}

Expand Down
Expand Up @@ -77,7 +77,7 @@ public static Schema toSchema(AirbyteCatalog catalog) {
.withName(airbyteStream.getName())
.withFields(list.stream().map(item -> new Field()
.withName(item.getKey())
.withDataType(DataType.valueOf(item.getValue().get("type").asText().toUpperCase()))
.withDataType(DataType.fromValue(item.getValue().get("type").asText()))
.withSelected(true)).collect(Collectors.toList()));
}).collect(Collectors.toList()));
}
Expand Down
2 changes: 2 additions & 0 deletions airbyte-integrations/base-python/airbyte_protocol/__init__.py
Expand Up @@ -2,11 +2,13 @@
from .logger import AirbyteLogger
from .models import AirbyteCatalog
from .models import AirbyteConnectionStatus
from .models import Status
from .models import AirbyteLogMessage
from .models import AirbyteMessage
from .models import AirbyteRecordMessage
from .models import AirbyteStateMessage
from .models import AirbyteStream
from .models import Type

# Must be the last one because the way we load the connector module creates a circular
# dependency and models might not have been loaded yet
Expand Down
14 changes: 9 additions & 5 deletions airbyte-integrations/base-python/airbyte_protocol/entrypoint.py
Expand Up @@ -3,9 +3,11 @@
import os.path
import sys
import tempfile
import json

from .integration import ConfigContainer, Source
from .logger import AirbyteLogger
from airbyte_protocol import AirbyteMessage, Type, Status

impl_module = os.environ.get('AIRBYTE_IMPL_MODULE', Source.__module__)
impl_class = os.environ.get('AIRBYTE_IMPL_PATH', Source.__name__)
Expand All @@ -14,7 +16,6 @@

logger = AirbyteLogger()


sherifnada marked this conversation as resolved.
Show resolved Hide resolved
class AirbyteEntrypoint(object):
def __init__(self, source):
self.source = source
Expand Down Expand Up @@ -83,15 +84,18 @@ def start(self, args):

if cmd == "check":
check_result = self.source.check(logger, config_container)
if check_result.successful:
output_message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result).json(exclude_unset=True)
print(output_message)

if check_result.status == Status.SUCCEEDED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada If i understand right, this change is not compatible with what is currently in the template/python-source for the ´ check' function anymore, right?

AirbyteCheckResponse has to be changed to AirbyteConnectionStatus in that file too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong Correct - good catch

logger.info("Check succeeded")
sys.exit(0)
else:
logger.error("Check failed")
sys.exit(1)

sys.exit(0)
elif cmd == "discover":
catalog = self.source.discover(logger, config_container)
print(catalog.json(exclude_unset=True))
print(AirbyteMessage(type=Type.CATALOG, catalog=catalog).json(exclude_unset=True))
sys.exit(0)
elif cmd == "read":
generator = self.source.read(logger, config_container, parsed_args.catalog, parsed_args.state)
Expand Down
Expand Up @@ -64,8 +64,8 @@ class AirbyteConnectionStatus(BaseModel):

class AirbyteStream(BaseModel):
name: str = Field(..., description="Stream's name.")
json_schema: Optional[Dict[str, Any]] = Field(
None, description='Stream schema using Json Schema specs.'
json_schema: Dict[str, Any] = Field(
..., description='Stream schema using Json Schema specs.'
)


Expand Down
Expand Up @@ -19,7 +19,7 @@ function main() {
--input "/airbyte/$YAML_DIR/$filename_wo_ext.yaml" \
--output "/airbyte/$OUTPUT_DIR/$filename_wo_ext.py" \
--disable-timestamp
done
done
}

main "$@"
39 changes: 35 additions & 4 deletions airbyte-integrations/base-singer/base_singer/singer_helpers.py
Expand Up @@ -8,7 +8,7 @@
from airbyte_protocol import AirbyteStream
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
from typing import Generator, List, DefaultDict

import json

Expand All @@ -34,6 +34,36 @@ class Catalogs:


class SingerHelper:
@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having multiple types is valid jsonschema. i don't think we want to normalize it out in the integration if we are saying that you define your stream's schema using jsonschema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. saw your comment here: #609 (review). maybe we're on the same page now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, we are. I was confused. JSONSchema it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case please merge your PR 😆

def _normalize_type(type_field):
if isinstance(type_field, list):
non_null_fields = [type_string.lower() for type_string in type_field if type_string.lower() != 'null']
if len(non_null_fields) != 1:
raise Exception(f"Unexpected type in singer catalog: {type_field} ")
return non_null_fields[0]
else:
return type_field

@staticmethod
def _parse_type(type_field):
normalized_type = SingerHelper._normalize_type(type_field)
switcher = {
"string": "string",
"integer": "number",
"number": "number",
"null": "string",
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
"boolean": "boolean",
"object": "object",
"array": "array"
}
return switcher[normalized_type]

@staticmethod
def _transform_types(stream_properties: DefaultDict):
for field_name in stream_properties:
field_object = stream_properties[field_name]
field_object['type'] = SingerHelper._parse_type(field_object['type'])

@staticmethod
def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs:
completed_process = subprocess.run(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
Expand All @@ -47,7 +77,8 @@ def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalo

for stream in singer_catalog.get("streams"):
name = stream.get("stream")
schema = stream.get("schema").get("properties")
SingerHelper._transform_types(stream.get("schema").get("properties"))
schema = {"type": "object", "properties": stream.get("schema").get("properties")}
airbyte_streams += [AirbyteStream(name=name, json_schema=schema)]

airbyte_catalog = airbyte_transform(AirbyteCatalog(streams=airbyte_streams))
Expand All @@ -72,7 +103,7 @@ def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x
if out_json is not None and is_message(out_json):
transformed_json = transform(out_json)
if transformed_json is not None:
if transformed_json.get('type') == "SCHEMA":
if transformed_json.get('type') == "SCHEMA" or transformed_json.get('type') == "ACTIVATE_VERSION":
pass
elif transformed_json.get('type') == "STATE":
out_record = AirbyteStateMessage(data=transformed_json["value"])
Expand Down Expand Up @@ -109,7 +140,7 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing
new_metadata = metadata
new_metadata["metadata"]["selected"] = True
if not is_field_metadata(new_metadata):
new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE"
new_metadata["metadata"]["replication-method"] = "FULL_TABLE"
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
new_metadatas += [new_metadata]
singer_stream["metadata"] = new_metadatas

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/base-singer/requirements.txt
@@ -1,2 +1,2 @@
-e ../../base-python
-e ../base-python
-e .
@@ -1,17 +1,20 @@
import urllib.request

from airbyte_protocol import AirbyteCheckResponse
from airbyte_protocol import AirbyteConnectionStatus, Status
from base_singer import SingerSource


class SourceExchangeRatesApiSinger(SingerSource):
def __init__(self):
pass

def check(self, logger, config_path) -> AirbyteCheckResponse:
code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode()
logger.info(f"Ping response code: {code}")
return AirbyteCheckResponse(code == 200, {})
def check(self, logger, config_path) -> AirbyteConnectionStatus:
try:
code = urllib.request.urlopen("https://api.exchangeratesapi.io/").getcode()
logger.info(f"Ping response code: {code}")
return AirbyteConnectionStatus(status=Status.SUCCEEDED if (code==200) else Status.FAILED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"{str(e)}")

def discover_cmd(self, logger, config_path) -> str:
return "tap-exchangeratesapi | grep '\"type\": \"SCHEMA\"' | head -1 | jq -c '{\"streams\":[{\"stream\": .stream, \"schema\": .schema}]}'"
Expand Down
Expand Up @@ -114,7 +114,7 @@ public void testSuccessfulDiscover() throws IOException, InterruptedException, W

@Test
public void testSync() throws IOException, InterruptedException, WorkerException {
final Date date = Date.from(Instant.now().minus(2, ChronoUnit.DAYS));
final Date date = Date.from(Instant.now().minus(3, ChronoUnit.DAYS));
final SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");

IOs.writeFile(jobRoot, CONFIG, String.format("{\"start_date\":\"%s\"}", fmt.format(date)));
Expand All @@ -127,7 +127,7 @@ public void testSync() throws IOException, InterruptedException, WorkerException
assertEquals(0, process.exitValue());

final Optional<String> record = IOs.readFile(syncOutputPath).lines().filter(s -> s.contains("RECORD")).findFirst();
assertTrue(record.isPresent());
assertTrue(record.isPresent(), "Date: " + date + "tap output: " + IOs.readFile(syncOutputPath));

assertTrue(Jsons.deserialize(record.get())
.get("record")
Expand Down
@@ -0,0 +1 @@
build
1 change: 1 addition & 0 deletions airbyte-integrations/singer/postgres_abprotocol/.gitignore
@@ -0,0 +1 @@
config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my PR this weekend: #606

20 changes: 20 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/Dockerfile
@@ -0,0 +1,20 @@
FROM airbyte/integration-base-singer:dev

RUN apt-get update && apt-get install -y \
jq libpq-dev gcc\
&& rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="postgres_singer_source"
ENV AIRBYTE_IMPL_MODULE="postgres_singer_source"
ENV AIRBYTE_IMPL_PATH="PostgresSingerSource"

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/postgres-singer-source-abprotocol

WORKDIR /airbyte/integration_code

COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .

WORKDIR /airbyte
21 changes: 21 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/build.gradle
@@ -0,0 +1,21 @@
plugins {
id 'java'
}

apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle')
apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle')
dependencies {
integrationTestImplementation 'org.apache.commons:commons-dbcp2:2.7.0'
integrationTestImplementation 'com.fasterxml.jackson.core:jackson-databind'
integrationTestImplementation 'org.apache.commons:commons-text:1.9'
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"
integrationTestImplementation "org.postgresql:postgresql:42.2.16"

integrationTestImplementation project(':airbyte-config:models')
integrationTestImplementation project(':airbyte-workers')
integrationTestImplementation project(':airbyte-protocol:models')
integrationTestImplementation project(':airbyte-test-utils')
}

integrationTest.dependsOn(buildImage)
buildImage.dependsOn ":airbyte-integrations:base-singer:buildImage"
8 changes: 8 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/main_dev.py
@@ -0,0 +1,8 @@
import sys
from airbyte_protocol.entrypoint import launch

from postgres_singer_source import PostgresSingerSource

if __name__ == "__main__":
source = PostgresSingerSource()
launch(source, sys.argv[1:])
@@ -0,0 +1 @@
from .source import *
@@ -0,0 +1,44 @@
import urllib.request
import psycopg2

from typing import Generator

from airbyte_protocol import AirbyteCatalog
from airbyte_protocol import AirbyteConnectionStatus
from airbyte_protocol import Status
from airbyte_protocol import AirbyteMessage
from airbyte_protocol import Source
from airbyte_protocol import ConfigContainer
from base_singer import SingerHelper

TAP_CMD = "PGCLIENTENCODING=UTF8 tap-postgres"
class PostgresSingerSource(Source):
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):
pass

def check(self, logger, config_container: ConfigContainer) -> AirbyteConnectionStatus:
config = config_container.rendered_config
print(config)
try:
params="dbname='{dbname}' user='{user}' host='{host}' password='{password}' port='{port}'".format(**config)
psycopg2.connect(params)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
logger.error(f"Exception while connecting to postgres database: {e}")
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

def discover(self, logger, config_container) -> AirbyteCatalog:
catalogs = SingerHelper.get_catalogs(logger, f"{TAP_CMD} --config {config_container.rendered_config_path} --discover")
return catalogs.airbyte_catalog

def read(self, logger, config_container, catalog_path, state=None) -> Generator[AirbyteMessage, None, None]:
discover_cmd = f"{TAP_CMD} --config {config_container.rendered_config_path} --discover"
discovered_singer_catalog = SingerHelper.get_catalogs(logger, discover_cmd).singer_catalog

masked_airbyte_catalog = self.read_config(catalog_path)
selected_singer_catalog = SingerHelper.create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_singer_catalog)

catalog_path = f"--properties {selected_singer_catalog}"
config_option = f"--config {config_container.rendered_config_path}"
state_option = f"--state {state}" if state else ""
return SingerHelper.read(logger, f"{TAP_CMD} {catalog_path} {config_option} {state_option}")
@@ -0,0 +1,34 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgres Source Spec",
"type": "object",
"required": ["host", "port", "user", "dbname"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Hostname of the database.",
"type": "string"
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536
},
"user": {
"description": "Username to use to access the database.",
"type": "string"
},
"password": {
"description": "Password associated with the username.",
"type": "string"
},
"dbname": {
"description": "Name of the database.",
"type": "string"
}
}
}
}
@@ -0,0 +1,3 @@
-e ../../base-python
-e ../../singer/base-singer
-e .
20 changes: 20 additions & 0 deletions airbyte-integrations/singer/postgres_abprotocol/setup.py
@@ -0,0 +1,20 @@
from setuptools import setup, find_packages

setup(
name='postgres-singer-source',
description='Postgres Singer source',
author='Airbyte',
author_email='contact@airbyte.io',

packages=find_packages(),
package_data={
'': ['*.json']
},

install_requires=[
'psycopg2==2.7.4',
'tap-postgres==0.1.0',
'base_singer',
'airbyte_protocol'
]
)