# **Uncovering Sales Insights using Frequent Itemset Mining and Association Rules**

Initially, our project focused on Real-Time Demand Forecasting in the E-commerce sector.
However, the dataset's size was limited, We didnt had a lot of different sales recoreds and conventional models like Random Forest couldn't handle the vast number of different items present.
Consequently, we rephrased our objective to center around "Uncovering Sales Insights using Frequent Itemset Mining and Association Rules in PySpark".

Insufficient bill numbers continue to hinder accurate predictions and comprehensive insights.
Increasing the dataset size with more bills remains a critical objective for improvement.


Another dataset with more bills became available, but it came with a new challenge.
The number of different items in this dataset was significantly larger, approximately ten times more than before.
This created a problem due to the ratio remaining the same between the number of bills and the number of items.
As a result, the issue of sparsity persisted, making it difficult to uncover meaningful patterns and associations between items.
Balancing the dataset to overcome this problem has become a priority to derive valuable insights effectively.



## We used several articles.

#### 1. "Educational Data Mining with Python and Apache Spark: A Hands-on Tutorial" - From Afeka Model

![image.png](attachment:image.png)

We used the following methodologies in order to reach the solution of our problem from the creation of the data mining to the creation of the model.




#### 2. "An optimized FP-growth algorithm for discovery of association rules" - <link>https://link.springer.com/article/10.1007/s11227-021-04066-y<link/>

The article talks about the FP-growth model.
Quote from the article "Association rule mining (ARM) is a data mining technique to discover interesting associations between datasets."

We chose to use this model in order to predict the question being asked.


#### 3."Big data: Some statistical issues" - <link>https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5992743/<link/>
An article that talks about how to work with imperfect data such as null or incorrect information in cells.


# Imports

In [None]:
# Install Python libraries
!pip install pyspark kafka-python pandas

from kafka import KafkaConsumer, KafkaProducer
from pyspark.sql.functions import collect_set
import pandas as pd
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.window import Window
import sys
from pyspark.ml.fpm import FPGrowth



# Kafka Producer

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic

# Create an admin client
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# Define a new topic
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in ['Market_Basket']]

# Create new topic
fs = admin_client.create_topics(new_topics)

# Wait for each operation to finish
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Topic Market_Basket1 created


In [None]:
# Function to convert Timestamp to string
def timestamp_to_string(obj):
    if isinstance(obj, pd.Timestamp):
        return obj.strftime('%Y-%m-%d %H:%M:%S')
    return obj

# Create a producer
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Read the Excel file
df = pd.read_excel('Assignment-1_Data.xlsx')

# Convert Timestamps to string
df = df.applymap(timestamp_to_string)

# Send each row to Kafka
for index, row in df.iterrows():
    producer.send('Market_Basket', value=row.to_dict())

                                                                                

## Kafka Consumer
This Consumer was used to test the producer

In [None]:
# # Create a consumer that starts from the earliest available message
# consumer = KafkaConsumer('Market_Basket', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda v: json.loads(v.decode('utf-8')))

# # Print each message
# for msg in consumer:
#     print(msg.value)

# Kafka Integration with PySpark for Structured Data Processing

In [None]:
spark = SparkSession.builder \
    .appName("Customer Purchasing Patterns") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

# Define the schema of the incoming data
schema = StructType([
    StructField("BillNo", StringType()),
    StructField("Itemname", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("Date", TimestampType()),
    StructField("Price", FloatType()),
    StructField("CustomerID", FloatType()),
    StructField("Country", StringType())
])

# Create a DataFrame from the Kafka stream
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "Market_Basket") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

## Data Processing

Removes rows with null 'Itemname'.

Filters out rows with non-positive 'Quantity' - Some of the rows have negative numbers and no product name. These lines have no meaning for what we want to do.

Fills missing 'CustomerID' within the same 'BillNo' and Generates unique 'CustomerID' for bills without one.

Converts 'Itemname' to numeric indices using StringIndexer.

Drops irrelevant columns 'Itemname' and 'Country'.



In [None]:
# Drop all the rows that has null in Itemname
df = df.na.drop(subset=['Itemname'])

# Remove rows where 'Quantity' is less than or equal to 0
df = df.filter(df['Quantity'] > 0)

# Fill the CustomerID within the same BillNo
df = df.withColumn('CustomerID', F.last('CustomerID', True).over(Window.partitionBy('BillNo').orderBy('Date').rowsBetween(-sys.maxsize, 0)))

# For BillNo without a CustomerID, generate a unique ID
df = df.withColumn('CustomerID', F.when(F.col('CustomerID').isNull(), F.concat(F.lit('cust'), F.monotonically_increasing_id())).otherwise(F.col('CustomerID')))

# Using StringIndexer to index categorical columns (Itemname): this assigns one numeric index to each different string in the column
indexer_item = StringIndexer(inputCol="Itemname", outputCol="ItemnameIndex").setHandleInvalid("keep")

df = indexer_item.fit(df).transform(df)

df1 = df.drop("Itemname","Country") # Delete Country because most of the values are the UK

                                                                                

**Group the different Items in each bill by "BillNo"**

In [None]:
# Preprocess data into transactions, where each transaction is a set of items

baskets = df.groupBy("BillNo").agg(collect_set('ItemnameIndex').alias('Items'))

In [None]:
baskets.show()

[Stage 9:>                                                          (0 + 1) / 1]

+------+--------------------+
|BillNo|               Items|
+------+--------------------+
|536366|     [184.0, 2900.0]|
|536367|[251.0, 3436.0, 2...|
|536371|              [13.0]|
|536374|            [1023.0]|
|536375|[0.0, 515.0, 37.0...|
|536377|     [184.0, 2900.0]|
|536384|[42.0, 9.0, 501.0...|
|536385|[23.0, 235.0, 353...|
|536386| [41.0, 1829.0, 1.0]|
|536387|[1620.0, 1263.0, ...|
|536389|[2379.0, 330.0, 2...|
|536395|[238.0, 35.0, 161...|
|536396|[0.0, 515.0, 37.0...|
|536398|[68.0, 185.0, 363...|
|536399|     [184.0, 2900.0]|
|536403|       [25.0, 149.0]|
|536404|[799.0, 857.0, 12...|
|536407|     [184.0, 2900.0]|
|536412|[799.0, 38.0, 469...|
|536414|             [132.0]|
+------+--------------------+
only showing top 20 rows



                                                                                

In [None]:
# Get the number of rows
num_rows = baskets.count()

# Get the number of columns
num_columns = len(baskets.columns)

# Calculate the size in terms of rows and columns
size_in_rows_and_columns = (num_rows, num_columns)
print("Size of DataFrame (rows, columns):", size_in_rows_and_columns)

[Stage 12:>                                                         (0 + 1) / 1]

Size of DataFrame (rows, columns): (20327, 2)


                                                                                

# Train The Model



The FPGrowth algorithm is a frequent itemset mining algorithm used for discovering interesting associations between items in a transactional dataset. It works by identifying sets of items that frequently occur together in transactions. We use FPGrowth because it efficiently handles large datasets and can uncover meaningful patterns without the need for candidate itemset generation, making it faster than traditional Apriori-based approaches.

Frequent itemset mining is valuable in various domains, including E-commerce. It helps businesses understand customer buying behavior, uncover item correlations, and make data-driven decisions. By using the FPGrowth algorithm, we can efficiently extract association rules that reveal co-occurring item patterns in the dataset, providing valuable insights for improving recommendation systems, marketing strategies, and overall business operations.

minSupport=0.01, minConfidence=0.5


The low minSupport value (0.01) is chosen because the dataset is not very big. This setting allows capturing more itemsets that occur somewhat frequently in the data and helps uncover potentially valuable insights and associations.

In [None]:
# train FPGrowth model
fpGrowth = FPGrowth(itemsCol="Items", minSupport=0.01, minConfidence=0.5)
model = fpGrowth.fit(baskets)

# Display frequent itemsets
model.freqItemsets.show()

# Display generated association rules
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(baskets).show()

                                                                                

+--------------+----+
|         items|freq|
+--------------+----+
|       [144.0]| 554|
| [144.0, 85.0]| 230|
| [144.0, 99.0]| 210|
|       [559.0]| 264|
|       [224.0]| 451|
|       [567.0]| 256|
|        [95.0]| 681|
|       [771.0]| 204|
|       [414.0]| 311|
|[414.0, 201.0]| 215|
|       [194.0]| 471|
|       [525.0]| 277|
|       [186.0]| 502|
| [186.0, 88.0]| 206|
|       [260.0]| 421|
|       [717.0]| 216|
|       [745.0]| 211|
|       [431.0]| 320|
|       [510.0]| 286|
|       [170.0]| 523|
+--------------+----+
only showing top 20 rows



                                                                                

+----------------+----------+------------------+------------------+--------------------+
|      antecedent|consequent|        confidence|              lift|             support|
+----------------+----------+------------------+------------------+--------------------+
|   [132.0, 50.0]|    [36.0]|0.7684887459807074|16.204430227748794|0.011757760613961726|
|    [52.0, 12.0]|    [14.0]|0.6493150684931507|11.319577527667473|0.011659369311752842|
|    [52.0, 12.0]|    [10.0]|0.6082191780821918|10.209142223680193|0.010921434545186206|
|    [52.0, 12.0]|     [1.0]|0.7342465753424657| 7.231119252415843|0.013184434495990554|
|     [18.0, 1.0]|    [88.0]|0.6721311475409836| 19.29718903399092|0.010085108476410686|
|[19.0, 8.0, 4.0]|    [16.0]|0.6386292834890965|11.467683255726913|0.010085108476410686|
|     [33.0, 2.0]|    [71.0]|0.6694386694386695|18.488695426195427|0.015840999655630444|
|     [33.0, 2.0]|    [30.0]|0.7900207900207901|15.852667915846594|  0.0186943474196881|
|     [58.0, 4.0]|   

[Stage 610:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+
|BillNo|               Items|          prediction|
+------+--------------------+--------------------+
|536366|     [177.0, 2760.0]|                  []|
|536367|[251.0, 5.0, 269....|      [430.0, 278.0]|
|536371|              [13.0]|                  []|
|536374|            [1016.0]|                  []|
|536375|[0.0, 37.0, 20.0,...|                  []|
|536377|     [177.0, 2760.0]|                  []|
|536384|[42.0, 9.0, 104.0...|                  []|
|536385|[234.0, 21.0, 93....|[1.0, 12.0, 15.0,...|
|536386| [41.0, 1821.0, 1.0]|  [14.0, 10.0, 12.0]|
|536387|[1612.0, 1092.0, ...|                  []|
|536389|[35.0, 388.0, 234...|              [46.0]|
|536395|[238.0, 138.0, 35...|       [103.0, 53.0]|
|536396|[0.0, 37.0, 20.0,...|                  []|
|536398|[138.0, 68.0, 104...|             [149.0]|
|536399|     [177.0, 2760.0]|                  []|
|536403|       [25.0, 149.0]|      [138.0, 103.0]|
|536404|[580.0, 318.0, 22...|[1

                                                                                

As we can see, the model has returned the most frequent items and their number of occurrences.

Also, we have the different items (consequent) and their chance to be in the same cart with one of the antecedent items.

# Old question and model

In [None]:
# from pyspark.sql import Window
# from pyspark.sql.functions import lag, avg, dayofmonth, month, dayofweek

# # Define a window specification
# windowSpec = Window.partitionBy('ItemnameIndex').orderBy('Day', 'Month')

# # Past sales
# df3 = df2.withColumn('PrevDaySales', lag(df2['Quantity']).over(windowSpec))

# # Lag features (sales 7 days ago)
# df4 = df3.withColumn('Sales7DaysAgo', lag(df3['Quantity'], 7).over(windowSpec))

# # Rolling window statistics (mean sales over the past week)
# df5 = df4.withColumn('RollingWeekSalesMean', avg(df4['Quantity']).over(windowSpec.rowsBetween(-6, 0)))

# # Time indicators
# df6 = df5.withColumn('DayOfWeek', dayofweek(df5['Date']))

# # Note: Make sure to handle missing values that may have been introduced by creating these features
# df6 = df6.na.fill(0)

In [None]:
# from pyspark.sql.functions import month, dayofmonth

# df2 = df1.withColumn('Day', dayofmonth(df1['Date']))
# df2 = df2.withColumn('Month', month(df2['Date']))

In [None]:
#df3 = df2.drop("Date")
#df3.show()

In [None]:
# from pyspark.ml.recommendation import ALS
# from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.sql.functions import lit

# # Convert 'Day' and 'Month' into a single 'Date' column
# # You will need to adjust this based on how your 'Day' and 'Month' columns are represented
# df = df.withColumn('Date', (df['Month'] - 1) * 30 + df['Day'])

# # Split the data into training and test sets
# train_data, test_data = df.randomSplit([0.8, 0.2])

# # Define the ALS model
# als = ALS(
#     userCol='Date',
#     itemCol='ItemNameIndex',
#     ratingCol='Quantity',
#     coldStartStrategy='drop'
# )

# # Train the model with the training data
# model = als.fit(train_data)

# # Make predictions on the test data
# predictions = model.transform(test_data)

# # Evaluate the model
# evaluator = RegressionEvaluator(metricName="rmse", labelCol="Quantity", predictionCol="prediction")
# rmse = evaluator.evaluate(predictions)
# print(f"Root-mean-square error = {rmse}")

# # Make predictions for the next day
# next_day = train_data.agg({"Date": "max"}).collect()[0][0] + 1
# next_day_df = df.select('ItemNameIndex').distinct().withColumn('Date', lit(next_day))
# next_day_predictions = model.transform(next_day_df)

# # Select the top 10 items with the highest predicted quantities
# top10_items = next_day_predictions.sort('prediction', ascending=False).limit(10).select('ItemNameIndex').collect()

# # 'top10_items' is a list of Row objects, so let's extract the item indices
# top10_item_indices = [row['ItemNameIndex'] for row in top10_items]

# print(f"Top 10 recommended items for day {next_day}: {top10_item_indices}")