### Setup

In [None]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.ml.fpm import FPGrowth

In [None]:
pyspark.__version__

In [None]:
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .config('spark.executor.memory', '4g') \
    .config('spark.driver.memory', '10g') \
    .config('mapreduce.fileoutputcommitter.algorithm.version', '2') \
    .config('spark.sql.execution.arrow.enabled', 'true') \
    .appName('Spark-apriori') \
    .getOrCreate()

In [None]:
spark

### Read data

In [None]:
orders = spark.read.json('gs://[...]//[...]/orders/')

In [None]:
orders.printSchema()

In [None]:
orders_clean = orders\
              .select(F.col('order_id'), F.col('category_id'))\
              .filter((F.col("category_id")!=0) & (F.col('category_id')!=200) & (F.col("category_id").isNotNull())\
                      & (F.col('order_id').isNotNull()) & (F.col('order_id')!=0)\
                      & (F.col("customers_id").isNotNull()) & (F.col("customers_id")!=0))  
orders_clean.show(5)

### when running the association rules at customer_id level, run the code below ### 
# orders_clean = orders\
#               .select(F.col('customers_id'), F.col('category_id'))\
#               .filter((F.col("category_id")!=0) & (F.col('category_id')!=200) & (F.col("category_id").isNotNull())\
#                       & (F.col('order_id').isNotNull()) & (F.col('order_id')!=0)\
#                       & (F.col("customers_id").isNotNull()) & (F.col("customers_id")!=0))

# orders_clean.show(5)

In [None]:
orders_nodups = orders_clean.drop_duplicates()
orders_nodups.cache()
orders_nodups.count()

### Create transactions data

In [None]:
transactions = orders_nodups.groupBy('order_id')\
                            .agg(F.collect_list('category_id').alias('categories_agg'))      

### when running the association rules at customer_id level, run the code below ### 
# transactions = orders_nodups.groupBy('customers_id')\
#                             .agg(F.collect_list('category_id').alias('categories_agg')) 

transactions.cache()
transactions.count()

### Association rules

In [None]:
### frequent itemsets

categoriesMinSupport=0.000002

fpGrowth = FPGrowth(minSupport=categoriesMinSupport, 
                    minConfidence=0.00001,
                    itemsCol="categories_agg")

model = fpGrowth.fit(transactions)
freqItemsets = model.freqItemsets
freqItemsets.show(5)

In [None]:
### generated association rules

rules = model.associationRules
rules.show(5)

In [None]:
rules.cache()
rules.count()

### Write results to GCS

In [None]:
### write association rules
outputFolder = "gs://[...]//[...]/"
filename = outputFolder + "associationRules_orders"

rules\
  .coalesce(18)\
  .write.format('json')\
  .option("compression", "org.apache.hadoop.io.compress.GzipCodec")\
  .mode("overwrite")\
  .option("header", "true")\
  .save(filename)