Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add normalization #76

Merged
merged 17 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 55 additions & 53 deletions abm/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import json
import tempfile
import pyarrow as pa
from pyarrow import json as pa_json
from .vault import get_secrets_from_vault
from .container import Container

MOUNTDIR = '/local'
CHUNKSIZE = 1024
CTRLD = '\x04'.encode()

class GenericConnector:
class GenericConnector(Container):
def __init__(self, config, logger, workdir, asset_name=""):
if 'connection' not in config:
raise ValueError("'connection' field missing from configuration")
Expand Down Expand Up @@ -41,19 +40,7 @@ def __init__(self, config, logger, workdir, asset_name=""):
else:
logger.info("no secrets returned by vault")

self.workdir = workdir
# Potentially the fybrik-blueprint pod for the airbyte module can start before the docker daemon pod, causing
# docker.from_env() to fail
retryLoop = 0
while retryLoop < 10:
try:
self.client = docker.from_env()
except Exception as e:
print('error on docker.from_env() ' + str(e) + ' sleep and retry. Retry count = ' + str(retryLoop))
time.sleep(1)
retryLoop += 1
else:
retryLoop = 10
super().__init__(logger, workdir, MOUNTDIR)

self.connector = self.config['connector']

Expand All @@ -62,13 +49,14 @@ def __init__(self, config, logger, workdir, asset_name=""):
# since the Airbyte connectors do not recognize this field
del self.config['connector']

self.logger = logger

# if the port field is a string, cast it to integer
if 'port' in self.config and type(self.config['port']) == str:
self.config['port'] = int(self.config['port'])

self.catalog_dict = None
# json_schema holds the json schema of the stream (table) to read if such stream is provided.
# otherwise it holds the json schema of the first stream in the catalog.
self.json_schema = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment explaining what is kept in self.json_schema


# create the temporary json file for configuration
self.conf_file = tempfile.NamedTemporaryFile(dir=self.workdir)
Expand All @@ -89,12 +77,48 @@ def __del__(self):
self.conf_file.close()

'''
Translate the name of the temporary file in the host to the name of the same file
in the container.
For instance, it the path is '/tmp/tmp12345', return '/local/tmp12345'.
This function does the following:
- Prune the catalog streams into only one stream: if a stream (table) is provided then keep it.
Otherwise the first stream is kept.
- Remove metadata columns, if such exists, from "CATALOG" lines returned by an Airbyte read operation.
For instance, if a line is:
{'name': 'stream_name', 'json_schema': {'type': 'object', 'properties': {'_airbyte_stream_name_hashid': {'type': 'string'}, '_airbyte_ab_id': {'type': 'string'},
'dob': {'type': 'string'}, '_airbyte_normalized_at': {'type': 'string', 'format': 'date-time', 'airbyte_type': 'timestamp_without_timezone'},
'name': {'type': 'string'}, '_airbyte_emitted_at': {'type': 'string', 'format': 'date-time', 'airbyte_type': 'timestamp_with_timezone'}}}}
extract:
{'name': 'stream_name', 'json_schema': {'type': 'object', 'properties': {'dob': {'type': 'string'},'name': {'type': 'string'}}}}

These metadata columns are added in the normalization process.
ref: https://docs.airbyte.com/understanding-airbyte/basic-normalization
'''
def name_in_container(self, path):
return path.replace(self.workdir, MOUNTDIR, 1)
def prune_streams_and_remove_metadata_columns(self, line_dict):
catalog_streams = line_dict['catalog']['streams']
stream_name = self.get_stream_name()
the_stream = None
# get the stream to keep: if a stream (table) is provided
# then find it otherwise use the first stream in
# streams list.
if stream_name == "":
# no specific stream was provided then take the first item
# in the list
the_stream = catalog_streams[0]
else:
for stream in catalog_streams:
if stream['name'] == stream_name:
the_stream = stream
break
if the_stream == None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a log message as well

self.logger.error('Error finding stream in catalog streams')
raise ValueError("error finding stream in catalog streams")
# remove metadata columns
properties = the_stream['json_schema']['properties']
for key in list(properties.keys()):
if key.startswith('_airbyte_'):
del properties[key]
# set the json_schema for later use
self.json_schema = the_stream['json_schema']
line_dict['catalog']['streams'] = [the_stream]
return json.dumps(line_dict).encode()

'''
Extract only the relevant data in "RECORD" lines returned by an Airbyte read operation.
Expand Down Expand Up @@ -122,7 +146,7 @@ def filter_reply(self, lines, batch_size=100):
if line_dict['type'] == 'LOG':
continue
if line_dict['type'] == 'CATALOG':
ret.append(line)
ret.append(self.prune_streams_and_remove_metadata_columns(line_dict))
elif line_dict['type'] == 'RECORD':
ret.append(self.extract_data(line_dict))
count = count + 1
Expand All @@ -141,36 +165,12 @@ def filter_reply(self, lines, batch_size=100):
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command):
self.logger.debug("running command: " + command)
try:
reply = self.client.containers.run(self.connector, command,
volumes=[self.workdir + ':' + MOUNTDIR], network_mode='host',
remove=True, stream=True)
return self.filter_reply(reply)
except docker.errors.DockerException as e:
self.logger.error('Running of docker container failed',
extra={'error': str(e)})
return None
volumes=[self.workdir + ':' + MOUNTDIR]
return super().run_container(command, self.connector, volumes=volumes, remove=True, detach=False, stream=True)

def open_socket_to_container(self, command):
container = self.client.containers.run(self.connector, detach=True,
tty=True, stdin_open=True,
volumes=[self.workdir + ':' + MOUNTDIR], network_mode='host',
command=command, remove=True)
# attach to the container stdin socket
s = container.attach_socket(params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(True)
return s, container

def close_socket_to_container(self, s, container):
s._sock.sendall(CTRLD) # ctrl d to finish things up
s._sock.close()
container.stop()
self.client.close()

def write_to_socket_to_container(self, s, binary_textline):
s._sock.sendall(binary_textline)
s.flush()
volumes=[self.workdir + ':' + MOUNTDIR]
return super().open_socket_to_container(command, self.connector, volumes)

# Given configuration, obtain the Airbyte Catalog, which includes list of datasets
def get_catalog(self):
Expand All @@ -186,6 +186,7 @@ def get_catalog(self):

'''
Return the schema of the first dataset in the catalog.
If a stream (table) is provided then its schema is returned.
Used by arrow-flight server for both the get_flight_info() and do_get().
Not needed for the Airbyte http server.
'''
Expand All @@ -195,7 +196,7 @@ def get_schema(self):
return None

schema = pa.schema({})
properties = self.catalog_dict['catalog']['streams'][0]['json_schema']['properties']
properties = self.json_schema['properties']
for field in properties:
type_field = properties[field]['type']
if type(type_field) is list:
Expand All @@ -215,6 +216,7 @@ def read_stream(self, catalog_file):
stream_name = self.get_stream_name()
for stream in self.catalog_dict['catalog']['streams']:
if 'name' in stream:
# read only the relavent stream if such provided
if stream_name != "" and stream['name'] != stream_name:
continue
stream_dict = {}
Expand Down
74 changes: 74 additions & 0 deletions abm/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import time

CTRLD = '\x04'.encode()

class Container:
def __init__(self, logger, workdir, mountdir):
self.logger = logger
self.workdir = workdir
self.mountdir = mountdir
# Potentially the fybrik-blueprint pod for the airbyte module can start before the docker daemon pod, causing
# docker.from_env() to fail
retryLoop = 0
while retryLoop < 10:
try:
self.client = docker.from_env()
except Exception as e:
print('error on docker.from_env() ' + str(e) + ' sleep and retry. Retry count = ' + str(retryLoop))
time.sleep(1)
retryLoop += 1
else:
retryLoop = 10

'''
Translate the name of the temporary file in the host to the name of the same file
in the container.
For instance, it the path is '/tmp/tmp12345', return '/local/tmp12345'.
'''
def name_in_container(self, path):
return path.replace(self.workdir, self.mountdir, 1)

def filter_reply(self, reply):
return reply

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command, image, volumes, environment=None, remove=True, detach=False, stream=True, init=False):
self.logger.debug("running command: " + command)

try:
reply = self.client.containers.run(image, volumes=volumes, network_mode='host',
environment=environment,
command=command, init=init, stream=stream, remove=remove, detach=detach)
return self.filter_reply(reply)
except docker.errors.DockerException as e:
self.logger.error('Running of docker container failed',
extra={'error': str(e)})
return None

def open_socket_to_container(self, command, image, volumes, detach=True, tty=True, stdin_open=True, remove=True):
container = self.client.containers.run(image, detach=detach,
tty=tty, stdin_open=stdin_open,
volumes=volumes, network_mode='host',
command=command, remove=remove)
# attach to the container stdin socket
s = container.attach_socket(params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(True)
return s, container

def close_socket_to_container(self, s, container):
s._sock.sendall(CTRLD) # ctrl d to finish things up
s._sock.close()
container.stop()
self.client.close()

def write_to_socket_to_container(self, s, binary_textline):
s._sock.sendall(binary_textline)
s.flush()
43 changes: 43 additions & 0 deletions abm/normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import tempfile
from .container import Container

MOUNTDIR = '/local'

class NormalizationContainer(Container):
def __init__(self, config, logger, workdir, asset_name=""):
if 'image' not in config['normalization']:
raise ValueError("'image' field missing from normalization section in configuration")
self.normalization_image = config['normalization']['image']
if 'integrationType' not in config['normalization']:
raise ValueError("'integrationType' field missing from normalization section in configuration")
self.integration_type = config['normalization']['integrationType']
if 'airbyteVersion' not in config['normalization']:
raise ValueError("'airbyteVersion' field missing from normalization section in configuration")
self.airbyte_version = config['normalization']['airbyteVersion']
super().__init__(logger, workdir, MOUNTDIR)

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command):
volumes=[self.workdir + ':' + MOUNTDIR]
# The normalization image is tied to and released along with a specific Airbyte version.
# ref: https://github.com/airbytehq/airbyte/blob/master/docs/understanding-airbyte/basic-normalization.md#airbyte-integrationbasesbase-normalization
environment=["WORKER_ENVIRONMENT=DOCKER", "AIRBYTE_VERSION=" + self.airbyte_version]
super().run_container(command, self.normalization_image, volumes, environment, remove=True, stream=True, init=True)

'''
Creates a normalization command
'''
def create_normalization_command(self, catalog, config):
command = 'run --config ' + self.name_in_container(config.name) + \
' --catalog ' + self.name_in_container(catalog.name) + ' --integration-type ' + \
self.integration_type

return command
22 changes: 20 additions & 2 deletions abm/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#

from fybrik_python_logging import init_logger, logger, DataSetID, ForUser
from .normalization import NormalizationContainer
from .config import Config
from .connector import GenericConnector
from .ticket import ABMTicket
Expand Down Expand Up @@ -163,6 +164,9 @@ def do_get(self, context, ticket: fl.Ticket):
def do_put(self, context, descriptor, reader, writer):
try:
command = json.loads(descriptor.command)
write_mode = None
if 'write_mode' in command:
write_mode = command['write_mode']
asset_name = command['asset']
json_schema_str = command['json_schema']
json_schema=json.loads(json_schema_str)
Expand All @@ -175,6 +179,12 @@ def do_put(self, context, descriptor, reader, writer):
DataSetID: asset_name,
ForUser: True})
with Config(self.config_path) as config:
mode = DestinationSyncMode.append
if write_mode:
if write_mode == "overwrite":
mode = DestinationSyncMode.overwrite
elif write_mode != "append":
logger.debug("unknown write mode. expect overwrite or append. using default append mode")
asset_conf = config.for_asset(asset_name)
connector = GenericConnector(asset_conf, logger, self.workdir, asset_name)
stream_name = connector.get_stream_name()
Expand All @@ -184,8 +194,8 @@ def do_put(self, context, descriptor, reader, writer):
stream_name = ''.join(random.choices(string.ascii_lowercase, k=stream_name_len))
stream = AirbyteStream(name=stream_name,supported_sync_modes=[SyncMode.full_refresh],json_schema=json_schema)
# TODO: Params to the Airbyte objects, such as destination_sync_mode, can be configurable using the arrow-flight request
streams = [ConfiguredAirbyteStream(destination_sync_mode=DestinationSyncMode.append, sync_mode=SyncMode.full_refresh, stream=stream)]
command, catalog = connector.create_write_command(ConfiguredAirbyteCatalog(streams=streams).json())
streams = [ConfiguredAirbyteStream(destination_sync_mode=mode, sync_mode=SyncMode.full_refresh, stream=stream)]
command, catalog = connector.create_write_command(ConfiguredAirbyteCatalog(streams=streams).json(exclude_unset=True, exclude_none=True))
socket, container = connector.open_socket_to_container(command)
idx = 0
record_reader = reader.to_reader()
Expand All @@ -206,6 +216,14 @@ def do_put(self, context, descriptor, reader, writer):
logger.error(f"Unexpected {err=}, {type(err)=}")
raise
connector.close_socket_to_container(socket, container)
if 'normalization' in asset_conf:
logger.info('starting normalization')
normalization_container = NormalizationContainer(asset_conf, logger, self.workdir, asset_name)
command = normalization_container.create_normalization_command(catalog=catalog,config=connector.conf_file)
reply = normalization_container.run_container(command)
if reply:
logger.debug(reply)

catalog.close()


Expand Down
Loading