# Install Java and Spark on Hadoop

In [None]:
# install java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark (change the version number if needed)
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
# unzip the spark file to the current folder
!tar xf spark-3.3.2-bin-hadoop3.tgz

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/114 kB 12%] [Waiting                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/114 kB 12%] [2 InRel0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/114 kB 12%] [Connect                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.38)] [1 InRelease 34.4 kB/114 k                                                                               Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Get:6 http://archiv

In [None]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"


In [None]:
!pip install findspark
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Creating a SparkSession in Python

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Introduction to Spark")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

In [None]:
# Import necessary libraries
from pyspark.sql.functions import col, column, expr
from pyspark.sql import functions as f

# Answer the questions

0- Load the data files

In [None]:
!git clone https://github.com/20127304-AQ/Spark_exercises.git

Cloning into 'Spark_exercises'...
remote: Enumerating objects: 10, done.[K
remote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 10 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (10/10), 815.55 KiB | 2.15 MiB/s, done.


In [None]:
df_foodmart = spark.read.csv("Spark_exercises/Data/foodmart.csv", header = True ) 

In [None]:
df_foodmart.show(10, truncate=True)

+------------+---------+-------+--------------+------+---------+----+-------+-------+------------+-----------------+------+------+-----+---------+---------------+-----+--------+------+-------------+------------------+-----------+-------+-----------+--------------+--------+----------+-----------+-----------+----+------+-----------+----------+----+-----------------+---------------+------------+-------------+----------+--------------+-----------------+---+---------+----------+--------------+--------+---------+---------+---+-----+-----+----------+----+----+---------+-------+------------+----+-------+-----------+--------+------------+-----------+-----+-------------+----------------+-----+----------------+-------+---------+------------+-------------+-------------+---------+--------+----+--------+------+------------+-------+---------+------+------------+----+----+----------+------+-------+----------------+-----+----------+----+--------------+-----+------------+----+---------+-------+----+----

In [None]:
# 1. Convert the given dataset to the following format. Note that in each list of items, consecutive items are separated by a single comma.
from pyspark.sql.types import StructType, StructField, ArrayType, StringType

columns = df_foodmart.columns
def mapping(x):
  ret = []
  for col in columns:
    if (x[col] == '1'):
      ret.append(col)
  return ret

rdd = df_foodmart.rdd.map(mapping).collect()
rdd = zip(range(1, len(rdd) + 1), rdd)
ret = spark.createDataFrame(rdd, ['id', 'item'])
ret.show()

+---+--------------------+
| id|                item|
+---+--------------------+
|  1|[Acetominifen, Ch...|
|  2|[Acetominifen, Ch...|
|  3|[Coffee, Deli Sal...|
|  4|[Eggs, Gum, Milk,...|
|  5|[Cheese, Dried Fr...|
|  6|           [Shampoo]|
|  7|[Milk, Paper Wipe...|
|  8|[Donuts, Dried Fr...|
|  9|[Cooking Oil, Ham...|
| 10|[Cheese, Cooking ...|
| 11|      [Nasal Sprays]|
| 12|[Auto Magazines, ...|
| 13|[Donuts, Dried Fr...|
| 14|[Cheese, Lightbul...|
| 15|[Cooking Oil, Egg...|
| 16|[Flavored Drinks,...|
| 17|              [Tuna]|
| 18|[Coffee, Hamburge...|
| 19|[Ibuprofen, Peanu...|
| 20|[Chips, Juice, Li...|
+---+--------------------+
only showing top 20 rows



In [None]:
# 2.Mine the set of frequent patterns and the set of association rules from the above dataset (in new format) with min support of 0.1 and min confidence of 0.9.
from pyspark.ml.fpm import FPGrowth
fpg = FPGrowth(
    itemsCol='item',
    minSupport=0.1,
    minConfidence=0.9
).fit(ret)

patterns = fpg.freqItemsets
rules = fpg.associationRules
patterns.show()
rules.show()

+-------------+----+
|        items|freq|
+-------------+----+
|[Dried Fruit]| 256|
|       [Soup]| 280|
|    [Cookies]| 238|
|     [Cheese]| 285|
+-------------+----+

+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------+----------+----+-------+
+----------+----------+----------+----+-------+

