-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move postgres source to AB Protocol & various python fixes #610
Changes from all commits
928af6c
577f16b
c8b8cb8
a511db7
37e9ced
3b6a23a
7651298
af37eac
f451dba
0f6f5f5
a7182ae
2d3926c
316b06a
e044856
18e48bc
c1b8c3c
a85d44a
41dc190
91771e0
e25c378
844902b
22fb146
0ca290b
4126ed8
525f4e4
9f2858d
77e8617
cd357cf
c390e88
feebd91
c406f21
3cfa4c7
313cadb
e1f8f2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
from airbyte_protocol import AirbyteStream | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from typing import Generator | ||
from typing import Generator, List, DefaultDict | ||
|
||
import json | ||
|
||
|
@@ -34,6 +34,12 @@ class Catalogs: | |
|
||
|
||
class SingerHelper: | ||
@staticmethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. having multiple types is valid jsonschema. i don't think we want to normalize it out in the integration if we are saying that you define your stream's schema using jsonschema. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh. saw your comment here: #609 (review). maybe we're on the same page now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, we are. I was confused. JSONSchema it is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which case please merge your PR 😆 |
||
def _transform_types(stream_properties: DefaultDict): | ||
for field_name in stream_properties: | ||
field_object = stream_properties[field_name] | ||
field_object['type'] = SingerHelper._parse_type(field_object['type']) | ||
|
||
@staticmethod | ||
def get_catalogs(logger, shell_command, singer_transform=(lambda catalog: catalog), airbyte_transform=(lambda catalog: catalog)) -> Catalogs: | ||
completed_process = subprocess.run(shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | ||
|
@@ -72,7 +78,7 @@ def read(logger, shell_command, is_message=(lambda x: True), transform=(lambda x | |
if out_json is not None and is_message(out_json): | ||
transformed_json = transform(out_json) | ||
if transformed_json is not None: | ||
if transformed_json.get('type') == "SCHEMA": | ||
if transformed_json.get('type') == "SCHEMA" or transformed_json.get('type') == "ACTIVATE_VERSION": | ||
pass | ||
elif transformed_json.get('type') == "STATE": | ||
out_record = AirbyteStateMessage(data=transformed_json["value"]) | ||
|
@@ -104,13 +110,15 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog, discovered_sing | |
for singer_stream in discovered_singer_catalog.get("streams"): | ||
if singer_stream.get("stream") in stream_to_airbyte_schema: | ||
new_metadatas = [] | ||
|
||
if singer_stream.get("metadata"): | ||
metadatas = singer_stream.get("metadata") | ||
for metadata in metadatas: | ||
new_metadata = metadata | ||
new_metadata["metadata"]["selected"] = True | ||
if not is_field_metadata(new_metadata): | ||
new_metadata["metadata"]["forced-replication-method"] = "FULL_TABLE" | ||
new_metadata["metadata"]["replication-method"] = "FULL_TABLE" | ||
new_metadatas += [new_metadata] | ||
singer_stream["metadata"] = new_metadatas | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
build |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
secrets |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
FROM airbyte/integration-base-singer:dev | ||
|
||
RUN apt-get update && apt-get install -y \ | ||
jq libpq-dev gcc\ | ||
&& rm -rf /var/lib/apt/lists/* | ||
|
||
ENV CODE_PATH="postgres_singer_source" | ||
ENV AIRBYTE_IMPL_MODULE="postgres_singer_source" | ||
ENV AIRBYTE_IMPL_PATH="PostgresSingerSource" | ||
|
||
LABEL io.airbyte.version=0.1.0 | ||
LABEL io.airbyte.name=airbyte/postgres-singer-source-abprotocol | ||
|
||
WORKDIR /airbyte/integration_code | ||
|
||
COPY $CODE_PATH ./$CODE_PATH | ||
COPY setup.py ./ | ||
RUN pip install . | ||
|
||
WORKDIR /airbyte |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
plugins { | ||
id 'java' | ||
} | ||
|
||
apply from: rootProject.file('tools/gradle/commons/integrations/image.gradle') | ||
apply from: rootProject.file('tools/gradle/commons/integrations/integration-test.gradle') | ||
dependencies { | ||
integrationTestImplementation 'org.apache.commons:commons-dbcp2:2.7.0' | ||
integrationTestImplementation 'com.fasterxml.jackson.core:jackson-databind' | ||
integrationTestImplementation 'org.apache.commons:commons-text:1.9' | ||
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2" | ||
integrationTestImplementation "org.postgresql:postgresql:42.2.16" | ||
|
||
integrationTestImplementation project(':airbyte-config:models') | ||
integrationTestImplementation project(':airbyte-workers') | ||
integrationTestImplementation project(':airbyte-protocol:models') | ||
integrationTestImplementation project(':airbyte-test-utils') | ||
} | ||
|
||
integrationTest.dependsOn(buildImage) | ||
buildImage.dependsOn ":airbyte-integrations:base-singer:buildImage" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
import sys | ||
from airbyte_protocol.entrypoint import launch | ||
|
||
from postgres_singer_source import PostgresSingerSource | ||
|
||
if __name__ == "__main__": | ||
source = PostgresSingerSource() | ||
launch(source, sys.argv[1:]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .source import * |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import urllib.request | ||
import psycopg2 | ||
|
||
from typing import Generator | ||
|
||
from airbyte_protocol import AirbyteCatalog | ||
from airbyte_protocol import AirbyteConnectionStatus | ||
from airbyte_protocol import Status | ||
from airbyte_protocol import AirbyteMessage | ||
from airbyte_protocol import Source | ||
from airbyte_protocol import ConfigContainer | ||
from base_singer import SingerHelper, SingerSource | ||
|
||
|
||
TAP_CMD = "PGCLIENTENCODING=UTF8 tap-postgres" | ||
class PostgresSingerSource(SingerSource): | ||
def __init__(self): | ||
pass | ||
|
||
def check(self, logger, config_container: ConfigContainer) -> AirbyteConnectionStatus: | ||
config = config_container.rendered_config | ||
try: | ||
params="dbname='{dbname}' user='{user}' host='{host}' password='{password}' port='{port}'".format(**config) | ||
psycopg2.connect(params) | ||
return AirbyteConnectionStatus(status=Status.SUCCEEDED) | ||
except Exception as e: | ||
logger.error(f"Exception while connecting to postgres database: {e}") | ||
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e)) | ||
|
||
def discover_cmd(self, logger, config_path) -> AirbyteCatalog: | ||
return f"{TAP_CMD} --config {config_path} --discover" | ||
|
||
def read_cmd(self, logger, config_path, catalog_path, state_path=None) -> Generator[AirbyteMessage, None, None]: | ||
catalog_option = f"--properties {catalog_path}" | ||
config_option = f"--config {config_path}" | ||
state_option = f"--state {state_path}" if state_path else "" | ||
return f"{TAP_CMD} {catalog_option} {config_option} {state_option}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", | ||
"connectionSpecification": { | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "Postgres Source Spec", | ||
"type": "object", | ||
"required": ["host", "port", "user", "dbname"], | ||
"additionalProperties": false, | ||
"properties": { | ||
"host": { | ||
"description": "Hostname of the database.", | ||
"type": "string" | ||
}, | ||
"port": { | ||
"description": "Port of the database.", | ||
"type": "integer", | ||
"minimum": 0, | ||
"maximum": 65536 | ||
}, | ||
"user": { | ||
"description": "Username to use to access the database.", | ||
"type": "string" | ||
}, | ||
"password": { | ||
"description": "Password associated with the username.", | ||
"type": "string" | ||
}, | ||
"dbname": { | ||
"description": "Name of the database.", | ||
"type": "string" | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
-e ../../base-python | ||
-e ../../singer/base-singer | ||
-e . |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from setuptools import setup, find_packages | ||
|
||
setup( | ||
name='postgres-singer-source', | ||
description='Postgres Singer source', | ||
author='Airbyte', | ||
author_email='contact@airbyte.io', | ||
|
||
packages=find_packages(), | ||
package_data={ | ||
'': ['*.json'] | ||
}, | ||
|
||
install_requires=[ | ||
'psycopg2==2.7.4', | ||
'tap-postgres==0.1.0', | ||
'base_singer', | ||
'airbyte_protocol' | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada If i understand right, this change is not compatible with what is currently in the
template/python-source
for the ´ check' function anymore, right?AirbyteCheckResponse
has to be changed toAirbyteConnectionStatus
in that file too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChristopheDuong Correct - good catch