In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
raw = spark.read.csv("Online_Retail.csv", inferSchema = True, header = True, sep=";")
raw.createOrReplaceTempView("raw_data")

# get all item code and name
itemCode = spark.sql("SELECT DISTINCT StockCode, Description, UnitPrice from raw_data")
itemCode = itemCode.orderBy('StockCode', ascending=False)

#preprocess the data
df = raw.groupBy("InvoiceNo").agg(F.collect_set("StockCode").alias("Items"))
df.show(2)

+---------+--------------------+
|InvoiceNo|               Items|
+---------+--------------------+
|   536596|[22900, 22114, 84...|
|   536938|[22112, 21931, 84...|
+---------+--------------------+
only showing top 2 rows



In [3]:
#do the fpgrowth algorithm
from pyspark.ml.fpm import FPGrowth

#create the model
fpGrowth = FPGrowth(itemsCol="Items", minSupport=0.01, minConfidence=0.6)

#fit the model with data
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
predictionModel1 = model.transform(df)
print("END OF MODEL 1")

+----------------+----+
|           items|freq|
+----------------+----+
|         [22633]| 487|
|         [23236]| 344|
|        [85123A]|2246|
|         [22423]|2172|
| [22423, 85123A]| 355|
|         [22667]| 486|
|         [22579]| 343|
|  [22579, 22578]| 282|
|        [85099B]|2135|
| [85099B, 22423]| 288|
|[85099B, 85123A]| 404|
|         [22620]| 486|
|        [84536A]| 342|
|         [71053]| 342|
|         [47566]|1706|
| [47566, 85099B]| 332|
|  [47566, 22423]| 398|
| [47566, 85123A]| 391|
|         [85150]| 483|
|         [20725]|1608|
+----------------+----+
only showing top 20 rows

+--------------------+----------+------------------+------------------+
|          antecedent|consequent|        confidence|              lift|
+--------------------+----------+------------------+------------------+
|      [20726, 22382]|   [20725]|0.6356107660455487|10.237760472997332|
|             [22699]|   [22697]|               0.7|  17.1523178807947|
|      [20723, 22355]|   [20724]|0.803

In [4]:
fpGrowth = FPGrowth(itemsCol="Items", minSupport=0.05, minConfidence=0.6)
#fit the model with data
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
predictionModel2 = model.transform(df)
print("END OF MODEL 2")

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
+--------+----+

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

END OF MODEL 2


In [5]:
fpGrowth = FPGrowth(itemsCol="Items", minSupport=0.03, minConfidence=0.6)
#fit the model with data
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
predictionModel3 = model.transform(df)
print("END OF MODEL 3")

+---------------+----+
|          items|freq|
+---------------+----+
|       [85123A]|2246|
|        [22423]|2172|
|       [85099B]|2135|
|        [47566]|1706|
|        [20725]|1608|
|        [84879]|1468|
|        [22720]|1462|
|        [22197]|1442|
|        [21212]|1334|
|        [22383]|1306|
|        [20727]|1295|
|        [22457]|1266|
|         [POST]|1254|
|        [23203]|1249|
|        [22386]|1231|
|[22386, 85099B]| 833|
|        [22960]|1220|
|        [22469]|1214|
|        [21931]|1201|
|        [22411]|1187|
+---------------+----+
only showing top 20 rows

+----------+----------+------------------+----------------+
|antecedent|consequent|        confidence|            lift|
+----------+----------+------------------+----------------+
|   [22699]|   [22697]|               0.7|17.1523178807947|
|   [22386]|  [85099B]|0.6766856214459789|8.20897311262335|
|   [22697]|   [22699]|0.7417218543046358|17.1523178807947|
+----------+----------+------------------+----------------+

E

In [6]:
print("TRANSFORMED DATAFRAME")
print("MODEL 1")
predictionModel1.show()
print("\nMODEL 2")
predictionModel2.show()
print("\nMODEL 3")
predictionModel3.show()

TRANSFORMED DATAFRAME
MODEL 1
+---------+--------------------+--------------------+
|InvoiceNo|               Items|          prediction|
+---------+--------------------+--------------------+
|   536596|[22900, 22114, 84...|                  []|
|   536938|[22112, 21931, 84...|[85099B, 22355, 2...|
|   537252|             [22197]|                  []|
|   537691|[22505, 46000R, 2...|                  []|
|   538041|             [22145]|                  []|
|   538184|[22561, 22147, 21...|                  []|
|   538517|[22749, 21212, 22...|                  []|
|   538879|[21212, 22759, 22...|                  []|
|   539275|[22083, 22150, 22...|                  []|
|   539630|[22111, 22971, 22...|                  []|
|   540499|[22697, 22796, 21...|      [22698, 20724]|
|   540540|[22111, 22834, 22...|                  []|
|   540976|[22413, 21212, 22...|[22355, 22356, 20...|
|  C540850|             [21231]|             [21232]|
|   541432|[22113, 22457, 21...|                  []

In [None]:
fpGrowth = FPGrowth(itemsCol="Items", minSupport=0.001, minConfidence=0.6)
#fit the model with data
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
model.transform(df)
