Skip to content

Commit

Permalink
Fixes after testing.
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <eres@il.ibm.com>
  • Loading branch information
revit13 committed Feb 5, 2023
1 parent ce92168 commit 9351b09
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 44 deletions.
42 changes: 1 addition & 41 deletions abm/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,53 +64,13 @@ def __init__(self, config, logger, workdir, asset_name=""):
def __del__(self):
self.conf_file.close()

'''
Extract only the relevant data in "RECORD" lines returned by an Airbyte read operation.
For instance, if a line is:
{"type":"RECORD","record":{"stream":"users","data":{"id":1,"col1":"record1"},"emitted_at":"1644416403239","namespace":"public"}}
extract:
{"id":1,"col1":"record1"}
'''
def extract_data(self, line_dict):
return json.dumps(line_dict['record']['data']).encode('utf-8')

'''
Filter out all irrelevant lines, such as log lines.
Relevant lines are JSON-formatted, and have a 'type' field which is
either 'CATALOG' or 'RECORD'
'''
def filter_reply(self, lines, batch_size=100):
count = 0
for line in lines:
if count == 0:
ret = []
try:
line_dict = json.loads(line)
if 'type' in line_dict:
if line_dict['type'] == 'LOG':
continue
if line_dict['type'] == 'CATALOG':
ret.append(line)
elif line_dict['type'] == 'RECORD':
ret.append(self.extract_data(line_dict))
count = count + 1
if count == batch_size:
count = 0
yield ret
finally:
continue
if count == 0:
yield []
else:
yield ret

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command):
volumes=[self.workdir + ':' + MOUNTDIR]
return super().run_container(command, self.connector, volumes, remove=True, detach=True, stream=True)
return super().run_container(command, self.connector, volumes=volumes, remove=True, detach=False, stream=True)

def open_socket_to_container(self, command):
volumes=[self.workdir + ':' + MOUNTDIR]
Expand Down
46 changes: 44 additions & 2 deletions abm/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
import docker
import time
import json

CTRLD = '\x04'.encode()

Expand Down Expand Up @@ -32,16 +33,57 @@ def __init__(self, logger, workdir):
def name_in_container(self, path, mountdir):
return path.replace(self.workdir, mountdir, 1)

'''
Extract only the relevant data in "RECORD" lines returned by an Airbyte read operation.
For instance, if a line is:
{"type":"RECORD","record":{"stream":"users","data":{"id":1,"col1":"record1"},"emitted_at":"1644416403239","namespace":"public"}}
extract:
{"id":1,"col1":"record1"}
'''
def extract_data(self, line_dict):
return json.dumps(line_dict['record']['data']).encode('utf-8')

'''
Filter out all irrelevant lines, such as log lines.
Relevant lines are JSON-formatted, and have a 'type' field which is
either 'CATALOG' or 'RECORD'
'''
def filter_reply(self, lines, batch_size=100):
count = 0
for line in lines:
if count == 0:
ret = []
try:
line_dict = json.loads(line)
if 'type' in line_dict:
if line_dict['type'] == 'LOG':
continue
if line_dict['type'] == 'CATALOG':
ret.append(line)
elif line_dict['type'] == 'RECORD':
ret.append(self.extract_data(line_dict))
count = count + 1
if count == batch_size:
count = 0
yield ret
finally:
continue
if count == 0:
yield []
else:
yield ret

'''
Run a docker container from the connector image.
Mount the workdir on /local. Remove the container after done.
'''
def run_container(self, command, image, volumes, environment=None, remove=True, detach=True, stream=True, init=False):
def run_container(self, command, image, volumes, environment=None, remove=True, detach=False, stream=True, init=False):
self.logger.debug("running command: " + command)
try:
_ = self.client.containers.run(image, volumes=volumes, network_mode='host',
reply = self.client.containers.run(image, volumes=volumes, network_mode='host',
environment=environment,
command=command, init=init, stream=stream, remove=remove, detach=detach)
return self.filter_reply(reply)
except docker.errors.DockerException as e:
self.logger.error('Running of docker container failed',
extra={'error': str(e)})
Expand Down
2 changes: 1 addition & 1 deletion abm/normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, config, logger, workdir, asset_name=""):
def run_container(self, command):
volumes=[self.workdir + ':' + MOUNTDIR]
environment=["DEPLOYMENT_MODE=OSS", "AIRBYTE_ROLE=", "WORKER_ENVIRONMENT=DOCKER", "AIRBYTE_VERSION=" + self.airbyte_version]
super().run_container(command, self.normalization_image, volumes, environment, remove=True, detach=True, stream=True, init=True)
super().run_container(command, self.normalization_image, volumes, environment, remove=True, stream=True, init=True)

'''
Creates a normalization command
Expand Down

0 comments on commit 9351b09

Please sign in to comment.