In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [4]:
from pyspark.sql.functions import collect_list, explode, array_sort

In [5]:
spark = SparkSession.builder.appName("AmazonFoodPairs").getOrCreate()

In [6]:
df = spark.read.option("header", True).option("sep", ",").csv("Reviews.csv")

In [15]:
df.show()

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [7]:
user_products = df.groupBy("UserId").agg(collect_list("ProductId").alias("products"))

In [17]:
user_products.show()

+------------------+--------------------+
|            UserId|            products|
+------------------+--------------------+
|#oc-R12KPBODL2B5ZD|[B007OSBEV0, B007...|
|#oc-R13NNUL4EKL4FL|[B005HG9ESG, B005...|
|#oc-R149FDXLRARCWJ|        [B008I1XPKA]|
|#oc-R152UR09M996EM|        [B006Q820X0]|
|#oc-R162D7S0A880MV|[B005HG9ESG, B005...|
|#oc-R1HFLTAYDCLBBP|[B005HG9ESG, B005...|
|#oc-R1I7C4F4WUF2PH|        [B006Q820X0]|
|#oc-R1KU5HKP3S3TXV|        [B006Q820X0]|
|#oc-R1PG9RG4BE0TUO|        [B006Q820X0]|
|#oc-R1S6OJV4N0J07C|        [B006Q820X0]|
|#oc-R1SUH4ULVHVEZS|        [B006Q820X0]|
|#oc-R1VHP4GD3MVIGG|        [B006Q820X0]|
|#oc-R1VRD09DW4H2HI|[B005HG9ESG, B005...|
|#oc-R22FWY5KIS6MKC|        [B008I1XPKA]|
|#oc-R2BZW5PLDUZHM1|        [B006Q820X0]|
| #oc-R2EGPCEJ9FAS3|        [B006Q820X0]|
|#oc-R2GDRKIV15IZHW|        [B006Q820X0]|
|#oc-R2GESI3K4U8TGQ|        [B008I1XPKA]|
|#oc-R2GTLUGFBPA5AP|        [B008I1XPKA]|
|#oc-R2H7Z7E2A1H0F3|        [B005ZBZLPI]|
+------------------+--------------

In [8]:
from pyspark.sql.functions import udf
from itertools import combinations
from pyspark.sql.types import ArrayType, StringType

In [9]:
def product_pairs(products):
    return [list(sorted(p)) for p in combinations(products, 2)]

In [10]:
pair_udf = udf(product_pairs, ArrayType(ArrayType(StringType())))
pairs_df = user_products.select(explode(pair_udf("products")).alias("pair"))

In [11]:
result = pairs_df.groupBy("pair").count().filter("count > 1").orderBy("count", ascending=False)

In [12]:
result.show()

+--------------------+-----+
|                pair|count|
+--------------------+-----+
|[B002QWP89S, B002...|  682|
|[B0026RQTGE, B002...|  682|
|[B002QWHJOU, B002...|  682|
|[B002QWHJOU, B002...|  682|
|[B0026RQTGE, B002...|  682|
|[B0026RQTGE, B002...|  682|
|[B000UBD88A, B001...|  608|
|[B001RVFEP2, B001...|  604|
|[B000VK8AVK, B007...|  604|
|[B0026KPDG8, B007...|  604|
|[B000VK8AVK, B001...|  604|
|[B0026KPDG8, B006...|  604|
|[B0013NUGDE, B002...|  604|
|[B000VK8AVK, B006...|  604|
|[B0026KNQSA, B002...|  604|
|[B0013NUGDE, B007...|  604|
|[B0013NUGDE, B006...|  604|
|[B0013NUGDE, B001...|  604|
|[B0026KNQSA, B006...|  604|
|[B0026KPDG8, B007...|  604|
+--------------------+-----+
only showing top 20 rows



In [14]:
from pyspark.sql.functions import concat_ws

In [18]:
result_fixed = result.withColumn("pair_string", concat_ws(",", result["pair"]))

In [19]:
result_fixed.select("pair_string", "count") \
             .write.option("header", True) \
             .csv("output_frequent_pairs_df")

In [20]:
result.write.parquet("output_frequent_pairs_df_parquet")