Skip to content

Commit

Permalink
Add _PRESENCE type to store whether a field is valued
Browse files Browse the repository at this point in the history
  • Loading branch information
ndemengel committed Jul 20, 2018
1 parent 421a8ab commit 1074729
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 22 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ This file should be named mappings.json. Here is a sample :
"myobject.subproperty":{
"dest":"subproperty",
"type":"TEXT"
},
"propertyWhichMayBeNull": {
"dest":"property_has_been_provided",
"type":"_PRESENCE"
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions mongo_connector/doc_managers/mapping_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"oneOf": [
{"$ref": "#/definitions/basic-field"},
{"$ref": "#/definitions/array-field"},
{"$ref": "#/definitions/presence-field"},
{"$ref": "#/definitions/scalar-array-field"}
]
}
Expand Down Expand Up @@ -141,6 +142,16 @@
},
"required": ["type", "dest", "fk"]
},
"presence-field": {
"properties": {
"type": {"enum": ["_PRESENCE"]},
"dest": {"type": "boolean"},
"nullable": {
"type": "boolean"
}
},
"required": ["type"]
},
"scalar-array-field": {
"properties": {
"type": {"enum": ["_ARRAY_OF_SCALARS"]},
Expand Down
8 changes: 6 additions & 2 deletions mongo_connector/doc_managers/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from future.utils import iteritems
from mongo_connector.doc_managers.formatters import DocumentFlattener

from mongo_connector.doc_managers.utils import db_and_collection, ARRAY_OF_SCALARS_TYPE
from mongo_connector.doc_managers.utils import db_and_collection, ARRAY_OF_SCALARS_TYPE, FIELD_PRESENCE_TYPE

_formatter = DocumentFlattener()

Expand Down Expand Up @@ -58,8 +58,12 @@ def get_mapped_document(mappings, document, namespace):
field_mapping = mappings[db][collection][key]

if 'dest' in field_mapping:
value = cleaned_and_flatten_document.pop(key)
if field_mapping['type'] == FIELD_PRESENCE_TYPE:
value = value is not None

mappedKey = field_mapping['dest']
cleaned_and_flatten_document[mappedKey] = cleaned_and_flatten_document.pop(key)
cleaned_and_flatten_document[mappedKey] = value

return cleaned_and_flatten_document

Expand Down
17 changes: 9 additions & 8 deletions mongo_connector/doc_managers/postgresql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
from mongo_connector.doc_managers.mappings import is_mapped, get_mapped_document, get_primary_key, \
get_scalar_array_fields
from mongo_connector.doc_managers.sql import sql_table_exists, sql_create_table, sql_insert, sql_delete_rows, \
sql_bulk_insert, object_id_adapter, sql_delete_rows_where, to_sql_value, sql_drop_table, insert_document_arrays, \
insert_scalar_arrays
sql_bulk_insert, object_id_adapter, sql_delete_rows_where, to_sql_value, sql_drop_table
from mongo_connector.doc_managers.utils import get_array_fields, db_and_collection, get_any_array_fields, \
ARRAY_OF_SCALARS_TYPE, ARRAY_TYPE, get_nested_field_from_document
ARRAY_OF_SCALARS_TYPE, ARRAY_TYPE, get_nested_field_from_document, pg_type_for_mapping_type

MAPPINGS_JSON_FILE_NAME = 'mappings.json'

Expand Down Expand Up @@ -60,17 +59,19 @@ def _init_schema(self):

for database in self.mappings:
for collection in self.mappings[database]:
collection_mapping = self.mappings[database][collection]

self.insert_accumulator[collection] = 0

with self.pgsql.cursor() as cursor:
pk_found = False
pk_name = self.mappings[database][collection]['pk']
pk_name = collection_mapping['pk']
columns = ['_creationdate TIMESTAMP']
indices = [u"INDEX idx_{0}__creation_date ON {0} (_creationdate DESC)".format(collection)] + \
self.mappings[database][collection].get('indices', [])
collection_mapping.get('indices', [])

for column in self.mappings[database][collection]:
column_mapping = self.mappings[database][collection][column]
for column in collection_mapping:
column_mapping = collection_mapping[column]

if 'dest' in column_mapping:
name = column_mapping['dest']
Expand All @@ -82,7 +83,7 @@ def _init_schema(self):
pk_found = True

if column_type != ARRAY_TYPE and column_type != ARRAY_OF_SCALARS_TYPE:
columns.append(name + ' ' + column_type + ' ' + constraints)
columns.append(name + ' ' + pg_type_for_mapping_type(column_type) + ' ' + constraints)

if 'index' in column_mapping:
indices.append(u"INDEX idx_{2}_{0} ON {1} ({0})".format(name, collection, collection.replace('.', '_')))
Expand Down
5 changes: 5 additions & 0 deletions mongo_connector/doc_managers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

ARRAY_TYPE = u'_ARRAY'
ARRAY_OF_SCALARS_TYPE = u'_ARRAY_OF_SCALARS'
FIELD_PRESENCE_TYPE = u'_PRESENCE'


def extract_creation_date(document, primary_key):
Expand Down Expand Up @@ -87,3 +88,7 @@ def get_nested_field_from_document(document, dot_notation_key):
return None

return get_nested_field_from_document(document[partial_key], '.'.join(dot_notation_key.split('.')[1:]))


def pg_type_for_mapping_type(mapping_type):
return 'BOOLEAN NOT NULL DEFAULT FALSE' if mapping_type == FIELD_PRESENCE_TYPE else mapping_type
50 changes: 38 additions & 12 deletions tests/test_postgresql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
"type": "_ARRAY",
"dest": "col_field2",
"fk": "id_col"
},
"field3": {
"dest": "field3_is_present",
"type": "_PRESENCE"
}
},
"col_field2": {
Expand Down Expand Up @@ -73,6 +77,10 @@
'dest': 'col_field2',
'type': '_ARRAY',
'fk': 'id_col'
},
'field3': {
'dest': 'field3_is_present',
'type': '_PRESENCE'
}
},
'col_field2': {
Expand Down Expand Up @@ -185,7 +193,7 @@ def test_valid_configuration(self):
cursor.execute.assert_has_calls([
call('DROP TABLE col'),
call(
'CREATE TABLE col (_creationdate TIMESTAMP,_id INT CONSTRAINT COL_PK PRIMARY KEY,field1 TEXT ) '
'CREATE TABLE col (_creationdate TIMESTAMP,_id INT CONSTRAINT COL_PK PRIMARY KEY,field1 TEXT ,field3_is_present BOOLEAN NOT NULL DEFAULT FALSE ) '
),
call(
'CREATE TABLE col_field2 (_creationdate TIMESTAMP,_id INT CONSTRAINT COL_FIELD2_PK PRIMARY KEY,id_col INT ,subfield1 TEXT ) '
Expand Down Expand Up @@ -244,20 +252,22 @@ def test_upsert(self):
{
'subfield1': 'subval1'
}
]
],
'field3': 'some value'
}
now = time()

self.docmgr.upsert(doc, 'db.col', now)

# print(self.cursor.execute.mock_calls)
self.cursor.execute.assert_has_calls([
call(
'INSERT INTO col_field2 (id_col,subfield1) VALUES (%(id_col)s,%(subfield1)s) ',
{'id_col': 1, 'subfield1': 'subval1'}
),
call(
'INSERT INTO col (_id,field1) VALUES (%(_id)s,%(field1)s) ON CONFLICT (_id) DO UPDATE SET (_id,field1) = (%(_id)s,%(field1)s) ',
{'_id': 1, 'field1': 'val1'}
'INSERT INTO col (_id,field1,field3_is_present) VALUES (%(_id)s,%(field1)s,%(field3_is_present)s) ON CONFLICT (_id) DO UPDATE SET (_id,field1,field3_is_present) = (%(_id)s,%(field1)s,%(field3_is_present)s) ',
{'_id': 1, 'field1': 'val1', 'field3_is_present': True}
)
], any_order=True)
self.pconn.commit.assert_called()
Expand All @@ -268,25 +278,36 @@ def test_bulk_upsert(self):
'field1': 'val1',
'field2': [
{'subfield1': 'subval1'}
]
],
'field3': 'some value'
}
doc2 = {
'_id': 2,
'field1': 'val2',
'field2': [
{'subfield1': 'subval2'}
]
],
'field3': None # -> FALSE
}
doc3 = {
'_id': 3,
'field1': 'val3',
'field2': [
{'subfield1': 'subval3'}
],
'field3': False # False is a value as well, and should be considered present
}
doc4 = {
'_id': 4,
'field1': 'val4',
'field2': [
{'subfield1': 'subval4'}
]
# no field3, should be considered absent -> NULL -> defaulted to FALSE
}
now = time()

self.docmgr.bulk_upsert([doc1, doc2, doc3], 'db.col', now)
self.docmgr.bulk_upsert([doc1, doc2, doc3, doc4], 'db.col', now)

print(self.cursor.execute.mock_calls)
self.cursor.execute.assert_has_calls([
Expand All @@ -297,13 +318,16 @@ def test_bulk_upsert(self):
"INSERT INTO col_field2 (_creationDate,_id,id_col,subfield1) VALUES (NULL,NULL,2,'subval2')"
),
call(
"INSERT INTO col (_creationDate,_id,field1) VALUES (NULL,1,'val1'),(NULL,2,'val2')"
"INSERT INTO col (_creationDate,_id,field1,field3_is_present) VALUES (NULL,1,'val1',True),(NULL,2,'val2',False)"
),
call(
"INSERT INTO col_field2 (_creationDate,_id,id_col,subfield1) VALUES (NULL,NULL,3,'subval3')"
),
call(
"INSERT INTO col (_creationDate,_id,field1) VALUES (NULL,3,'val3')"
"INSERT INTO col_field2 (_creationDate,_id,id_col,subfield1) VALUES (NULL,NULL,4,'subval4')"
),
call(
"INSERT INTO col (_creationDate,_id,field1,field3_is_present) VALUES (NULL,3,'val3',True),(NULL,4,'val4',NULL)"
)
], any_order=True)
self.pconn.commit.assert_called()
Expand All @@ -315,7 +339,8 @@ def test_update(self):
'field1': 'val1',
'field2': [
{'subfield1': 'subval1'}
]
],
'field3': 'some value'
}
now = time()

Expand All @@ -327,6 +352,7 @@ def test_update(self):
self.mdb.__getitem__.assert_called_with('col')
self.mcol.find_one.assert_called_with({'_id': 1})

# print(self.cursor.execute.mock_calls)
self.cursor.execute.assert_has_calls([
call(
'DELETE FROM col_field2 WHERE id_col = 1'
Expand All @@ -336,8 +362,8 @@ def test_update(self):
{'id_col': 1, 'subfield1': 'subval1'}
),
call(
'INSERT INTO col (_id,field1) VALUES (%(_id)s,%(field1)s) ON CONFLICT (_id) DO UPDATE SET (_id,field1) = (%(_id)s,%(field1)s) ',
{'_id': 1, 'field1': 'val1'}
'INSERT INTO col (_id,field1,field3_is_present) VALUES (%(_id)s,%(field1)s,%(field3_is_present)s) ON CONFLICT (_id) DO UPDATE SET (_id,field1,field3_is_present) = (%(_id)s,%(field1)s,%(field3_is_present)s) ',
{'_id': 1, 'field1': 'val1', 'field3_is_present': True}
)
], any_order=True)
self.pconn.commit.assert_called()
Expand Down

0 comments on commit 1074729

Please sign in to comment.