In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Row

import operator
import numpy as np

In [2]:
sqlContext.sql("set spark.sql.shuffle.partitions=1000")

DataFrame[key: string, value: string]

In [29]:
savm_hashed = sqlContext.sql("select * from ignite.savm_hashed").select(['party_id', 'vector'])
savm_parsed = sqlContext.sql('select * from ignite.savm_parsed').cache()
cr_hashed = sqlContext.sql('select * from ignite.cr_hashed').select(['party_id', 'vector'])
cr_parsed = sqlContext.sql('select * from ignite.cr_parsed').cache()
savm_hashed = savm_hashed.unionAll(cr_hashed).repartition(1000, 'party_id').drop_duplicates().cache()

In [6]:
num_buckets = 100
num_and_functions = 0
num_or_functions = 7

def apply_function(df, fields, function):
    column_objects = []
    for column in df.columns:
        if column in fields:
            column_objects.append(function(column))
        else:
            column_objects.append(column)
    return df.select(column_objects)

class MinHash:
    def __init__(self, a, b, c):
        self.a = a
        self.b = b
        self.c = c
    def get(self, x):
        return (self.a * x + self.b) % self.c
    
min_hash_functions = [MinHash(np.random.randint(100000), np.random.randint(100000), 10009) for i in range(num_and_functions)] + \
                    [MinHash(np.random.randint(100000), np.random.randint(100000), 1000003) for i in range(num_or_functions)]

def run_minhash(which_hash):
    def minhash(values):
        try:
            return min([min_hash_functions[which_hash].get(i) for i in values])
        except:
            return 0
    return lambda column : F.udf(minhash, IntegerType())(F.col(column)).alias('min_hash'+str(which_hash))

In [7]:
index_vectorized = savm_hashed.map(lambda x : (x.party_id, list([int(elem) for elem in x.vector.indices]))).toDF(['party_id', 'vector']).cache()

In [8]:
index_vectorized.count()
#match 0 and 987842481885

28702413

In [9]:
builder = index_vectorized

for i in range(len(min_hash_functions)):
    builder = builder.withColumn('min_hash'+str(i), run_minhash(i)('vector'))
    
builder

DataFrame[party_id: double, vector: array<bigint>, min_hash0: int, min_hash1: int, min_hash2: int, min_hash3: int, min_hash4: int, min_hash5: int, min_hash6: int]

In [10]:
min_hashed = builder

In [11]:

def bucketize(x):
    or_hashes = [x['min_hash'+str(i)] for i in range(num_or_functions)]
    #and_key = reduce(operator.add, [str(x['min_hash'+str(i)]) for i in range(num_or_functions, num_and_functions + num_or_functions)])
    data = []
    for i in range(len(or_hashes)):
        for j in range(i):
            for k in range(j):
                hashes = sorted([or_hashes[i], or_hashes[j], or_hashes[k]])
                data.append((int(str(hashes[0]) + str(hashes[1]) + str(hashes[2])), Row(party_id = x.party_id, vector = x.vector)))
    return data

bucketized = min_hashed.flatMap(lambda x : bucketize(x)).cache()

In [12]:
bucketized.count()

1004584455

In [13]:
bad_keys = bucketized.mapValues(lambda x : 1).reduceByKey(lambda x, y : x + y).filter(lambda x : x[1] > 1000).filter(lambda x : x[1] > 0).map(lambda x : x[0]).collect()
#bucketized_grouped = bucketized.join(good_keys).groupByKey().cache()

In [14]:
len(bad_keys)

13419

In [23]:
161716776486251 in bad_keys

False

In [15]:
broadcast_bad_keys = sc.broadcast(set(bad_keys))

In [16]:
good_keys = bucketized.filter(lambda x : x[0] not in broadcast_bad_keys.value).repartition(3000).cache()

In [17]:
good_keys.take(1)

[(1751963047138946,
  Row(party_id=173174908.0, vector=[52, 119, 3105, 3125, 3684, 9228, 15960, 26440, 48380, 48753, 53851, 79215, 96976, 113138, 190217, 209544, 233463, 234234]))]

In [9]:
#good_keys.saveAsPickleFile('good_keys.pkl')
good_keys = sc.pickleFile('good_keys.pkl').cache()

In [28]:
good_keys.take(500)

[(2885696623161706,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (534428856161706,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (53442885696623,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (1811128856161706,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (181112885696623,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (53441811128856,
  Row(party_id=275071387.0, vector=[60301, 98080, 107484, 126358, 158354, 176190, 179562, 192585, 197030, 199492, 212093, 249099, 259415])),
 (687028856161706,
  Row(party_id=275071

In [15]:
def jaccard(set1, set2):
    union_length = len(set1.union(set2))
    if union_length == 0:
        return 0
    return float(len(set1.intersection(set2))) / union_length

def jaccard_samples(result_iter):
    rows = []
    row_indices = []
    pairs = []
    for i, row in enumerate(result_iter):
        rows.append(row)
        row_indices.append(set(row.vector))
        for j in range(i):
            if jaccard(row_indices[i], row_indices[j]) > 0.7:
                pairs.append((rows[i].party_id, rows[j].party_id))
        if len(pairs) > 10000:
            break
                
    return pairs

In [20]:
one_key.groupByKey().flatMapValues(jaccard_samples).map(lambda x : x[1]).toDF(['party_id_1', 'party_id_2']).drop_duplicates().collect()

ValueError: RDD is empty

In [20]:
good_keys.groupByKey(3000).flatMapValues(jaccard_samples).map(lambda x : x[1]).toDF(['party_id_1', 'party_id_2']).drop_duplicates().repartition(1000).write.saveAsTable('ignite.lsh_pairs_2', mode = 'overwrite')

In [41]:
lsh_pairs = sqlContext.sql("select * from ignite.lsh_pairs_2")

In [43]:
lsh_pairs.take(50)

[Row(party_id_1=251529632.0, party_id_2=257095977.0),
 Row(party_id_1=218146106.0, party_id_2=229387233.0),
 Row(party_id_1=278486596.0, party_id_2=233067967.0),
 Row(party_id_1=11615441.0, party_id_2=34924777.0),
 Row(party_id_1=273873718.0, party_id_2=232335639.0),
 Row(party_id_1=230318305.0, party_id_2=123107586.0),
 Row(party_id_1=3418195.0, party_id_2=3107225.0),
 Row(party_id_1=195820414.0, party_id_2=116867805.0),
 Row(party_id_1=259897668.0, party_id_2=263253968.0),
 Row(party_id_1=281051095.0, party_id_2=280422045.0),
 Row(party_id_1=258309565.0, party_id_2=258314582.0),
 Row(party_id_1=278935104.0, party_id_2=92004584.0),
 Row(party_id_1=225094102.0, party_id_2=226696465.0),
 Row(party_id_1=166579.0, party_id_2=157113477.0),
 Row(party_id_1=4459790.0, party_id_2=44744261.0),
 Row(party_id_1=195216697.0, party_id_2=178484146.0),
 Row(party_id_1=249037869.0, party_id_2=2637506.0),
 Row(party_id_1=184294679.0, party_id_2=179495926.0),
 Row(party_id_1=212664579.0, party_id_2=272

In [61]:
lsh_pairs.select(['party_id_1']).distinct().unionAll(lsh_pairs.select(['party_id_2'])).count()

14705445

In [17]:
lsh_pairs.where(F.col('party_id_2') == 3040651).count()

0

In [18]:
lsh_pairs.where(F.col('party_id_1') == 3040651).count()

0

In [100]:
all_hashed.where(F.col('row_id') == 4148938451154).take(1)

[Row(party_id=84810425.0, vector=SparseVector(262144, {73617: 1.0, 122434: 2.0, 158230: 1.0, 168428: 2.0, 232192: 1.0}), source=u'cr', row_id=4148938451154)]

In [101]:
cr.where(F.col('party_id') == 300611511.0).take(1)

[Row(party_id=300611511.0, parent_party_id=2120611.0, party_name=u'SWORDS LABORATORIES', node_type=u'BR', address1=u'WATERY LANE', address2=None, address3=None, address4=None, city=u'DUBLIN', county=None, state=u'Dublin', province=None, postal_code=u'Code', postal_code_extn=None, country_code=u'IE', street_name=u'WATERY LANE', street_number=None, street_direction=None, street_type=None, geo_valid_status=u'GEO_UNCERTIFIED_COUNTRY', completenes_status=u'COMPLETE', cleansed_status=u'CLEANSED', start_date=u'2016-11-16 03:01:50.0', end_date=None, program_id=300.0, request_id=1231748346439.0, created_by=1130.0, last_updated_by=1130.0, creation_date=u'2016-11-16 03:01:50.0', last_update_date=u'2016-11-16 03:01:50.0', certified_date=u'2016-11-16 03:01:50.0')]

In [102]:
cr.where(F.col('party_id') == 84810425.0).take(1)

[Row(party_id=84810425.0, parent_party_id=2120611.0, party_name=u'SWORDS LABORATORIES', node_type=u'BR', address1=u'WATERY LANE', address2=None, address3=None, address4=None, city=u'DUBLIN', county=None, state=None, province=None, postal_code=None, postal_code_extn=None, country_code=u'IE', street_name=u'WATERY LANE', street_number=None, street_direction=None, street_type=None, geo_valid_status=u'GEO_UNCERTIFIED_COUNTRY', completenes_status=u'COMPLETE', cleansed_status=u'CLEANSED', start_date=u'2007-08-08 06:04:06.0', end_date=u'4712-12-31 00:00:01.0', program_id=None, request_id=None, created_by=305151.0, last_updated_by=332355.0, creation_date=u'2007-08-08 06:04:06.0', last_update_date=u'2015-05-17 16:38:56.0', certified_date=u'2015-05-17 16:38:56.0')]

In [97]:


row_id_pairs = bucketized_grouped.mapValues(jaccard_samples).toDF(['row_id_1', 'row_id_2']).drop_duplicates().cache()

In [55]:
row_id_pairs.count()

Py4JJavaError: An error occurred while calling o932.count.
: org.apache.spark.SparkException: Job 39 cancelled because Stage 135 was cancelled
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1370)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1358)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1357)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1357)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156)
	at org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1357)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
	at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
	at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1514)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
	at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1514)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
row_id_pairs.take(5)

In [None]:
row_id_pairs.write.saveAsTable('ignite.row_id_pairs')
all_hashed.write.saveAsTable('ignite.all_hashed')

In [55]:
inspect_one = bucketized.filter(lambda x : x[0] == '162227').map(lambda x : (x[1], )).toDF(['row_id']).cache()

In [56]:
inspect_table = inspect_one.join(all_hashed, 'row_id').cache()

In [60]:
cr_sample = inspect_table.where(F.col('source') == 'cr').collect()

In [61]:
savm_sample = inspect_table.where(F.col('source') == 'savm').collect()

In [70]:


for cr_row in cr_sample:
    cr_indices = set(cr_row.vector.indices)
    for savm_row in savm_sample:
        savm_indices = set(savm_row.vector.indices)
        if jaccard(cr_indices, savm_indices) > 0.5:
            print(cr_row.row_id, savm_row.row_id, jaccard(cr_indices, savm_indices))

(489626307421, 489626337250, 1.0)
(2250562898248, 2250562902428, 1.0)
(523986040083, 523986085990, 1.0)
(3745211502283, 352187354434, 0.7)
(3745211502283, 3745211550220, 1.0)
(2070174262252, 2070174252645, 1.0)
(3951369939181, 3659312207187, 0.5454545454545454)
(3659312190131, 3659312207187, 1.0)
(790273993217, 3616362548569, 0.7142857142857143)
(790273993217, 790274006947, 1.0)
(2989297244582, 2989297290590, 1.0)
(3616362549200, 3616362548569, 1.0)
(3616362549200, 790274006947, 0.7142857142857143)
(352187369566, 352187354434, 1.0)
(352187369566, 3745211550220, 0.7)
(2370821999739, 2370821966397, 1.0)
(4105988799223, 4105988783190, 1.0)


In [67]:
all_hashed.where(F.col('row_id') == 489626307421).take(1)

[Row(party_id=4009137.0, vector=SparseVector(262144, {805: 1.0, 3583: 1.0, 3774: 1.0, 45463: 1.0, 69463: 1.0, 74281: 1.0, 84523: 1.0, 97739: 1.0, 138504: 1.0, 199456: 1.0, 237576: 1.0}), source=u'cr', row_id=489626307421)]

In [68]:
all_hashed.where(F.col('row_id') == 489626337250).take(1)

[Row(party_id=4009137.0, vector=SparseVector(262144, {805: 1.0, 3583: 1.0, 3774: 1.0, 45463: 1.0, 69463: 1.0, 74281: 1.0, 84523: 1.0, 97739: 1.0, 138504: 1.0, 199456: 1.0, 237576: 1.0}), source=u'savm', row_id=489626337250)]

In [188]:
min_hashed.take(5)

[Row(party_id=8921578.0, vector=[0, 63103, 71153, 91135, 104414, 114801], min_hash0=4, min_hash1=0, min_hash2=1, min_hash3=1, min_hash4=0, min_hash5=1, min_hash6=1, min_hash7=0, min_hash8=3, min_hash9=2),
 Row(party_id=165367714.0, vector=[3166, 3681, 8823, 19360, 50674, 53430, 104414, 113632, 114212, 180436, 236794], min_hash0=1, min_hash1=2, min_hash2=0, min_hash3=3, min_hash4=0, min_hash5=0, min_hash6=0, min_hash7=1, min_hash8=0, min_hash9=3),
 Row(party_id=33470380.0, vector=[115, 3363, 71091, 96727, 96976, 175944, 176060, 198762, 232293, 235489, 249386], min_hash0=1, min_hash1=0, min_hash2=0, min_hash3=1, min_hash4=1, min_hash5=1, min_hash6=1, min_hash7=0, min_hash8=0, min_hash9=3),
 Row(party_id=126391606.0, vector=[3329, 3634, 3742, 40244, 56568, 109638, 110052, 134618, 142925, 147271, 170347, 226495, 228278, 243044], min_hash0=2, min_hash1=0, min_hash2=0, min_hash3=1, min_hash4=4, min_hash5=0, min_hash6=0, min_hash7=0, min_hash8=0, min_hash9=0),
 Row(party_id=213206967.0, vecto