Skip to content

Commit

Permalink
Merge ada4d74 into ef07673
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdd committed May 3, 2017
2 parents ef07673 + ada4d74 commit aad770a
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 18 deletions.
13 changes: 13 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ This file should be named mappings.json. Here is a sample :
"myobject.subproperty":{
"dest":"subproperty",
"type":"TEXT"
},
"transformed_field": {
"dest": "field",
"type": "BOOLEAN",
"transform": "package.module.transform_str_to_bool"
}
}
}
Expand All @@ -99,6 +104,14 @@ Please notice the following :
- If the original document in mongodb has a embedded document, everything is flattened to be inserted in PostgreSQL
- One can define indices in two different ways : Using the array ``indices`` and a SQL definition or autogenerate index
by setting the ``index`` field to true
- The ``transform`` (if set) points to a function used to transform the field from the Mongo document

Example of transform function:

.. code-block:: python
def transform_str_to_bool(mongo_val):
return (mongo_val in ['true', 'True', 'yes', 'Yes', '1'])
The connector also supports arrays of documents. Let say your Mongo database stores the following documents :

Expand Down
71 changes: 70 additions & 1 deletion mongo_connector/doc_managers/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@
from future.utils import iteritems
from mongo_connector.doc_managers.formatters import DocumentFlattener

from mongo_connector.doc_managers.utils import db_and_collection, ARRAY_OF_SCALARS_TYPE
from mongo_connector.doc_managers.utils import (
db_and_collection,
ARRAY_OF_SCALARS_TYPE,
ARRAY_TYPE
)

from importlib import import_module
import logging


LOG = logging.getLogger(__name__)

_formatter = DocumentFlattener()

Expand Down Expand Up @@ -74,6 +84,65 @@ def get_primary_key(mappings, namespace):
return mappings[db][collection]['pk']


def get_transformed_value(mapped_field, mapped_document, key):
val = mapped_document[key]

if 'transform' in mapped_field:
transform = mapped_field['transform']
transform_path = transform.rsplit('.', 1)
module_path = 'mongo_connector.doc_managers.transforms'

if len(transform_path) == 2:
module_path, transform_path = transform_path

else:
transform_path = transform_path[0]

try:
module = import_module(module_path)
transform = getattr(module, transform_path)

except (ImportError, ValueError) as err:
LOG.error('Impossible to use transform function: {0}'.format(err))

else:
try:
new_val = transform(val)

except Exception as err:
LOG.error(
'An error occured during field transformation: {0}'.format(
err
)
)

else:
val = new_val

return val


def get_transformed_document(mappings, db, collection, mapped_document):
mapped_fields = {
mapping['dest']: mapping
for _, mapping in iteritems(mappings[db][collection])
if 'dest' in mapping and mapping['type'] not in (
ARRAY_TYPE,
ARRAY_OF_SCALARS_TYPE
)
}
keys = list(mapped_fields.keys())
keys.sort()

return {
key: get_transformed_value(
mapped_fields[key],
mapped_document, key
) if key in mapped_fields else mapped_document[key]
for key in mapped_document
}


def is_mapped(mappings, namespace, field_name=None):
db, collection = db_and_collection(namespace)
return db in mappings and collection in mappings[db] and \
Expand Down
8 changes: 7 additions & 1 deletion mongo_connector/doc_managers/postgresql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ def _upsert(self, namespace, document, cursor, timestamp):
mapped_document = get_mapped_document(self.mappings, document, namespace)

if mapped_document:
sql_insert(cursor, collection, mapped_document, self.mappings[db][collection]['pk'])
sql_insert(
cursor,
collection,
mapped_document,
self.mappings,
db, collection
)

self._upsert_array_fields(collection, cursor, db, document, mapped_document, namespace, timestamp)
self.upsert_scalar_array_fields(collection, cursor, db, document, mapped_document, namespace, timestamp)
Expand Down
61 changes: 48 additions & 13 deletions mongo_connector/doc_managers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,26 @@
from past.builtins import long, basestring
from psycopg2._psycopg import AsIs

from mongo_connector.doc_managers.mappings import get_mapped_document
from mongo_connector.doc_managers.utils import extract_creation_date, get_array_fields, db_and_collection, \
get_array_of_scalar_fields, ARRAY_OF_SCALARS_TYPE, ARRAY_TYPE, get_nested_field_from_document
from mongo_connector.doc_managers.mappings import (
get_mapped_document,
get_transformed_value,
get_transformed_document
)

from mongo_connector.doc_managers.utils import (
extract_creation_date,
get_array_fields,
db_and_collection,
get_array_of_scalar_fields,
ARRAY_OF_SCALARS_TYPE,
ARRAY_TYPE,
get_nested_field_from_document
)


LOG = logging.getLogger(__name__)


all_chars = (chr(i) for i in range(0x10000))
control_chars = ''.join(c for c in all_chars if unicodedata.category(c) == 'Cc')
control_char_re = re.compile('[%s]' % re.escape(control_chars))
Expand Down Expand Up @@ -60,25 +74,41 @@ def sql_bulk_insert(cursor, mappings, namespace, documents):
db, collection = db_and_collection(namespace)

primary_key = mappings[db][collection]['pk']
keys = [
v['dest'] for k, v in iteritems(mappings[db][collection])
if 'dest' in v and v['type'] != ARRAY_TYPE
and v['type'] != ARRAY_OF_SCALARS_TYPE
]
keys.sort()

mapped_fields = {
mapping['dest']: mapping
for _, mapping in iteritems(mappings[db][collection])
if 'dest' in mapping and mapping['type'] not in (
ARRAY_TYPE,
ARRAY_OF_SCALARS_TYPE
)
}
keys = list(mapped_fields.keys())
keys.sort()
values = []

for document in documents:
mapped_document = get_mapped_document(mappings, document, namespace)
document_values = [to_sql_value(extract_creation_date(mapped_document, mappings[db][collection]['pk']))]
document_values = [
to_sql_value(
extract_creation_date(
mapped_document,
mappings[db][collection]['pk']
)
)
]

if not mapped_document:
break

for key in keys:
if key in mapped_document:
document_values.append(to_sql_value(mapped_document[key]))
val = get_transformed_value(
mapped_fields[key],
mapped_document, key
)
document_values.append(to_sql_value(val))

else:
document_values.append(to_sql_value(None))
values.append(u"({0})".format(u','.join(document_values)))
Expand Down Expand Up @@ -128,7 +158,9 @@ def get_document_keys(document):
return keys


def sql_insert(cursor, tableName, document, primary_key):
def sql_insert(cursor, tableName, document, mappings, db, collection):
primary_key = mappings[db][collection]['pk']

creationDate = extract_creation_date(document, primary_key)
if creationDate is not None:
document['_creationDate'] = creationDate
Expand All @@ -152,7 +184,10 @@ def sql_insert(cursor, tableName, document, primary_key):
)

try:
cursor.execute(sql, document)
cursor.execute(
sql,
get_transformed_document(mappings, db, collection, document)
)
except Exception as e:
LOG.error(u"Impossible to upsert the following document %s : %s", document, e)

Expand Down
60 changes: 60 additions & 0 deletions tests/test_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from mongo_connector.doc_managers import mappings


def parseInt(val):
return int(val)


class TestPostgreSQLMappings(TestCase):
def test_clean_and_flatten_doc(self):
mapping = {
Expand Down Expand Up @@ -163,6 +167,62 @@ def test_scalar_array_fields(self):
got = mappings.get_scalar_array_fields(mapping, 'missing_db', 'col')
self.assertEqual(got, [])

def test_get_transform_value(self):
mapped_field = {
'type': 'INT',
'dest': 'str_to_int',
'transform': 'tests.test_mappings.parseInt'
}
doc = {
'str_to_int': '42'
}

got = mappings.get_transformed_value(mapped_field, doc, 'str_to_int')
self.assertEqual(got, 42)

mapped_field['transform'] = 'missing.no'
got = mappings.get_transformed_value(mapped_field, doc, 'str_to_int')
self.assertEqual(got, '42')

mapped_field['transform'] = 'tests.test_mappings.parseInt'
doc = {
'str_to_int': '42a'
}
got = mappings.get_transformed_value(mapped_field, doc, 'str_to_int')
self.assertEqual(got, '42a')

def test_get_transform_document(self):
mapping = {
'db': {
'col': {
'str_to_int': {
'type': 'INT',
'dest': 'str_to_int',
'transform': 'tests.test_mappings.parseInt'
}
}
}
}

doc = {
'str_to_int': '42'
}
got = mappings.get_transformed_document(mapping, 'db', 'col', doc)
self.assertEqual(got, {'str_to_int': 42})

doc = {
'str_to_int': '42a'
}
got = mappings.get_transformed_document(mapping, 'db', 'col', doc)
self.assertEqual(got, {'str_to_int': '42a'})

mapping['db']['col']['str_to_int']['transform'] = 'missing.no'
doc = {
'str_to_int': '42'
}
got = mappings.get_transformed_document(mapping, 'db', 'col', doc)
self.assertEqual(got, {'str_to_int': '42'})


if __name__ == '__main__':
main()
1 change: 0 additions & 1 deletion tests/test_postgresql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ def test_bulk_upsert(self):

self.docmgr.bulk_upsert([doc1, doc2, doc3], 'db.col', now)

print(self.cursor.execute.mock_calls)
self.cursor.execute.assert_has_calls([
call(
"INSERT INTO col_field2 (_creationDate,_id,id_col,subfield1) VALUES (NULL,NULL,1,'subval1')"
Expand Down
20 changes: 18 additions & 2 deletions tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def test_sql_bulk_insert(self):
}

sql.sql_bulk_insert(cursor, mapping, 'db.col', [doc])

cursor.execute.assert_called_with(
"INSERT INTO col (_creationDate,field1,field2_subfield) VALUES (NULL,'val',NULL)"
)
Expand Down Expand Up @@ -161,6 +162,21 @@ def test_sql_bulk_insert_array(self):
])

def test_sql_insert(self):
mapping = {
'db': {
'col': {
'pk': '_id',
'_id': {
'type': 'INT',
'dest': '_id'
},
'field': {
'type': 'TEXT',
'dest': 'field'
}
}
}
}
cursor = MagicMock()
now = datetime.now()

Expand All @@ -169,7 +185,7 @@ def test_sql_insert(self):
doc['_id'] = ObjectId.from_datetime(now)
doc['field'] = 'val'

sql.sql_insert(cursor, 'table', doc, '_id')
sql.sql_insert(cursor, 'table', doc, mapping, 'db', 'col')

doc['_creationDate'] = utils.extract_creation_date(doc, '_id')

Expand All @@ -182,7 +198,7 @@ def test_sql_insert(self):
'field': 'val'
}

sql.sql_insert(cursor, 'table', doc, '_id')
sql.sql_insert(cursor, 'table', doc, mapping, 'db', 'col')

cursor.execute.assert_called_with(
'INSERT INTO table (field) VALUES (%(field)s) ',
Expand Down

0 comments on commit aad770a

Please sign in to comment.