In [1]:
import findspark
findspark.init("/usr/local/spark")
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

In [2]:
df2 = spark.read.csv("OnlineRetail.csv", header=True, inferSchema=True)

In [3]:
df2.createOrReplaceTempView("retail")


In [4]:
df2.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [5]:
result=spark.sql("SELECT * FROM retail")

In [6]:
result.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [7]:
from pyspark.sql import functions as F

df=df2.groupBy("InvoiceNo").agg(F.collect_set(F.col("StockCode")).alias("stockCode"))
df.show()

+---------+--------------------+
|InvoiceNo|           stockCode|
+---------+--------------------+
|   536596|[22900, 22114, 84...|
|   536938|[22112, 21931, 84...|
|   537252|             [22197]|
|   537691|[22505, 46000R, 2...|
|   538041|             [22145]|
|   538184|[22561, 22147, 21...|
|   538517|[22749, 21212, 22...|
|   538879|[21212, 22759, 22...|
|   539275|[22083, 22150, 22...|
|   539630|[22111, 22971, 22...|
|   540499|[22697, 22796, 21...|
|   540540|[22111, 22834, 22...|
|   540976|[22413, 21212, 22...|
|   541432|[22113, 22457, 21...|
|   541518|[21212, 22432, 22...|
|   541783|[22561, 22697, 22...|
|   542026|[22398, 22194, 22...|
|   542375|[22629, 21731, 22...|
|  C540850|             [21231]|
|   543641|[22645, 75131, 22...|
+---------+--------------------+
only showing top 20 rows



In [8]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- stockCode: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [11]:
from pyspark.ml.fpm import FPGrowth

# unique = df.map(lambda x: list(set(x))).cache()
# FPGrowth.train(unique, minSupport=0.2, numPartitions=10)

fpGrowth = FPGrowth(itemsCol="stockCode", minSupport=0.05, minConfidence=0.06)
model = fpGrowth.fit(df)
model.freqItemsets.show()

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
+--------+----+

