In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, expr

In [2]:
spark = SparkSession.builder.appName("Queries Demo").getOrCreate()

21/07/30 18:58:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

In [4]:
orders_df = spark.read.csv("orders_dataset.csv", inferSchema=True, header=True)

In [5]:
customers_df = spark.read.csv("customers_dataset.csv", inferSchema=True, header=True)

In [6]:
employees_df = spark.read.csv("employees_dataset.csv", inferSchema=True, header=True)

In [30]:
orders_df.createOrReplaceTempView("orders")

In [31]:
customers_df.createOrReplaceTempView("customers")

In [32]:
employees_df.createOrReplaceTempView("employees")

In [34]:
orders_df.show(5)
customers_df.show(5)
employees_df.show(5)
employees_df.select(year(to_date("hire_date", "MM/dd/yyyy")).alias("Hiring Year")).show()

+--------+-----------+---------+-----------+----------+
|order_id|customer_id|   status|salesman_id|order_date|
+--------+-----------+---------+-----------+----------+
|       1|       1015|  pending|        109|02/07/2020|
|       2|       1012|  pending|        112|04/16/2021|
|       3|       1007|  shipped|        115|09/28/2019|
|       4|       1011|  shipped|        106|09/18/2020|
|       5|       1013|cancelled|        106|09/18/2019|
+--------+-----------+---------+-----------+----------+
only showing top 5 rows

+-----------+-----------------+-------------------+--------------------+------------+
|customer_id|             name|            address|             website|credit_limit|
+-----------+-----------------+-------------------+--------------------+------------+
|       1001|   Garold Greaser|    431 Basil Court|ggreaser0@people....|        4523|
|       1002|       Dion Halms|   9 Sheridan Place|     dhalms1@psu.edu|        1087|
|       1003|Phillida Stickney|  53169 Or

### Create a pivot that shows, how many orders each customer placed per year. Give the customer_id and name of each customer. Sort the resulting DataFrame by the customer name in ascending order and replace all null values by 0. Use the DataFrane API.

In [9]:
# VERY BAD QUERY
orders_df.groupBy("customer_id", year(to_date("order_date", "MM/dd/yyyy")).alias("year")).count().alias("o")\
.join(customers_df.alias("c"), expr("o.customer_id = c.customer_id"), "right")\
.select("c.customer_id", "name", "year", "count")\
.sort("customer_id", ascending=True).na.fill(0).toPandas()

                                                                                

Unnamed: 0,customer_id,name,year,count
0,1001,Garold Greaser,0,0
1,1002,Dion Halms,0,0
2,1003,Phillida Stickney,0,0
3,1004,Clare Kilfoyle,0,0
4,1005,Arther Hallstone,0,0
5,1006,Granville Brosetti,2021,1
6,1006,Granville Brosetti,2020,1
7,1007,Brandice Alleway,2018,3
8,1007,Brandice Alleway,2020,1
9,1007,Brandice Alleway,2019,3


In [10]:
# Looks OKAY tho can drop null column
orders_df.alias("o").join(customers_df.alias("c"), expr("o.customer_id = c.customer_id"), "right")\
.withColumn("year", year(to_date("order_date", "MM/dd/yyyy"))).selectExpr("c.customer_id","name", "year")\
.groupBy("name", "c.customer_id").pivot("year").agg({"year": "count"})\
.orderBy("c.customer_id", ascending=True).na.fill(0).toPandas()

                                                                                

Unnamed: 0,name,customer_id,null,2017,2018,2019,2020,2021
0,Garold Greaser,1001,0,0,0,0,0,0
1,Dion Halms,1002,0,0,0,0,0,0
2,Phillida Stickney,1003,0,0,0,0,0,0
3,Clare Kilfoyle,1004,0,0,0,0,0,0
4,Arther Hallstone,1005,0,0,0,0,0,0
5,Granville Brosetti,1006,0,0,0,0,1,1
6,Brandice Alleway,1007,0,0,3,3,1,0
7,Caryn Madre,1008,0,1,0,1,2,0
8,Adolph Cisco,1009,0,0,1,0,1,0
9,Emmeline Kettley,1010,0,2,0,0,2,0


### Evaluation Plan Q3 (a)

In [18]:
employees_df.alias("m").join(employees_df.alias("e"), expr("m.employee_id = e.manager_id"), "inner")\
.select("e.employee_id", "e.first_name", "e.last_name", "e.job_title").explain()

== Physical Plan ==
*(2) Project [employee_id#646, first_name#647, last_name#648, job_title#653]
+- *(2) BroadcastHashJoin [employee_id#68], [manager_id#652], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#439]
   :  +- *(1) Filter isnotnull(employee_id#68)
   :     +- FileScan csv [employee_id#68] Batched: false, DataFilters: [isnotnull(employee_id#68)], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<employee_id:int>
   +- *(2) Filter isnotnull(manager_id#652)
      +- FileScan csv [employee_id#646,first_name#647,last_name#648,manager_id#652,job_title#653] Batched: false, DataFilters: [isnotnull(manager_id#652)], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [IsNotNull(manager_id)], Read

In [38]:
spark.sql("""
SELECT e.employee_id, e.first_name, e.last_name, e.job_title 
FROM employees m 
INNER JOIN employees e ON m.employee_id = e.manager_id
""").explain()

== Physical Plan ==
*(2) Project [employee_id#840, first_name#841, last_name#842, job_title#847]
+- *(2) BroadcastHashJoin [employee_id#68], [manager_id#846], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#640]
   :  +- *(1) Filter isnotnull(employee_id#68)
   :     +- FileScan csv [employee_id#68] Batched: false, DataFilters: [isnotnull(employee_id#68)], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<employee_id:int>
   +- *(2) Filter isnotnull(manager_id#846)
      +- FileScan csv [employee_id#840,first_name#841,last_name#842,manager_id#846,job_title#847] Batched: false, DataFilters: [isnotnull(manager_id#846)], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [IsNotNull(manager_id)], Read

### Evaluation Plan Q3 (b)

In [50]:
employees_df.join(orders_df, expr("employee_id = salesman_id"), "left")\
.groupBy("employee_id").agg({"order_id": "count"}).sort("count(order_id)", ascending=False).explain()

== Physical Plan ==
*(4) Sort [count(order_id)#1163L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count(order_id)#1163L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#1282]
   +- *(3) HashAggregate(keys=[employee_id#68], functions=[count(order_id#16)])
      +- Exchange hashpartitioning(employee_id#68, 200), ENSURE_REQUIREMENTS, [id=#1278]
         +- *(2) HashAggregate(keys=[employee_id#68], functions=[partial_count(order_id#16)])
            +- *(2) Project [employee_id#68, order_id#16]
               +- *(2) BroadcastHashJoin [employee_id#68], [salesman_id#19], LeftOuter, BuildRight, false
                  :- FileScan csv [employee_id#68] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<employee_id:int>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [id=#1272]
 

In [51]:
spark.sql("""
SELECT employee_id, COUNT(order_id) 
FROM employees 
LEFT JOIN orders ON employee_id = salesman_id 
GROUP BY employee_id
ORDER BY count(order_id) DESC
""").explain()

== Physical Plan ==
*(4) Sort [count(order_id)#1169L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count(order_id)#1169L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#1345]
   +- *(3) HashAggregate(keys=[employee_id#68], functions=[count(order_id#16)])
      +- Exchange hashpartitioning(employee_id#68, 200), ENSURE_REQUIREMENTS, [id=#1341]
         +- *(2) HashAggregate(keys=[employee_id#68], functions=[partial_count(order_id#16)])
            +- *(2) Project [employee_id#68, order_id#16]
               +- *(2) BroadcastHashJoin [employee_id#68], [salesman_id#19], LeftOuter, BuildRight, false
                  :- FileScan csv [employee_id#68] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Queries/employees_dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<employee_id:int>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [id=#1335]
 