# BeRTo Spark version

In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
from pysparkBeRTo import *
import time

### Create SparkSession

In [4]:
configuration_cluster = (
    SparkConf()
    .set("spark.executor.cores", "4")
    .set("spark.dynamicAllocation.maxExecutors", "20")
    .set("spark.executor.memory", "20g")
    .set("spark.driver.memory", "16g")
    .set("spark.driver.maxResultSize", "8g")
    .set("spark.sql.shuffle.partitions", "200")
    .set("spark.kryoserializer.buffer.max", "1g")
    .set("spark.dynamicAllocation.enabled", "true")
    .set("spark.network.timeout", "180000")
    .set("spark.sql.execution.arrow.pyspark.enabled", "true")
    .set("parquet.enable.dictionary","false")
)

#Create SparkSession
spark = (
    SparkSession.builder.appName("matching")
    .config(conf=configuration_cluster)
    .getOrCreate()
)

### Loading the datasets

In [5]:
    # MFI List
dataset1 = spark.read.parquet("Data/mfi/parquet/")\
                         .select("code", "name", "country", "address", "post", "city")
dataset1.cache()
dataset1.count()

                                                                                

105139

In [None]:
dataset1

In [6]:
    # GLEIF
dataset2 = spark.read.parquet("Data/gleif/parquet")\
                         .select("LEI", col("`Entity.LegalName`").alias("NAME"), col("`Entity.LegalAddress.Country`").alias("CNTY"),\
                                 col("`Entity.LegalAddress.FirstAddressLine`").alias("ADDR"), col("`Entity.LegalAddress.PostalCode`").alias("PC"), \
                                col("`Entity.LegalAddress.City`").alias("CITY"))
dataset2.cache()
dataset2.count()

                                                                                

2548020

In [None]:
dataset2

In [11]:
    # National Registries
dataset3 = spark.read.parquet("Data/national_business/parquet")
dataset3.cache()
dataset3.count()

                                                                                

9489636

In [None]:
dataset3

## Experiment Precision-Recall - Matching MFI list with GLEIF

### Configuration 1: Focus on Recall

In [7]:
config1 = {
    # Indicate attributes of first data source
  "identifier_1" : "code",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "isocode2", # mandatory
  "street_1" : "NA",      # set "NA" to disable
  "city_1" : "NA",           # set "NA" to disable
  "post_1" : "NA",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "NA",          # set "NA" to disable
  "city_2" : "NA",            # set "NA" to disable
  "post_2" : "NA",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": True,
  "fuzzy_level": 3, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : False,
  "similarity_level" : 1, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : True,

}

In [8]:
start = time.time()
matches_recall = fuzzyNameMatching(spark,dataset1,dataset2,config1)
print(time.time()-start)
# N = matches_recall.toPandas()
# N.to_excel('Data/results/MFI_GLEIF_RECALL.xlsx')

24/02/07 11:55:03 WARN CacheManager: Asked to cache already cached data.
24/02/07 11:55:09 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.InternalCompilerException: Compiling "GeneratedClass" in File 'generated.java', Line 1, Column 1: File 'generated.java', Line 191, Column 14: Compiling "hashAgg_doAggregateWithKeys_0()"
org.codehaus.commons.compiler.InternalCompilerException: Compiling "GeneratedClass" in File 'generated.java', Line 1, Column 1: File 'generated.java', Line 191, Column 14: Compiling "hashAgg_doAggregateWithKeys_0()"
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:402)
	at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:236)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:363)
	at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:361)
	at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.ja

1144.5479431152344


                                                                                

Execution Time: 
- 126
- 146

### Configuration 2: Balanced-Recall

In [None]:
config2 = {
    # Indicate attributes of first data source
  "identifier_1" : "code",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "isocode2", # mandatory
  "street_1" : "address",      # set "NA" to disable
  "city_1" : "city",           # set "NA" to disable
  "post_1" : "post",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "ADDR",          # set "NA" to disable
  "city_2" : "CITY",            # set "NA" to disable
  "post_2" : "PC",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": True,
  "fuzzy_level": 3, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : True,
  "similarity_level" : 3, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : True,
}

In [None]:
start = time.time()
matches_recall_bal = fuzzyNameMatching(spark,dataset1,dataset2,config2)
print(time.time()-start)
N = matches_recall_bal.toPandas()
N.to_excel('Data/results/MFI_GLEIF_BALANCED-RECALL.xlsx')

Execution Time 
- 118.54
- 204.6

### Configuration 3: Balanced-Precision

In [None]:
config3 = {
    # Indicate attributes of first data source
  "identifier_1" : "code",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "isocode2", # mandatory
  "street_1" : "address",      # set "NA" to disable
  "city_1" : "city",           # set "NA" to disable
  "post_1" : "post",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "ADDR",          # set "NA" to disable
  "city_2" : "CITY",            # set "NA" to disable
  "post_2" : "PC",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": True,
  "fuzzy_level": 2, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : True,
  "similarity_level" : 1, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : True,
}

In [None]:
start = time.time()
matches_balanced = fuzzyNameMatching(spark,dataset1,dataset2,config3)
print(time.time()-start)
# N = matches_balanced.toPandas()
# N.to_excel('Data/results/MFI_GLEIF_BALANCED.xlsx')

Execution time: 
- 108.7
- 188

### Configuration 4: Total Precision

In [None]:
config4 = {
    # Indicate attributes of first data source
  "identifier_1" : "code",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "isocode2", # mandatory
  "street_1" : "address",      # set "NA" to disable
  "city_1" : "city",           # set "NA" to disable
  "post_1" : "post",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "ADDR",          # set "NA" to disable
  "city_2" : "CITY",            # set "NA" to disable
  "post_2" : "PC",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": True,
  "fuzzy_level": 1, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : False,
  "similarity_level" : 2, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : True,
}

In [None]:
start = time.time()
matches_precision = fuzzyNameMatching(spark,dataset1,dataset2,config4)
print(time.time()-start)
N = matches_precision.toPandas()
N.to_excel('Data/results/MFI_GLEIF_PRECISION.xlsx')

Execution Time: 
- 143.1
- 153.5

### Analyse Results

In [None]:
BR_MFI = pd.read_csv("Data/mfi/mfi.csv")
BR_GLEIF = pd.read_csv("Data/gleif/gleif.csv",low_memory=False)

In [None]:
data1 = BR_MFI[['code','lei','address','name']]
data1 = data1.drop_duplicates().dropna()
data2 = BR_GLEIF[BR_GLEIF['Entity.EntityStatus'] == 'ACTIVE'][['LEI','Entity.LegalAddress.FirstAddressLine','Entity.LegalName']]
data2 = data2.drop_duplicates().dropna()

In [None]:
# True Matches
matTrue = pd.merge(data1,data2, how= 'inner', left_on ='lei', right_on = 'LEI')

In [None]:
n = len(matTrue['code'].drop_duplicates())
n

In [None]:
df1 = pd.read_excel("Data/results/MFI_GLEIF_RECALL.xlsx", index_col=0)
df2 = pd.read_excel("Data/results/MFI_GLEIF_BALANCED-RECALL.xlsx", index_col=0)
df3 = pd.read_excel("Data/results/MFI_GLEIF_BALANCED.xlsx", index_col=0)
df4 = pd.read_excel("Data/results/MFI_GLEIF_PRECISION.xlsx", index_col=0)

In [None]:
print('Recall Configuration Results')
print('Size of mapping table: '+ str(len(df1)))
df1_p = pd.merge(df1,matTrue, how = 'inner', on ='code')
df1_add = pd.merge(df1,matTrue, how = 'left', on ='code')
truepositive = pd.merge(df1[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df1_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df1_p)
print('Incorrect matches: '+str(len(df1_p[df1_p['LEI_x'] != df1_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

In [None]:
print('Balanced-Recall Configuration Results')
print('Size of mapping table: '+ str(len(df2)))
df2_p = pd.merge(df2,matTrue, how = 'inner', on ='code')
df2_add = pd.merge(df2,matTrue, how = 'left', on ='code')
truepositive = pd.merge(df2[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df2_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df2_p)
print('Incorrect matches: '+str(len(df2_p[df2_p['LEI_x'] != df2_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

In [None]:
df2['score'].mean()

In [None]:
print('Balanced-Precision Configuration Results')
print('Size of mapping table: '+ str(len(df3)))
df3_p = pd.merge(df3,matTrue, how = 'inner', on ='code')
df3_add = pd.merge(df3,matTrue, how = 'left', on ='code')
truepositive = pd.merge(df3[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df3_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df3_p)
print('Incorrect matches: '+str(len(df3_p[df3_p['LEI_x'] != df3_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

In [None]:
df3['score'].mean()

In [None]:
print('Precision Configuration Results')
print('Size of mapping table: '+ str(len(df4)))
df4_p = pd.merge(df4,matTrue, how = 'inner', on ='code')
df4_add = pd.merge(df4,matTrue, how = 'left', on ='code')
truepositive = pd.merge(df4[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df4_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df4_p)
print('Incorrect matches: '+str(len(df4_p[df4_p['LEI_x'] != df4_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

In [None]:
df4['score'].mean()

In [None]:
dedupe_results = pd.read_excel("Data/results/MFI_GLEIF_DEDUPE.xlsx", index_col=0)

In [None]:
print('Dedupe Results')
print('Size of mapping table: '+ str(len(dedupe_results)))
df1_p = pd.merge(dedupe_results,matTrue, how = 'inner', on ='code')
df1_add = pd.merge(dedupe_results,matTrue, how = 'left', on ='code')
truepositive = pd.merge(dedupe_results[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df1_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df1_p)
print('Incorrect matches: '+str(len(df1_p[df1_p['LEI_x'] != df1_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

In [None]:
splink_name_block = pd.read_csv("Data/results/splink_block_name.csv")

In [None]:
splink_name_block = splink_name_block[['unique_id_l','unique_id_r']]
splink_name_block.rename(columns={'unique_id_l':'code','unique_id_r':'LEI'}, inplace = True)

In [None]:
print('Splink Results')
print('Size of mapping table: '+ str(len(splink_name_block)))
df1_p = pd.merge(splink_name_block,matTrue, how = 'inner', on ='code')
df1_add = pd.merge(splink_name_block,matTrue, how = 'left', on ='code')
truepositive = pd.merge(splink_name_block[['LEI','code']].drop_duplicates(),matTrue[['code','LEI']].drop_duplicates(), how = 'inner',on = ['code','LEI']).drop_duplicates()
outer_join = pd.merge(df1_add[['code']].drop_duplicates(),matTrue[['code']].drop_duplicates(), how = 'outer', indicator = True)
recall = len(truepositive)/n
print('Size of inner matches: '+ str(len(truepositive)) + ' -> ' + str(round(recall*100,1))+'%')
print('Additional matches (new links discovered): '+ str(len(outer_join[outer_join['_merge'] == 'left_only'])))
precision = len(truepositive)/len(df1_p)
print('Incorrect matches: '+str(len(df1_p[df1_p['LEI_x'] != df1_p['LEI_y']]))+ ' -> Precision = ' + str(round(precision*100,1))+'%')
print('F1-Score: ' + str(2*((precision)*recall)/((precision)+recall)))

## Experiment Scalability

#### Spark Session

In [2]:
conf = SparkConf()
conf.set("spark.driver.memory", "100g")
conf.set("spark.default.parallelism", "40")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")

24/02/07 08:55:41 WARN Utils: Your hostname, genex resolves to a loopback address: 127.0.1.1; using 192.168.4.13 instead (on interface eth2)
24/02/07 08:55:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/07 08:55:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### BeRTo configurations

In [9]:
config1 = {
    # Indicate attributes of first data source
  "identifier_1" : "id",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "country_name", # mandatory
  "street_1" : "NA",      # set "NA" to disable
  "city_1" : "NA",           # set "NA" to disable
  "post_1" : "NA",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "NA",          # set "NA" to disable
  "city_2" : "NA",            # set "NA" to disable
  "post_2" : "NA",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": True,
  "fuzzy_level": 1, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : True,
  "similarity_level" : 2, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : False,
}

config2 = {
    # Indicate attributes of first data source
  "identifier_1" : "id",     # mandatory
  "name_1" : "name",           # mandatory
  "country_1" : "country",     # mandatory
  "type_country" : "country_name", # mandatory
  "street_1" : "address",      # set "NA" to disable
  "city_1" : "NA",           # set "NA" to disable
  "post_1" : "NA",           # set "NA" to disable
    
    # Indicate attributes of second data source
  "identifier_2" : "LEI",       # mandatory
  "name_2" : "NAME",            # mandatory
  "country_2" : "CNTY",         # mandatory
  "type_country2" : "isocode2", # mandatory
  "street_2" : "ADDR",          # set "NA" to disable
  "city_2" : "NA",            # set "NA" to disable
  "post_2" : "NA",              # set "NA" to disable

    # Fuzzy Name Settings 
  "use_fuzzy_dictionary": False,
  "fuzzy_level": 2, #possible values = 1,2,3 --> the higher the more attention to recall

    # Address processing settings
  "address_similarity" : False,
  "similarity_level" : 2, #possible values = 1,2,3 --> the higher the more attention to recall
    
    # Add score of matching
  "add_column_score" : False,
}

#### Exp

In [None]:
start = time.time()
matches = fuzzyNameMatching(spark,dataset3,dataset2,config2)
print(time.time()-start)

1 run:
- config 1: 900.66
- config 2: 1371 - 1031.42


In [13]:
dataset2 = spark.read.parquet("Data/gleif_big/parquet").select("LEI", col("`Entity.LegalName`").alias("NAME"), col("`Entity.LegalAddress.Country`").alias("CNTY"),\
                                 col("`Entity.LegalAddress.FirstAddressLine`").alias("ADDR"), col("`Entity.LegalAddress.PostalCode`").alias("PC"), \
                                col("`Entity.LegalAddress.City`").alias("CITY"))
dataset3 = spark.read.parquet("Data/national_business_big/parquet")

In [None]:
data3 = dataset3.limit(30000000)
data3.cache()
data3.count()
data2 = dataset2.limit(2500000)
data2.cache()
data2.count()
start = time.time()
matches = fuzzyNameMatching(spark,data3,data2,config1)
print(time.time()-start)

## Close Spark

In [3]:
spark.stop()