In [None]:
%autoreload 2

In [None]:
import logging 
logging.basicConfig()
# log = logging.getLogger("blocking").setLevel("DEBUG")
# log = logging.getLogger("sql").setLevel("DEBUG")
# log = logging.getLogger("gammas").setLevel("DEBUG")
# log = logging.getLogger("expectation_step").setLevel("DEBUG")
# log = logging.getLogger("maximisation_step").setLevel("DEBUG")

In [None]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType
import pyspark.sql.functions as f

# WARNING:
# These config options are appropriate only if you're running Spark locally!!!
conf=SparkConf()
conf.set('spark.driver.memory', '8g')
conf.set("spark.sql.shuffle.partitions", "8") 

sc = SparkContext.getOrCreate(conf=conf)

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [None]:
import sys 
sys.path.append("sparklink")
from sql import *
from blocking import *
from gammas import *
from params import *
from expectation_step import *
from maximisation_step import *
from iterate import *


In [None]:
df = spark.read.parquet("data/fake_100.parquet")
df = df.dropDuplicates()
df.show()

In [None]:
blocking_rules = []

blocking_rules.append('l.first_name = r.first_name  and  l.dob = r.dob')
blocking_rules.append('l.first_name = r.first_name  and  l.city = r.city')
blocking_rules.append('l.first_name = r.first_name  and  l.email = r.email')

blocking_rules.append('l.surname = r.surname  and  l.dob = r.dob')
blocking_rules.append('l.surname = r.surname  and  l.city = r.city')
blocking_rules.append('l.surname = r.surname  and  l.email = r.email')

blocking_rules.append('l.dob = r.dob  and  l.city = r.city')
blocking_rules.append('l.dob = r.dob  and  l.email = r.email')

blocking_rules.append('l.city = r.city  and  l.email = r.email')

In [None]:
df_comparison = block_using_rules(df, blocking_rules, spark=spark)

In [None]:

from gammas import *

gamma_settings = {
    "first_name": {
        "levels": 3,
        "case_expression": gammas_case_statement_3_levels("first_name", 0)
    },
    "surname": {
        "levels": 3
    },
    "dob": {
        "levels": 2
    },
    "city": {
        "levels": 2
    },
    "email": {
        "levels": 2
    }
}


# gamma_settings = {
#     "first_name": {
#         "levels": 3,
#         "case_expression": gammas_case_statement_3_levels("first_name", 0)
#     },
#     "surname": {
#         "levels": 2
#     }
    
# }

df_gammas = add_gammas(df_comparison, gamma_settings, spark, include_orig_cols = False)

In [None]:
params = Params(gamma_settings)
iterate(df_gammas, spark, params)

In [None]:
data = params.iteration_history_df()

In [None]:
import altair as alt 
from altair import datum
chart_data = alt.Data(values=data)
chart = alt.Chart(chart_data).mark_bar().encode(
    x='iteration:O',
    y=alt.Y('sum(probability):Q', axis=alt.Axis(title='𝛾 value')),
    color='value:N',
    row=alt.Row('column:N', sort=alt.SortField("gamma"))
).resolve_scale(
    y='independent'
).properties(height=100)


c0 = chart.transform_filter(
    (datum.match == 0)
).properties(title= "Match")

c1 = chart.transform_filter(
    (datum.match == 1)
).properties(title= "Non match")

facetted_chart = c0 | c1

fc = facetted_chart.configure_title(
    anchor='middle'
).properties(
    title='Probability distribution of comparison vector values by iteration number'
)
fc

In [None]:
params.probability_distribution_chart()

In [None]:
data