Skip to content

Commit

Permalink
Add normalization (#76)
Browse files Browse the repository at this point in the history
* Add normalization (#75)

Signed-off-by: Revital Sur <eres@il.ibm.com>
Co-authored-by: Doron Chen <cdoron@il.ibm.com>

* Change the run command of the normalization image.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Add container.py for common code to handle container functionality.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Apply normalization only on write operation.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Add a table to the asset.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Fixes after testing.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Add overwrite mode for write operation.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Minor fix.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Address review comments.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Remove the usage of transformer.transform in extract_data.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Address Doron's comments.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Minor changes.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Add comments.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Rename function name in abm/connector.py

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Minor fix.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Minor change.

Signed-off-by: Revital Sur <eres@il.ibm.com>

* Address review comments.

Signed-off-by: Revital Sur <eres@il.ibm.com>

---------

Signed-off-by: Revital Sur <eres@il.ibm.com>
Co-authored-by: Doron Chen <cdoron@il.ibm.com>
  • Loading branch information
revit13 and cdoron committed Feb 9, 2023
1 parent 8873b91 commit 36f0155
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 68 deletions.
108 changes: 55 additions & 53 deletions abm/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import json
import tempfile
import pyarrow as pa
from pyarrow import json as pa_json
from .vault import get_secrets_from_vault
from .container import Container

MOUNTDIR = '/local'
CHUNKSIZE = 1024
CTRLD = '\x04'.encode()

class GenericConnector:
class GenericConnector(Container):
def __init__(self, config, logger, workdir, asset_name=""):
if 'connection' not in config:
raise ValueError("'connection' field missing from configuration")
Expand Down Expand Up @@ -41,19 +40,7 @@ def __init__(self, config, logger, workdir, asset_name=""):
else:
logger.info("no secrets returned by vault")

self.workdir = workdir
# Potentially the fybrik-blueprint pod for the airbyte module can start before the docker daemon pod, causing
# docker.from_env() to fail
retryLoop = 0
while retryLoop < 10:
try:
self.client = docker.from_env()
except Exception as e:
print('error on docker.from_env() ' + str(e) + ' sleep and retry. Retry count = ' + str(retryLoop))
time.sleep(1)
retryLoop += 1
else:
retryLoop = 10
super().__init__(logger, workdir, MOUNTDIR)

self.connector = self.config['connector']

Expand All @@ -62,13 +49,14 @@ def __init__(self, config, logger, workdir, asset_name=""):
# since the Airbyte connectors do not recognize this field
del self.config['connector']

self.logger = logger

# if the port field is a string, cast it to integer
if 'port' in self.config and type(self.config['port']) == str:
self.config['port'] = int(self.config['port'])

self.catalog_dict = None
# json_schema holds the json schema of the stream (table) to read if such stream is provided.
# otherwise it holds the json schema of the first stream in the catalog.
self.json_schema = None

# create the temporary json file for configuration
self.conf_file = tempfile.NamedTemporaryFile(dir=self.workdir)
Expand All @@ -89,12 +77,48 @@ def __del__(self):
self.conf_file.close()

'''
Translate the name of the temporary file in the host to the name of the same file
in the container.
For instance, it the path is '/tmp/tmp12345', return '/local/tmp12345'.
This function does the following:
- Prune the catalog streams into only one stream: if a stream (table) is provided then keep it.
Otherwise the first stream is kept.
- Remove metadata columns, if such exists, from "CATALOG" lines returned by an Airbyte read operation.
For instance, if a line is:
{'name': 'stream_name', 'json_schema': {'type': 'object', 'properties': {'_airbyte_stream_name_hashid': {'type': 'string'}, '_airbyte_ab_id': {'type': 'string'},
'dob': {'type': 'string'}, '_airbyte_normalized_at': {'type': 'string', 'format': 'date-time', 'airbyte_type': 'timestamp_without_timezone'},
'name': {'type': 'string'}, '_airbyte_emitted_at': {'type': 'string', 'format': 'date-time', 'airbyte_type': 'timestamp_with_timezone'}}}}
extract:
{'name': 'stream_name', 'json_schema': {'type': 'object', 'properties': {'dob': {'type': 'string'},'name': {'type': 'string'}}}}
These metadata columns are added in the normalization process.
ref: https://docs.airbyte.com/understanding-airbyte/basic-normalization
'''
def name_in_container(self, path):
return path.replace(self.workdir, MOUNTDIR, 1)
def prune_streams_and_remove_metadata_columns(self, line_dict):
catalog_streams = line_dict['catalog']['streams']
stream_name = self.get_stream_name()
the_stream = None
# get the stream to keep: if a stream (table) is provided
# then find it otherwise use the first stream in
# streams list.
if stream_name == "":
# no specific stream was provided then take the first item
# in the list
the_stream = catalog_streams[0]
else:
for stream in catalog_streams:
if stream['name'] == stream_name:
the_stream = stream
break
if the_stream == None:
self.logger.error('Error finding stream in catalog streams')
raise ValueError("error finding stream in catalog streams")
# remove metadata columns
properties = the_stream['json_schema']['properties']
for key in list(properties.keys()):
if key.startswith('_airbyte_'):
del properties[key]
# set the json_schema for later use
self.json_schema = the_stream['json_schema']
line_dict['catalog']['streams'] = [the_stream]
return json.dumps(line_dict).encode()

'''
Extract only the relevant data in "RECORD" lines returned by an Airbyte read operation.
Expand Down Expand Up @@ -122,7 +146,7 @@ def filter_reply(self, lines, batch_size=100):
if line_dict['type'] == 'LOG':
continue
if line_dict['type'] == 'CATALOG':
ret.append(line)
ret.append(self.prune_streams_and_remove_metadata_columns(line_dict))
elif line_dict['type'] == 'RECORD':
ret.append(self.extract_data(line_dict))
count = count + 1
Expand All @@ -141,36 +165,12 @@ def filter_reply(self, lines, batch_size=100):
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command):
self.logger.debug("running command: " + command)
try:
reply = self.client.containers.run(self.connector, command,
volumes=[self.workdir + ':' + MOUNTDIR], network_mode='host',
remove=True, stream=True)
return self.filter_reply(reply)
except docker.errors.DockerException as e:
self.logger.error('Running of docker container failed',
extra={'error': str(e)})
return None
volumes=[self.workdir + ':' + MOUNTDIR]
return super().run_container(command, self.connector, volumes=volumes, remove=True, detach=False, stream=True)

def open_socket_to_container(self, command):
container = self.client.containers.run(self.connector, detach=True,
tty=True, stdin_open=True,
volumes=[self.workdir + ':' + MOUNTDIR], network_mode='host',
command=command, remove=True)
# attach to the container stdin socket
s = container.attach_socket(params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(True)
return s, container

def close_socket_to_container(self, s, container):
s._sock.sendall(CTRLD) # ctrl d to finish things up
s._sock.close()
container.stop()
self.client.close()

def write_to_socket_to_container(self, s, binary_textline):
s._sock.sendall(binary_textline)
s.flush()
volumes=[self.workdir + ':' + MOUNTDIR]
return super().open_socket_to_container(command, self.connector, volumes)

# Given configuration, obtain the Airbyte Catalog, which includes list of datasets
def get_catalog(self):
Expand All @@ -186,6 +186,7 @@ def get_catalog(self):

'''
Return the schema of the first dataset in the catalog.
If a stream (table) is provided then its schema is returned.
Used by arrow-flight server for both the get_flight_info() and do_get().
Not needed for the Airbyte http server.
'''
Expand All @@ -195,7 +196,7 @@ def get_schema(self):
return None

schema = pa.schema({})
properties = self.catalog_dict['catalog']['streams'][0]['json_schema']['properties']
properties = self.json_schema['properties']
for field in properties:
type_field = properties[field]['type']
if type(type_field) is list:
Expand All @@ -215,6 +216,7 @@ def read_stream(self, catalog_file):
stream_name = self.get_stream_name()
for stream in self.catalog_dict['catalog']['streams']:
if 'name' in stream:
# read only the relavent stream if such provided
if stream_name != "" and stream['name'] != stream_name:
continue
stream_dict = {}
Expand Down
74 changes: 74 additions & 0 deletions abm/container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import time

CTRLD = '\x04'.encode()

class Container:
def __init__(self, logger, workdir, mountdir):
self.logger = logger
self.workdir = workdir
self.mountdir = mountdir
# Potentially the fybrik-blueprint pod for the airbyte module can start before the docker daemon pod, causing
# docker.from_env() to fail
retryLoop = 0
while retryLoop < 10:
try:
self.client = docker.from_env()
except Exception as e:
print('error on docker.from_env() ' + str(e) + ' sleep and retry. Retry count = ' + str(retryLoop))
time.sleep(1)
retryLoop += 1
else:
retryLoop = 10

'''
Translate the name of the temporary file in the host to the name of the same file
in the container.
For instance, it the path is '/tmp/tmp12345', return '/local/tmp12345'.
'''
def name_in_container(self, path):
return path.replace(self.workdir, self.mountdir, 1)

def filter_reply(self, reply):
return reply

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command, image, volumes, environment=None, remove=True, detach=False, stream=True, init=False):
self.logger.debug("running command: " + command)

try:
reply = self.client.containers.run(image, volumes=volumes, network_mode='host',
environment=environment,
command=command, init=init, stream=stream, remove=remove, detach=detach)
return self.filter_reply(reply)
except docker.errors.DockerException as e:
self.logger.error('Running of docker container failed',
extra={'error': str(e)})
return None

def open_socket_to_container(self, command, image, volumes, detach=True, tty=True, stdin_open=True, remove=True):
container = self.client.containers.run(image, detach=detach,
tty=tty, stdin_open=stdin_open,
volumes=volumes, network_mode='host',
command=command, remove=remove)
# attach to the container stdin socket
s = container.attach_socket(params={'stdin': 1, 'stream': 1, 'stdout': 1, 'stderr': 1})
s._sock.setblocking(True)
return s, container

def close_socket_to_container(self, s, container):
s._sock.sendall(CTRLD) # ctrl d to finish things up
s._sock.close()
container.stop()
self.client.close()

def write_to_socket_to_container(self, s, binary_textline):
s._sock.sendall(binary_textline)
s.flush()
43 changes: 43 additions & 0 deletions abm/normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2022 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
import docker
import tempfile
from .container import Container

MOUNTDIR = '/local'

class NormalizationContainer(Container):
def __init__(self, config, logger, workdir, asset_name=""):
if 'image' not in config['normalization']:
raise ValueError("'image' field missing from normalization section in configuration")
self.normalization_image = config['normalization']['image']
if 'integrationType' not in config['normalization']:
raise ValueError("'integrationType' field missing from normalization section in configuration")
self.integration_type = config['normalization']['integrationType']
if 'airbyteVersion' not in config['normalization']:
raise ValueError("'airbyteVersion' field missing from normalization section in configuration")
self.airbyte_version = config['normalization']['airbyteVersion']
super().__init__(logger, workdir, MOUNTDIR)

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command):
volumes=[self.workdir + ':' + MOUNTDIR]
# The normalization image is tied to and released along with a specific Airbyte version.
# ref: https://github.com/airbytehq/airbyte/blob/master/docs/understanding-airbyte/basic-normalization.md#airbyte-integrationbasesbase-normalization
environment=["WORKER_ENVIRONMENT=DOCKER", "AIRBYTE_VERSION=" + self.airbyte_version]
super().run_container(command, self.normalization_image, volumes, environment, remove=True, stream=True, init=True)

'''
Creates a normalization command
'''
def create_normalization_command(self, catalog, config):
command = 'run --config ' + self.name_in_container(config.name) + \
' --catalog ' + self.name_in_container(catalog.name) + ' --integration-type ' + \
self.integration_type

return command
22 changes: 20 additions & 2 deletions abm/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#

from fybrik_python_logging import init_logger, logger, DataSetID, ForUser
from .normalization import NormalizationContainer
from .config import Config
from .connector import GenericConnector
from .ticket import ABMTicket
Expand Down Expand Up @@ -163,6 +164,9 @@ def do_get(self, context, ticket: fl.Ticket):
def do_put(self, context, descriptor, reader, writer):
try:
command = json.loads(descriptor.command)
write_mode = None
if 'write_mode' in command:
write_mode = command['write_mode']
asset_name = command['asset']
json_schema_str = command['json_schema']
json_schema=json.loads(json_schema_str)
Expand All @@ -175,6 +179,12 @@ def do_put(self, context, descriptor, reader, writer):
DataSetID: asset_name,
ForUser: True})
with Config(self.config_path) as config:
mode = DestinationSyncMode.append
if write_mode:
if write_mode == "overwrite":
mode = DestinationSyncMode.overwrite
elif write_mode != "append":
logger.debug("unknown write mode. expect overwrite or append. using default append mode")
asset_conf = config.for_asset(asset_name)
connector = GenericConnector(asset_conf, logger, self.workdir, asset_name)
stream_name = connector.get_stream_name()
Expand All @@ -184,8 +194,8 @@ def do_put(self, context, descriptor, reader, writer):
stream_name = ''.join(random.choices(string.ascii_lowercase, k=stream_name_len))
stream = AirbyteStream(name=stream_name,supported_sync_modes=[SyncMode.full_refresh],json_schema=json_schema)
# TODO: Params to the Airbyte objects, such as destination_sync_mode, can be configurable using the arrow-flight request
streams = [ConfiguredAirbyteStream(destination_sync_mode=DestinationSyncMode.append, sync_mode=SyncMode.full_refresh, stream=stream)]
command, catalog = connector.create_write_command(ConfiguredAirbyteCatalog(streams=streams).json())
streams = [ConfiguredAirbyteStream(destination_sync_mode=mode, sync_mode=SyncMode.full_refresh, stream=stream)]
command, catalog = connector.create_write_command(ConfiguredAirbyteCatalog(streams=streams).json(exclude_unset=True, exclude_none=True))
socket, container = connector.open_socket_to_container(command)
idx = 0
record_reader = reader.to_reader()
Expand All @@ -206,6 +216,14 @@ def do_put(self, context, descriptor, reader, writer):
logger.error(f"Unexpected {err=}, {type(err)=}")
raise
connector.close_socket_to_container(socket, container)
if 'normalization' in asset_conf:
logger.info('starting normalization')
normalization_container = NormalizationContainer(asset_conf, logger, self.workdir, asset_name)
command = normalization_container.create_normalization_command(catalog=catalog,config=connector.conf_file)
reply = normalization_container.run_container(command)
if reply:
logger.debug(reply)

catalog.close()


Expand Down
Loading

0 comments on commit 36f0155

Please sign in to comment.