In [None]:
# Introduction
In this notebook, it is demonstrated how data from a database can be inserted into the OpenSLEX metamodel. First, all necessary methods for the insertions are defined. Then the methods are invoked to insert the data. Finally, test queries are written and executed to test the results.

In [None]:
# Imports

In [1]:
import os
import time
import sqlite3
import getpass

# SQLAlchemy imports
from sqlalchemy import create_engine
from sqlalchemy.schema import MetaData
from sqlalchemy.sql import *
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import types

# Define parameters

In [2]:
# define parameters
# source db parameters
project_name = 'mimic_demo'
dialect = 'postgresql'
username = 'postgres'
host = 'localhost'
port = '5432'
database = 'mimic_demo'
schema = 'mimiciii'
dm_name = '{database}.{schema}'.format(database=database, schema=schema)

# OpenSLEX parameters
openslex_script_path = '../OpenSLEX/sql/metamodel.sql'
openslex_file_path = '../OpenSLEX/data/{project_name}.slexmm'.format(project_name=project_name)



In [3]:
password = getpass.getpass()

# Connect to the metamodel

In [4]:
# create a SQLite database file for the OpenSLEX mm and run the script to create all tables
def create_mm(mm_file_path, mm_script_path):
    is_success = False
    
    # check if file already exists
    if (os.path.exists(openslex_file_path)):
        cont = input('OpenSLEX MM already exists. Do you want to remove the existing MM and create a new? [y/n]')
        while cont != 'y':
            if cont == 'n':
                print('OpenSLEX MM not created')
                return
            else:
                cont = input('Input invalid. \nMM already exists. Do you want to remove the existing MM and create a new? [y/n]')
        try:
            print("Removing OpenSLEX MM")
            os.remove(openslex_file_path)
        except Exception as e:
            raise
    
    # if directory doesn't exist, create directory
    openslex_dir = os.path.dirname(openslex_file_path)
    if not os.path.exists(openslex_dir):
        os.makedirs(openslex_dir)
    
    is_connected = False
    
    try:
        print("Opening OpenSLEX MM")
        conn = sqlite3.connect(openslex_file_path)
        is_connected = True
        cursor = conn.cursor()
        
        print("Reading script")
        script_file = open(openslex_script_path)
        script = script_file.read()
        script_file.close()
        
        print("Running script")
        cursor.executescript(script)
        conn.commit()
        conn.close()
        is_connected = False
        print("OpenSLEX MM succesfully created")

        
    except Exception as e:
        if is_connected:
            print("Closing DB")
            conn.close()
            is_connected = False
        raise


In [5]:
# create engine for the OpenSLEX mm using SQLAlchemy
def create_mm_engine(openslex_file_path):
    print("Creating OpenSLEX MM engine")
    mm_url = 'sqlite:///{path}'.format(path=openslex_file_path)
    engine = create_engine(mm_url)
    print("OpenSLEX MM engine created")
    return engine

# Connect to the source database

In [6]:
# create engine for the source database using SQLAlchemy
def create_db_engine(dialect, username, password, host, port, database):
    print("Creating DB engine")
    db_url = '{dialect}://{username}:{password}@{host}:{port}/{database}'.format(
        dialect = dialect,
        username = username,
        password = password,
        host = host,
        port = port,
        database = database
    )
    engine = create_engine(db_url)
    print("DB engine created")
    return engine

# From database to metamodel

## Load the data model into the metamodel

In [7]:
# automap the source database into a SQLAlchemy Base object
def automap_db(db_engine, schema):
    print("Automapping DB")
    Base = automap_base()
    Base.metadata.schema = schema
    Base.prepare(db_engine, reflect=True)
    print("Automap finished")
    return Base, Base.metadata

In [8]:
# reflect the metadata of the OpenSLEX mm into a SQLAlchemy MetaData object
def get_mm_meta(mm_engine):
    print("Obtaining MM metadata")
    mm_meta = MetaData()
    mm_meta.reflect(bind=mm_engine)
    print("MM metadata obtained")
    return mm_meta

In [9]:
# insert values into table t using conn
def insert_values(conn, t, values):
    trans = conn.begin()
    try:
        q = t.insert().values(values)
        res = conn.execute(q)
        trans.commit()
    except:
        trans.rollback()
        raise
    
    return res


In [10]:
# get the data type of a column (integer, string, boolean, numeric, timestamp)
def get_data_type(col):
    if isinstance(col.type, types.Integer):
        return 'integer'
    elif isinstance(col.type, types.String):
        return 'string'
    elif isinstance(col.type, types.Boolean):
        return 'boolean'
    elif isinstance(col.type, types.Numeric):
        return 'numeric'
    elif isinstance(col.type, (types.Date, types.DateTime, types.Time)):
        return 'timestamp'

In [11]:
'''
insert the metadata of the source database (classes, attributes and relationships) into the OpenSLEX mm
returns:
class_map: mapping class_name --> class_id in the OpenSLEX mm
attr_map: mapping (class_name, attribute_name) --> attribute_id in the OpenSLEX mm
rel_map: mapping (class_name, relationship_name) --> relationship_id in the OpenSLEX mm
'''
def insert_metadata(mm_conn, mm_meta, Base, db_meta, dm_name):
    
    class_map = dict()
    attr_map = dict()
    rel_map = dict()
    
    trans = mm_conn.begin()
    try:

        dm_table = mm_meta.tables.get('datamodel')
        dm_values = {'name': dm_name}
        res_ins_dm = insert_values(mm_conn, dm_table, dm_values)
        dm_id = res_ins_dm.inserted_primary_key[0]
        db_classes = Base.classes.keys()
        for c in db_classes:
            class_table = mm_meta.tables.get('class')
            class_values = {'datamodel_id': dm_id, 'name': c}
            res_ins_class = insert_values(mm_conn, class_table, class_values)
            class_id = res_ins_class.inserted_primary_key[0]
            class_map[c] = class_id

            attrs = db_meta.tables.get('{schema}.{c}'.format(schema=db_meta.schema, c=c)).c
            for attr in attrs:
                if get_data_type(attr):
                    attr_table = mm_meta.tables.get('attribute_name')
                    attr_values = {'class_id': class_id, 'name': attr.name, 'type': get_data_type(attr)}
                    res_ins_col = insert_values(mm_conn, attr_table, attr_values)
                    attr_id = res_ins_col.inserted_primary_key[0]
                    attr_map[(c, attr.name)] = attr_id

        for c in db_classes:
            fkcs = db_meta.tables.get('{schema}.{c}'.format(schema=db_meta.schema,c=c)).\
                foreign_key_constraints
            for fkc in fkcs:
                rel_table = mm_meta.tables.get('relationship')
                rel_values = {'source': class_map[c],
                              'target': class_map[fkc.referred_table.name],
                              'name': fkc.name}
                res_ins_rel = insert_values(mm_conn, rel_table, rel_values)
                rel_id = res_ins_rel.inserted_primary_key[0]
                rel_map[(c, fkc.name)] = rel_id

        trans.commit()
        print('transaction committed')
    except:
        trans.rollback()
        print('transaction rolled back')
        raise

    return class_map, attr_map, rel_map

In [12]:
# insert object, object version, object attribute values into the OpenSLEX mm for one object in the source db
def insert_object(mm_conn, obj, source_table, class_name, class_map, attr_map, rel_map, obj_v_map):
    
    trans = mm_conn.begin()
    try:
        # insert into object table
        obj_table = mm_meta.tables.get('object')
        obj_values = {'class_id': class_map[class_name]}
        res_ins_obj = insert_values(mm_conn, obj_table, obj_values)
        obj_id = res_ins_obj.inserted_primary_key[0]
        
        # insert into object_version table
        obj_v_table = mm_meta.tables.get('object_version')
        obj_v_values = {'object_id': obj_id, 'start_timestamp': -2, 'end_timestamp': -1}
        res_ins_obj_v = insert_values(mm_conn, obj_v_table, obj_v_values)
        obj_v_id = res_ins_obj_v.inserted_primary_key[0]
        pk_tuple = tuple(col.name for col in source_table.primary_key.columns)
        pk_values_tuple = tuple(obj[col] for col in pk_tuple)
        obj_v_map[(class_name, pk_tuple, pk_values_tuple)] = obj_v_id
        
        unique_constraints = [uc for uc in source_table.constraints if isinstance(uc, UniqueConstraint)]
        for uc in unique_constraints:
            unique_tuple = tuple(col.name for col in uc)
            unique_values_tuple = tuple(obj[col] for col in unique_tuple)
            obj_v_map[(class_name, unique_tuple, unique_values_tuple)] = obj_v_id
        
        # insert into attribute_value table
        attr_v_table = mm_meta.tables.get('attribute_value')
        
        attr_v_values = [{'object_version_id': obj_v_id,
                          'attribute_name_id': attr_map[(class_name, attr[0])],
                          'value': str(attr[1])
                         } for attr in obj.items() if ((class_name, attr[0] in attr_map.keys()) and attr[1])]
        res_ins_attr_v = insert_values(mm_conn, attr_v_table, attr_v_values)
        
        trans.commit()
    except:
        trans.rollback()
        raise
        

In [13]:
# insert all objects of one class into the OpenSLEX mm
def insert_class_objects(mm_conn, mm_meta, db_conn, db_meta, class_name, class_map, attr_map, rel_map, obj_v_map):
    print("inserting objects for class '{c}'".format(c=class_name))
    t1 = time.time()
    trans = mm_conn.begin()
    try:
        source_table = db_meta.tables.get('{s}.{c}'.format(s=db_meta.schema,c=class_name))
        objs = db_conn.execute(source_table.select())
        for obj in objs:
            insert_object(mm_conn, obj, source_table, class_name, class_map, attr_map, rel_map, obj_v_map)
        trans.commit()
    except:
        trans.rollback()
        raise
    print("objects for class '{c}' inserted".format(c=class_name))
    t2 = time.time()
    time_diff = t2-t1
    print('time elapsed: {time_diff} seconds'.format(time_diff=time_diff))


In [14]:
# insert the relations of one object into the OpenSLEX mm
def insert_object_relations(mm_conn, mm_meta, obj, source_table, class_name, rel_map, obj_v_map):
    
    trans = mm_conn.begin()
    try:
        rel_table = mm_meta.tables.get('relation')
        for fkc in source_table.foreign_key_constraints:
            target_obj_v_params = (
                fkc.referred_table.name,
                tuple(fk.column.name for fk in fkc.elements),
                tuple(obj[col] for col in fkc.columns)
            )
            if target_obj_v_params in obj_v_map.keys():
                target_obj_v_id = obj_v_map[target_obj_v_params]
                source_obj_v_id = obj_v_map[(
                    source_table.name,
                    tuple(col.name for col in source_table.primary_key.columns),
                    tuple(obj[col] for col in source_table.primary_key.columns)
                )]
                rel_value = [{
                    'source_object_version_id': source_obj_v_id,
                    'target_object_version_id': target_obj_v_id,
                    'relationship_id': rel_map[(class_name, fkc.name)],
                    'start_timestamp': -2,
                    'end_timestamp': -1
                }]
                res_ins_rel = insert_values(mm_conn, rel_table, rel_value)
        
        trans.commit()
    except:
        trans.rollback()
        raise

In [15]:
# insert the relations of all objects of one class into the OpenSLEX mm
def insert_class_relations(mm_conn, mm_meta, db_conn, db_meta, class_name, rel_map, obj_v_map):
    print("inserting relations for class '{c}'".format(c=class_name))
    t1 = time.time()
    trans = mm_conn.begin()
    try:
        source_table = db_meta.tables.get('{s}.{c}'.format(s=db_meta.schema,c=class_name))
        objs = db_conn.execute(source_table.select())
        for obj in objs:
            insert_object_relations(mm_conn, mm_meta, obj, source_table, class_name, rel_map, obj_v_map)
        trans.commit()
    except:
        trans.rollback()
        raise
    print("relations for class '{c}' inserted".format(c=class_name))
    t2 = time.time()
    time_diff = t2-t1
    print('time elapsed: {time_diff} seconds'.format(time_diff=time_diff))

In [16]:
# insert the objects of all classes of the source db into the OpenSLEX mm
def insert_objects(mm_conn, mm_meta, db_conn, db_meta, classes, class_map, attr_map, rel_map):
    
    obj_v_map = dict()
    for class_name in classes:
        insert_class_objects(mm_conn, mm_meta, db_conn, db_meta, class_name, 
                             class_map, attr_map, rel_map, obj_v_map)
    
    for class_name in classes:
        insert_class_relations(mm_conn, mm_meta, db_conn, db_meta, class_name, 
                               rel_map, obj_v_map)
    
    return obj_v_map


# Execute it all

In [17]:
# connect to the OpenSLEX mm
try:
    create_mm(openslex_file_path, openslex_script_path)
    mm_engine = create_mm_engine(openslex_file_path)
    db_engine = create_db_engine(dialect, username, password, host, port, database)
    Base, db_meta = automap_db(db_engine, schema)
    mm_meta = get_mm_meta(mm_engine)
except Exception as e:
    print('Something went wrong: {e}'.format(e=e))


Removing OpenSLEX MM
Opening OpenSLEX MM
Reading script
Running script
OpenSLEX MM succesfully created
Creating OpenSLEX MM engine
OpenSLEX MM engine created
Creating DB engine
DB engine created
Automapping DB
Automap finished
Obtaining MM metadata
MM metadata obtained


In [18]:
# insert the source's datamodel into the OpenSLEX mm
t1 = time.time()
mm_conn = mm_engine.connect()
print('connection opened')
try:
    class_map, attr_map, rel_map = insert_metadata(mm_conn, mm_meta, Base, db_meta, dm_name)
except Exception as e:
    print('Exception: {e}'.format(e=e))
mm_conn.close()
print('connection closed')
t2 = time.time()
time_diff = t2-t1
print('total time elapsed: {time_diff} seconds'.format(time_diff=time_diff))

connection opened
transaction committed
connection closed
total time elapsed: 0.2579987049102783 seconds


In [19]:
# insert objects into the OpenSLEX mm
t1 = time.time()
mm_conn = mm_engine.connect()
db_conn = db_engine.connect()
print('connections opened')
try:
#     classes = Base.classes.keys() # use this if you want to insert objects of all classes
    classes = ['patients', 'admissions', 'microbiologyevents', 'd_items'] # use this to specify a subset of the classes
    obj_v_map = insert_objects(mm_conn, mm_meta, db_conn, db_meta, classes, class_map, attr_map, rel_map)
except Exception as e:
    print('Exception: {e}'.format(e=e))
mm_conn.close()
db_conn.close()
print('connections closed')
t2 = time.time()
time_diff = t2-t1
print('total time elapsed: {time_diff} seconds'.format(time_diff=time_diff))

connections opened
inserting objects for class 'patients'
objects for class 'patients' inserted
time elapsed: 0.2395002841949463 seconds
inserting objects for class 'admissions'
objects for class 'admissions' inserted
time elapsed: 0.4485034942626953 seconds
inserting objects for class 'microbiologyevents'
objects for class 'microbiologyevents' inserted
time elapsed: 3.2560007572174072 seconds
inserting objects for class 'd_items'
objects for class 'd_items' inserted
time elapsed: 14.64276671409607 seconds
inserting relations for class 'patients'
relations for class 'patients' inserted
time elapsed: 0.0029997825622558594 seconds
inserting relations for class 'admissions'
relations for class 'admissions' inserted
time elapsed: 0.12800192832946777 seconds
inserting relations for class 'microbiologyevents'
relations for class 'microbiologyevents' inserted
time elapsed: 3.1095240116119385 seconds
inserting relations for class 'd_items'
relations for class 'd_items' inserted
time elapsed: 0

# Test
For all microbiology events, the patient IDs, hospital admission IDs, microbiology event IDs, item IDs and item labels are selected from the OpenSLEX metamodel and from the source database. To do this, four tables are joined: 'patients', 'admissions', 'microbiologyevents' and 'd_items'. The results are compared to test if they are equal.

In [20]:
# define mm classes
class_table = mm_meta.tables.get('class')
attr_table = mm_meta.tables.get('attribute_name')
rels_table = mm_meta.tables.get('relationship').alias()
obj_table = mm_meta.tables.get('object')
obj_v_table = mm_meta.tables.get('object_version')
attr_v_table = mm_meta.tables.get('attribute_value')
rel_table = mm_meta.tables.get('relation')

In [21]:
# define mm class aliases
micro_obj_v = obj_v_table.alias()
micro_obj = obj_table.alias()
micro_class = class_table.alias()
micro_row_id_val = attr_v_table.alias()
micro_row_id_name = attr_table.alias()
micro_itemid_val = attr_v_table.alias()
micro_itemid_name = attr_table.alias()
micro_adm_rel = rel_table.alias()
micro_adm_rels = rels_table.alias()
adm_obj_v = obj_v_table.alias()
adm_id_val = attr_v_table.alias()
adm_id_name = attr_table.alias()
micro_pat_rel = rel_table.alias()
micro_pat_rels = rels_table.alias()
pat_obj_v = obj_v_table.alias()
pat_id_val = attr_v_table.alias()
pat_id_name = attr_table.alias()
micro_item_rel = rel_table.alias()
micro_item_rels = rels_table.alias()
item_obj_v = obj_v_table.alias()
item_id_val = attr_v_table.alias()
item_id_name = attr_table.alias()
item_label_val = attr_v_table.alias()
item_label_name = attr_table.alias()

In [22]:
# query for selecting data from mm
mm_q = select([
            pat_id_val.c.value, 
            adm_id_val.c.value, 
            micro_row_id_val.c.value,
            item_id_val.c.value,
            item_label_val.c.value,
           ])\
    .select_from(micro_obj_v.join(micro_obj, micro_obj_v.c.object_id==micro_obj.c.id)\
                 .join(micro_class, micro_obj.c.class_id==micro_class.c.id)\
                 .join(micro_row_id_val, micro_obj_v.c.id==micro_row_id_val.c.object_version_id)\
                 .join(micro_row_id_name, micro_row_id_val.c.attribute_name_id==micro_row_id_name.c.id)\
                 .join(micro_itemid_val, micro_obj_v.c.id==micro_itemid_val.c.object_version_id)\
                 .join(micro_itemid_name, micro_itemid_val.c.attribute_name_id==micro_itemid_name.c.id)\
                 .join(micro_adm_rel, micro_obj_v.c.id==micro_adm_rel.c.source_object_version_id)\
                 .join(micro_adm_rels, micro_adm_rel.c.relationship_id==micro_adm_rels.c.id)\
                 .join(adm_obj_v, micro_adm_rel.c.target_object_version_id==adm_obj_v.c.id)\
                 .join(adm_id_val, adm_obj_v.c.id==adm_id_val.c.object_version_id)\
                 .join(adm_id_name, adm_id_val.c.attribute_name_id==adm_id_name.c.id)\
                 .join(micro_pat_rel, micro_obj_v.c.id==micro_pat_rel.c.source_object_version_id)\
                 .join(micro_pat_rels, micro_pat_rel.c.relationship_id==micro_pat_rels.c.id)\
                 .join(pat_obj_v, micro_pat_rel.c.target_object_version_id==pat_obj_v.c.id)\
                 .join(pat_id_val, pat_obj_v.c.id==pat_id_val.c.object_version_id)\
                 .join(pat_id_name, pat_id_val.c.attribute_name_id==pat_id_name.c.id)\
                 .join(micro_item_rel, micro_obj_v.c.id==micro_item_rel.c.source_object_version_id)\
                 .join(micro_item_rels, micro_item_rel.c.relationship_id==micro_item_rels.c.id)\
                 .join(item_obj_v, micro_item_rel.c.target_object_version_id==item_obj_v.c.id)\
                 .join(item_id_val, item_obj_v.c.id==item_id_val.c.object_version_id)\
                 .join(item_id_name, item_id_val.c.attribute_name_id==item_id_name.c.id)\
                 .join(item_label_val, item_obj_v.c.id==item_label_val.c.object_version_id)\
                 .join(item_label_name, item_label_val.c.attribute_name_id==item_label_name.c.id)\
                )\
    .where(and_(micro_class.c.name=='microbiologyevents',
                micro_row_id_name.c.name=='row_id',
                micro_itemid_name.c.name=='spec_itemid',
                micro_adm_rels.c.name=='microbiologyevents_fk_hadm_id',
                adm_id_name.c.name=='hadm_id',
                micro_pat_rels.c.name=='microbiologyevents_fk_subject_id',
                pat_id_name.c.name=='subject_id',
                micro_item_rels.c.name=='microbiologyevents_fk_spec_itemid',
                item_id_name.c.name=='itemid',
                item_label_name.c.name=='label',
               ))\
    .order_by(
             pat_id_val.c.value, 
             adm_id_val.c.value, 
             micro_row_id_val.c.value,
            )
    

In [23]:
# execute query
mm_conn = mm_engine.connect()
mm_res = mm_conn.execute(mm_q).fetchall()
mm_conn.close()

In [24]:
len(mm_res)

2003

In [25]:
mm_res[0:10]

[('10006', '142345', '134694', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134695', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134696', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134697', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134698', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134699', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134700', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134701', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134702', '70012', 'BLOOD CULTURE'),
 ('10006', '142345', '134703', '70079', 'URINE')]

In [26]:
# define db tables
pat_table = db_meta.tables.get('mimiciii.patients')
adm_table = db_meta.tables.get('mimiciii.admissions')
micro_table = db_meta.tables.get('mimiciii.microbiologyevents')
item_table = db_meta.tables.get('mimiciii.d_items')

In [27]:
# query for selecting data from original database
db_q = select([
    pat_table.c.subject_id,
    adm_table.c.hadm_id,
    micro_table.c.row_id,
    item_table.c.itemid,
    item_table.c.label,
]).select_from(
    micro_table\
    .join(adm_table, micro_table.c.hadm_id==adm_table.c.hadm_id)\
    .join(pat_table, micro_table.c.subject_id==pat_table.c.subject_id)\
    .join(item_table, micro_table.c.spec_itemid==item_table.c.itemid)
).order_by(pat_table.c.subject_id,
    adm_table.c.hadm_id,
    micro_table.c.row_id,)

In [28]:
# execute query
db_conn = db_engine.connect()
db_res = db_conn.execute(db_q).fetchall()
db_conn.close()

In [29]:
len(db_res)

2003

In [30]:
db_res[0:10]

[(10006, 142345, 134694, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134695, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134696, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134697, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134698, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134699, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134700, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134701, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134702, 70012, 'BLOOD CULTURE'),
 (10006, 142345, 134703, 70079, 'URINE')]

In [31]:
# Compare results
mm_res == db_res

False

In [32]:
# Results not the same because data types are not the same. Integers in source database are strings in mm. 
# First convert integers to strings and then compare the results.
db_res_str = []
for t in db_res:
    db_res_str.append(tuple(str(x) for x in t))

In [33]:
db_res_str == mm_res

True