Skip to content

Commit

Permalink
Merge branch 'master' into liren/remove-json-connector-definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Feb 15, 2022
2 parents dfe25ba + 358aa9e commit 34477fa
Show file tree
Hide file tree
Showing 303 changed files with 1,918 additions and 2,024 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.35.28-alpha
current_version = 0.35.29-alpha
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


### SHARED ###
VERSION=0.35.28-alpha
VERSION=0.35.29-alpha

# When using the airbyte-db via default docker image
CONFIG_ROOT=/data
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/sonar-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ jobs:
echo "::set-output name=changed-modules::{ \"include\": $CHANGES }"
run-ci-tests:
if: github.event.pull_request.draft == false
# Do only run if the PR is not a draft and the changed modules matrix contains at least one entry
if: github.event.pull_request.draft == false && fromJson(needs.detect-changes.outputs.changed-modules).include[0] != null
needs: detect-changes
name: Tests for ${{ matrix.module }}
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions airbyte-bootloader/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ ENV APPLICATION airbyte-bootloader

WORKDIR /app

ADD bin/${APPLICATION}-0.35.28-alpha.tar /app
ADD bin/${APPLICATION}-0.35.29-alpha.tar /app

ENTRYPOINT ["/bin/bash", "-c", "${APPLICATION}-0.35.28-alpha/bin/${APPLICATION}"]
ENTRYPOINT ["/bin/bash", "-c", "${APPLICATION}-0.35.29-alpha/bin/${APPLICATION}"]
2 changes: 1 addition & 1 deletion airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
application {
applicationName = "airbyte-bootloader"
mainClass = 'io.airbyte.bootloader.BootloaderApp'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

// Publish this so Airbyte Cloud can consume and extend the classes within this jar.
Expand Down
73 changes: 57 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def configured_for_incremental(configured_stream: ConfiguredAirbyteStream):
return configured_stream.sync_mode and configured_stream.sync_mode == SyncMode.incremental


def get_stream_level_metadata(metadatas: List[Dict[str, any]]) -> Optional[Dict[str, any]]:
def get_stream_level_metadata(metadatas: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
for metadata in metadatas:
if not is_field_metadata(metadata) and "metadata" in metadata:
return metadata.get("metadata")
Expand All @@ -67,7 +67,7 @@ class SyncModeInfo:
default_cursor_field: Optional[List[str]] = None


def set_sync_modes_from_metadata(airbyte_stream: AirbyteStream, metadatas: List[Dict[str, any]]):
def set_sync_modes_from_metadata(airbyte_stream: AirbyteStream, metadatas: List[Dict[str, Any]]):
stream_metadata = get_stream_level_metadata(metadatas)
if stream_metadata:
# A stream is incremental if it declares replication keys or if forced-replication-method is set to incremental
Expand Down Expand Up @@ -102,11 +102,14 @@ class SingerHelper:
def _transform_types(stream_properties: DefaultDict):
for field_name in stream_properties:
field_object = stream_properties[field_name]
field_object["type"] = SingerHelper._parse_type(field_object["type"])
# according to issue CDK: typing errors #9500, mypy raises error on this line
# '"Type[SingerHelper]" has no attribute "_parse_type"', it's need to fix
# ignored for now
field_object["type"] = SingerHelper._parse_type(field_object["type"]) # type: ignore

@staticmethod
def singer_catalog_to_airbyte_catalog(
singer_catalog: Dict[str, any], sync_mode_overrides: Dict[str, SyncModeInfo], primary_key_overrides: Dict[str, List[str]]
singer_catalog: Dict[str, Any], sync_mode_overrides: Dict[str, SyncModeInfo], primary_key_overrides: Dict[str, List[str]]
) -> AirbyteCatalog:
"""
:param singer_catalog:
Expand All @@ -116,7 +119,11 @@ def singer_catalog_to_airbyte_catalog(
:return: Airbyte Catalog
"""
airbyte_streams = []
for stream in singer_catalog.get("streams"):
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Item "None" of "Optional[Any]" has no attribute "__iter__" (not iterable)'
# It occurs because default value isn't set, and it's None
# It's needed to set default value, ignored for now
for stream in singer_catalog.get("streams"): # type: ignore
name = stream.get("stream")
schema = stream.get("schema")
airbyte_stream = AirbyteStream(name=name, json_schema=schema)
Expand Down Expand Up @@ -154,9 +161,17 @@ def get_catalogs(
singer_catalog = SingerHelper._read_singer_catalog(logger, shell_command)
streams = singer_catalog.get("streams", [])
if streams and excluded_streams:
singer_catalog["streams"] = [stream for stream in streams if stream["stream"] not in excluded_streams]

airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog, sync_mode_overrides, primary_key_overrides)
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Unsupported target for indexed assignment ("Mapping[str, Any]")'
# _read_singer_catalog returns Mapping, to fix this error it should be changed to MutableMapping
# ignored for now
singer_catalog["streams"] = [stream for stream in streams if stream["stream"] not in excluded_streams] # type: ignore

# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Argument 1 to "singer_catalog_to_airbyte_catalog" of "SingerHelper" has incompatible type "Mapping[str, Any]"; expected "Dict[str, Any]"'
# singer_catalog is Mapping, because _read_singer_catalog returns Mapping, but singer_catalog_to_airbyte_catalog expects Dict
# it's needed to check and fix, ignored for now
airbyte_catalog = SingerHelper.singer_catalog_to_airbyte_catalog(singer_catalog, sync_mode_overrides, primary_key_overrides) # type: ignore
return Catalogs(singer_catalog=singer_catalog, airbyte_catalog=airbyte_catalog)

@staticmethod
Expand All @@ -177,14 +192,22 @@ def read(logger, shell_command, is_message=(lambda x: True)) -> Iterator[Airbyte
@staticmethod
def _read_lines(process: subprocess.Popen) -> Iterator[Tuple[str, TextIOWrapper]]:
sel = selectors.DefaultSelector()
sel.register(process.stdout, selectors.EVENT_READ)
sel.register(process.stderr, selectors.EVENT_READ)
# according to issue CDK: typing errors #9500, mypy raises error on this two lines
# 'Argument 1 to "register" of "DefaultSelector" has incompatible type "Optional[IO[Any]]"; expected "Union[int, HasFileno]"'
# 'Argument 1 to "register" of "DefaultSelector" has incompatible type "Optional[IO[Any]]"; expected "Union[int, HasFileno]"'
# It's need to check, ignored for now
sel.register(process.stdout, selectors.EVENT_READ) # type: ignore
sel.register(process.stderr, selectors.EVENT_READ) # type: ignore
eof = False
while not eof:
selects_list = sel.select()
empty_line_counter = 0
for key, _ in selects_list:
line = key.fileobj.readline()
# according to issue CDK: typing errors #9500, mypy raises two errors on these lines
# 'Item "int" of "Union[int, HasFileno]" has no attribute "readline"'
# 'Item "HasFileno" of "Union[int, HasFileno]" has no attribute "readline"'
# It's need to check, ignored for now
line = key.fileobj.readline() # type: ignore
if not line:
empty_line_counter += 1
if empty_line_counter >= len(selects_list):
Expand All @@ -193,12 +216,21 @@ def _read_lines(process: subprocess.Popen) -> Iterator[Tuple[str, TextIOWrapper]
try:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
raise Exception(f"Underlying command {process.args} is hanging")
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'On Python 3 '{}'.format(b'abc') produces "b'abc'", not 'abc'; use '{!r}'.format(b'abc') if this is desired behavior'
# It's need to fix, ignored for now
raise Exception(f"Underlying command {process.args} is hanging") # type: ignore

if process.returncode != 0:
raise Exception(f"Underlying command {process.args} failed with exit code {process.returncode}")
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'On Python 3 '{}'.format(b'abc') produces "b'abc'", not 'abc'; use '{!r}'.format(b'abc') if this is desired behavior'
# It's need to fix, ignored for now
raise Exception(f"Underlying command {process.args} failed with exit code {process.returncode}") # type: ignore
else:
yield line, key.fileobj
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Incompatible types in "yield" (actual type "Tuple[Any, Union[int, HasFileno]]", expected type "Tuple[str, TextIOWrapper]")'
# It's need to fix, ignored for now
yield line, key.fileobj # type: ignore

@staticmethod
def _airbyte_message_from_json(transformed_json: Mapping[str, Any]) -> Optional[AirbyteMessage]:
Expand All @@ -210,7 +242,12 @@ def _airbyte_message_from_json(transformed_json: Mapping[str, Any]) -> Optional[
else:
# todo: check that messages match the discovered schema
stream_name = transformed_json["stream"]
out_record = AirbyteRecordMessage(
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Incompatible types in assignment (expression has type "AirbyteRecordMessage", variable has type "AirbyteStateMessage")'
# type of out_record is first initialized as AirbyteStateMessage on the line 240
# however AirbyteRecordMessage is assigned on the line below, it causes error
# ignored
out_record = AirbyteRecordMessage( # type: ignore
stream=stream_name,
data=transformed_json["record"],
emitted_at=int(datetime.now().timestamp()) * 1000,
Expand All @@ -227,7 +264,11 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog: ConfiguredAirby
configured_stream.stream.name: configured_stream for configured_stream in masked_airbyte_catalog.streams
}

for singer_stream in discovered_singer_catalog.get("streams"):
# according to issue CDK: typing errors #9500, mypy raises error on this line
# '"object" has no attribute "get"'
# discovered_singer_catalog type is set to object on the line 259, need to check
# ignored for now
for singer_stream in discovered_singer_catalog.get("streams"): # type: ignore
stream_name = singer_stream.get("stream")
if stream_name in stream_name_to_configured_stream:
new_metadatas = []
Expand Down
53 changes: 39 additions & 14 deletions airbyte-cdk/python/airbyte_cdk/sources/singer/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
#


import json
import os
from dataclasses import dataclass
from typing import Dict, Iterable, List, Type
from typing import Any, Dict, Iterable, List, Mapping, Type

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status
Expand All @@ -18,14 +17,17 @@

@dataclass
class ConfigContainer:
config: json
config: Mapping[str, Any]
config_path: str


class SingerSource(Source):

# can be overridden to change an input config
def configure(self, raw_config: json, temp_dir: str) -> json:
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "ConfigContainer" of "configure" incompatible with return type "Mapping[str, Any]" in supertype "Connector"'
# need to fix, ignored for now
def configure(self, raw_config: Mapping[str, Any], temp_dir: str) -> ConfigContainer: # type: ignore
"""
Persist raw_config in temporary directory to run the Source job
This can be overridden if extra temporary files need to be persisted in the temp dir
Expand All @@ -36,27 +38,33 @@ def configure(self, raw_config: json, temp_dir: str) -> json:
return ConfigContainer(config, config_path)

# Can be overridden to change an input config
def transform_config(self, config: json) -> json:
def transform_config(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Singer source may need to adapt the Config object for the singer tap specifics
"""
return config

# Overriding to change an input catalog as path instead
def read_catalog(self, catalog_path: str) -> str:
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "str" of "read_catalog" incompatible with return type "ConfiguredAirbyteCatalog" in supertype "Source"'
# need to fix, ignored for now
def read_catalog(self, catalog_path: str) -> str: # type: ignore
"""
Since singer source don't need actual catalog object, we override this to return path only
"""
return catalog_path

# Overriding to change an input state as path instead
def read_state(self, state_path: str) -> str:
# according to issue CDK: typing errors #9500, mypy raises error on this line
# 'Return type "str" of "read_state" incompatible with return type "Dict[str, Any]" in supertype "Source"'
# need to fix, ignored for now
def read_state(self, state_path: str) -> str: # type: ignore
"""
Since singer source don't need actual state object, we override this to return path only
"""
return state_path

def check_config(self, logger: AirbyteLogger, config_path: str, config: json) -> AirbyteConnectionStatus:
def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Some Singer source may perform check using config_path or config to
tests if the input configuration can be used to successfully connect to the integration
Expand Down Expand Up @@ -85,13 +93,20 @@ def _discover_internal(self, logger: AirbyteLogger, config_path: str) -> Catalog
)
return catalogs

def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus:
# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "check" is incompatible with supertype "Connector"; supertype defines the argument type as "Mapping[str, Any]"'
# need to fix, ignored for now
def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus: # type: ignore
"""
Tests if the input configuration can be used to successfully connect to the integration
"""
return self.check_config(logger, config_container.config_path, config_container.config)

def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog:
# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# need to fix, ignored for now
def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog: # type: ignore
"""
Implements the parent class discover method.
"""
Expand All @@ -100,7 +115,13 @@ def discover(self, logger: AirbyteLogger, config_container) -> AirbyteCatalog:
else:
return self._discover_internal(logger, config_container).airbyte_catalog

def read(
# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"'
# 'Argument 3 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "ConfiguredAirbyteCatalog"'
# 'Argument 4 of "read" is incompatible with supertype "Source"; supertype defines the argument type as "Optional[MutableMapping[str, Any]]"'
# need to fix, ignored for now
def read( # type: ignore
self, logger: AirbyteLogger, config_container: ConfigContainer, catalog_path: str, state_path: str = None
) -> Iterable[AirbyteMessage]:
"""
Expand Down Expand Up @@ -149,7 +170,7 @@ def get_excluded_streams(self) -> List[str]:
class BaseSingerSource(SingerSource):
force_full_refresh = False

def check_config(self, logger: AirbyteLogger, config_path: str, config: json) -> AirbyteConnectionStatus:
def check_config(self, logger: AirbyteLogger, config_path: str, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
self.try_connect(logger, config)
except self.api_error as err:
Expand All @@ -169,13 +190,17 @@ def read_cmd(self, logger: AirbyteLogger, config_path: str, catalog_path: str, s

return f"{self.tap_cmd} {cmd}"

def discover(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteCatalog:
# according to issue CDK: typing errors #9500, mypy raises errors on this line
# 'Argument 1 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Logger"'
# 'Argument 2 of "discover" is incompatible with supertype "Source"; supertype defines the argument type as "Mapping[str, Any]"'
# need to fix, ignored for now
def discover(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteCatalog: # type: ignore
catalog = super().discover(logger, config_container)
if self.force_full_refresh:
return CatalogHelper.coerce_catalog_as_full_refresh(catalog)
return catalog

def try_connect(self, logger: AirbyteLogger, config: json):
def try_connect(self, logger: AirbyteLogger, config: Mapping[str, Any]):
"""Test provided credentials, raises self.api_error if something goes wrong"""
raise NotImplementedError

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def traverse_schema(schema: Any, path: List[str]):
traverse_schema(i, path)
else:
if path[-1] == "airbyte_secret" and schema is True:
path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]])
pathes.add(path)
path_str = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]])
pathes.add(path_str)

traverse_schema(schema, [])
return pathes
Expand Down

0 comments on commit 34477fa

Please sign in to comment.