In [1]:
%%configure
{"driverMemory":"8g","executorMemory": "8072M", "executorCores": 6, "numExecutors":12}

In [2]:
## helper_functions
"""Helper functions for the jupyter notebooks"""

REMOTE_HOST="192.168.11.3"

from os import listdir
from os.path import abspath, isfile, join
from socket import gethostname, gethostbyname
from typing import List
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import DecimalType, DoubleType, IntegerType, StringType
from pyspark.sql.functions import col, isnan, when, count, lit

from itertools import combinations



def print_df_to_html(sparkDF: DataFrame): 
    from IPython.display import HTML
    newdf = sparkDF.toPandas()
    return HTML(newdf.to_html())

def translate_to_local_file_path(filename,directory=''):  
    if (gethostbyname(gethostname())) == REMOTE_HOST :
        if directory:
            filepath= "../{directory}/{filename}".format(directory=directory, filename=filename)
        else:
            filepath= "../{filename}".format(filename=filename)
    else:
        if directory:
            filepath= "../{directory}/{filename}".format(directory=directory, filename=filename)
        else:
            filepath= "../{filename}".format(filename=filename)
    print(abspath(filepath))   
    return "file:///path".format(path=abspath(filepath))

def translate_to_file_string(filepath):
    return "file:///path".format(path=abspath(filepath))


def translate_header_file_to_list(filepath):
   """Reads the column names from the given file and converts it to a list."""
   with open(filepath, "r") as f:
       return f.readline().rstrip().split("|")  

def translate_datatype_file_to_list(filepath) -> list:
    """reads the data types from the file, consolidates them to (string, double, int, date). 
    Retruns a list of the consolidated datatypes."""
    result = []
    #with open(filepath, "r") as f:
    raw_datatype_list = list(spark.read.option("header", "false").option("inferSchema", "true").option("delimiter", "|").csv(filepath).toPandas().iloc[0].values)
    for curr_rt in raw_datatype_list:
        if (curr_rt.startswith("smallint")):
            result.append(IntegerType())
        elif (curr_rt.startswith("decimal")):
            decimal_params = curr_rt[curr_rt.find("("):curr_rt.find(")")+1]
            num_digits = int(decimal_params[decimal_params.find("(")+1:decimal_params.find(",")])
            num_scale = int(decimal_params[decimal_params.find(",")+1:decimal_params.find(")")])
            result.append(DecimalType(num_digits, num_scale))
        elif (curr_rt.startswith("varchar")):
            result.append(StringType())
        else:
            result.append(StringType())
    return result        

def concat_files(output_file,result_dir):
    """Concats the files in the output dir do one file"""
    file_list = [f for f in listdir(result_dir) if isfile(join(result_dir, f))]
    file_list = list (map(lambda file : result_dir+"/"+file , filter(lambda x : (not x.startswith(".")) and (not x.startswith("_")), file_list)))
    print(file_list)
    with open(output_file, "w") as outfile:
        for fname in file_list:
            with open(fname) as infile:
                outfile.write(infile.read())


def variations(items :list, k :int) -> list:
    if k==0 or k>len(items):
        return [set()]
    else:
        new_result = []
        for item in items :
            for curr_set in variations(items, k-1):
                if item not in curr_set:
                    curr_set.add(item)
                    if curr_set not in new_result:
                        new_result.append(curr_set)
        return new_result

def pair_permutations_ordered(items: list) -> list:
    pair_permutations = []
    for curr_attr_set in variations(items,2):
        first_attr = curr_attr_set.pop()
        second_attr = curr_attr_set.pop()  
        pair_permutations.append([first_attr,second_attr])
        pair_permutations.append([second_attr,first_attr])
    for curr_attr in items:
        pair_permutations.append([curr_attr, curr_attr])
    return pair_permutations

def cast_datatypes(datatype_file, input_df:DataFrame) -> DataFrame:
    datatype_list  = translate_datatype_file_to_list(datatype_file)
    df = input_df.alias('tmp_df')
    if len(df.columns) == len(datatype_list):
        for i in range(len(datatype_list)):
            if datatype_list[i] != type(df.schema[i].dataType) :
                #print (f"{df.columns[i]} {datatype_list[i]} {df.schema[i].dataType}") 
                df = df.withColumn(df.columns[i],col("`{column}`".format(column=df.columns[i])).cast(datatype_list[i]))                
        return df
    else:
        return df

def check_attribute_completeness(all_columns:list, string_attributes:list, numeric_attributes:list):
    for curr_col in all_columns:
        if curr_col not in string_attributes and curr_col not in numeric_attributes :
            print ("{curr_col} is not numeric or string".format(curr_col=curr_col))

def compare_schemas(df1:DataFrame, df2:DataFrame):
    if len(df1.dtypes) != len(df2.dtypes):
        print ("Schemas have differen sizes!!!")
    else :
        for i in range(len(df1.dtypes)):
            if df1.dtypes[i][0] != df2.dtypes[i][0] or df1.dtypes[i][1] != df2.dtypes[i][1] :
                print ("Columns differ {dtype1} {dtype2}".format(dtype1=df1.dtypes[i], dtype2=df2.dtypes[i]))


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1617221016544_0017,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [3]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType
from pyspark.sql.functions import udf, col, pandas_udf, PandasUDFType, collect_list, count
from scipy.stats import wasserstein_distance
from numpy import asarray
import numpy as np

# Create UDF for calculating EMD

In [4]:
@udf(returnType=FloatType())
def emd_UDF(col1, col2) -> FloatType:
    #     if len(col1) > 10:
    #         return None
    #     else:
    return float(wasserstein_distance(col1, col2))

spark.udf.register("emd_UDF", emd_UDF)

<function emd_UDF at 0x7f5642e3f048>

# Setup MLB Tables

In [5]:
BENCHMARK_REL_PATH = "/DataLake/public_bi_benchmark/benchmark/"

#list_of_all_MLB_tables = [ "MLB_1", "MLB_20", "MLB_15"]
#list_of_all_MLB_tables = [ "MLB_1", "MLB_20"]
#list_of_all_MLB_tables = [ "MLB_1", "MLB_10"]
list_of_all_MLB_tables = [
     "MLB_1", "MLB_10", "MLB_11", "MLB_12", "MLB_13", "MLB_14", "MLB_15",
     "MLB_16", "MLB_17", "MLB_18", "MLB_19", "MLB_2", "MLB_20", "MLB_21",
     "MLB_22", "MLB_23", "MLB_24", "MLB_25", "MLB_26", "MLB_27", "MLB_28",
     "MLB_29", "MLB_3", "MLB_30", "MLB_31", "MLB_32", "MLB_33", "MLB_34",
     "MLB_35", "MLB_36", "MLB_37", "MLB_38", "MLB_39", "MLB_4", "MLB_40",
     "MLB_41", "MLB_42", "MLB_43", "MLB_44", "MLB_45", "MLB_46", "MLB_47",
     "MLB_48", "MLB_49", "MLB_5", "MLB_50", "MLB_51", "MLB_52", "MLB_53",
     "MLB_54", "MLB_55", "MLB_56", "MLB_57", "MLB_58", "MLB_59", "MLB_6",
     "MLB_60", "MLB_61", "MLB_62", "MLB_63", "MLB_64", "MLB_65", "MLB_66",
     "MLB_67", "MLB_68", "MLB_7", "MLB_8", "MLB_9" ]
#list_of_MLB_join_candidate_pairs = [ ("MLB_1","MLB_12"), ("MLB_1","MLB_13"), ("MLB_1","MLB_14")  ]

In [6]:
file_path = BENCHMARK_REL_PATH+"MLB/"
sample = False
azure_env = True

if azure_env == False:
    # create Spark Config
    conf = SparkConf()
    conf.set("spark.executor.memory", "8g")
    conf.set("spark.driver.memory", "8g")
    conf.set("spark.memory.offHeap.enabled","true" )
    conf.set("spark.memory.offHeap.size","8g") 
    conf.setMaster("local[2]")
    conf.setAppName("MLB-similarity-calc")
    # create a SparkSession
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
# dict of string attributes for each table
string_attributes = {}
numeric_attributes = {}
for table_name in list_of_all_MLB_tables:
    if sample: 
        data_file = file_path+"samples/"+table_name+".sample"+".csv"
    else :
        data_file = file_path+table_name+".csv"
    header_file = file_path+"samples/"+table_name+".header.csv"
    datatype_file = file_path+"samples/"+table_name+".datatypes.csv"
    print(header_file)
    print(data_file)
    # create a DataFrame using an ifered Schema 
    orig_df = spark.read.option("header", "false").option("inferSchema", "true").option("delimiter", "|").csv(data_file).toDF(*spark.read.option("header", "false").option("inferSchema", "true").csv(header_file).first()[0].split("|")) 
    df = cast_datatypes(datatype_file, orig_df)
    #df = orig_df
    # compare_schemas(orig_df, df)
    df.createOrReplaceTempView(table_name)
    string_attributes[table_name] = list(filter(lambda x : not x.startswith("Calculation"), \
                                       map(lambda x : x[0], filter(lambda tupel: tupel[1] == 'string' ,df.dtypes))))
    numeric_attributes[table_name] = list(filter(lambda x : not x.startswith("Calculation"), \
                                          map(lambda x : x[0], \
                                              filter(lambda tupel: tupel[1] == 'double' or \
                                              tupel[1] == 'int' or tupel[1].startswith('decimal'),df.dtypes))))
    check_attribute_completeness(df.columns, string_attributes[table_name], numeric_attributes[table_name])
    


/DataLake/public_bi_benchmark/benchmark/MLB/samples/MLB_1.header.csv
/DataLake/public_bi_benchmark/benchmark/MLB/MLB_1.csv
Calculation_40532458112880653 is not numeric or string
Calculation_40532458117070874 is not numeric or string
/DataLake/public_bi_benchmark/benchmark/MLB/samples/MLB_10.header.csv
/DataLake/public_bi_benchmark/benchmark/MLB/MLB_10.csv
Calculation_40532458117263387 is not numeric or string
Calculation_40532458113208334 is not numeric or string
/DataLake/public_bi_benchmark/benchmark/MLB/samples/MLB_11.header.csv
/DataLake/public_bi_benchmark/benchmark/MLB/MLB_11.csv
Calculation_40532458117263387 is not numeric or string
Calculation_40532458113208334 is not numeric or string
/DataLake/public_bi_benchmark/benchmark/MLB/samples/MLB_12.header.csv
/DataLake/public_bi_benchmark/benchmark/MLB/MLB_12.csv
Calculation_40532458112880653 is not numeric or string
Calculation_40532458117070874 is not numeric or string
/DataLake/public_bi_benchmark/benchmark/MLB/samples/MLB_13.hea

In [9]:
## Calc EMD and COS just for selected combinations

result_dist_calc = []

for curr_set in variations(list_of_all_MLB_tables, 2):
    outer = curr_set.pop()
    inner = curr_set.pop()
    print(outer, inner)
    #print(index)
    # find matching attributes to compare
    join_attributes = list(
        set(string_attributes[inner]) & set(string_attributes[outer]))
    join_condition = "ON (" + " AND ".join(map(lambda join_att : "o.`{join_att}` = i.`{join_att}`".format(join_att=join_att) ,\
                                           join_attributes))
    intersecting_attr = list(
        set(numeric_attributes[inner]) & set(numeric_attributes[outer]))
    #print(intersecting_attr)
    #create projection list
    projection_list = " , ".join(
        map(lambda attr: "o.`{attr}` as `{attr}`".format(attr=attr),
            join_attributes)
    ) + " , " + " , ".join(
        map(
            lambda attr: "o.`{attr}` as `o.{attr}` , i.`{attr}` as `i.{attr}`".
            format(attr=attr), intersecting_attr))
    sqlDF = spark.sql("SELECT "+projection_list+" FROM " +outer +" o JOIN "+ \
                            inner+ " i " + join_condition+")")
    # filter out null tupels with null values
    sqlDF = sqlDF.dropna(subset=list(
        map(lambda cur_col: "`{cur_col}`".format(cur_col=cur_col),
            sqlDF.columns)))
    # calculates null values in the table
    #sqlDF.select([count(when(isnan(f"`{c}`") | col(f"`{c}`").isNull(), c)).alias(f"`{c}`") for c in sqlDF.columns]).show()

    # calcultes basic statisitc for the attributes
    #print_df_to_html(sqlDF.describe())

    attr_variations = pair_permutations_ordered(intersecting_attr)
    #print(attr_variations)

    # selsect specific attr_variation with a specific attribute included
    sel_attr = ['H','BB','X1B','X2B']
    sel_attr_variations = list(
         filter(lambda x: x[0] in sel_attr, attr_variations))
    #print(sel_attr_variations)

    #sel_attr_variations = [['H', "H"]]

    for index_attr, curr_item in enumerate(sel_attr_variations):
        print(str(index_attr) + "/" + str(len(sel_attr_variations)))
        first_attr = curr_item[0]
        second_attr = curr_item[1]
        #print(first_attr)
        #print(second_attr)
        if index_attr == 0:
            curDF = sqlDF.groupby(join_attributes).agg(
                emd_UDF(collect_list("`o.{first_attr}`".format(first_attr=first_attr)),
                        collect_list("`i.{second_attr}`".format(second_attr=second_attr))).alias("EMD"),
                count("`i.H`").alias("count")).select(col("EMD"), col("count"))
            curDF = curDF.withColumn("OUTER", lit(outer)).withColumn(
                "OUTER_ATTR",
                lit(first_attr)).withColumn("INNER", lit(inner)).withColumn(
                    "INNER_ATTR", lit(second_attr))
        else:
            newDF = sqlDF.groupby(join_attributes).agg(
                emd_UDF(collect_list("`o.{first_attr}`".format(first_attr=first_attr)),
                        collect_list("`i.{second_attr}`".format(second_attr=second_attr))).alias("EMD"),
                count("`i.H`").alias("count")).select(col("EMD"), col("count"))
            newDF = newDF.withColumn("OUTER", lit(outer)).withColumn(
                "OUTER_ATTR",
                lit(first_attr)).withColumn("INNER", lit(inner)).withColumn(
                    "INNER_ATTR", lit(second_attr))
            curDF = curDF.union(newDF)
    curDF.write.format("csv").mode("overwrite").option("header", "true").save(
        "/HdiNotebooks/semantic_data_lake/results/emd_results_dist_sep_inst_{outer}_{inner}".format(
            outer=outer, inner=inner))

An error was encountered:
Invalid status code '400' from http://headnodehost:8998/sessions/10/statements/7 with error payload: "requirement failed: Session isn't active."


In [7]:
## Calc EMD and COS just for selected combinations

result_dist_calc = []

#for curr_set in variations(list_of_all_MLB_tables, 2):
#for index, curr_set in enumerate(list(combinations(list_of_all_MLB_tables,2))):
for index, curr_set in enumerate([["MLB_1", "MLB_65"],["MLB_1", "MLB_7"],["MLB_1", "MLB_8"],["MLB_1", "MLB_9"]]):
#     if index <= 46:
#         continue
#     if outer != "MLB_1":
#         break
    outer = curr_set[0]
    inner = curr_set[1]
    #print(outer)
    #print(inner)
    # find matching attributes to compare
    join_attributes = list(
        set(string_attributes[inner]) & set(string_attributes[outer]))
    join_condition = "ON (" + " AND ".join(map(lambda join_att : "o.`{join_att}` = i.`{join_att}`".format(join_att=join_att) ,\
                                           join_attributes))
    intersecting_attr = list(
        set(numeric_attributes[inner]) & set(numeric_attributes[outer]))
    #print(intersecting_attr)
    #create projection list
    projection_list = " , ".join(
        map(lambda attr: "o.`{attr}` as `{attr}`".format(attr=attr),
            join_attributes)
    ) + " , " + " , ".join(
        map(
            lambda attr: "o.`{attr}` as `o.{attr}` , i.`{attr}` as `i.{attr}`".
            format(attr=attr), intersecting_attr))
    sqlDF = spark.sql("SELECT "+projection_list+" FROM " +outer +" o JOIN "+ \
                            inner+ " i " + join_condition+")")
    # filter out null tupels with null values
    sqlDF = sqlDF.dropna(subset=list(
        map(lambda cur_col: "`{cur_col}`".format(cur_col=cur_col),
            sqlDF.columns)))
    # calculates null values in the table
    #sqlDF.select([count(when(isnan(f"`{c}`") | col(f"`{c}`").isNull(), c)).alias(f"`{c}`") for c in sqlDF.columns]).show()

    # calcultes basic statisitc for the attributes
    #print_df_to_html(sqlDF.describe())

    attr_variations = pair_permutations_ordered(intersecting_attr)
    #print(attr_variations)

    # selsect specific attr_variation with a specific attribute included
    sel_attr = ['H','BB','X1B','X2B']
    sel_attr_variations = list(
         filter(lambda x: x[0] in sel_attr, attr_variations))
    #print(sel_attr_variations)

    #sel_attr_variations = [['H', "H"]]

    for index_attr, curr_item in enumerate(sel_attr_variations):
        #print(str(index_attr)+"/"+str(len(sel_attr_variations)))
        first_attr = curr_item[0]
        second_attr = curr_item[1]
        # print(first_attr)
        # print(second_attr)
        curDF = sqlDF.groupby(join_attributes).agg(emd_UDF(collect_list("`o.{first_attr}`".format(first_attr=first_attr)),collect_list("`i.{second_attr}`".format(second_attr=second_attr))).alias("EMD"),count(join_attributes[0]).alias("count")).select(col("EMD"), col("count"))
        curDF = curDF.withColumn("OUTER", lit(outer)).withColumn("OUTER_ATTR",lit(first_attr)).withColumn("INNER", lit(inner)).withColumn("INNER_ATTR", lit(second_attr))
        curDF.write.format("csv").mode("overwrite").option("header", "true").save("/HdiNotebooks/semantic_data_lake/results/{outer}_{inner}/emd_results_dist_sep_inst_{outer}_{first_attr}_{inner}_{second_attr}_".format(outer=outer, first_attr=first_attr, inner=inner, second_attr=second_attr))