In [30]:
import pickle
from pymongo import MongoClient
import multiprocessing
from multiprocessing import Pool
from dataclasses import dataclass
from tqdm import tqdm
import uuid
import pandas as pd
import os

In [31]:
ALL_FILES_PATH = "D:\\all_files"
if(not os.path.exists(ALL_FILES_PATH)):
    raise Exception("ALL_FILES_PATH does not exist")
NUM_PROCESSES = multiprocessing.cpu_count()

In [3]:
with open('JPairs_n10_90_ALL.pickle', 'rb') as handle:
    jPairs = pickle.load(handle)
print(len(jPairs))

3964795


In [4]:
client = MongoClient('mongodb://admin:V6jjpnR8Ee8NuxdgYsrqsfWBRaQ4yWFQKbc8teM4eYXUYJJTRZ@direct.montreal-1.mew.im:27777'
                     '/?authSource=admin&readPreference=primary&directConnection=true&ssl=false')
repoNames = ['Singapore', 'Canada', 'UK', 'US']
mongoCollections = []

mongoCollections.append(client['datagovsg'])
mongoCollections.append(client['opencanada'])
mongoCollections.append(client['datagovuk'])
mongoCollections.append(client['datagov'])

tableStats = []
for collect in mongoCollections:
    tableStats.append(collect['inferredstats'])
tableDocs = []
for portal in tableStats:
    for table_doc in portal.find({}, {"_id": 0, "schema": 0}):
        tableDocs.append(table_doc)
client.close()

In [5]:
allUUIDs = set()
for j in jPairs:
    allUUIDs.add(j['leftTableID'])
    allUUIDs.add(j['rightTableID'])

In [6]:
tableDocs = [x for x in tableDocs if x['uuid'] in allUUIDs]

In [26]:
def readCSV(table_doc):
    csvID = table_doc['uuid']
    encoding = table_doc['encoding']
    header = table_doc['header']
    delimiter = table_doc['delimiter'] if 'delimiter' in table_doc else ','
    file_path = os.path.join(ALL_FILES_PATH, csvID + '.csv')
    print(file_path)
    if header == 0:
        df = pd.read_csv(file_path, low_memory=False, encoding=encoding, delimiter=delimiter)
    else:
        df = pd.read_csv(file_path, skiprows=header, low_memory=False, encoding=encoding, delimiter=delimiter)
        df.drop(df.filter(regex="Unnamed"),axis=1, inplace=True)
        df.dropna(how='all', inplace= True)
    df.columns = list(range(0, df.shape[1]))
    return {"uuid": csvID, "df": df}

In [1]:
tables_map = {}
with Pool(processes=NUM_PROCESSES) as pool:
    for data in tqdm(pool.imap_unordered(readCSV, tableDocs), total=len(tableDocs)):
        tables_map[data['uuid']] = data['df']

NameError: name 'Pool' is not defined

In [28]:
readCSV(tableDocs[0])

D:\all_files\205f7050-39fd-4af3-899b-ac2dd323e437.csv


{'uuid': '205f7050-39fd-4af3-899b-ac2dd323e437',
 'df':        0    1    2
 0   1990  4.5  4.0
 1   1991  3.9  3.5
 2   1992  3.7  3.3
 3   1993  3.6  3.1
 4   1994  3.5  3.1
 5   1995  3.5  3.0
 6   1996  3.2  2.8
 7   1997  3.2  2.7
 8   1998  2.3  2.1
 9   1999  2.6  2.2
 10  2000  3.2  2.5
 11  2001  2.5  2.2
 12  2002  2.4  2.1
 13  2003  2.3  2.0
 14  2004  2.8  2.2
 15  2005  2.7  2.1
 16  2006  2.8  2.0
 17  2007  2.9  2.0
 18  2008  2.8  2.0
 19  2009  2.2  1.8
 20  2010  2.8  2.0
 21  2011  2.7  2.0
 22  2012  2.8  2.1
 23  2013  2.7  2.0
 24  2014  2.6  2.0
 25  2015  2.4  1.9
 26  2016  2.2  1.8
 27  2017  2.1  1.8
 28  2018  2.3  1.8
 29  2019  2.2  1.8
 30  2020  1.6  1.5}

In [None]:
def compare_columns(pair):
    import duckdb
    duckdb.query("PRAGMA threads=2;")
    view_name = "view_" + str(uuid.uuid1().hex)
    portal = pair['portal']
    
    #pair = j['pair']
    try:
        if portal == 'SG':
            table_1 = readCSVFromZipSG(pair['leftTableDoc'])['df']
            table_1.columns = list(range(0, table_1.shape[1]))
            table_2 = readCSVFromZipSG(pair['rightTableDoc'])['df']
            table_2.columns = list(range(0, table_2.shape[1]))
        elif portal == 'CA':
            table_1 = readCSVFromZipCA(pair['leftTableDoc'])['df']
            table_1.columns = list(range(0, table_1.shape[1]))
            table_2 = readCSVFromZipCA(pair['rightTableDoc'])['df']
            table_2.columns = list(range(0, table_2.shape[1]))
        elif portal == 'UK':
            table_1 = readCSVFromZipUK(pair['leftTableDoc'])['df']
            table_1.columns = list(range(0, table_1.shape[1]))
            table_2 = readCSVFromZipUK(pair['rightTableDoc'])['df']
            table_2.columns = list(range(0, table_2.shape[1]))
        elif portal == 'US':
            table_1 = readCSVFromZipUS(pair['leftTableDoc'])['df']
            table_1.columns = list(range(0, table_1.shape[1]))
            table_2 = readCSVFromZipUS(pair['rightTableDoc'])['df']
            table_2.columns = list(range(0, table_2.shape[1]))
            
        query_string = 'CREATE VIEW %s AS\
            (\
                WITH T1 AS (SELECT "%d" AS col_1, COUNT(*) AS count_1 FROM table_1 GROUP BY "%d"),\
                    T2 AS (SELECT "%d" AS col_2, COUNT(*) AS count_2 FROM table_2 GROUP BY "%d")\
                    SELECT *, (count_1 * count_2) AS total_count FROM T1 INNER JOIN  T2 ON T1.col_1 = T2.col_2\
            )' % (view_name, pair['leftColumnIndex'], pair['leftColumnIndex'], pair['rightColumnIndex'], pair['rightColumnIndex'])

        duckdb.query(query_string)
        joined_count_result = duckdb.query('SELECT SUM(total_count) AS aggregated FROM %s' % view_name).to_df()
        joined_count = int(joined_count_result['aggregated'][0])

        cardinality = "one:one"
        is_many_many_result = duckdb.query(
            'SELECT * FROM %s WHERE count_1 > 1 AND count_2 > 1 LIMIT 1' % view_name).to_df()
        if len(is_many_many_result) > 0:
            cardinality = "many:many"
        else:
            is_many_one_result = duckdb.query(
                'SELECT * FROM %s WHERE (count_1 > 1 AND count_2 = 1) OR (count_1 = 1 AND count_2 > 1) LIMIT 1' % view_name).to_df()
            if len(is_many_one_result) > 0:
                cardinality = "many:one"
        duckdb.query('DROP VIEW IF EXISTS %s' % view_name)
        return {
            'pair': pair,
            'score' : pair['score'],
            'cardinality': cardinality,
            'joined_count': joined_count,
            'expRatio' : joined_count / max(table_1.shape[0], table_2.shape[0])
        }
    except Exception as e:
        print(e)
        return {
            'pair': pair
        }

In [None]:
error_pairs = []
results = []

for pair in tqdm(jPairs[0:100], total=100):

    #print (joinablePair) 
    data = compare_columns(pair)
    if 'cardinality' in data:
        results.append(data)
    else:
        error_pairs.append(data['pair'])
        #break
print(len(results))    

In [6]:
from decompositionJoin import *
if __name__ == '__main__':
    error_pairs = []
    results = []
    NUM_PROCESSES = 4 # Chang do not forget to change here
    print('Processing pairs has started.')
    with Pool(processes=NUM_PROCESSES) as pool:
        for data in tqdm(pool.imap_unordered(compare_columns, jPairs[0:100]), total=1000):
            if 'cardinality' in data:
                results.append(data)
            else:
                error_pairs.append(data['pair'])

    print(f'Processing {len(jPairs)} pairs is done.')
print(len(results))

Processing pairs has started.


 10%|███████▉                                                                       | 100/1000 [00:04<00:41, 21.86it/s]

Processing 3964795 pairs is done.
100



