In [39]:

import sparker
from pyspark import Row
from pyspark.sql import SparkSession, DataFrame

spark = SparkSession.builder.getOrCreate()


In [40]:

customers = spark.read.json("../datasets/clean/customers/accounts.jsonl")
customer_profiles = sparker.DataFrameWrapper.load_profiles(
    customers, real_id_field="id", source_id=1, ignored_columns=["lastmodifiedid", "cleanstatus", "engagement_level", "industry", "payment_recieved", "ownerid", "site", "total_products_used"])

separator_id = customer_profiles.map(lambda profile: profile.profile_id).max()
separator_ids = [separator_id]

print(customer_profiles.take(1)[0])
separator_ids


{'profile_id': 0, 'attributes': [{'key': 'company_name', 'value': 'Meezzy'}, {'key': 'createddate', 'value': '01-14-2018'}, {'key': 'deleteddate', 'value': ''}, {'key': 'isdeleted', 'value': 'FALSE'}], 'original_id': '0959a851-bd0d-4fb6-8a89-052ac4f28279', 'source_id': 1}


[99]

In [41]:
interactions = spark.read.json("../datasets/clean/interactions/calls.jsonl")
interactions = sparker.DataFrameWrapper.load_profiles(
    interactions, start_id_from=separator_id+1, real_id_field="id", source_id=2)
# Max profile id
max_profile_id = interactions.map(lambda profile: profile.profile_id).max()
max_profile_id


1477

In [42]:
profiles = customer_profiles.union(interactions)
profiles.toDF().write.json("profiles", "overwrite")


In [43]:
clusters = sparker.AttributeClustering.cluster_similar_attributes(profiles,
                                                                  num_hashes=128,
                                                                  target_threshold=0.5,
                                                                  compute_entropy=True)
                                                                  #keys_to_exclude=["contact_first_name", "photourl", "contact_last_name", "account_executive", "onboarding_complete"])
clusters


[{'cluster_id': 0, 'keys': ['2_is_public', '1_isdeleted'], 'entropy': 0.8800807124389624},
 {'cluster_id': 1, 'keys': ['2_updated_at', '1_deleteddate'], 'entropy': 3.688587246318698},
 {'cluster_id': 2, 'keys': ['2_created_at', '1_createddate'], 'entropy': 4.656398012907379},
 {'cluster_id': 3, 'keys': ['2_content', '1_company_name', '2_priority', '2_type', '2_status', '2_customer_id'], 'entropy': 14.381599915540582}]

In [44]:
blocks = sparker.Blocking.create_blocks_clusters(profiles, clusters, separator_ids)
print("Number of blocks",blocks.count())

COUNT 1478
Number of blocks 45


In [45]:
blocks_purged = sparker.BlockPurging.block_purging(blocks, 1.005)
sc = spark.sparkContext
# Performs the cleaning
(profile_blocks, profile_blocks_filtered,
 blocks_after_filtering) = sparker.BlockFiltering.block_filtering_quick(blocks_purged, 0.8, separator_ids)

block_index_map = blocks_after_filtering.map(
    lambda b: (b.block_id, b.profiles)).collectAsMap()
block_index = sc.broadcast(block_index_map)

block_entropies = sc.broadcast(blocks.map(
    lambda b: (b.block_id, b.entropy)).collectAsMap())

# This is only needed for certain weight measures
profile_blocks_size_index = sc.broadcast(profile_blocks_filtered.map(
    lambda pb: (pb.profile_id, len(pb.blocks))).collectAsMap())



In [46]:
results = sparker.WNP.wnp(
                          profile_blocks_filtered,
                          block_index,
                          max_profile_id,
                          separator_ids,
                          weight_type=sparker.WeightTypes.CHI_SQUARE,
                          profile_blocks_size_index=profile_blocks_size_index,
                          use_entropy=True,
                          blocks_entropies=block_entropies,
                          chi2divider=2.0
                         )
num_edges = results.map(lambda x: x[0]).sum()
num_matches = results.map(lambda x: x[1]).sum()
print("Precision", num_matches/num_edges)
print("Number of comparisons",num_edges)

Precision 0.0
Number of comparisons 2963


In [48]:
import networkx as nx

edges = results.flatMap(lambda x: x[2])

plot = nx.Graph()
for row in edges.take(20000):
    plot.add_edge(row[0], row[1], weight=row[2].item())

nx.write_graphml(plot, "edge.graphml")

edges_df: DataFrame = edges.map(
    lambda x: Row(dst=x[0], src=x[1], weight=x[2].item())).toDF().cache()

edges_df.write.json("edges", "overwrite")
