### Q1
Setup

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=796301fbb1f45ef0153d435a32197fcb020380e6dd6e39acf1a179621bacc85c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indi

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np

In [3]:
# initiate SparkContext
conf = SparkConf().setAppName("ChihaoShen").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
spark

In [4]:
!pwd

/content


In [5]:
# read files
stores = spark.read.option("header", True).csv("/content/drive/MyDrive/CSC4008/ass/data-Q1/stores-data-set.csv")
sales = spark.read.option("header", True).csv("/content/drive/MyDrive/CSC4008/ass/data-Q1/sales-data-set.csv")
stores.na.drop()
sales.na.drop()

DataFrame[Store: string, Dept: string, Date: string, Weekly_Sales: string, IsHoliday: string]

In [6]:
stores.show(5)
sales.show(5)
print("stores schema: ")
stores.printSchema()
print("sales schema: ")
sales.printSchema()

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows

+-----+----+----------+------------+---------+
|Store|Dept|      Date|Weekly_Sales|IsHoliday|
+-----+----+----------+------------+---------+
|    1|   1|05/02/2010|     24924.5|    FALSE|
|    1|   1|12/02/2010|    46039.49|     TRUE|
|    1|   1|19/02/2010|    41595.55|    FALSE|
|    1|   1|26/02/2010|    19403.54|    FALSE|
|    1|   1|05/03/2010|     21827.9|    FALSE|
+-----+----+----------+------------+---------+
only showing top 5 rows

stores schema: 
root
 |-- Store: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: string (nullable = true)

sales schema: 
root
 |-- Store: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: string (nullable = true)
 |-- IsHoliday: string (nullable = true)



In [7]:
# change data types
stores = stores.selectExpr(["cast(Store as int) Store", "Type", "cast(Size as int) Size"])
sales = sales.selectExpr(["cast(Store as int) Store", "cast(Dept as int) Dept", "Date", "cast(Weekly_Sales as double) Weekly_Sales", "IsHoliday"])
stores.show(5)
sales.show(5)
print("stores schema: ")
stores.printSchema()
print("sales schema: ")
sales.printSchema()

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows

+-----+----+----------+------------+---------+
|Store|Dept|      Date|Weekly_Sales|IsHoliday|
+-----+----+----------+------------+---------+
|    1|   1|05/02/2010|     24924.5|    FALSE|
|    1|   1|12/02/2010|    46039.49|     TRUE|
|    1|   1|19/02/2010|    41595.55|    FALSE|
|    1|   1|26/02/2010|    19403.54|    FALSE|
|    1|   1|05/03/2010|     21827.9|    FALSE|
+-----+----+----------+------------+---------+
only showing top 5 rows

stores schema: 
root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

sales schema: 
root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- IsHoliday: string (nullable = true

### Q1(a)

In [8]:
sales_joined = sales.join(stores, ["Store"]).sort("Store")
store_type_sales = sales_joined.groupBy("Type").agg(sum("Weekly_Sales").alias("Total_Sales")).sort("Type")
store_type_sales.show()

+----+--------------------+
|Type|         Total_Sales|
+----+--------------------+
|   A|4.3310147227500725E9|
|   B|2.0007007368200114E9|
|   C| 4.055035275399986E8|
+----+--------------------+



### Q1(b)

In [9]:
average_sales = sales.groupBy("Date", "IsHoliday").agg((sum("Weekly_Sales")).alias("Sales")).groupBy("IsHoliday").agg((sum("Sales")/count("*")).alias("Average_Sales"))
average_sales.na.drop().show()

+---------+-------------------+
|IsHoliday|      Average_Sales|
+---------+-------------------+
|    FALSE|4.685653710939851E7|
|     TRUE|5.052995515600001E7|
+---------+-------------------+

