Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/key properties canonicalized #95

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 15 additions & 9 deletions target_postgres/postgres.py
Expand Up @@ -138,18 +138,21 @@ def write_batch(self, stream_buffer):
stream_buffer.key_properties
))

for key in stream_buffer.key_properties:
if self.json_schema_to_sql_type(current_table_schema['schema']['properties'][key]) \
!= self.json_schema_to_sql_type(stream_buffer.schema['properties'][key]):
for key_property in stream_buffer.key_properties:
canonicalized_key, remote_column_schema = self.fetch_column_from_path((key_property,),
current_table_schema)
if self.json_schema_to_sql_type(remote_column_schema) \
!= self.json_schema_to_sql_type(stream_buffer.schema['properties'][key_property]):
raise PostgresError(
('`key_properties` type change detected for "{}". ' +
'Existing values are: {}. ' +
'Streamed values are: {}, {}, {}').format(
key,
json_schema.get_type(current_table_schema['schema']['properties'][key]),
json_schema.get_type(stream_buffer.schema['properties'][key]),
self.json_schema_to_sql_type(current_table_schema['schema']['properties'][key]),
self.json_schema_to_sql_type(stream_buffer.schema['properties'][key])
key_property,
json_schema.get_type(current_table_schema['schema']['properties'][key_property]),
json_schema.get_type(stream_buffer.schema['properties'][key_property]),
self.json_schema_to_sql_type(
current_table_schema['schema']['properties'][key_property]),
self.json_schema_to_sql_type(stream_buffer.schema['properties'][key_property])
))

root_table_name = stream_buffer.stream
Expand Down Expand Up @@ -457,9 +460,12 @@ def persist_csv_rows(self,
pattern = re.compile(SINGER_LEVEL.format('[0-9]+'))
subkeys = list(filter(lambda header: re.match(pattern, header) is not None, columns))

canonicalized_key_properties = [self.fetch_column_from_path((key_property,), remote_schema)[0]
for key_property in remote_schema['key_properties']]

update_sql = self._get_update_sql(remote_schema['name'],
temp_table_name,
remote_schema['key_properties'],
canonicalized_key_properties,
columns,
subkeys)
cur.execute(update_sql)
Expand Down
15 changes: 15 additions & 0 deletions target_postgres/sql_base.py
Expand Up @@ -111,6 +111,21 @@ def canonicalize_identifier(self, name):
"""
raise NotImplementedError('`canonicalize_identifier` not implemented.')

def fetch_column_from_path(self, path, table_schema):
"""
Should only be used for paths which have been established, ie, the schema will
not be changing etc.
:param path:
:param table_schema:
:return:
"""

for to, m in table_schema.get('mappings', {}).items():
if tuple(m['from']) == path:
return to, json_schema.simple_type(m)

raise Exception('blahbittyblah')

def _canonicalize_column_identifier(self, path, schema, mappings):
""""""

Expand Down
43 changes: 43 additions & 0 deletions tests/test_postgres.py
Expand Up @@ -975,6 +975,49 @@ def test_loading__invalid_column_name(db_cleanup):
})


def test_loading__invalid_column_name__pk(db_cleanup):
def setup(count):
class Stream(CatStream):
def generate_record(self):
record = CatStream.generate_record(self)
record['ID'] = record['id']
record.pop('id')
return record

stream = Stream(count)
stream.schema = deepcopy(stream.schema)
stream.schema['schema']['properties']['ID'] = \
stream.schema['schema']['properties']['id']

stream.schema['key_properties'] = ['ID']
stream.schema['schema'][['properties'].pop('id')

return stream

main(CONFIG, input_stream=setup(100))
main(CONFIG, input_stream=setup(200))

with psycopg2.connect(**TEST_DB) as conn:
with conn.cursor() as cur:
assert_columns_equal(cur,
'cats',
{
('_sdc_batched_at', 'timestamp with time zone', 'YES'),
('_sdc_received_at', 'timestamp with time zone', 'YES'),
('_sdc_sequence', 'bigint', 'YES'),
('_sdc_table_version', 'bigint', 'YES'),
('adoption__adopted_on', 'timestamp with time zone', 'YES'),
('adoption__was_foster', 'boolean', 'YES'),
('age', 'bigint', 'YES'),
('id', 'bigint', 'NO'),
('name', 'text', 'NO'),
('paw_size', 'bigint', 'NO'),
('paw_colour', 'text', 'NO'),
('flea_check_complete', 'boolean', 'NO'),
('pattern', 'text', 'YES')
})


def test_loading__invalid_column_name__duplicate_name_handling(db_cleanup):
for i in range(101):
name_too_long_stream = CatStream(100)
Expand Down