# Running rosetta queries on SparkSQL (pyspark)

## Input Data 

In [121]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode ,col
from pyspark.sql import functions as functions
import pandas as pd
from beakerx import *

pd.set_option("display.max_colwidth", 10000)
spark = SparkSession.builder.getOrCreate() 
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

customers = spark.read.format('avro').load('customers.avro')
orders = spark.read.format('avro').load('orders.avro') 

orders.toPandas()



Unnamed: 0,orderno,custid,order_date,ship_date,items
0,1001,C41,2017-04-29,2017-05-03,"[(347, 5, 19.99), (193, 2, 28.89)]"
1,1002,C13,2017-05-01,2017-05-03,"[(460, 95, 100.99), (680, 150, 8.75)]"
2,1003,C31,2017-06-15,2017-06-16,"[(120, 2, 88.99), (460, 3, 99.99)]"
3,1004,C35,2017-07-10,2017-07-15,"[(680, 6, 9.99), (195, 4, 35.0)]"
4,1005,C37,2017-08-30,,"[(460, 2, 99.98), (347, 120, 22.0), (780, 1, 1500.0), (375, 2, 149.98)]"
5,1006,C41,2017-09-02,2017-09-04,"[(680, 51, 25.98), (120, 65, 85.0), (460, 120, 99.98)]"
6,1007,C13,2017-09-13,2017-09-20,"[(185, 5, 21.99), (680, 1, 20.5)]"
7,1008,C13,2017-10-13,,"[(460, 20, 99.99)]"


In [122]:
customers.toPandas()

Unnamed: 0,custid,name,address,rating
0,C13,T. Cruise,"(201 Main St., St. Louis, MO, 63101)",750.0
1,C25,M. Streep,"(690 River St., Hanover, MA, 02340)",690.0
2,C31,B. Pitt,"(360 Mountain Ave., St. Louis, MO, 63101)",
3,C35,J. Roberts,"(420 Green St., Boston, MA, 02115)",565.0
4,C37,T. Hanks,"(120 Harbor Blvd., Boston, MA, 02115)",750.0
5,C41,R. Duvall,"(150 Market St., St. Louis, MO, 63101)",640.0
6,C47,S. Loren,"(Via del Corso, Rome, Italy, None)",625.0


## Query1: BASIC

In [54]:
customers.selectExpr("custid","name","address.zipcode", "rating").orderBy("custid").limit(2).show()

+------+---------+-------+------+
|custid|     name|zipcode|rating|
+------+---------+-------+------+
|   C13|T. Cruise|  63101|   750|
|   C25|M. Streep|  02340|   690|
+------+---------+-------+------+



## Query2: VALUE (ObjectContructor, ArrayConstructor)

### Q2.1

In [19]:
customers.select(col("name")).filter(col("rating") > 650 ).agg(functions.collect_list("name").alias("value")).show(20,False);

+--------------------------------+
|value                           |
+--------------------------------+
|[T. Cruise, M. Streep, T. Hanks]|
+--------------------------------+



### Q2.2

In [44]:
customerByRating = customers.selectExpr("custid", "name", "rating").filter(col("rating") > 650 ).orderBy("rating").withColumn("highratedrate", functions.struct(col("custid"), col("name"), col("rating")))

customerByzipcode = customers.selectExpr("custid", "name", "address.zipcode").filter(col("rating")> 650).orderBy(col("address.zipcode")).withColumn("highratedzip", functions.struct(col("custid"), col("name"), col("zipcode")))

result = customerByRating.join(customerByzipcode,"custid" ).agg(functions.collect_list("highratedzip").alias("high-rated customers, ordered by zipcode"),functions.collect_list("highratedrate").alias("high-rated customers, ordered by rating"))

result.toPandas()

Unnamed: 0,"high-rated customers, ordered by zipcode","high-rated customers, ordered by rating"
0,"[(C37, T. Hanks, 02115), (C25, M. Streep, 02340), (C13, T. Cruise, 63101)]","[(C37, T. Hanks, 750), (C25, M. Streep, 690), (C13, T. Cruise, 750)]"


### Q2.3

In [49]:
res = customers.select(col("address.zipcode"), col("rating")).filter(col("address.zipcode").isNotNull()).groupBy("zipcode").agg(functions.round(functions.avg(col("rating"))).alias("rating")).orderBy("zipcode");

res.select(functions.create_map(col("zipcode"), col("rating")).alias("Average credit rating by zipcode")).toPandas()

Unnamed: 0,Average credit rating by zipcode
0,{'02115': 658.0}
1,{'02340': 690.0}
2,{'63101': 695.0}


## Q3: * (AutoMap) and augmentation

In [51]:
 customers.withColumn("credit",functions.when(col("rating")> 650, "Good").when(col("rating").between(500, 650),"Fair").otherwise("Poor")).filter(col("address.zipcode") == "02115")



custid,name,address,rating,credit
C35,J. Roberts,"[420 Green St., B...",565,Fair
C37,T. Hanks,[120 Harbor Blvd....,750,Good


## Q4:UNNEST

In [53]:
res = orders.select(col("orderno"), col("order_date"),functions.explode(col("items")).alias("i"));

res.filter(col("order_date")== "2017-05-01").groupBy(col("orderno"), col("i.itemno").alias("item_number")).agg(functions.sum('i.qty').alias('quantity')).orderBy("orderno", "item_number").limit(1)

orderno,item_number,quantity
1002,460,95


## Q5: GROUP BY/ GROUP AS

### Q5.1

In [187]:
res = orders.select(col("*"), functions.explode(col("items")).alias("i")).filter(col("order_date") == "2017-05-01")

res2 = res.groupBy(col("orderno").alias("ord"), col("i.itemno").alias("item_number")).agg(functions.sum("i.qty").alias("quantity"));

res3 = res2.join(res).filter(col("item_number") == col("i.itemno")).withColumn("o", functions.struct(col("orderno"), col("custid"), col("order_date"),col("ship_date"), col("items")))
result = res3.withColumn("od", functions.struct(col("o"), col("i"))).selectExpr("orderno", "item_number", "quantity", "od")

result.toPandas()

Unnamed: 0,orderno,item_number,quantity,od
0,1002,460,95,"((1002, C13, 2017-05-01, 2017-05-03, [Row(itemno=460, qty=95, price=100.99), Row(itemno=680, qty=150, price=8.75)]), (460, 95, 100.99))"
1,1002,680,150,"((1002, C13, 2017-05-01, 2017-05-03, [Row(itemno=460, qty=95, price=100.99), Row(itemno=680, qty=150, price=8.75)]), (680, 150, 8.75))"


## Q5.2

In [190]:
groupedcustomers = customers.selectExpr("*").groupBy("address.zipcode").agg(functions.max(col("rating")).alias("best_rating"))

res = customers.join(groupedcustomers, groupedcustomers.zipcode == customers.address.zipcode).filter(col("rating")== col("best_rating")).withColumn("Customers", functions.struct(col("custid"), col("name")))
res.groupBy("zipcode", "best_rating").agg(functions.collect_list("Customers").alias("best_Customers"))



zipcode,best_rating,best_Customers
2340,690,"[[C25, M. Streep]]"
2115,750,"[[C37, T. Hanks]]"
63101,750,"[[C13, T. Cruise]]"


## Q6: SUBQUERIES


In [107]:

    
customers.createOrReplaceTempView("c");
spark.sql("select name from c where c.rating in (select max(c2.rating) from c c2) ")


name
T. Cruise
T. Hanks


In [106]:
spark.sql("select name from c where c.rating > (select avg(c2.rating) from c c2) ")


name
T. Cruise
M. Streep
T. Hanks


## Q7: QUANTIFIED EXPRESSIONS

In [102]:
customers.createOrReplaceTempView("c");
spark.sql("select c.name from c where c.rating in (select max(c2.rating) from c c2 where c2.rating  IS NOT NULL ) ")

name
T. Cruise
T. Hanks


## Q8: INNER JOIN

In [126]:
customerfiltred = customers.selectExpr("*").filter(col("rating")> 500);
res = orders.select(col("custid"), col("orderno").alias("id"),functions.explode(col("items")).alias("exploded"))
res = res.join(customerfiltred, "custid").orderBy("rating")
res = res.withColumn("Item",functions.struct(col("exploded.itemno").alias("id"),(col("exploded.price") * col("exploded.price")).alias("total_price")))
res = res.select(col("id"), col("name"), col("Item").alias("ordered_item")).groupBy("id", "name").agg(functions.collect_list("ordered_item").alias("ordered_item"))
res.toPandas()

Unnamed: 0,id,name,ordered_item
0,1001,R. Duvall,"[(347, 399.60009999999994), (193, 834.6321)]"
1,1005,T. Hanks,"[(460, 9996.0004), (347, 484.0), (780, 2250000.0), (375, 22494.000399999997)]"
2,1006,R. Duvall,"[(680, 674.9604), (120, 7225.0), (460, 9996.0004)]"
3,1007,T. Cruise,"[(185, 483.5600999999999), (680, 420.25)]"
4,1002,T. Cruise,"[(460, 10198.980099999999), (680, 76.5625)]"
5,1004,J. Roberts,"[(680, 99.8001), (195, 1225.0)]"
6,1008,T. Cruise,"[(460, 9998.0001)]"


## Q9: MULTIPLE JOINS

In [132]:
res = orders.join(customers, "custid").select(col("orderno").alias("order no"), col("name"), col("address"),col("order_date"), functions.explode(col("items")).alias("i"))

res.groupBy("order no", "name", "address", "order_date").agg(functions.sum(col("i.qty") * col("i.price")).alias("amount_due")).orderBy("order no")

order no,name,address,order_date,amount_due
1001,R. Duvall,"[150 Market St., ...",2017-04-29,157.73
1002,T. Cruise,"[201 Main St., St...",2017-05-01,10906.55
1003,B. Pitt,[360 Mountain Ave...,2017-06-15,477.94999999999993
1004,J. Roberts,"[420 Green St., B...",2017-07-10,199.94
1005,T. Hanks,[120 Harbor Blvd....,2017-08-30,4639.92
1006,R. Duvall,"[150 Market St., ...",2017-09-02,18847.58
1007,T. Cruise,"[201 Main St., St...",2017-09-13,130.45
1008,T. Cruise,"[201 Main St., St...",2017-10-13,1999.8


## Q10: LEFT OUTER JOIN

In [150]:
res = customers.join(orders, "custid", how="left_outer")
res =res.select(col("custid"),  col("name"),col("orderno"),col("order_date") ).orderBy("custid", "order_date");
res

custid,name,orderno,order_date
C13,T. Cruise,1002.0,2017-05-01
C13,T. Cruise,1007.0,2017-09-13
C13,T. Cruise,1008.0,2017-10-13
C25,M. Streep,,
C31,B. Pitt,1003.0,2017-06-15
C35,J. Roberts,1004.0,2017-07-10
C37,T. Hanks,1005.0,2017-08-30
C41,R. Duvall,1001.0,2017-04-29
C41,R. Duvall,1006.0,2017-09-02
C47,S. Loren,,


## Q11: DISTINCT

In [156]:
res = customers.filter(col("name")=="T. Cruise");
res = res.join(orders, "custid").select(functions.explode(col("items")).alias("i"))
res.selectExpr("i.itemno").distinct()

itemno
460
185
680


## Q12: LET




In [165]:
orders.filter(col("ship_date").isNotNull()).select(col("orderno"), ((functions.dayofmonth(col("ship_date"))-
              functions.dayofmonth(col("order_date")))- 2).alias("days_late"))  .filter(col("days_late") > 2 ).orderBy(functions.col("days_late").desc())        
 

orderno,days_late
1007,5
1004,3


In [169]:
res = orders.withColumn("i",functions.explode(col("items")));
res.groupBy(col("orderno")).agg(functions.sum(col("i.qty") * col("i.price")).alias("revenue")).where("revenue >1000").orderBy(functions.col("revenue").desc())

orderno,revenue
1006,18847.58
1002,10906.55
1005,4639.92
1008,1999.8


In [175]:
res = customers.groupBy("address.zipcode").agg(functions.max("rating").alias("best_rating"));

res2 = res.join(customers, res.zipcode == customers.address.zipcode).filter(col("rating")== col("best_rating")).withColumn("best_customers", functions.struct(col("custid"),col("name"))).orderBy("custid")

res2.select(col("zipcode").alias("zip"), col("best_rating"), col("best_customers")).orderBy("zip")


zip,best_rating,best_customers
2115,750,"[C37, T. Hanks]"
2340,690,"[C25, M. Streep]"
63101,750,"[C13, T. Cruise]"


## Q13: BUILT-IN FUNCTIONS

In [178]:
res = orders.select(functions.month(col("order_date")).alias("month")).filter(functions.year(col("order_date"))== 2017)
res.groupBy("month").agg(functions.count(col("*")).alias("order_count")).orderBy("month")

month,order_count
4,1
5,1
6,1
7,1
8,1
9,2
10,1


In [181]:
res = orders.withColumn("i", functions.explode(col("items")))
res2 = res.groupBy(col("orderno").alias("ord")).agg(functions.sum(col("i.qty") * col("i.price")).alias("revenue"))
res3 = res2.join(orders, res2.ord == orders.orderno).filter(col("revenue") > 10000).orderBy(functions.col("revenue").desc())
res3.select(col("revenue"),col("orderno"), col("order_date"))

revenue,orderno,order_date
18847.58,1006,2017-09-02
10906.55,1002,2017-05-01


## Q14: MODULARITY

In [186]:
res = orders.withColumn("i", functions.explode(col("items")));
res = res.filter(functions.year(col("order_date"))== "2017").groupBy("orderno").agg(functions.sum(col("i.qty") * col("i.price")).alias("revenu"))
res.select(functions.round(functions.min("revenu")).alias("minimum"),functions.round(functions.max("revenu")).alias("maximum"),functions.round(functions.avg("revenu")).alias("average"))

minimum,maximum,average
130.0,18848.0,4670.0
