In [1]:
import findspark
findspark.init("C:/spark/spark-3.3.2-bin-hadoop2")
import pyspark
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
color = sns.color_palette()
%matplotlib inline
pd.options.mode.chained_assignment = None  # default='warn'
from mlxtend.frequent_patterns import apriori
from mlxtend.frequent_patterns import association_rules

from pyspark.ml.feature import StringIndexer
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import *
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import udf

In [2]:
spark = SparkSession.Builder().appName('Project').getOrCreate()
sqlCtx = SQLContext(spark)

In [3]:
# Importing the data :
orders = spark.read.csv("Input/orders.csv", header='true', inferSchema='true')
products = spark.read.csv("Input/products.csv", header='true', inferSchema='true')
order_products_prior= spark.read.csv("Input/order_products__prior.csv", header='true', inferSchema='true')

# Merging The data :
df_merged = order_products_prior.join(orders, on="order_id",how="left")
df_merged = df_merged.join(products,on="product_id",how="left")


In [4]:
# Most sold Product
product_counts = df_merged.groupBy("product_name"
                                  ).agg({"product_id": "count"}
                                        ).withColumnRenamed("count(product_id)", "Frq_sold"
                                                           ).orderBy(col("Frq_sold").desc())

In [5]:
filtered_products = product_counts.filter(col("Frq_sold") > 100000)

In [6]:
df_merged = df_merged.join(filtered_products, on="product_name")

In [7]:
df = df_merged[["order_id","product_name"]]

In [8]:
basket = df.groupBy("order_id").agg(collect_list(col("product_name")).alias("list_of_values"))

In [9]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import col

# Assuming df_transactions is your DataFrame with columns: transaction_id, items
df_transactions = basket

# FPGrowth requires a DataFrame with a column of arrays containing items
# Assuming you have a column 'items' that contains arrays of transaction items
df_items = df_transactions.select("list_of_values")

# Configure and train the FPGrowth model
fp_growth = FPGrowth(itemsCol="list_of_values",minConfidence=0.01 , minSupport=0.01, predictionCol='prediction')
model = fp_growth.fit(df_items)


In [13]:
# Generate frequent itemsets
frequent_itemsets = model.freqItemsets

# Generate association rules
association_rules = model.associationRules

# Saving Output frequent itemsets and association rules
output_path = f"Output/FPGrowth/frequent_itemsets"
frequent_itemsets = frequent_itemsets.coalesce(1)
frequent_itemsets.write.parquet(output_path, mode="overwrite")

output_path = f"Output/FPGrowth/association_rules"
association_rules = association_rules.coalesce(1)
association_rules.write.parquet(output_path, mode="overwrite")

In [14]:
# rules_pandas = association_rules.toPandas()
# rules_pandas.sort_values("support")[::-1]

In [15]:
# printing Association Rules
'''for index, rule in rules_pandas.iterrows():
    antecedents = ', '.join(rule['antecedent'])
    consequents = ', '.join(rule['consequent'])
    support = rule['support']
    confidence = rule['confidence']
    lift = rule['lift']
    
    print(f"Rule #{index+1}:")
    print(f"Antecedents: {antecedents}")
    print(f"Consequents: {consequents}")
    print(f"Support: {support}")
    print(f"Confidence: {confidence}")
    print(f"Lift: {lift}")
    print("-----------------------------")'''

'for index, rule in rules_pandas.iterrows():\n    antecedents = \', \'.join(rule[\'antecedent\'])\n    consequents = \', \'.join(rule[\'consequent\'])\n    support = rule[\'support\']\n    confidence = rule[\'confidence\']\n    lift = rule[\'lift\']\n    \n    print(f"Rule #{index+1}:")\n    print(f"Antecedents: {antecedents}")\n    print(f"Consequents: {consequents}")\n    print(f"Support: {support}")\n    print(f"Confidence: {confidence}")\n    print(f"Lift: {lift}")\n    print("-----------------------------")'