***Remember to upload `baskets-df.csv` before running.***



# Installing PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!cp drive/MyDrive/MMDS-data/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

--2023-03-08 08:52:30--  http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228721937 (218M) [application/x-gzip]
Saving to: ‘spark-3.1.1-bin-hadoop3.2.tgz’


2023-03-08 08:54:13 (2.12 MB/s) - ‘spark-3.1.1-bin-hadoop3.2.tgz’ saved [228721937/228721937]

cp: missing destination file operand after 'drive/MyDrive/MMDS-data/spark-3.1.1-bin-hadoop3.2.tgz'
Try 'cp --help' for more information.


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
! echo $SPARK_HOME

/content/spark-3.1.1-bin-hadoop3.2


In [4]:
import findspark
findspark.init()

## Init

In [5]:
import csv
from pyspark import SparkConf, SparkContext

# Create a SparkConf object and a SparkContext
conf = SparkConf().setAppName("CovidDataProcessing")
sc = SparkContext.getOrCreate(conf)

# Task 1

In [42]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# define schema for the CSV file
schema = StructType([
    StructField("BillNo", StringType(), True),
    StructField("Itemname", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Price", DoubleType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True)
])

# read the CSV file into a DataFrame with the specified schema
df = spark.read.csv("baskets01.csv", header=False, schema=schema)

# group by BillNo and collect a list of Itemname
baskets_df = df.groupBy("BillNo").agg(collect_list("Itemname").alias("basket"))

# convert the array column to a string column
baskets_df = baskets_df.withColumn("basket", concat_ws(",", "basket"))

# save the DataFrame to the local filesystem as baskets-df.csv
baskets_df.write.csv("baskets-df.csv", header=True, mode="overwrite")


# Task 2

In [65]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import array_distinct

# remove duplicate rows
df = df.dropDuplicates()

# group items in the same basket together
df = df.groupBy("BillNo").agg(collect_list("basket").alias("basket"))

# remove duplicate items in each basket
df = df.withColumn("basket", array_distinct("basket"))

# create FPGrowth model
fp_growth = FPGrowth(itemsCol="basket", minSupport=0.01, minConfidence=0.3)

# fit the model to the dataset
model = fp_growth.fit(df)

# find frequent itemsets
freq_itemsets = model.freqItemsets

# find association rules
assoc_rules = model.associationRules

In [66]:
# convert freq_itemsets and assoc_rules to string columns
from pyspark.sql.functions import to_json, regexp_replace

freq_itemsets_str = freq_itemsets.withColumn("items", regexp_replace(to_json("items"), "\[|\]", ""))
assoc_rules_str = assoc_rules.withColumn("antecedent", regexp_replace(to_json("antecedent"), "\[|\]", "")) \
                             .withColumn("consequent", regexp_replace(to_json("consequent"), "\[|\]", ""))


In [67]:
freq_itemsets_str.write.csv("freq_itemsets_str.csv", header=True, mode="overwrite")
assoc_rules_str.write.csv("assoc_rules_str.csv", header=True, mode="overwrite")