# Spark and Spark SQL

## George Puthean

In [2]:
# install Java Virtual Machine (JVM) from OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
# download and decompress Apache Spark with Hadoop from https://spark.apache.org/downloads.html
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [4]:
# set environment path
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.2.1-bin-hadoop3.2'

In [5]:
# install and import findspark to locate Spark on the system
!pip install -q findspark
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [6]:
# Spark resilient distributed dataset (RDD)
import pyspark
sc = pyspark.SparkContext(appName='hw4')
data = list(range(7)) #store the data
rdd = sc.parallelize(data)
rdd.getNumPartitions(), rdd.collect()

(2, [0, 1, 2, 3, 4, 5, 6])

In [7]:
# import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('Colab').config('spark.ui.port', '4050').getOrCreate()
spark

In [8]:
# authorize Colab to access Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
# load data into PySpark
df_product = spark.read.json('/content/drive/My Drive/hw4_product.json')
df_order = spark.read.json('/content/drive/My Drive/hw4_order.json')
df_orderline = spark.read.json('/content/drive/My Drive/hw4_orderline.json')
df_customer = spark.read.json('/content/drive/My Drive/hw4_customer.json')

In [10]:
# show column details
df_customer.printSchema()

root
 |-- customerAddress: string (nullable = true)
 |-- customerCity: string (nullable = true)
 |-- customerID: long (nullable = true)
 |-- customerName: string (nullable = true)
 |-- customerPostalCode: string (nullable = true)
 |-- customerState: string (nullable = true)



In [11]:
# display rows from top
df_product.show(7)

+--------------------+-------------+---------+-------------+--------------------+
|  productDescription|productFinish|productID|productLineID|productStandardPrice|
+--------------------+-------------+---------+-------------+--------------------+
|    Cherry End Table|       Cherry|        1|            1|               175.0|
| Birch Coffee Tables|        Birch|        2|            1|               200.0|
|   Oak Computer Desk|          Oak|        3|            1|               750.0|
|Entertainment Center|       Cherry|        4|            1|              1650.0|
|       Writer's Desk|          Oak|        5|            2|               325.0|
|    8-Drawer Dresser|        Birch|        6|            1|               750.0|
|         48 Bookcase|       Walnut|        7|            3|               150.0|
+--------------------+-------------+---------+-------------+--------------------+
only showing top 7 rows



## Q1

What are the description, finish and standard price of product, which standard price is less than $275 in the ascending order?


In [12]:
# filter rows then select columns
df_product.filter('productStandardPrice < 275').select('productDescription', 'productFinish', 'productStandardPrice').sort(df_product.productStandardPrice).show()

+-------------------+-------------+--------------------+
| productDescription|productFinish|productStandardPrice|
+-------------------+-------------+--------------------+
|        48 Bookcase|       Walnut|               150.0|
|         Nightstand|       Cherry|               150.0|
|   Cherry End Table|       Cherry|               175.0|
|        48 Bookcase|          Oak|               175.0|
|Birch Coffee Tables|        Birch|               200.0|
|        96 Bookcase|          Oak|               200.0|
|        96 Bookcase|       Walnut|               225.0|
|     Pine End Table|         Pine|               256.0|
+-------------------+-------------+--------------------+



## Q2

What are the description, finish and standard price of all desks and all tables that cost more than $300 in the descending order?


In [13]:
# select columns then filter rows
df_product.select('productDescription', 'productFinish', 'productStandardPrice').filter( 'productDescription like "%Desk%" AND productStandardPrice > 300').sort(df_product.productStandardPrice.desc()).show()

+------------------+-------------+--------------------+
|productDescription|productFinish|productStandardPrice|
+------------------+-------------+--------------------+
| Oak Computer Desk|          Oak|               750.0|
|     Writer's Desk|          Oak|               325.0|
+------------------+-------------+--------------------+



## **Q3**
What are the description and finish of product that has been ordered in the ascending order of finish then description?

In [14]:
df_product.select('productDescription', 'productFinish').orderBy(df_product.productFinish,df_product.productDescription).show()

+--------------------+-------------+
|  productDescription|productFinish|
+--------------------+-------------+
|    8-Drawer Dresser|        Birch|
| Birch Coffee Tables|        Birch|
|       Writer's Desk|        Birch|
|    Cherry End Table|       Cherry|
|Entertainment Center|       Cherry|
|          Nightstand|       Cherry|
|High Back Leather...|      Leather|
|    4-Drawer Dresser|          Oak|
|         48 Bookcase|          Oak|
|6' Grandfather Clock|          Oak|
|7' Grandfather Clock|          Oak|
|    8-Drawer Dresser|          Oak|
|         96 Bookcase|          Oak|
|   Oak Computer Desk|          Oak|
|       Writer's Desk|          Oak|
|      Pine End Table|         Pine|
|         48 Bookcase|       Walnut|
|         96 Bookcase|       Walnut|
|              Amoire|       Walnut|
+--------------------+-------------+



### Q4
What are the name (no truncation), city and state of customer in Florida, Texas, California or Hawaii in the ascending order of name?


In [120]:
df_customer.select('customerName', 'customerCity', 'customerState').filter('customerState in ("FL","TX","CA","HI")').sort(df_customer.customerName).show(truncate=0)

+------------------------+------------+-------------+
|customerName            |customerCity|customerState|
+------------------------+------------+-------------+
|California Classics     |Santa Clara |CA           |
|Contemporary Casuals    |Gainesville |FL           |
|Impressions             |Sacramento  |CA           |
|Kaneohe Homes           |Kaneohe     |HI           |
|M and H Casual Furniture|Clearwater  |FL           |
|Seminole Interiors      |Seminole    |FL           |
|Value Furniture         |Plano       |TX           |
+------------------------+------------+-------------+



### Q5

How many customers in each of the state Florida, Texas, California or Hawaii?

In [16]:
df_customer.filter('customerState in ("FL","TX","CA","HI")').groupby('customerState').count().show()

+-------------+-----+
|customerState|count|
+-------------+-----+
|           CA|    2|
|           TX|    1|
|           FL|    3|
|           HI|    1|
+-------------+-----+



### Q6
What is the average standard price for all products in inventory?

In [17]:
from pyspark.sql.functions import mean
df_product.agg(mean('productStandardPrice')).show()

+-------------------------+
|avg(productStandardPrice)|
+-------------------------+
|        534.6315789473684|
+-------------------------+



### Q7

What are the product description, product finish, and the price higher than the average standard price for all products in inventory, in the descending order of price difference?


In [18]:
# find the most expensive product
from pyspark.sql.functions import mean
df_slt = df_product.select('productDescription', 'productFinish', 'productStandardPrice')
meanValue = df_slt.agg(mean('productStandardPrice')).collect()[0][0]
print(f'average price = {meanValue}')
df_slt.filter(df_product.productStandardPrice > meanValue).show()

average price = 534.6315789473684
+--------------------+-------------+--------------------+
|  productDescription|productFinish|productStandardPrice|
+--------------------+-------------+--------------------+
|   Oak Computer Desk|          Oak|               750.0|
|Entertainment Center|       Cherry|              1650.0|
|    8-Drawer Dresser|        Birch|               750.0|
|    8-Drawer Dresser|          Oak|               800.0|
|6' Grandfather Clock|          Oak|               890.0|
|7' Grandfather Clock|          Oak|              1100.0|
|              Amoire|       Walnut|              1200.0|
+--------------------+-------------+--------------------+



### Q8
What are the order id, order date, the customer name (no truncation), and the overall total price for each order, in the ascending order of order id?

In [136]:

df_order_customer=df_order.join(df_customer, df_order.customerID == df_customer.customerID, 'left').select('orderID','orderDate','customerName')
df_or_cust_line=df_order_customer.join(df_orderline,df_order_customer.orderID == df_orderline.orderID, 'left').select(df_order_customer.orderID,'orderDate','customerName','orderedQuantity','productID')
df_set_table=df_or_cust_line.join(df_product,df_or_cust_line.productID == df_product.productID,'left').select('orderID','orderDate','customerName',(df_or_cust_line.orderedQuantity*df_product.productStandardPrice).alias('totalAmount'))
from pyspark.sql.functions import sum
df_total_set=df_set_table.groupBy('orderID').agg(sum('totalAmount').alias('overalltotal'))
df_total_set=df_set_table.groupBy('orderID','orderDate','customerName').agg(sum('totalAmount').alias('overalltotal'))
df_total_set.sort('orderID').show(truncate=0)

+-------+----------+------------------------+------------+
|orderID|orderDate |customerName            |overalltotal|
+-------+----------+------------------------+------------+
|1001   |2010-10-21|Contemporary Casuals    |2400.0      |
|1002   |2010-10-21|California Classics     |3750.0      |
|1003   |2010-10-22|Mountain Scenes         |2250.0      |
|1004   |2010-10-22|Impressions             |1850.0      |
|1005   |2010-10-24|Home Furnishings        |4950.0      |
|1006   |2010-10-24|Value Furniture         |2600.0      |
|1007   |2010-10-27|American Euro Lifestyles|925.0       |
|1008   |2010-10-30|Battle Creek Furniture  |2775.0      |
|1009   |2010-11-05|Eastern Furniture       |3750.0      |
|1010   |2010-11-05|Contemporary Casuals    |1750.0      |
+-------+----------+------------------------+------------+



### Q9
What are the id, name (no truncation), full address, and number of orders (0 if no order) for all customers, in the ascending order of customer id?



In [149]:

df_cust_order=df_customer.join(df_order,df_customer.customerID==df_order.customerID,'left').select(df_customer.customerID,'customerName','customerAddress','orderID')

from pyspark.sql.functions import countDistinct
df_cust_order.groupBy('customerID','customerName','customerAddress').agg(countDistinct('orderID')).sort('customerID').show(truncate=0)

+----------+------------------------+--------------------+--------------+
|customerID|customerName            |customerAddress     |count(orderID)|
+----------+------------------------+--------------------+--------------+
|1         |Contemporary Casuals    |1355 S Hines Blvd   |2             |
|2         |Value Furniture         |15145 S.W. 17th St. |1             |
|3         |Home Furnishings        |1900 Allard Ave.    |1             |
|4         |Eastern Furniture       |1925 Beltline Rd.   |1             |
|5         |Impressions             |5585 Westcott Ct.   |1             |
|6         |Furniture Gallery       |325 Flatiron Dr.    |0             |
|7         |Period Furniture        |394 Rainbow Dr.     |0             |
|8         |California Classics     |816 Peach Rd.       |1             |
|9         |M and H Casual Furniture|3709 First Street   |0             |
|10        |Seminole Interiors      |2400 Rocky Point Dr.|0             |
|11        |American Euro Lifestyles|2