##### Loading packages

In [1]:
import pandas as pd
from pymongo import MongoClient
import sqlalchemy
from py2neo import Graph, Node, Relationship
from sqlalchemy import MetaData
import numpy as np
from scipy import stats
#from sklearn.metrics import jaccard_similarity_score
import jellyfish
from time import time, ctime
from joblib import Parallel, delayed
import multiprocessing


In [2]:
start = time()

In [3]:
print("STARTED : "+ctime(start) )

STARTED : Tue Feb  9 10:53:47 2021


##### Connection to SQLite

In [4]:
db_uri = 'sqlite:////home/ubuntu/d3l/aura_pmi.db'
#global engine 
engine = sqlalchemy.create_engine(db_uri, echo = False)
connection = engine.connect()

##### Connection to Neo4j

In [5]:
#global graph 
graph = Graph("bolt://:7687", auth=("neo4j", ""))

In [6]:
res = graph.run('MATCH ()-[t:COL_SIMILARITY]-() RETURN t')
if (len(res.data())>0):
    graph.run('MATCH ()-[t:COL_SIMILARITY]-() DELETE t')
res = graph.run('MATCH ()-[t:PK_FK_LINK]-() RETURN t')
if(len(res.data())>0):
    graph.run('MATCH ()-[t:PK_FK_LINK]-() DELETE t')

In [7]:
def load_table_string_columns(table_id, neo_conn):
    
    res = neo_conn.run('''MATCH (t:Object:Table {identifier:"'''+table_id+'''"})<-[c:CONTAINS]-(col:Column) 
    WHERE col.type="STRING" RETURN ID(col) AS id, col.name AS name, col.uniqueness AS uniqueness''')
    result = []
    #for item in res.data():
    #    result.append(item)
    return res.data()

In [8]:
def compute_jaccard(vals1, vals2):
    #print(len(vals1))
    #print(len(vals2))
    intersection = len(np.intersect1d(vals1, vals2))
    union = float(len(np.union1d(vals1, vals2)))
    jaccard = intersection/union
    return jaccard

##### Preprocessing

##### --> Load list of all Tables

In [9]:
res = graph.run('MATCH (t:Object:Table) RETURN t.title AS title, t.identifier AS identifier') 
all_tables = [] 
all_table_ids = []
for item in res.data():
    all_tables.append(item['title'])
    all_table_ids.append(item['identifier'])
print("all_table_ids ", len(all_table_ids))

all_table_ids  250


In [10]:
all_table_cols = []
for item in all_table_ids:
    all_table_cols.append(load_table_string_columns(item, graph))
print("all_table_cols ", len(all_table_cols))

all_table_cols  250


res = graph.run('''MATCH (t:Object:Table)<-[co:CONTAINS]-(c:Column) 
        WHERE c.type="STRING" and  c.uniqueness<1 
        RETURN ID(c) AS c_id, c.name AS c_name ,
         t.title AS t_title, t.identifier AS t_identifier''') 
all_columns = res.data()
print("all_columns ", len(all_columns))

##### --> Load list of tables that contain probable primary key

In [11]:
res = graph.run('MATCH (t:Object:Table)<-[co:CONTAINS]-(c:Column) \
                WHERE c.type="STRING" AND c.uniqueness=1 \
                RETURN DISTINCT t.title AS title, t.identifier AS identifier')
prim_tables = []
prim_table_ids = []
for item in res.data():
    prim_tables.append(item['title'])
    prim_table_ids.append(item['identifier'])
print("prim_table_ids ", len(prim_table_ids))

prim_table_ids  50


In [12]:
prim_table_cols = []
for item in prim_table_ids:
    prim_table_cols.append(load_table_string_columns(item, graph))
print("prim_table_cols ", len(prim_table_cols))

prim_table_cols  50


res = graph.run('''MATCH (t:Object:Table)<-[co:CONTAINS]-(c:Column) 
        WHERE c.type="STRING" and  c.uniqueness=1 
        RETURN ID(c) AS c_id, c.name AS c_name ,
         t.title AS t_title, t.identifier AS t_identifier''') 
prim_columns = res.data()
print("prim_columns ", len(prim_columns))

##### --> Load columns by table id

In [13]:

def compute_table_cols_similarities(table_l, id_l, rank, time_t, all_tables, all_table_ids, seuil=0.5, tolerance = 0.05):
    count = 0
    neo_conn = Graph("bolt://159.84.108.111:7687", auth=("neo4j", "audal"))
    engine = sqlalchemy.create_engine('sqlite:////home/ubuntu/d3l/aura_pmi.db', echo = False)
    #df_r = pd.read_sql_table(table_r, con=sqlite_conn)
    #print("-----"+table_l+" -- "+table_r)
    print("START RANK : ", rank, " TIME ", ctime(time()) )
    #Load columns from left table (containing primary key)
    try:
        df_l = pd.read_sql_table(table_l, con=engine)
        cols1 = load_table_string_columns(id_l, neo_conn)
    except:
        print("ERROR WITH LEFT TABLE ", table_l, " : UNABLE TO LOAD LEFT COLUMNS")
        return 0
            
    #iterate right tables
    for table_r, id_r in zip(all_tables, all_table_ids):
        if(id_l != id_r):
            
            for col1 in cols1:
                #second pruning condition : primary key is unique
                ratio2 = col1['uniqueness']
                keep2 = ratio2 >= 1-tolerance
                if not keep2:
                    continue #go to next column, when left is not unique
                
                try:
                    cols2 = load_table_string_columns(id_r, graph)
                    df_r = pd.read_sql_table(table_r, con=engine)
                    for col2 in cols2:
                        #first pruning condition
                        ratio1 = col2['uniqueness']
                        keep1 = ratio1 < 1
                        if (not keep1):
                            continue #go to next when right column is unique,

                        #third pruning condition
                        #(Always verified, all ccolumns non numeric)

                        #Similarity check
                        sim_val = compute_jaccard(df_l[col1['name']].dropna(), df_r[col2['name']].dropna())
                        if sim_val >= seuil: #continue if similarity is significant

                            #fourth pruning condition: Containement check
                            containment_prop = df_r[col2['name']].dropna().isin(df_l[col1['name']].dropna()).value_counts(normalize=True, sort=True, dropna=True).to_dict().get(True)
                            if ((containment_prop != None) and (containment_prop >= 1-tolerance)):
                                #Make scores 
                                score1 = jellyfish.jaro_winkler_similarity(col1['name'], col2['name'])
                                #Column-table name
                                score2 = jellyfish.jaro_winkler_similarity(col2['name'], table_l)
                                #Content sim
                                score3 = sim_val 
                                score = np.mean([score1, score2, score3])

                                #insert in Neo4J
                                res = neo_conn.run(''' MATCH (c1:Column), (c2:Column) 
                                    WHERE ID(c1) = '''+str(col1['id'])+''' AND ID(c2) = '''+str(col2['id'])+'''
                                    CREATE (c1)<-[l:PK_FK_LINK {value:'''+str(score)+'''}]-(c2)''')
                                count = count+1
                except: 
                    print("ERROR WITH RIGHT TABLE ", table_r, " : UNABLE TO LOAD RIGHT COLUMNS")
                    continue
            
    
    print("FINISH RANK : ", rank, " TIME ", ctime(time()), " ELAPSED SINCE START : ", str((time()-time_t)/60), " MINUTES " )
    
    return count
    

In [14]:
#compute_table_cols_similarities(tables[100],tables[0],table_ids[100], table_ids[0], neo_conn=graph, sqlite_conn=engine, seuil=0.5, tolerance = 0.05)

In [15]:

def compute_table_cols_similarities2(table_l, id_l, cols1, rank, time_t, all_tables, all_table_ids, liste_cols2, seuil=0.5, tolerance = 0.05):
    count = 0
    queries = []
    #neo_conn = Graph("bolt://159.84.108.111:7687", auth=("neo4j", "audal"))
    engine = sqlalchemy.create_engine('sqlite:////home/ubuntu/d3l/aura_pmi.db', echo = False)
    #df_r = pd.read_sql_table(table_r, con=sqlite_conn)
    #print("-----"+table_l+" -- "+table_r)
    print("START RANK : ", rank, " TIME ", ctime(time()) )
    #Load columns from left table (containing primary key)
    try:
        df_l = pd.read_sql_table(table_l, con=engine)
        #cols1 = load_table_string_columns(id_l, neo_conn)
    except:
        print("ERROR WITH LEFT TABLE ", table_l, " : UNABLE TO LOAD LEFT COLUMNS")
        return 0
            
    #iterate right tables
    for table_r, id_r, cols2 in zip(all_tables, all_table_ids, liste_cols2):
        if(id_l != id_r):
            
            for col1 in cols1:
                #second pruning condition : primary key is unique
                ratio2 = col1['uniqueness']
                keep2 = ratio2 >= 1-tolerance
                if not keep2:
                    continue #go to next column, when left is not unique
                
                try:
                    #cols2 = load_table_string_columns(id_r, graph)
                    df_r = pd.read_sql_table(table_r, con=engine)
                    for col2 in cols2:
                        #first pruning condition
                        ratio1 = col2['uniqueness']
                        keep1 = ratio1 < 1
                        if (not keep1):
                            continue #go to next when right column is unique,

                        #third pruning condition
                        #(Always verified, all ccolumns non numeric)

                        #Similarity check
                        sim_val = compute_jaccard(df_l[col1['name']].dropna(), df_r[col2['name']].dropna())
                        if sim_val >= seuil: #continue if similarity is significant

                            #fourth pruning condition: Containement check
                            containment_prop = df_r[col2['name']].dropna().isin(df_l[col1['name']].dropna()).value_counts(normalize=True, sort=True, dropna=True).to_dict().get(True)
                            if ((containment_prop != None) and (containment_prop >= 1-tolerance)):
                                #Make scores 
                                score1 = jellyfish.jaro_winkler_similarity(col1['name'], col2['name'])
                                #Column-table name
                                score2 = jellyfish.jaro_winkler_similarity(col2['name'], table_l)
                                #Content sim
                                score3 = sim_val 
                                score = np.mean([score1, score2, score3])

                                #insert in Neo4J
                                #res = neo_conn.run(''' MATCH (c1:Column), (c2:Column) 
                                #    WHERE ID(c1) = '''+str(col1['id'])+''' AND ID(c2) = '''+str(col2['id'])+'''
                                #    CREATE (c1)<-[l:PK_FK_LINK {value:'''+str(score)+'''}]-(c2)''')
                                count = count+1
                                queries.append(''' MATCH (c1:Column), (c2:Column) WHERE ID(c1) = '''+str(col1['id'])+''' AND ID(c2) = '''+str(col2['id'])+''' CREATE (c1)<-[l:PK_FK_LINK {value:'''+str(score)+'''}]-(c2)''')
                except Exception as e:
                    print("ERROR WITH RIGHT TABLE ", table_r, " : UNABLE TO LOAD RIGHT COLUMNS")
                    print(e)
                    continue
            
    
    print("FINISH RANK : ", rank, " TIME ", ctime(time()), " ELAPSED SINCE START : ", str((time()-time_t)/60), " MINUTES " )
    
    return queries
    

##### Identification of PK/FK

In [16]:
print("Identification of PK/FK STARTING... "+ctime(time()))

Identification of PK/FK STARTING... Tue Feb  9 10:53:53 2021


time_t = time()
Parallel(n_jobs=5)(delayed(compute_table_cols_similarities2)\
                   (table_l, id_l, rank, time_t, all_tables, all_table_ids) \
                           for table_l, id_l, rank in zip(prim_tables, prim_table_ids, range(len(prim_tables))))

In [17]:
time_t = time()
ranks = range(len(prim_tables))


In [18]:
results = Parallel(n_jobs=-2)(delayed(compute_table_cols_similarities2)\
                   (table_l, id_l, prim_table_col, rank, time_t, all_tables, all_table_ids, all_table_cols) \
                           for table_l, id_l, prim_table_col, rank in zip(prim_tables, prim_table_ids, prim_table_cols, ranks))

In [19]:
queries = [item for sublist in results for item in sublist]
ok=0
for query in queries:
    try:
        res= graph.run(query)
        ok=ok+1
    except Exception as e:
        print(e)
print(ok, " OF ", len(queries), " CONNECTIONS CREATED")

1  OF  1  CONNECTIONS CREATED


In [20]:
done = time()
elapsed = done - start
print("*"*20)
print("STARTED : "+ctime(start) )
print("END : "+ctime(done) )
print("TIME ELAPSED:" + str(elapsed/60) + " MINUTES" )

********************
STARTED : Tue Feb  9 10:53:47 2021
END : Tue Feb  9 11:02:59 2021
TIME ELAPSED:9.208821845054626 MINUTES


def test(n, rank):
    res= 1
    temp = n
    while temp > 1:
        res = res
        temp = temp-1
    print("RANK : ", rank, " TIME: ", ctime(time()))
    return res

Parallel(n_jobs=5)(delayed(test)\
                   (i, j) \
                           for j, i in zip(range(100), np.repeat(500000, 100)))