# Schema matching con i tool di Valentine

In [1]:
import warnings
warnings.filterwarnings('ignore')
import os
import pandas as pd
from valentine import valentine_match
from valentine.algorithms import Coma, Cupid, DistributionBased, JaccardLevenMatcher, SimilarityFlooding
import random
import seaborn as sns
import matplotlib.colors
import matplotlib.pyplot as plt
import numpy as np
import shutil
import json
import re

In [3]:
datasets_base_path = "..\\..\\Dataset\\ClusterParsed\\"
info_path = ".\\DatasetSchemaMatch\\"
filename_synonym = "column_sinonimi.txt"
dictionary_path = ".\\dictionary\\"
if os.path.exists(dictionary_path):
    shutil.rmtree(dictionary_path)
    os.mkdir(dictionary_path)
else:
    os.mkdir(dictionary_path)
plot_path = ".\\plot\\"
if os.path.exists(plot_path):
    shutil.rmtree(plot_path)
    os.mkdir(plot_path)
else:
    os.mkdir(plot_path)
csv_columns_path = ".\\columns\\"
if os.path.exists(csv_columns_path):
    shutil.rmtree(csv_columns_path)
    os.mkdir(csv_columns_path)
else:
    os.mkdir(csv_columns_path)

In [4]:
inverted_index = {}
for cluster_folder_name in os.listdir(datasets_base_path):
    cluster_path = os.path.join(datasets_base_path, cluster_folder_name)
    inverted_index[cluster_folder_name] = {}
    for filename in os.listdir(cluster_path):
        file_path = os.path.join(cluster_path, filename)
        df = pd.read_csv(file_path)
        group_name = os.path.splitext(filename)[0]
        for column in list(df.columns):
            if column not in inverted_index[cluster_folder_name]:
                inverted_index[cluster_folder_name][column] = []
            inverted_index[cluster_folder_name][column].append(group_name)

In [5]:
# cluster_info = {
#     cluster_name: {
#         files: {
#             group_name: path_file_csv
#         }
#         synonym: {}
#         index: {}
#     }
# }
cluster_info = {}

for cluster_folder_name in os.listdir(datasets_base_path):
    
    cluster_info[cluster_folder_name] = {}
    
    cluster_path = os.path.join(datasets_base_path, cluster_folder_name)
    cluster_info[cluster_folder_name]["files"] = {}
    
    for filename in os.listdir(cluster_path):
        dataset_path = os.path.join(cluster_path, filename)
        cluster_info[cluster_folder_name]["files"][os.path.splitext(filename)[0]] = dataset_path

for cluster_folder_name in os.listdir(info_path):
    
    cluster_path = os.path.join(info_path, cluster_folder_name)
    synonym_path = os.path.join(cluster_path, filename_synonym)
    
    with open(synonym_path) as f:
        data = f.read()
    tmp = re.sub('(\w+)', '"\g<1>"', data)  
    js_synonym = json.loads(tmp)
        
    cluster_info[cluster_folder_name].update({
        "synonym": js_synonym,
        "index": inverted_index[cluster_folder_name]
    })

In [6]:
for cluster in cluster_info.keys():
    if len(cluster_info[cluster]["files"]) > 1:
        
        cluster_path = csv_columns_path + cluster + "\\"
        os.mkdir(cluster_path)
        
        synonym = cluster_info[cluster]["synonym"]
        index = cluster_info[cluster]["index"]
        
        for main_token, tokens in synonym.items():
            
            data_column_cluster = {}
            
            for token in tokens:
                
                filename_with_token = index[token]
                for filename in filename_with_token:
                    
                    path = cluster_info[cluster]["files"][filename]
                    df_tmp = pd.read_csv(path)
                    column = df_tmp[token]
                    data_column_cluster[filename + "-" + token] = column
                    
            df = pd.DataFrame(data_column_cluster)
            df.to_csv(cluster_path + main_token + ".csv", index=False)


### Estrazione score match fra coppie di colonne per ogni cluster

In [7]:
# res0 = {
#     (table1, column1, table2, column2): score
# }
def schema_matching(df1, df2, matcher, df1_name, df2_name):
    matches = valentine_match(df1, df2, matcher, df1_name, df2_name)
    res0 = {}
    res1 = []
    for key in matches:
        res0[(key[0][0], key[0][1], key[1][0], key[1][1])] = matches[key]    
        res1.append([matcher.__class__.__name__, key[0][0], key[0][1], key[1][0], key[1][1], matches[key]])
    return (res0, pd.DataFrame(res1, columns=["Matcher", "Table1", "Column1", "Table2", "Column2", "Score"]))

In [8]:
# transform dictionary d = {a: 1, b: [1,1], c: 2} in d = {a: [1], b: [1,1], c: [2]}
def transform_dictionary(d):
    for key, value in d.items(): 
        if type(d[key]) != list:
            d[key] = [value]
    return d

In [9]:
# merge two dictionary d1 = {a: 1, b: [1,1], c: 3, d: 1} and d1 = {a: 2, b: [2,1], c: [2]}
# in d3 = {a: [1,2], b: [1,1,2,1], c: [3,2], d: [1]}
def merge_dictionary(dict_1, dict_2):
    dict_1 = transform_dictionary(dict_1)
    dict_2 = transform_dictionary(dict_2)
    tmp = {**dict_1, **dict_2}
    for key in tmp.keys():
        if key in dict_1 and key in dict_2:
            dict_1[key].extend(dict_2[key])
        elif key in dict_2:
            dict_1[key] = dict_2[key]
    return dict_1

In [10]:
# cluster_datasets = {
#     cluster_name: [list_of_path]
# }
cluster_datasets = {}
for cluster_folder_name in os.listdir(csv_columns_path):
    cluster_path = os.path.join(csv_columns_path, cluster_folder_name)
    cluster_datasets[cluster_folder_name] = []
    for filename in os.listdir(cluster_path):
        dataset_path = os.path.join(cluster_path, filename)
        cluster_datasets[cluster_folder_name].append(dataset_path)

In [None]:
# !! RUN = about 40 minutes
cluster_res = {}
my_matcher_list = [JaccardLevenMatcher()] # DistributionBased() JaccardLevenMatcher()
for cluster, paths in cluster_datasets.items():
    
    res = {}
    for path in paths:
        
        df1 = pd.read_csv(path)
        df2 = pd.read_csv(path)

        for matcher in my_matcher_list:
            print("Matching cluster" + "(" + cluster + ")" + "(" + matcher.__class__.__name__ + "): " + path)
            res_tmp = schema_matching(df1, df2, matcher, "table1", "table2")[0]
            res = merge_dictionary(res, res_tmp)
        
    cluster_res[cluster] = res

Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\city.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\company.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\country.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\datejoined.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\founded.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\industry.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\investors.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\name.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\selectinvestors.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\stage.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbinsights\totalraised.csv
Matching cluster(cbinsights)(JaccardLevenMatcher): .\columns\cbins

In [11]:
# mean_cluster_res = 
# {
#     cluster1: {
#         (table1, column1, table2, column2): meanScore
#     }
# }

# threshold ([0,1]) for mean score value
threshold = 0.0
mean_cluster_res = {}
for key1, value1 in cluster_res.items():
    match = {}
    for key2, value2 in value1.items():
        mean_score = sum(value2) / len(value2)
        if mean_score > threshold:
            match[key2] = mean_score
    mean_cluster_res[key1] = match

### Costruzione matrice di correlazione fra coppie di colonne per ogni cluster

In [12]:
# inverted_index_cluster = 
# {
#     cluster1: {
#         mainToken1: [(token1, score1), (token1, score2), (token2, score1), ...]
#     }
# }

# score is mean score of mean_cluster_res

inverted_index_cluster = {}
for cluster, dict_of_score in mean_cluster_res.items():
    inverted_index = {}
    for match_tuple, score in dict_of_score.items():
        
        if match_tuple[1] not in inverted_index.keys():
            inverted_index[match_tuple[1]] = []
        if match_tuple[3] not in inverted_index.keys():
            inverted_index[match_tuple[3]] = []
        
        inverted_index[match_tuple[1]].append((match_tuple[3], score))
        inverted_index[match_tuple[3]].append((match_tuple[1], score))    
                    
    inverted_index_cluster[cluster] = inverted_index

In [13]:
# mean_inverted_index_cluster = 
# {
#     cluster1: {
#         mainToken1: [(token1, meanScore), (token2, meanScore), ...]
#     }
# }

mean_inverted_index_cluster = {}
for cluster, inverted_index in inverted_index_cluster.items():
    mean_inverted_index = {}
    for token, index in inverted_index.items():
        if token not in mean_inverted_index.keys():
            mean_inverted_index[token] = []
        for e in set([i[0] for i in index]):
            tmp = [k[1] for k in index if k[0] == e]
            mean_inverted_index[token].append((e, (sum(tmp) / len(tmp))))                   
    mean_inverted_index_cluster[cluster] = mean_inverted_index

In [14]:
def make_dictionary(inverted_index):
    return set(inverted_index.keys())

In [15]:
def get_value_from_list_of_tuples(l, t):
    for tup in l:
        if tup[0] == t:
            return tup[1]
    return 0

In [16]:
# correlation_matrix_cluster = 
# {
#     cluster1: {
#         mainToken1: {
#             token1: score, token2: score, ...
#         }
#     }
# }
correlation_matrix_cluster = {}
for cluster, inverted_index in mean_inverted_index_cluster.items():
    correlation_matrix = {}
    dictionary = make_dictionary(mean_inverted_index_cluster[cluster])    
    for token, values in mean_inverted_index_cluster[cluster].items():
        terms_of_token = [e[0] for e in values]
        correlation_matrix[token] = {}
        for term in dictionary:
            if term not in terms_of_token:
                correlation_matrix[token][term] = 0
            else:
                correlation_matrix[token][term] = get_value_from_list_of_tuples(values, term)
    correlation_matrix_cluster[cluster] = correlation_matrix

In [17]:
# save correlation_matrix_cluster
for cluster, correlation_matrix in correlation_matrix_cluster.items():
    json_obj = json.dumps(correlation_matrix) 
    f = open(dictionary_path+cluster+".txt","w")
    f.write(json_obj)
    f.close()

NameError: name 'dictionary_path' is not defined

In [None]:
# read correlation_matrix_cluster
correlation_matrix_cluster = {}
for filename in os.listdir(dictionary_path):
    file_path = os.path.join(dictionary_path, filename)
    cluster = os.path.splitext(filename)[0]
    file = open(file_path)
    correlation_matrix_cluster[cluster] = json.load(file)
    file.close()

In [None]:
def plot_correlation(df, base_path=".\\", title="Matrice di correlazione"):
    fig, ax = plt.subplots(figsize=(20, 20))
    title = title
    file_name = base_path + "\\" + "".join(title.lower()).replace(" ", "_")
    ax.set_title(title)
    # ax.set_xlabel("Token")
    # ax.set_ylabel("Token")
    heatmap = sns.heatmap(df, ax=ax, fmt=".0f", linewidths=2, cmap="Purples", square=True, cbar_kws={"shrink": .5})

    fig.savefig(file_name, bbox_inches='tight', transparent=True)

In [None]:
for cluster, correlation_matrix in correlation_matrix_cluster.items():
    df = pd.DataFrame(correlation_matrix)
    plot_correlation(df, base_path=plot_path, title="Matrice di correlazione-" + cluster)