In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.shuffle.useOldFetchProtocol', 'true').\
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/itv015970/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
order_schema = "order_id long, order_date string, customer_id long, order_status string" 

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(order_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [5]:
orders_df.createOrReplaceTempView("orders")

In [6]:
spark.sql("select * from orders")

order_id,order_date,customer_id,order_status
1,2013-07-25 00:00:...,11599,CLOSED
2,2013-07-25 00:00:...,256,PENDING_PAYMENT
3,2013-07-25 00:00:...,12111,COMPLETE
4,2013-07-25 00:00:...,8827,CLOSED
5,2013-07-25 00:00:...,11318,COMPLETE
6,2013-07-25 00:00:...,7130,COMPLETE
7,2013-07-25 00:00:...,4530,COMPLETE
8,2013-07-25 00:00:...,2911,PROCESSING
9,2013-07-25 00:00:...,5657,PENDING_PAYMENT
10,2013-07-25 00:00:...,5648,PENDING_PAYMENT


In [7]:
spark.sql("select order_id from orders")

order_id
1
2
3
4
5
6
7
8
9
10


In [8]:
spark.sql("""
select order_id, order_status from
(select order_id, customer_id, order_status from orders 
where order_id <500) where order_id <200
""").explain(True)

== Parsed Logical Plan ==
'Project ['order_id, 'order_status]
+- 'Filter ('order_id < 200)
   +- 'SubqueryAlias __auto_generated_subquery_name
      +- 'Project ['order_id, 'customer_id, 'order_status]
         +- 'Filter ('order_id < 500)
            +- 'UnresolvedRelation [orders], [], false

== Analyzed Logical Plan ==
order_id: bigint, order_status: string
Project [order_id#0L, order_status#3]
+- Filter (order_id#0L < cast(200 as bigint))
   +- SubqueryAlias __auto_generated_subquery_name
      +- Project [order_id#0L, customer_id#2L, order_status#3]
         +- Filter (order_id#0L < cast(500 as bigint))
            +- SubqueryAlias orders
               +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv

== Optimized Logical Plan ==
Project [order_id#0L, order_status#3]
+- Filter ((isnotnull(order_id#0L) AND (order_id#0L < 500)) AND (order_id#0L < 200))
   +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv

== Physical Plan ==
*(1) Filter 

In [28]:
customer_schema = "customerid long, fname string, lname string, username string, password string, address string, city string, state string,zipcode string"

## PREDICATE PUSHDOWN
### Tries push down filters as early as possible in query execution which helps us to scan less and relevant data
### 1. Converts multiple prjections into one
### 2. Converts multiple filters into one

In [29]:
customer_schema

'customerid long, fname string, lname string, username string, password string, address string, city string, state string,zipcode string'

In [30]:
customers_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [31]:
customers_df.show()

+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|customerid|      fname|    lname| username| password|             address|         city|state|zipcode|
+----------+-----------+---------+---------+---------+--------------------+-------------+-----+-------+
|         1|    Richard|Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|
|         2|       Mary|  Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|
|         3|        Ann|    Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|  00725|
|         4|       Mary|    Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|
|         5|     Robert|   Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|       Caguas|   PR|  00725|
|         6|       Mary|    Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ|  07055|
|         7|    Melissa|   Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High 

In [32]:
customers_df.createOrReplaceTempView("customers")

In [33]:
spark.sql("select * from customers")

customerid,fname,lname,username,password,address,city,state,zipcode
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers...,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer...,Caguas,PR,725
4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River ...,Caguas,PR,725
6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail...,Passaic,NJ,7055
7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,725
8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest...,Lawrence,MA,1841
9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,725
10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beac...,Stafford,VA,22554


In [34]:
spark.sql("""
select * from orders inner join
customers on orders.customer_id == customers.customerid
where order_status = 'CLOSED'
""").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('order_status = CLOSED)
   +- 'Join Inner, ('orders.customer_id = 'customers.customerid)
      :- 'UnresolvedRelation [orders], [], false
      +- 'UnresolvedRelation [customers], [], false

== Analyzed Logical Plan ==
order_id: bigint, order_date: string, customer_id: bigint, order_status: string, customerid: bigint, fname: string, lname: string, username: string, password: string, address: string, city: string, state: string, zipcode: string
Project [order_id#0L, order_date#1, customer_id#2L, order_status#3, customerid#660L, fname#661, lname#662, username#663, password#664, address#665, city#666, state#667, zipcode#668]
+- Filter (order_status#3 = CLOSED)
   +- Join Inner, (customer_id#2L = customerid#660L)
      :- SubqueryAlias orders
      :  +- Relation[order_id#0L,order_date#1,customer_id#2L,order_status#3] csv
      +- SubqueryAlias customers
         +- Relation[customerid#660L,fname#661,lname#662,username#663,password#664,add

In [35]:
orders_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [36]:
customers_df.printSchema()

root
 |-- customerid: long (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)

