-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add normalization #76
Conversation
dab7198
to
9351b09
Compare
Signed-off-by: Revital Sur <eres@il.ibm.com> Co-authored-by: Doron Chen <cdoron@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
c7085bf
to
e3286c1
Compare
Signed-off-by: Revital Sur <eres@il.ibm.com>
e3286c1
to
a7d8e64
Compare
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
abm/server.py
Outdated
if write_mode: | ||
if write_mode == "overwrite": | ||
mode = DestinationSyncMode.overwrite | ||
if write_mode != "append": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be elif
# if the port field is a string, cast it to integer | ||
if 'port' in self.config and type(self.config['port']) == str: | ||
self.config['port'] = int(self.config['port']) | ||
|
||
self.catalog_dict = None | ||
self.json_schema = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment explaining what is kept in self.json_schema
abm/connector.py
Outdated
Translate the name of the temporary file in the host to the name of the same file | ||
in the container. | ||
For instance, it the path is '/tmp/tmp12345', return '/local/tmp12345'. | ||
Remove metadata columns, if such exists, from "CATALOG" lines returned by an Airbyte read operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In future versions, we should consider keeping the metadata columns and returning them to the user.
abm/connector.py
Outdated
for stream in catalog_streams: | ||
# remove metadata columns for a specific stream (table) if such | ||
# is provided | ||
if stream_name != "" and stream['name'] != stream_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider replacing this line with:
if stream_name and stream['name'] != stream_name:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there is no need to keep entries for all other streams (other than the stream you are really interested in)
abm/connector.py
Outdated
properties = json_schema['properties'] | ||
for key in list(properties.keys()): | ||
if key.startswith('_airbyte_'): | ||
del properties[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add break
(?)
abm/connector.py
Outdated
|
||
# Given configuration, obtain the Airbyte Catalog, which includes list of datasets | ||
def get_catalog(self): | ||
ret = [] | ||
for lines in self.run_container('discover --config ' + self.name_in_container(self.conf_file.name)): | ||
for lines in self.run_container('discover --config ' + self.name_in_container(self.conf_file.name, MOUNTDIR)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check whether there is a way to remove the MOUNTDIR
parameter
abm/connector.py
Outdated
''' | ||
def name_in_container(self, path): | ||
return path.replace(self.workdir, MOUNTDIR, 1) | ||
def remove_metadata_columns(self, line_dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider setting the json_schema
field in this method
abm/connector.py
Outdated
Given a catalog return the json schema of a specific stream (table) if the stream | ||
is provided. Otherwise return the json schema of the first stream. | ||
''' | ||
def get_stream_schema(self, catalog): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this method would no longer be needed once self.json_schema
contains the schema for a single stream
abm/connector.py
Outdated
@@ -260,6 +255,8 @@ def get_catalog_dict(self): | |||
|
|||
try: | |||
self.catalog_dict = json.loads(airbyte_catalog[0]) | |||
# save the json_schema part in the catalog for later use | |||
self.json_schema = self.get_stream_schema(self.catalog_dict['catalog']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe no longer needed
helm/abm/values.yaml
Outdated
@@ -97,3 +97,12 @@ connectors: | |||
storage: HTTPS | |||
# dataset_name is the Name of the final table to replicate this file into. | |||
dataset_name: userdata | |||
|
|||
# Atttributes related to the normzliation process. If they are provided then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
sample/write_mysql.yaml
Outdated
host: host.docker.internal | ||
port: 3306 | ||
database: test | ||
database: test | ||
table: table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please change table name
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
Signed-off-by: Revital Sur <eres@il.ibm.com>
@cdoron Thanks for the review. I fixed the code according to the the comments. Thanks |
Signed-off-by: Revital Sur <eres@il.ibm.com>
if stream['name'] == stream_name: | ||
the_stream = stream | ||
break | ||
if the_stream == None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a log message as well
1. To verify that the Airbyte module writes the dataset, run: | ||
```bash | ||
kubectl exec -it mysql-client --namespace fybrik-airbyte-sample -- bash | ||
mysql -h mysql.fybrik-airbyte-sample.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the kubectl exec
line be in the same "bash" segment as the mysql -h
line?
I think that if someone copies and pastes this entire segment it would not work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works ok - after the msql command we enter a mysql shell prompt...
Signed-off-by: Revital Sur <eres@il.ibm.com>
/Closes #75
This PR adds the code for doing normalization of the database tables which transforms the data from Airbyte format to expected database format.
more on normalization:
https://docs.airbyte.com/understanding-airbyte/basic-normalization/
Signed-off-by: Revital Sur eres@il.ibm.com
Co-authored-by: Doron Chen cdoron@il.ibm.com