In [46]:
from IPython.display import display
from collections import Counter
import mysql.connector
from mysql.connector import Error
import csv
import re
from neo4j import GraphDatabase
import pandas as pd
import numpy as np
from pandas.errors import ParserError

import math
import warnings
warnings.filterwarnings('ignore') # setting ignore as a parameter


In [47]:
host = 'localhost'
mysql_schema = 'sakila'
mysql_user = 'root'
mysql_password = '<put password here>'
neo_uri=f'bolt://{host}:7687'
neo_user='neo4j'
neo_pass='<put password here>'

In [48]:
class Neo4jDB:
    
    def __init__(self, uri, user, pwd=''):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
            print("Driver has been initialized!")
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response
    
neoConn = Neo4jDB(uri=neo_uri, user=neo_user, pwd=neo_pass)

Driver has been initialized!


In [49]:
try:
    connection = mysql.connector.connect(host=host, database=mysql_schema, user=mysql_user, password=mysql_password)
    
    if connection.is_connected():
        db_Info = connection.get_server_info()
        print("Connected to MySQL Server version ", db_Info)
        cursor = connection.cursor()
        cursor.execute("select database();")
        record = cursor.fetchone()
        print("You're connected to database: ", record)

except Error as e:
    print("Error while connecting to MySQL", e)

Connected to MySQL Server version  8.0.29
You're connected to database:  ('sakila',)


In [50]:
def _clear_all(commit):
    if commit:
        query='''MATCH (n)
        DETACH DELETE n
        RETURN count(*) AS total'''
        return neoConn.query(query)

In [51]:
def get_dict(tables, keys):
    ref = {}
    for i in range(len(tables)):
        table = tables[i]
        if ref.get(table):
            ref[table] += ','+str(keys[i])
        else:
            ref[table] = str(keys[i])
    
    return ref

In [52]:
def populate_obj(name, df, excludes):
    header = list(df.columns)
    
    obj = '('+name.lower()+':'+name.capitalize()+'{'
    first = True
    for h in header:
        if h not in excludes:
            if first:
                first = False
            else:
                obj +=','
            obj += h+': row.'+h
        
    obj +='})'
    return obj

In [53]:
def _read_csv(table, s, e):
    file = 'db.localhost/{}/{}.csv'.format(mysql_schema, table)
    #print("file: {}".format(file))
    return pd.read_csv (file, sep = s, encoding = e)  

In [54]:
def _get_ref_node(table, ref_table):
    #print("table:{}, ref_table:{}".format(table, ref_table))
    rtn = ref_table.lower()
    if table and ref_table:
        if table.strip() == ref_table.strip():
            return "{}_".format(ref_table.lower())
            
    return rtn

In [55]:
def generate_single_graph(m, df_logs, commit):
    st = time.time()
    table = m.get('table')    
    print("\n\n===> Processing records for Table '{}'".format(table))
    if not commit:
        print(m)    

    records = _read_csv(table, ';', 'utf-8')

    query = '''UNWIND $rows AS row \nCREATE ''' + populate_obj(table.lower(), records, []) + '''\nRETURN count(*) as total'''

    df_logs = _add_row(df_logs, table, ref_tables='', no_records=len(records), no_rels=0, no_nodes=1)
    
    if not commit:
        print(query)
    else:
        neoConn.query(query, parameters = {'rows': records.to_dict('records')})
        
    return df_logs

In [56]:
def generate_joined_graph(m, s, df_logs, commit):
    table = m.get('table')
    print("\n\n===> Processing records for Table '{}'".format(table))
    if not commit:
        print(m)    

    records = _read_csv(table, ';', 'utf-8')

    pri_key = m.get('pri_key').split(',')    
    for_keys = m.get('for_keys').split(',')    
    # print(for_keys)
    ref_tables = m.get('ref_tables').split(',')  
    # print(ref_tables)
    ref_keys = m.get('ref_keys').split(',')    
    # print(ref_keys)
    ref = get_dict(ref_tables, ref_keys)
    # print(ref)
    
    #if sorted(pri_key) == sorted(for_keys):
    #    query = '''UNWIND $rows AS row\nCREATE ''' + populate_obj(table.lower(), records, []) +'''\n'''
    #else:
    #    query = '''UNWIND $rows AS row\nCREATE ''' + populate_obj(table.lower(), records, for_keys) +'''\n'''    
    #include ids
    query = '''UNWIND $rows AS row\nCREATE ''' + populate_obj(table.lower(), records, []) +'''\n'''
    
    start = 0
    end = 0
    no_edges = 0
    for ref_table, ids in ref.items():
        
        ref_ids = ids.split(',')
        # print(ref_table)
        # print(ref_ids)

        end = start+len(ref_ids)
        fk = for_keys[start:end]

        #skip, if it is a cyclic table
        if s and ref_table in s:
            print("********** Table '{}' is a cyclic referenced by Table '{}'..skipped it *********".format(ref_table, table))
            start = end
            continue
        
        print("Processing reference Table '{}' ...".format(ref_table))
        query += '''\nWITH distinct row, ''' + table.lower()
        
        no_edges += 1
        
        prev_fk = None
        duplicate = False
        uwind = {}
        for i in range(len(fk)):
            if (prev_fk == ref_ids[i]):
                uwind[i] = '''\nUNWIND row.''' + fk[i] + ''' AS _''' + fk[i]
            else:
                query += '''\nUNWIND row.''' + fk[i] + ''' AS _''' + fk[i]
            prev_fk = ref_ids[i]

        if len(uwind) > 0:
            for i in range(len(fk)):
                if i>0:
                    query += '''\nWITH distinct row, ''' + table.lower()
                    query += uwind.get(i)
                ref = _get_ref_node(table, ref_table)
                query += '''\nOPTIONAL MATCH (''' +ref+ ''':'''+ ref_table.capitalize() +''' {'''
                query += ref_ids[i] + ''': _'''+fk[i]
                query += '''})\nMERGE ('''+table.lower()+''')-[:'''+ table.upper() + '''_''' + ref_table.upper() +''']->('''+ref+''')'''
        else:
            ref = _get_ref_node(table, ref_table)
            query += '''\nOPTIONAL MATCH (''' + ref + ''':'''+ ref_table.capitalize() +''' {'''
            first = True
            for i in range(len(fk)):
                if not first:
                    query += ''','''
                else:
                    first = False
                query += ref_ids[i] + ''': _'''+fk[i]
            query += '''})\nMERGE ('''+table.lower()+''')-[:'''+ table.upper() + '''_''' + ref_table.upper() +''']->('''+ref+''')'''

        start = end

    query += '''\nRETURN count(*) AS total'''

    df_logs = _add_row(df_logs, table, ref_tables=ref_tables, no_records=len(records), no_rels=len(ref_tables), no_nodes=1, no_edges=no_edges)
    
    if not commit:
        print(query)
    else:
        neoConn.query(query, parameters = {'rows': records.to_dict('records')})    
        
    return df_logs

In [57]:
def generate_edges(m, r, df_logs, commit):
    table = m.get('table')
    print("\n\n===> Creating additional Edges between Table '{}' and Table(s) '{}' ".format(table, r))
    if not commit:
        print(m)
        print(r)

    records = _read_csv(table, ';', 'utf-8')

    pri_key = m.get('pri_key').split(',')    
    for_keys = m.get('for_keys').split(',')    
    # print(for_keys)
    ref_tables = m.get('ref_tables').split(',')  
    # print(ref_tables)
    ref_keys = m.get('ref_keys').split(',')    
    # print(ref_keys)
    ref = get_dict(ref_tables, ref_keys)
    # print(ref)
    
    query = '''UNWIND $rows AS row'''
    for prim in pri_key:
        query += '''\nUNWIND row.''' + prim + ''' AS _''' + prim
    
    query += '''\nMATCH (''' + table.lower() + ''':'''+ table.capitalize() +''' {'''
    first = True
    for i in range(len(pri_key)):
        if not first:
            query += ''','''
        else:
            first = False
        query += pri_key[i] + ''': _'''+pri_key[i]
    query += '''})'''

    start = 0
    end = 0
    no_edges = 0
    for ref_table, ids in ref.items():
        
        ref_ids = ids.split(',')
        #print(ref_table)
        #print(ref_ids)

        end = start+len(ref_ids)
        fk = for_keys[start:end]
        
        if ref_table not in r:
            print('********** Skipping table {} **********'.format(ref_table))
            start = end
            continue
                
        print("Creating an Edge from Table '{}' --> Table '{}' ...".format(table, ref_table))
        query += '''\nWITH distinct row, ''' + table.lower()
        
        no_edges += 1
        
        prev_fk = None
        duplicate = False
        uwind = {}
        for i in range(len(fk)):
            if (prev_fk == ref_ids[i]):
                uwind[i] = '''\nUNWIND row.''' + fk[i] + ''' AS _''' + fk[i]
            else:
                query += '''\nUNWIND row.''' + fk[i] + ''' AS _''' + fk[i]
            prev_fk = ref_ids[i]

        if len(uwind) > 0:
            for i in range(len(fk)):
                if i>0:
                    query += '''\nWITH distinct row, ''' + table.lower()
                    query += uwind.get(i)
                ref = _get_ref_node(table, ref_table)
                query += '''\nOPTIONAL MATCH (''' + ref + ''':'''+ ref_table.capitalize() +''' {'''
                query += ref_ids[i] + ''': _'''+fk[i]
                query += '''})\nMERGE ('''+table.lower()+''')-[:'''+ table.upper() + '''_''' + ref_table.upper() +''']->('''+ref+''')'''
        else:
            ref = _get_ref_node(table, ref_table)
            query += '''\nOPTIONAL MATCH (''' + ref + ''':'''+ ref_table.capitalize() +''' {'''
            first = True
            for i in range(len(fk)):
                if not first:
                    query += ''','''
                else:
                    first = False
                query += ref_ids[i] + ''': _'''+fk[i]
            query += '''})\nMERGE ('''+table.lower()+''')-[:'''+ table.upper() + '''_''' + ref_table.upper() +''']->('''+ref+''')'''

        start = end

    query += '''\nRETURN count(*) AS total'''

    df_logs = _add_row(df_logs, table, ref_tables=ref_tables, no_records=len(records), no_rels=len(ref_tables), no_edges=no_edges)
        
    if not commit:
        print(query)
    else:
        neoConn.query(query, parameters = {'rows': records.to_dict('records')})    
        
    return df_logs

In [58]:
def get_ddl(c1, table):
    q2 = "SHOW CREATE TABLE %s;" % table
    c1.execute(q2)
    result = c1.fetchone()
    return list(result.values())[1]

In [76]:
def get_metadata(c1, table):
    ddl = get_ddl(c1, table)
    
    prog = re.compile('CREATE TABLE.*?`(.*).*?`\s\(', re.IGNORECASE)
    table = prog.findall(ddl)[0]

    prog = re.compile('PRIMARY KEY\s\S(.*)(?=\))', re.IGNORECASE)
    pri_key = prog.findall(ddl)[0].replace('`', '')

    prog = re.compile('FOREIGN KEY.*?\(`(.*).*?`\)\sREFERENCES', re.IGNORECASE)
    for_keys = prog.findall(ddl)
    for_keys = ",".join(for_keys)

    prog = re.compile('REFERENCES.*?`(.*).*?`\s', re.IGNORECASE)
    ref_tables = prog.findall(ddl)
    ref_tables = ",".join(ref_tables)

    prog = re.compile('REFERENCES.*?\(`(.*).*?`\)', re.IGNORECASE)
    ref_keys = prog.findall(ddl)
    ref_keys = ",".join(ref_keys)
    
    print(f'table:[{table}],pri_key:[{pri_key}],for_keys:[{for_keys}],ref_tables:[{ref_tables}],ref_keys:[{ref_keys}]')
    
    return {'table':table, 'pri_key':pri_key, 'for_keys': for_keys, 'ref_tables':ref_tables, 'ref_keys':ref_keys}

In [60]:
def _compute_out(data):
    return len(data.split(','))        

def _compute_in(key, unsorted_data):
    counter = 0
    for k, v in unsorted_data.items():
        if k != key and key in v:
            dependents = v.split(',')
            for d in dependents:
                if d.strip() == key:
                    counter += 1
    return counter
    
def _score_data(unsorted_data):
    scores = {}
    for key, data in unsorted_data.items():
        out_elms = 0
        in_elms = 0
        if data.strip() == '':
            scores[key] = 0
        else:            
            #compute: score = no.outgoung / no.incoming
            out_elms = _compute_out(data)
            in_elms = _compute_in(key, unsorted_data)
            scores[key] = np.inf if in_elms == 0 else (out_elms/in_elms)
        #print ("Key [{}], Out[{}], In[{}], Score[{}]".format(key, out_elms, in_elms, scores.get(key)))
    
    scores = dict(sorted(scores.items(), key=lambda item: item[1]))
    #print(scores)
    return scores
    
def _remove_best(unsorted_data, key, sorted_data, relations):
    for k, v in unsorted_data.items():
        if key in v:
            deps = v.split(',')
            dependents = deps.copy()
            for d in deps:
                if d.strip() == key:
                    dependents.remove(d)

                    #add the relation between this entry's key and the 'key', if record exists then append to it
                    rels = relations.get(k)
                    if rels:
                        relations[k] = rels + ',' + key
                    else:
                        relations[k] = key
            
            #dependents = list(filter(lambda d: d.strip() != key, dependents))
            unsorted_data[k] = ','.join(dependents)
            
    unsorted_data.pop(key, None)
    
def _remove_cyclic(unsorted_data, key, sorted_data, relations, cyclics):    
    #1) add the 'key' as one of the cyclic dependencies
    cyclics[key] = unsorted_data.get(key)
    #2) remove from those depend on the 'key', record their relations to it, and finally remove the 'key'
    _remove_best(unsorted_data, key, sorted_data, relations)
    #3) finally, add the 'key' as one of the sorted keys
    sorted_data.append(key)
    
def _sort(unsorted_data, sorted_data, relations, cyclics):
    scores = _score_data(unsorted_data)
        
    best_key = list(scores.keys())[0]
    best_val = list(scores.values())[0]
    if best_val == 0:
        #remove best[0] from unsorted_data
        _remove_best(unsorted_data, best_key, sorted_data, relations)
        sorted_data.append(best_key)
    else:
        print('CYCLIC detected!')
        _remove_cyclic(unsorted_data, best_key, sorted_data, relations, cyclics)
    
    return unsorted_data

In [68]:
def _add_row(df_logs, table, **kwargs):
    idx = df_logs.index[df_logs['table']==table]
    if len(idx):
        idx = idx[0]
        for key, value in kwargs.items():
            if isinstance(value, list):   
                value = ','.join(value)
            if key not in (['no_records', 'ref_tables']):
                if isinstance(value, int):
                    df_logs.at[idx, key] = value
                else:
                    df_logs.at[idx, key] = df_logs.iloc[idx][key] + ',' + value
            else:
                df_logs.at[idx, key] = value
    else:
        df_logs = df_logs.append({'table':table}, ignore_index=True)
    
    return df_logs

In [62]:
def _populate_graph(df_logs, metas, sorted_data, relations, cyclics, commit):
    for data in sorted_data:
        meta = metas[data]
        table = meta['table']
        df_logs = _add_row(df_logs, table)
        rels = relations.get(table)
        skip = cyclics.get(table)
        if rels:
            #joined
            df_logs = generate_joined_graph(meta, skip, df_logs, commit)
        else:
            #single
            df_logs = generate_single_graph(meta, df_logs, commit)
    
    #create relation for cyclic references
    for table, refs in cyclics.items():
        meta = metas[table]
        df_logs = generate_edges(meta, refs, df_logs, commit)
        
    return df_logs

In [63]:
def _print(tables, metas, sorted_data, relations, cyclics):
    
    df_print1 = pd.DataFrame(columns=['table', 'ref_tables'])
    sep = '---------------'
    for table in tables:
        m = metas[table]
        ref_tables = m.get('ref_tables')
        rels = ref_tables.split(',')
        if ref_tables == '':
            rels = None            
        df_print1 = df_print1.append({'table':table, 'ref_tables':rels}, ignore_index=True)    
    df_print1 = df_print1.append({'table':sep, 'ref_tables':sep}, ignore_index=True)
    
    df_print2 = pd.DataFrame(columns=['table', 'ref_tables'])
    
    for data in sorted_data:
        meta = metas[data]
        table = meta['table']
        rs = relations.get(table)
        rels = None
        if rs:
            rels = rs.split(',')
        cs = cyclics.get(table)
        skip = None
        if cs:
            skip = cs.split(',')
        if skip and rels:
            rels = [i for i in rels if i not in skip]
        df_print2 = df_print2.append({'table':table, 'ref_tables':rels}, ignore_index=True)
    df_print2 = df_print2.append({'table':sep, 'ref_tables':sep}, ignore_index=True)
        
    df_print3 = pd.DataFrame(columns=['table', 'ref_tables'])
    for table, refs in cyclics.items():
        df_print3 = df_print3.append({'table':table, 'ref_tables':refs}, ignore_index=True)
    df_print3 = df_print3.append({'table':sep, 'ref_tables':sep}, ignore_index=True)
    
    df_print = df_print1.append(df_print2).append(df_print3)
    display(df_print)

In [64]:
def _generate_graph(df_logs, commit):
    q1 = ("SHOW TABLES FROM " + mysql_schema)
    c1 = connection.cursor(dictionary=True, buffered=True)
    c1.execute(q1)

    table_list = c1.fetchall()
    tables = []
    metas = {}
    unsorted_data = {}
    for entry in table_list:
        _, table = entry.popitem()
        tables.append(table)
        meta = get_metadata(c1, table)
        metas[table]=meta        
        unsorted_data[meta.get('table')] = meta.get('ref_tables')
    
    sorted_data = []
    relations = {}
    cyclics = {}
    half_sort = dict(sorted(unsorted_data.items(), key=lambda item: item[1]))
    while True:
        half_sort = _sort(half_sort, sorted_data, relations, cyclics)
        if (len(half_sort) == 0):
            break

    _print(tables, metas, sorted_data, relations, cyclics)
    
    return _populate_graph(df_logs, metas, sorted_data, relations, cyclics, commit)

In [74]:
def cek_graph(df_logs):
    for table in df_logs['table']:
        query = f'MATCH (n:{table.capitalize()}) RETURN count(n) as total'
        total = 0
        idx = df_logs.index[df_logs['table']==table][0]

        total = neoConn.query(query)
        df_logs.at[idx,'no_nodes'] = total[0][0]
        
        rt = df_logs.iloc[idx]['ref_tables']
        if len(rt)>0:
            ref_tables = rt.split(',')
            ref_tables = list(dict.fromkeys(ref_tables))
            total = 0
            for rt in ref_tables:
                key = f'{table}_{rt}'.upper()
                query = f'MATCH p=()-[r:{key}]->() RETURN count(p) as total'
                tot = neoConn.query(query)[0][0]
                total = total + tot
            df_logs.at[idx,'no_edges'] = total
            
    df_logs.loc['total']= df_logs.sum()
    df_logs.loc[df_logs.index[-1], 'table'] = ''
    df_logs.loc[df_logs.index[-1], 'ref_tables'] = ''

In [77]:
import time
commit = False

start_time = time.time()

_clear_all(commit)

elapsed_time = time.time() - start_time
print('Execution time:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

start_time = time.time()

_sorted = []

df_logs = pd.DataFrame(columns=['table', 'ref_tables', 'no_records', 'no_rels', 'no_nodes', 'no_edges'])
df_logs = _generate_graph(df_logs, commit)

cek_graph(df_logs)

elapsed_time = time.time() - start_time
print('Execution time:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
df_logs

Execution time: 00:00:00
table:[actor],pri_key:[actor_id],for_keys:[],ref_tables:[],ref_keys:[]
table:[address],pri_key:[address_id],for_keys:[city_id],ref_tables:[city],ref_keys:[city_id]
table:[category],pri_key:[category_id],for_keys:[],ref_tables:[],ref_keys:[]
table:[city],pri_key:[city_id],for_keys:[country_id],ref_tables:[country],ref_keys:[country_id]
table:[country],pri_key:[country_id],for_keys:[],ref_tables:[],ref_keys:[]
table:[customer],pri_key:[customer_id],for_keys:[address_id,store_id],ref_tables:[address,store],ref_keys:[address_id,store_id]
table:[film],pri_key:[film_id],for_keys:[language_id,original_language_id],ref_tables:[language,language],ref_keys:[language_id,language_id]
table:[film_actor],pri_key:[actor_id,film_id],for_keys:[actor_id,film_id],ref_tables:[actor,film],ref_keys:[actor_id,film_id]
table:[film_category],pri_key:[film_id,category_id],for_keys:[category_id,film_id],ref_tables:[category,film],ref_keys:[category_id,film_id]
table:[film_text],pri_key:[

Unnamed: 0,table,ref_tables
0,actor,
1,address,[city]
2,category,
3,city,[country]
4,country,
5,customer,"[address, store]"
6,film,"[language, language]"
7,film_actor,"[actor, film]"
8,film_category,"[category, film]"
9,film_text,




===> Processing records for Table 'actor'
{'table': 'actor', 'pri_key': 'actor_id', 'for_keys': '', 'ref_tables': '', 'ref_keys': ''}
UNWIND $rows AS row 
CREATE (actor:Actor{actor_id: row.actor_id,first_name: row.first_name,last_name: row.last_name,last_update: row.last_update})
RETURN count(*) as total


===> Processing records for Table 'category'
{'table': 'category', 'pri_key': 'category_id', 'for_keys': '', 'ref_tables': '', 'ref_keys': ''}
UNWIND $rows AS row 
CREATE (category:Category{category_id: row.category_id,name: row.name,last_update: row.last_update})
RETURN count(*) as total


===> Processing records for Table 'country'
{'table': 'country', 'pri_key': 'country_id', 'for_keys': '', 'ref_tables': '', 'ref_keys': ''}
UNWIND $rows AS row 
CREATE (country:Country{country_id: row.country_id,country: row.country,last_update: row.last_update})
RETURN count(*) as total


===> Processing records for Table 'film_text'
{'table': 'film_text', 'pri_key': 'film_id', 'for_keys': '', 



===> Processing records for Table 'rental'
{'table': 'rental', 'pri_key': 'rental_id', 'for_keys': 'customer_id,inventory_id,staff_id', 'ref_tables': 'customer,inventory,staff', 'ref_keys': 'customer_id,inventory_id,staff_id'}
Processing reference Table 'customer' ...
Processing reference Table 'inventory' ...
Processing reference Table 'staff' ...
UNWIND $rows AS row
CREATE (rental:Rental{rental_id: row.rental_id,rental_date: row.rental_date,inventory_id: row.inventory_id,customer_id: row.customer_id,return_date: row.return_date,staff_id: row.staff_id,last_update: row.last_update})

WITH distinct row, rental
UNWIND row.customer_id AS _customer_id
OPTIONAL MATCH (customer:Customer {customer_id: _customer_id})
MERGE (rental)-[:RENTAL_CUSTOMER]->(customer)
WITH distinct row, rental
UNWIND row.inventory_id AS _inventory_id
OPTIONAL MATCH (inventory:Inventory {inventory_id: _inventory_id})
MERGE (rental)-[:RENTAL_INVENTORY]->(inventory)
WITH distinct row, rental
UNWIND row.staff_id AS _s

Unnamed: 0,table,ref_tables,no_records,no_rels,no_nodes,no_edges
0,actor,,200,0,200,
1,category,,16,0,16,
2,country,,109,0,109,
3,film_text,,1000,0,1000,
4,language,,6,0,6,
5,city,country,600,1,600,600.0
6,address,city,603,1,603,603.0
7,film,"language,language",1000,2,1000,1000.0
8,film_actor,"actor,film",5462,2,5462,10924.0
9,film_category,"category,film",1000,2,1000,2000.0
