### PySpark made simple

Spark is a data analytics framework created by a group of scientists at UC Berkeley, which is predominantly used for a bigdata processing. <br> PySpark is the Python API for Spark created for using Python along with Spark.

#### Setup environment

In [1]:
#install pyspark
!pip install pyspark




In [2]:
#import pyspark
import pyspark

#import sparksession
from pyspark.sql import SparkSession

In [3]:
#import necessary libraries
from pyspark.sql.functions import *
from pyspark.sql.types import StructField,StructType,TimestampType

In [4]:
#create SparkSession and provide appName
spark = SparkSession.builder.appName("pysparkbasics").getOrCreate() 

#### Create Dataframes using createDataFrame()

In [5]:
#customers dataframe
c_list = [
    [342,"Emma", "Clarke","Watford"],
    [567,"James", "Thomas","Durham"],
    [23,"Gregson","White","Lampeter"],
    [45,"Hannah","Abernathy","Colchester"],]

customers = spark.createDataFrame(c_list, ["id", "first_name", "last_name","city"])

In [6]:
#show the top rows
customers.show()

+---+----------+---------+----------+
| id|first_name|last_name|      city|
+---+----------+---------+----------+
|342|      Emma|   Clarke|   Watford|
|567|     James|   Thomas|    Durham|
| 23|   Gregson|    White|  Lampeter|
| 45|    Hannah|Abernathy|Colchester|
+---+----------+---------+----------+



In [7]:
#show schema
customers.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- city: string (nullable = true)



In [8]:
#orders dataframe
o_list = [
    [2, 567, '2021-5-10', 250 ],
    [3, 567, '2021-5-14', 100 ],
    [4, 23, '2021-5-23', 50 ],
    [5, 45, '2021-5-25', 80 ],
    [6, 342, '2021-5-30', 75 ]]

orders = spark.createDataFrame(o_list, ["id", "cust_id", "date","order_cost"])
orders.show()
orders.printSchema()

+---+-------+---------+----------+
| id|cust_id|     date|order_cost|
+---+-------+---------+----------+
|  2|    567|2021-5-10|       250|
|  3|    567|2021-5-14|       100|
|  4|     23|2021-5-23|        50|
|  5|     45|2021-5-25|        80|
|  6|    342|2021-5-30|        75|
+---+-------+---------+----------+

root
 |-- id: long (nullable = true)
 |-- cust_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- order_cost: long (nullable = true)



#### View Data using select()

In [119]:
#select columns by name
orders.select('date', 'order_cost').show()

+---------+----------+
|     date|order_cost|
+---------+----------+
|2021-5-10|       250|
|2021-5-14|       100|
|2021-5-23|        50|
|2021-5-25|        80|
|2021-5-30|        75|
+---------+----------+



In [127]:
#select columns using col function
orders.select(col("date"),col("order_cost")).show()

+---------+----------+
|     date|order_cost|
+---------+----------+
|2021-5-10|       250|
|2021-5-14|       100|
|2021-5-23|        50|
|2021-5-25|        80|
|2021-5-30|        75|
+---------+----------+



In [128]:
#select columns by index
orders.select(orders.columns[2:4]).show()  

+---------+----------+
|     date|order_cost|
+---------+----------+
|2021-5-10|       250|
|2021-5-14|       100|
|2021-5-23|        50|
|2021-5-25|        80|
|2021-5-30|        75|
+---------+----------+



In [122]:
#select all columns from list
list = orders.columns
orders.select(*list).show()  

+--------+-------+---------+----------+
|order_id|cust_id|     date|order_cost|
+--------+-------+---------+----------+
|       2|    567|2021-5-10|       250|
|       3|    567|2021-5-14|       100|
|       4|     23|2021-5-23|        50|
|       5|     45|2021-5-25|        80|
|       6|    342|2021-5-30|        75|
+--------+-------+---------+----------+



In [125]:
#select StructType columns (nested struct) 
#assume that "date" column is StructType
orders.select("date").show(truncate=False)

+---------+
|date     |
+---------+
|2021-5-10|
|2021-5-14|
|2021-5-23|
|2021-5-25|
|2021-5-30|
+---------+



### Clean data
#### Rename columns using withColumnRenamed()

In [10]:
#rename id columns of both tables
customers = customers.withColumnRenamed('id','cust_id')
customers.show()

orders = orders.withColumnRenamed('id','order_id')
orders.show()

+-------+----------+---------+----------+
|cust_id|first_name|last_name|      city|
+-------+----------+---------+----------+
|    342|      Emma|   Clarke|   Watford|
|    567|     James|   Thomas|    Durham|
|     23|   Gregson|    White|  Lampeter|
|     45|    Hannah|Abernathy|Colchester|
+-------+----------+---------+----------+

+--------+-------+---------+----------+
|order_id|cust_id|     date|order_cost|
+--------+-------+---------+----------+
|       2|    567|2021-5-10|       250|
|       3|    567|2021-5-14|       100|
|       4|     23|2021-5-23|        50|
|       5|     45|2021-5-25|        80|
|       6|    342|2021-5-30|        75|
+--------+-------+---------+----------+



In [11]:
#change the data type of columns

orders = orders.withColumn("order_id",orders['order_id'].cast("Integer"))
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- cust_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- order_cost: long (nullable = true)



#### Find all customer details 

In [12]:
#use join on customer id to combine records from both tables
#sort records based on the customer's first name in ascending order

customer_details= customers.join(orders, on = 'cust_id', how='inner').sort(asc("last_name"))
customer_details.show()

+-------+----------+---------+----------+--------+---------+----------+
|cust_id|first_name|last_name|      city|order_id|     date|order_cost|
+-------+----------+---------+----------+--------+---------+----------+
|     45|    Hannah|Abernathy|Colchester|       5|2021-5-25|        80|
|    342|      Emma|   Clarke|   Watford|       6|2021-5-30|        75|
|    567|     James|   Thomas|    Durham|       2|2021-5-10|       250|
|    567|     James|   Thomas|    Durham|       3|2021-5-14|       100|
|     23|   Gregson|    White|  Lampeter|       4|2021-5-23|        50|
+-------+----------+---------+----------+--------+---------+----------+



#### Find unique customer details (no duplicates) - distinct()

In [102]:
#columns list object
cols=customer_details.columns
cols

['cust_id',
 'first_name',
 'last_name',
 'city',
 'order_id',
 'date',
 'order_cost']

In [107]:
#select all columns using columns list object
#use distinct() to avoid duplicates rows
answer = customer_details.select(*cols).distinct().show()

+-------+----------+---------+----------+--------+---------+----------+
|cust_id|first_name|last_name|      city|order_id|     date|order_cost|
+-------+----------+---------+----------+--------+---------+----------+
|     23|   Gregson|    White|  Lampeter|       4|2021-5-23|        50|
|     45|    Hannah|Abernathy|Colchester|       5|2021-5-25|        80|
|    342|      Emma|   Clarke|   Watford|       6|2021-5-30|        75|
|    567|     James|   Thomas|    Durham|       2|2021-5-10|       250|
|    567|     James|   Thomas|    Durham|       3|2021-5-14|       100|
+-------+----------+---------+----------+--------+---------+----------+



#### Find Total Cost Of Orders

- join() - combine two DataFrames
- groupBy() - group the data and allows to perform aggregate functions on the grouped data
- orderBy() - sort dataframe based on a specific column values

In [13]:
#use sum() to find the total cost
#use  on customer id to combine records from both tables
#group records by the customer's first name

total_order_cost = customers.join(orders,"cust_id").groupby('cust_id','first_name').agg(
    sum('order_cost')).orderBy("first_name", ascending=True)
total_order_cost.show()

+-------+----------+---------------+
|cust_id|first_name|sum(order_cost)|
+-------+----------+---------------+
|    342|      Emma|             75|
|     23|   Gregson|             50|
|     45|    Hannah|             80|
|    567|     James|            350|
+-------+----------+---------------+



#### Find the details of the customers who made purchases higher than 100
- when() - view the output based on the particular condition

In [142]:
customer_details.select('cust_id','first_name','last_name','city','order_id','date','order_cost',
                        when(customer_details.order_cost > 100, 'high_order_cost'),).show()

+-------+----------+---------+----------+--------+---------+----------+-----------------------------------------------------+
|cust_id|first_name|last_name|      city|order_id|     date|order_cost|CASE WHEN (order_cost > 100) THEN high_order_cost END|
+-------+----------+---------+----------+--------+---------+----------+-----------------------------------------------------+
|     45|    Hannah|Abernathy|Colchester|       5|2021-5-25|        80|                                                 null|
|    342|      Emma|   Clarke|   Watford|       6|2021-5-30|        75|                                                 null|
|    567|     James|   Thomas|    Durham|       2|2021-5-10|       250|                                      high_order_cost|
|    567|     James|   Thomas|    Durham|       3|2021-5-14|       100|                                                 null|
|     23|   Gregson|    White|  Lampeter|       4|2021-5-23|        50|                                               

#### Find the customer who has purchased the lowest
- max() - an aggregate function that returns the minimum value of the expression in a group

In [68]:
# accessing the minimum value for a particular column of a dataframe 

customer_details.agg({'order_cost':'min'}).collect()[0]

Row(min(order_cost)=50)

In [69]:
customer_details.select(min(col("order_cost")).alias("MIN")).limit(1).collect()[0].MIN

50

In [70]:
customer_details.agg({"order_cost": "min"}).head()[0]

50

In [71]:
customer_details.select([min("order_cost")]).show()

+---------------+
|min(order_cost)|
+---------------+
|             50|
+---------------+



In [72]:
customer_details.agg({"order_cost": "min"}).first()[0]

50

In [83]:
# access customer id, first name along with the minimum cost

customer_details.groupby("cust_id", "first_name","last_name").agg(
    min("order_cost").alias("minimum order cost")).show(1)

+-------+----------+---------+------------------+
|cust_id|first_name|last_name|minimum order cost|
+-------+----------+---------+------------------+
|     23|   Gregson|    White|                50|
+-------+----------+---------+------------------+
only showing top 1 row



#### Find the top customers who are responsible for multiple orders 

-distinct() - find unique values of a column <br>
-dropDuplicates() - remove duplicate values of a column<br>
-orderby() - order the values in a column 

In [157]:
#find unique names of customers and their order ids

customer_details.dropDuplicates((['first_name','last_name'])).select(
    "first_name","order_id").show()

+----------+--------+
|first_name|order_id|
+----------+--------+
|    Hannah|       5|
|      Emma|       6|
|     James|       2|
|   Gregson|       4|
+----------+--------+



In [158]:
#find unique names of customers - distinct()

customer_details.select('first_name').distinct(
).orderBy("first_name", ascending=True).show()

+----------+
|first_name|
+----------+
|      Emma|
|   Gregson|
|    Hannah|
|     James|
+----------+



In [162]:
#find unique names of customers and order the values - dropDuplicates()

customer_details.dropDuplicates((['first_name'])).select("first_name").orderBy(
    "first_name", ascending=True).show()

+----------+
|first_name|
+----------+
|      Emma|
|   Gregson|
|    Hannah|
|     James|
+----------+



In [148]:
#find unique values based on multiple columns and order the values- distinct()

customer_details.select('first_name','last_name').distinct().orderBy(
    'first_name', ascending=True).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|      Emma|   Clarke|
|   Gregson|    White|
|    Hannah|Abernathy|
|     James|   Thomas|
+----------+---------+

