In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,udf
from pyspark.sql.functions import monotonically_increasing_id,max
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import explode,lower,monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.window import Window

import pseudopeople as pseudo



In [40]:
print("Loading Spark")
spark = SparkSession.builder.appName("Record Linkage").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()


Loading Spark


In [41]:
print("Generating Pseudopeople Data")
src_1 = '/home/nachiket/RLA_CL_EXTRACT/data/pse/pse_sample4.1.1'
src_2 = '/home/nachiket/RLA_CL_EXTRACT/data/pse/pse_sample4.1.2'
df_1 = spark.read.csv(src_1, header=True, sep='\t')
df_2 = spark.read.csv(src_2, header=True, sep='\t')

Generating Pseudopeople Data


In [None]:
'''
Sorting and combining the dataset

''' 

df_1.sort(col("last_name"),col("first_name"),col("middle_initial"),col("age"),col("street_name"))
df_2.sort(col("last_name"),col("first_name"),col("middle_initial"),col("age"),col("street_name"))

df_sorted = df_1.union(df_2)

df_sorted.count()


151841

In [54]:

'''
Deduplicated dataset generation
'''

df_deduplicated = df_sorted.drop_duplicates(subset= list(df_sorted.columns)[1:])

df_deduplicated.count()

#df_2_new.sort(col("simulant_id")).show(100)



                                                                                

93433

In [81]:

'''
Generate K-mers for the deduplicated dataset on last_name column
K =  3

'''

k = 3

def generate_k_mer(str_d):
    if len(str_d) <= k:
        return [str_d]

    return [str_d[i:i+k] for i in range(0, len(str_d)-(k-1))]


kmer_udf = udf(lambda seq: generate_k_mer(seq), ArrayType(StringType()))

df_2_new = df_deduplicated.withColumn("kmers", kmer_udf(col("last_name")))

df_with_kmers = df_2_new.withColumn("id", F.row_number().over(Window.orderBy(monotonically_increasing_id()))) # adds id column

df_with_kmers.show(10)



25/03/18 21:04:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:04:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:04:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:04:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:04:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:04:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+----------+--------------+---------+---+-------------+-------------+-------------------+-----------+------------+-----+-------+----------------------------+------+--------------+--------------------+---+
| simulant_id|first_name|middle_initial|last_name|age|date_of_birth|street_number|        street_name|unit_number|        city|state|zipcode|relation_to_reference_person|   sex|race_ethnicity|               kmers| id|
+------------+----------+--------------+---------+---+-------------+-------------+-------------------+-----------+------------+-----+-------+----------------------------+------+--------------+--------------------+---+
|  6760_48339|         A|             M|  Lindsey|  4|   12/31/2015|          604|           c.r. 655|        nan|     warwick|   RI|  02914|              Other relative|  Male|           nan|[Lin, ind, nds, d...|  1|
|  7016_43372|         A|             M| Jauregui| 17|   01/11/2003|         1509|          timber dr|        nan|    westerly| 

                                                                                

In [83]:
## Blocking Code



## 1. Add index to the dataframe
## 2. Determine the datatype in the value of the MapType declared below
## 3. Go through all the k-mers and do blocking




def do_blocking(df):
    # Explode the kmers column to create a (kmer, index) pair for each kmer
    exploded_df = df.select(explode(col("kmers")).alias("kmer"), col("id").alias("index"))
    
    # # Convert kmers to lowercase for consistency
    exploded_df = exploded_df.withColumn("kmer", lower(col("kmer")))

    # # Group by kmer and collect indices
    blocked_df = exploded_df.groupBy("kmer").agg(F.collect_list("index").alias("list_indices"))
    return blocked_df


blocked_df = do_blocking(df_with_kmers)
blocked_df.show(5)




25/03/18 21:05:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:05:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:05:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:05:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:05:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/18 21:05:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 237

+----+--------------------+
|kmer|        list_indices|
+----+--------------------+
|  ab|[1566, 5910, 9978...|
|  ac|      [49122, 73474]|
|  af|              [6272]|
|  ag|[2250, 2619, 7413...|
|  al|[1394, 5879, 8036...|
+----+--------------------+
only showing top 5 rows



                                                                                

In [None]:
''' 
 NOT IMPLEMENTED YET
'''
def all_pairs(df):
    df_cross = df.withColumnRenamed("list_indices", "id1").crossJoin(df.withColumnRenamed("list_indices", "id2"))
    

