In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.sql.types import ArrayType, IntegerType

In [2]:
spark = SparkSession \
    .builder \
    .appName("Keyword clustering") \
    .getOrCreate()

In [3]:
spark.version

'3.1.1'

In [4]:
sc = spark.sparkContext

In [5]:
df = spark.read.csv('./search_results.csv', header=True)

In [7]:
df = df.drop('Title', '_id')

In [8]:
df.show(3)

+--------------------+--------------------+
|             Keyword|                 Url|
+--------------------+--------------------+
|sump pump next to...|https://www.mlive...|
|sump pump next to...|https://www.famil...|
|sump pump next to...|https://diy.stack...|
+--------------------+--------------------+
only showing top 3 rows



In [9]:
df = df.groupBy('Keyword') \
  .agg(F.collect_list('url').alias('urlIndex'))

In [11]:
cv = CountVectorizer(inputCol='urlIndex', outputCol='urlVec')

In [12]:
transformed_df = cv.fit(df).transform(df)

In [13]:
transformed_df.count()

27665

In [14]:
df2 = transformed_df.drop('urlIndex')

In [15]:
df2.show()

+--------------------+--------------------+
|             Keyword|              urlVec|
+--------------------+--------------------+
|1/4 vs 1/3 sump pump|(34430,[7,17,42,6...|
|2 stage dry sump ...|(34430,[1913,2573...|
|3 phase sump pump...|(34430,[2079,3983...|
|6-cia-ml 1/3 hp s...|(34430,[1405,1678...|
|add battery to ex...|(34430,[16,74,111...|
|add battery to su...|(34430,[16,25,41,...|
|adjusting zoeller...|(34430,[151,159,4...|
|average sump pump...|(34430,[7,62,63,6...|
|battery backup su...|(34430,[16,25,41,...|
|best sump pump fo...|(34430,[7,17,120,...|
|best way to drain...|(34430,[2,14,23,5...|
|big john automati...|(34430,[2836,3618...|
|black friday sump...|(34430,[36,120,15...|
|bur cam 1/3 hp su...|(34430,[4407,7850...|
|campbell hausfeld...|(34430,[694,1343,...|
|can you connect s...|(34430,[88,105,17...|
|changing out sump...|(34430,[4,20,21,2...|
|clogged sump pump...|(34430,[0,119,344...|
|coleman effluent ...|(34430,[300,928,1...|
|consumer report b...|(34430,[18

In [17]:
get_indices = udf(lambda v: [i.item() for i in v.indices], ArrayType(IntegerType()))

In [18]:
df3 = df2.withColumn("urlIndices", get_indices('urlVec'))

In [19]:
df3 = df3.drop('urlVec')

In [20]:
df3.show()

+--------------------+--------------------+
|             Keyword|          urlIndices|
+--------------------+--------------------+
|1/4 vs 1/3 sump pump|[7, 17, 42, 64, 8...|
|2 stage dry sump ...|[1913, 2573, 3316...|
|3 phase sump pump...|[2079, 3983, 8216...|
|6-cia-ml 1/3 hp s...|[1405, 1678, 5892...|
|add battery to ex...|[16, 74, 111, 117...|
|add battery to su...|[16, 25, 41, 74, ...|
|adjusting zoeller...|[151, 159, 436, 5...|
|average sump pump...|[7, 62, 63, 64, 8...|
|battery backup su...|[16, 25, 41, 74, ...|
|best sump pump fo...|[7, 17, 120, 246,...|
|best way to drain...|[2, 14, 23, 50, 1...|
|big john automati...|[2836, 3618, 4872...|
|black friday sump...|[36, 120, 1566, 6...|
|bur cam 1/3 hp su...|[4407, 7850, 8039...|
|campbell hausfeld...|[694, 1343, 2428,...|
|can you connect s...|[88, 105, 172, 18...|
|changing out sump...|[4, 20, 21, 29, 5...|
|clogged sump pump...|[0, 119, 344, 575...|
|coleman effluent ...|[300, 928, 1632, ...|
|consumer report b...|[18, 201, 

In [81]:
def find_url_intersections(df1, df2):
    self_join = df1.alias('Key1').join(df2.alias('Key2')).where('Key1.Keyword < Key2.Keyword')\
        .withColumn("urlIntersect", F.array_intersect('Key1.urlIndices','Key2.urlIndices'))\
        .where(F.size(F.col('urlIntersect'))>0)\
        .drop('urlIndices')
    return self_join.toDF("Keyword1", "Keyword2", "urlIntersect")

In [88]:
self_join = find_url_intersections(df3, df3)

In [89]:
self_join.rdd.getNumPartitions()

40000

In [90]:
self_join = self_join.repartition(8*3)
#self_join = self_join.coalesce(8*3)

In [91]:
%%time
self_join.count()

CPU times: user 2.63 s, sys: 1.29 s, total: 3.92 s
Wall time: 26min 14s


15139127

In [77]:
self_join.show()

+--------------------+--------------------+--------------------+
|            Keyword1|            Keyword2|        urlIntersect|
+--------------------+--------------------+--------------------+
|1/4 vs 1/3 sump pump|average sump pump...|         [7, 64, 84]|
|1/4 vs 1/3 sump pump|best sump pump fo...|             [7, 17]|
|1/4 vs 1/3 sump pump|how big should a ...|                [84]|
|1/4 vs 1/3 sump pump|how do i know whi...|             [7, 42]|
|1/4 vs 1/3 sump pump|how to determine ...|         [7, 64, 84]|
|1/4 vs 1/3 sump pump|  low watt sump pump|             [7, 17]|
|1/4 vs 1/3 sump pump| pipe size sump pump|                [84]|
|1/4 vs 1/3 sump pump|zoeller vs hydrom...|            [17, 42]|
|add battery to ex...|add battery to su...|[16, 74, 111, 117...|
|add battery to ex...|battery backup su...|            [16, 74]|
|add battery to ex...|how to hook up a ...| [16, 111, 117, 403]|
|add battery to ex...|sump pump battery...|            [16, 74]|
|add battery to ex...|sum

In [36]:
tmp = self_join.limit(8)

In [37]:
tmp.show()

+--------------------+--------------------+------------+
|            Keyword1|            Keyword2|urlIntersect|
+--------------------+--------------------+------------+
|1/4 vs 1/3 sump pump|average sump pump...| [7, 64, 84]|
|1/4 vs 1/3 sump pump|best sump pump fo...|     [7, 17]|
|1/4 vs 1/3 sump pump|how big should a ...|        [84]|
|1/4 vs 1/3 sump pump|how do i know whi...|     [7, 42]|
|1/4 vs 1/3 sump pump|how to determine ...| [7, 64, 84]|
|1/4 vs 1/3 sump pump|  low watt sump pump|     [7, 17]|
|1/4 vs 1/3 sump pump| pipe size sump pump|        [84]|
|1/4 vs 1/3 sump pump|zoeller vs hydrom...|    [17, 42]|
+--------------------+--------------------+------------+



In [78]:
self_join = tmp.alias('Key1').join(tmp.drop('Keyword1').alias('Key2')).where('Key1.Keyword2 < Key2.Keyword2')\
    .withColumn("urlIntersect3", F.array_intersect('Key1.urlIntersect','Key2.urlIntersect'))\
    .where(F.size(F.col('urlIntersect3'))>0)\
    .drop('urlIntersect')

In [80]:
self_join.show()

+--------------------+--------------------+--------------------+-------------+
|            Keyword1|            Keyword2|            Keyword2|urlIntersect3|
+--------------------+--------------------+--------------------+-------------+
|1/4 vs 1/3 sump pump|average sump pump...|best sump pump fo...|          [7]|
|1/4 vs 1/3 sump pump|average sump pump...|how big should a ...|         [84]|
|1/4 vs 1/3 sump pump|average sump pump...|how do i know whi...|          [7]|
|1/4 vs 1/3 sump pump|average sump pump...|how to determine ...|  [7, 64, 84]|
|1/4 vs 1/3 sump pump|average sump pump...|  low watt sump pump|          [7]|
|1/4 vs 1/3 sump pump|average sump pump...| pipe size sump pump|         [84]|
|1/4 vs 1/3 sump pump|best sump pump fo...|how do i know whi...|          [7]|
|1/4 vs 1/3 sump pump|best sump pump fo...|how to determine ...|          [7]|
|1/4 vs 1/3 sump pump|best sump pump fo...|  low watt sump pump|      [7, 17]|
|1/4 vs 1/3 sump pump|best sump pump fo...|zoeller v

In [79]:
self_join.count()

18

In [None]:
return self_join.toDF("Keyword1", "Keyword2", "urlIntersect")

In [38]:
find_url_intersections(tmp,tmp)

AnalysisException: cannot resolve '`Key1.Keyword`' given input columns: [Key1.Keyword1, Key2.Keyword1, Key1.Keyword2, Key2.Keyword2, Key1.urlIntersect, Key2.urlIntersect]; line 1 pos 0;
'Filter NOT ('Key1.Keyword = 'Key2.Keyword)
+- Join Inner
   :- SubqueryAlias Key1
   :  +- GlobalLimit 8
   :     +- LocalLimit 8
   :        +- Repartition 24, false
   :           +- Project [Keyword#16 AS Keyword1#1111, Keyword#1091 AS Keyword2#1112, urlIntersect#1102 AS urlIntersect#1113]
   :              +- Project [Keyword#16, Keyword#1091, urlIntersect#1102]
   :                 +- Filter (size(urlIntersect#1102, true) > 0)
   :                    +- Project [Keyword#16, urlIndices#958, Keyword#1091, urlIndices#1090, array_intersect(urlIndices#958, urlIndices#1090) AS urlIntersect#1102]
   :                       +- Filter NOT (Keyword#16 = Keyword#1091)
   :                          +- Join Inner
   :                             :- SubqueryAlias Key1
   :                             :  +- Project [Keyword#16, urlIndices#958]
   :                             :     +- Project [Keyword#16, urlVec#890, <lambda>(urlVec#890) AS urlIndices#958]
   :                             :        +- Project [Keyword#16, urlVec#890]
   :                             :           +- Project [Keyword#16, urlIndex#44, UDF(urlIndex#44) AS urlVec#890]
   :                             :              +- Aggregate [Keyword#16], [Keyword#16, collect_list(url#18, 0, 0) AS urlIndex#44]
   :                             :                 +- Project [Keyword#16, Url#18]
   :                             :                    +- Relation[Keyword#16,Title#17,Url#18,_id#19] csv
   :                             +- SubqueryAlias Key2
   :                                +- Project [Keyword#1091, urlIndices#1090]
   :                                   +- Project [Keyword#1091, urlVec#890, <lambda>(urlVec#890) AS urlIndices#1090]
   :                                      +- Project [Keyword#1091, urlVec#890]
   :                                         +- Project [Keyword#1091, urlIndex#44, UDF(urlIndex#44) AS urlVec#890]
   :                                            +- Aggregate [Keyword#1091], [Keyword#1091, collect_list(url#1093, 0, 0) AS urlIndex#44]
   :                                               +- Project [Keyword#1091, Url#1093]
   :                                                  +- Relation[Keyword#1091,Title#1092,Url#1093,_id#1094] csv
   +- SubqueryAlias Key2
      +- GlobalLimit 8
         +- LocalLimit 8
            +- Repartition 24, false
               +- Project [Keyword#16 AS Keyword1#1492, Keyword#1091 AS Keyword2#1493, urlIntersect#1102 AS urlIntersect#1494]
                  +- Project [Keyword#16, Keyword#1091, urlIntersect#1102]
                     +- Filter (size(urlIntersect#1102, true) > 0)
                        +- Project [Keyword#16, urlIndices#958, Keyword#1091, urlIndices#1090, array_intersect(urlIndices#958, urlIndices#1090) AS urlIntersect#1102]
                           +- Filter NOT (Keyword#16 = Keyword#1091)
                              +- Join Inner
                                 :- SubqueryAlias Key1
                                 :  +- Project [Keyword#16, urlIndices#958]
                                 :     +- Project [Keyword#16, urlVec#890, <lambda>(urlVec#890) AS urlIndices#958]
                                 :        +- Project [Keyword#16, urlVec#890]
                                 :           +- Project [Keyword#16, urlIndex#44, UDF(urlIndex#44) AS urlVec#890]
                                 :              +- Aggregate [Keyword#16], [Keyword#16, collect_list(url#18, 0, 0) AS urlIndex#44]
                                 :                 +- Project [Keyword#16, Url#18]
                                 :                    +- Relation[Keyword#16,Title#17,Url#18,_id#19] csv
                                 +- SubqueryAlias Key2
                                    +- Project [Keyword#1091, urlIndices#1090]
                                       +- Project [Keyword#1091, urlVec#890, <lambda>(urlVec#890) AS urlIndices#1090]
                                          +- Project [Keyword#1091, urlVec#890]
                                             +- Project [Keyword#1091, urlIndex#44, UDF(urlIndex#44) AS urlVec#890]
                                                +- Aggregate [Keyword#1091], [Keyword#1091, collect_list(url#1093, 0, 0) AS urlIndex#44]
                                                   +- Project [Keyword#1091, Url#1093]
                                                      +- Relation[Keyword#1091,Title#1092,Url#1093,_id#1094] csv


In [39]:
%%time
two_keyword_clusters = self_join.select("Keyword1").distinct().rdd.flatMap(lambda x: x).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 23.0 failed 1 times, most recent failure: Lost task 4.0 in stage 23.0 (TID 900) (192.168.0.142 executor driver): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:164)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:131)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:91)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationMap.dumpToExternalSorter(ObjectAggregationMap.scala:77)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:177)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:77)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:107)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:47)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:164)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:131)
	at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:91)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationMap.dumpToExternalSorter(ObjectAggregationMap.scala:77)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:177)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:77)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:107)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:47)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
two_keyword_clusters = self_join.select("Keyword1").distinct().rdd.flatMap(lambda x: x).collect()

# Find a way to group by Keyword1, and apply find_url_intersection to each group

In [None]:
self_join = self_join.alias('Key1').join(df3.alias('Key2')).where('Key1.Keyword1 != Key1.Keyword2 != Key2.Keyword')
self_join = self_join.withColumn("urlIntersect", F.array_intersect('Key1.urlIntersect','Key2.urlIndices'))\
.where(F.size(F.col('urlIntersect'))>0)\
.drop('urlIndices')
#self_join = self_join.toDF("Keyword1", "Keyword2", "urlIntersect")

In [None]:
self_join.columns

In [None]:
self_join = self_join.alias('IntersectKey').join(df3.alias('Key3')).where('IntersectKey.Keyword != Key3.Keyword')

In [None]:
self_join = self_join.alias('Key1Key2').join(df3.alias('Key3')).where('Key1Key2.Key1.Keyword != Key3.Keyword')

In [92]:
df4 = df3.select(df3.Keyword, F.explode(df3.urlIndices))

In [93]:
df4.show()

+--------------------+-----+
|             Keyword|  col|
+--------------------+-----+
|1/4 vs 1/3 sump pump|    7|
|1/4 vs 1/3 sump pump|   17|
|1/4 vs 1/3 sump pump|   42|
|1/4 vs 1/3 sump pump|   64|
|1/4 vs 1/3 sump pump|   84|
|1/4 vs 1/3 sump pump|  339|
|1/4 vs 1/3 sump pump|  704|
|1/4 vs 1/3 sump pump| 1578|
|1/4 vs 1/3 sump pump| 2130|
|1/4 vs 1/3 sump pump| 2143|
|2 stage dry sump ...| 1913|
|2 stage dry sump ...| 2573|
|2 stage dry sump ...| 3316|
|2 stage dry sump ...| 5447|
|2 stage dry sump ...|10975|
|2 stage dry sump ...|13771|
|2 stage dry sump ...|16509|
|2 stage dry sump ...|28180|
|2 stage dry sump ...|28958|
|3 phase sump pump...| 2079|
+--------------------+-----+
only showing top 20 rows



In [None]:
df5 = df4.groupBy("col").agg(F.collect_list("Keyword"))

In [None]:
df5.show(5, truncate=100)

In [None]:
df5.join(df5).join(df5).show(100)

In [None]:
recursive_self_join = df5
for i in range(1):
    alias1 = str(i) + '_1'
    alias2 = str(i) + '_2'
    print(alias1)
    print(alias2)
    recursive_self_join = recursive_self_join.alias(alias1).join(df5.alias(alias2)).where(alias1+'.col != '+alias2+'.col')

In [None]:
recursive_self_join.show(10)

In [None]:
alias1 = '0_1'
alias2 = '0_2'
new_alias1 = '1_1'
new_alias2 = '1_2'
recursive_self_join = recursive_self_join.alias(new_alias1).join(df5.alias(new_alias2)).where(alias1+'.col != '+alias2+'.col')

In [None]:
tmp = df5.alias(alias1).join(df5.alias(alias2)).where(alias1+'.col != '+alias2+'.col')

In [None]:
pairs = df2.alias("Keyword1").join(df2.alias("Keyword2")).where('Keyword1.Keyword != Keyword2.Keyword')

In [None]:
df6 = df4.join(df5, on='col')

In [None]:
df6.show(200, truncate = 100)

In [None]:
# number of urls two keywords have to share to be grouped together
X = 3
# minimum number of keywords per group
Y = 4

In [None]:
@udf(VectorUDT())
def intersect(v1, v2):
    # Sparse vector will become dense
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    # Compute union of indices
    indices = set(v1.indices).intersection(set(v2.indices))
    return Vectors.sparse(34430, dict(zip(indices,[1.0]*len(indices))))

In [None]:
pairs = df2.alias("Keyword1").join(df2.alias("Keyword2")).where('Keyword1.Keyword != Keyword2.Keyword')

In [None]:
pairs.show(3)

In [None]:
df3 = pairs.withColumn('urlIntersect', intersect('Keyword1.urlVec', 'Keyword2.urlVec'))

In [None]:
df3 = df3.drop('urlVec')

In [None]:
df3.show(300)