In [1]:
import pandas as pd

data = pd.read_excel("superstore.xls")
superstore = spark.createDataFrame(data)



In [2]:
#df.printSchema() shows the schema

superstore.printSchema()

root
 |-- Row ID: long (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: timestamp (nullable = true)
 |-- Ship Date: timestamp (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [3]:
#df.selectExpr("`col name` as id") gives a df with col name renamed in display

superstore.selectExpr("`Order ID` as id").show(5)

+--------------+
|            id|
+--------------+
|CA-2017-152156|
|CA-2017-152156|
|CA-2017-138688|
|US-2016-108966|
|US-2016-108966|
+--------------+
only showing top 5 rows



In [4]:
#df.where("filter condition") is used to filter the data based on a given logical condition

superstore.where("Quantity > 10")\
          .selectExpr("`Order ID`", "`Postal Code`", "`Quantity`")\
          .show(5)

+--------------+-----------+--------+
|      Order ID|Postal Code|Quantity|
+--------------+-----------+--------+
|CA-2015-115259|    43229.0|      14|
|CA-2017-145583|    95661.0|      14|
|CA-2017-114489|    53132.0|      11|
|CA-2017-145625|    92037.0|      13|
|CA-2015-122336|    19140.0|      13|
+--------------+-----------+--------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import column

#df.sort(col("col name")).desc() orders the dataframe in descending order

superstore.where(column("Category") == "Furniture")\
          .select("Order ID", "Postal Code", "Quantity")\
          .sort(column("Quantity").desc())\
          .show(5)

+--------------+-----------+--------+
|      Order ID|Postal Code|Quantity|
+--------------+-----------+--------+
|CA-2015-120768|    35630.0|      14|
|CA-2018-152702|    61107.0|      14|
|CA-2017-145583|    95661.0|      14|
|CA-2015-163447|    10011.0|      14|
|CA-2016-104241|    22304.0|      14|
+--------------+-----------+--------+
only showing top 5 rows



In [6]:
#multiple df.where fucntions can be used to filter more conditions

superstore.where(column("Category") != "Furniture")\
          .where(column("Region") == "West")\
          .selectExpr("Category", "Region", "Quantity")\
          .show(5)

+---------------+------+--------+
|       Category|Region|Quantity|
+---------------+------+--------+
|Office Supplies|  West|       2|
|Office Supplies|  West|       4|
|     Technology|  West|       6|
|Office Supplies|  West|       3|
|Office Supplies|  West|       5|
+---------------+------+--------+
only showing top 5 rows



In [7]:
#df.createOrReplaceTempView("view name") can be used to create temp view which can be used to query tables using spark.sql

superstore.createOrReplaceTempView("superstore_view")

spark.sql("""DESCRIBE FORMATTED superstore_view""").show(21)

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       Row ID|   bigint|   null|
|     Order ID|   string|   null|
|   Order Date|timestamp|   null|
|    Ship Date|timestamp|   null|
|    Ship Mode|   string|   null|
|  Customer ID|   string|   null|
|Customer Name|   string|   null|
|      Segment|   string|   null|
|      Country|   string|   null|
|         City|   string|   null|
|        State|   string|   null|
|  Postal Code|   double|   null|
|       Region|   string|   null|
|   Product ID|   string|   null|
|     Category|   string|   null|
| Sub-Category|   string|   null|
| Product Name|   string|   null|
|        Sales|   double|   null|
|     Quantity|   bigint|   null|
|     Discount|   double|   null|
|       Profit|   double|   null|
+-------------+---------+-------+



In [8]:
#spark.sql("SQL QUERY") can be used to query temp views

spark.sql("SELECT Category, Region, Quantity FROM superstore_view").show(5)

+---------------+------+--------+
|       Category|Region|Quantity|
+---------------+------+--------+
|      Furniture| South|       2|
|      Furniture| South|       3|
|Office Supplies|  West|       2|
|      Furniture| South|       5|
|Office Supplies| South|       2|
+---------------+------+--------+
only showing top 5 rows



In [9]:
spark.sql("""SELECT  Category, Region, SUM(Quantity)
FROM superstore_view 
GROUP BY Category, Region
ORDER BY SUM(Quantity) DESC LIMIT 10""")

DataFrame[Category: string, Region: string, sum(Quantity): bigint]

In [10]:
spark.sql(""" SELECT Category, (Region = "Furniture" AND Quantity > 10 OR SALES > 1000) as test
FROM superstore_view WHERE Region = "Furniture" AND Quantity > 10 OR SALES > 1000""").show(5)

+----------+----+
|  Category|test|
+----------+----+
| Furniture|true|
| Furniture|true|
| Furniture|true|
|Technology|true|
|Technology|true|
+----------+----+
only showing top 5 rows



In [11]:
#df.describe() can be used to display the summary of the dataframe
#df.toPandas() can be used to convert a spark dataframe into pandas dataframe

superstore.describe().toPandas().T

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Row ID,9994,4997.5,2885.1636290974325,1,9994
Order ID,9994,,,CA-2015-100006,US-2018-169551
Ship Mode,9994,,,First Class,Standard Class
Customer ID,9994,,,AA-10315,ZD-21925
Customer Name,9994,,,Aaron Bergman,Zuschuss Donatelli
Segment,9994,,,Consumer,Home Office
Country,9994,,,United States,United States
City,9994,,,Aberdeen,Yuma
State,9994,,,Alabama,Wyoming


In [12]:
#df.distinct() gives the distinct values in a selected column

superstore.select("State").distinct().sort("State").show(5)

+----------+
|     State|
+----------+
|   Alabama|
|   Arizona|
|  Arkansas|
|California|
|  Colorado|
+----------+
only showing top 5 rows



In [13]:
#df.sample() can be used to randomly select a sample of data from a dataframe

seed = 5
withReplacement = False
fraction = 0.3

superstore.sample(withReplacement, fraction, seed).count()

3002

In [14]:
#df.randomSplit() can be used to split the data into two fractions

df = superstore.randomSplit([0.25, 0.75], seed = 5)

In [15]:
print("Test: ", df[0].count(),"\n","Train: ", df[1].count())

Test:  2527 
 Train:  7467


In [16]:
#df.union(df2) can be used to join two tables of same columns

test = df[0]
train = df[1]

new = test.union(train)
new.count()

9994

In [17]:
from pyspark.sql.functions import desc, asc, col

newdf = superstore.select("State").distinct()

In [18]:
newdf.count()

49

In [19]:
newdf.show(5)

+---------+
|    State|
+---------+
|     Utah|
|Minnesota|
|     Ohio|
|   Oregon|
| Arkansas|
+---------+
only showing top 5 rows



In [20]:
newdf.sort("State").show(5)

+----------+
|     State|
+----------+
|   Alabama|
|   Arizona|
|  Arkansas|
|California|
|  Colorado|
+----------+
only showing top 5 rows



In [21]:
#df.orderBy(col("col name")) can also be used to sort a dataframe column

from pyspark.sql.functions import desc

newdf.orderBy(col("State").desc()).show(5)

+-------------+
|        State|
+-------------+
|      Wyoming|
|    Wisconsin|
|West Virginia|
|   Washington|
|     Virginia|
+-------------+
only showing top 5 rows



In [22]:
superstore.rdd.getNumPartitions()

4

In [23]:
newdf = superstore.repartition(8)

print(newdf.rdd.getNumPartitions())

8


In [24]:
newdf = newdf.coalesce(2)

print(newdf.rdd.getNumPartitions())

2


In [25]:
superstore.rdd.getNumPartitions()

4

In [26]:
superstore.where(col("State") != "Washington")\
.select("State", "Quantity")\
.groupBy("State")\
.sum("Quantity")\
.withColumnRenamed("sum(Quantity)", "Quantity")\
.orderBy(col("Quantity").desc())\
.show(5)

+------------+--------+
|       State|Quantity|
+------------+--------+
|  California|    7667|
|    New York|    4224|
|       Texas|    3724|
|Pennsylvania|    2153|
|    Illinois|    1845|
+------------+--------+
only showing top 5 rows



In [27]:
quantFilter = col("Quantity") > 5
catFilter = col("Category") != "Furniture"
cols = ["State", "Ship Mode", "Quantity"]

superstore.where(quantFilter | catFilter)\
.select("State", "Ship Mode", "Quantity")\
.groupBy("State", "Ship Mode")\
.sum("Quantity")\
.withColumnRenamed("sum(Quantity)", "Quantity")\
.orderBy(cols, ascending = True)\
.show(5)

+-------+--------------+--------+
|  State|     Ship Mode|Quantity|
+-------+--------------+--------+
|Alabama|   First Class|      33|
|Alabama|      Same Day|       2|
|Alabama|  Second Class|      77|
|Alabama|Standard Class|     133|
|Arizona|   First Class|     149|
+-------+--------------+--------+
only showing top 5 rows



In [28]:
from pyspark.sql.functions import corr

superstore.stat.corr("Quantity", "Profit")
superstore.select(corr("Quantity", "Profit")).show()

+----------------------+
|corr(Quantity, Profit)|
+----------------------+
|   0.06625318912428482|
+----------------------+



In [29]:
superstore.select("Sales", "Profit", "Quantity", "Discount").describe().show()

+-------+-------------------+------------------+------------------+-------------------+
|summary|              Sales|            Profit|          Quantity|           Discount|
+-------+-------------------+------------------+------------------+-------------------+
|  count|               9994|              9994|              9994|               9994|
|   mean| 229.85800083049847|  28.6568963077847| 3.789573744246548|0.15620272163297735|
| stddev|   623.245100508681| 234.2601076909574|2.2251096911414012|0.20645196782571612|
|    min|0.44399999999999995|-6599.978000000001|                 1|                0.0|
|    max|           22638.48| 8399.975999999999|                14|                0.8|
+-------+-------------------+------------------+------------------+-------------------+



In [30]:
colName = "Sales"
quantileProbs = [0.25, 0.5, 0.75]
relError = 0.05

for i in superstore.stat.approxQuantile(colName, quantileProbs, relError):
    print(round(i, 3))

17.31
49.848
206.112


In [31]:
from pyspark.sql.functions import monotonically_increasing_id
superstore.select(monotonically_increasing_id()).withColumnRenamed("monotonically_increasing_id()", "ROW_ID").show(5)

+------+
|ROW_ID|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



In [32]:
from pyspark.sql.functions import initcap, lower, upper, lit

superstore.select(initcap(lit("test row")), upper(lit("test row")), lower(lit("TEST ROW"))).show(5)

+-----------------+---------------+---------------+
|initcap(test row)|upper(test row)|lower(TEST ROW)|
+-----------------+---------------+---------------+
|         Test Row|       TEST ROW|       test row|
|         Test Row|       TEST ROW|       test row|
|         Test Row|       TEST ROW|       test row|
|         Test Row|       TEST ROW|       test row|
|         Test Row|       TEST ROW|       test row|
+-----------------+---------------+---------------+
only showing top 5 rows



In [33]:
from pyspark.sql.functions import ltrim, rtrim, lpad, rpad, trim

superstore.select(ltrim(lit("          hello         ")).alias("ltrim"),
                 rtrim(lit("         hello         ")).alias("rtrim"),
                 trim(lit("           hello        ")).alias("trim"),
                 rpad(lit("hello"), 10, " ").alias("rpad"),
                 lpad(lit("hello"), 3, " ").alias("lpad")).show(5)

+--------------+--------------+-----+----------+----+
|         ltrim|         rtrim| trim|      rpad|lpad|
+--------------+--------------+-----+----------+----+
|hello         |         hello|hello|hello     | hel|
|hello         |         hello|hello|hello     | hel|
|hello         |         hello|hello|hello     | hel|
|hello         |         hello|hello|hello     | hel|
|hello         |         hello|hello|hello     | hel|
+--------------+--------------+-----+----------+----+
only showing top 5 rows



In [34]:
from pyspark.sql.functions import regexp_replace

regex_string = "2017|2018"

superstore.select(regexp_replace(col("Order ID"), regex_string, "latest").alias("latest orders")).show(5)

+----------------+
|   latest orders|
+----------------+
|CA-latest-152156|
|CA-latest-152156|
|CA-latest-138688|
|  US-2016-108966|
|  US-2016-108966|
+----------------+
only showing top 5 rows



In [35]:
from pyspark.sql.functions import translate

superstore.select(translate(col("State"), "AEIOUaeiou", "0123456789").alias("translated state")).show(5)

+----------------+
|translated state|
+----------------+
|        K6nt9cky|
|        K6nt9cky|
|      C5l7f8rn75|
|         Fl8r7d5|
|         Fl8r7d5|
+----------------+
only showing top 5 rows



In [36]:
from pyspark.sql.functions import current_date, current_timestamp

dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())

dateDF.show(5, False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2019-03-20|2019-03-20 14:32:46.275|
|1  |2019-03-20|2019-03-20 14:32:46.275|
|2  |2019-03-20|2019-03-20 14:32:46.275|
|3  |2019-03-20|2019-03-20 14:32:46.275|
|4  |2019-03-20|2019-03-20 14:32:46.275|
+---+----------+-----------------------+
only showing top 5 rows



In [37]:
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [38]:
type(dateDF)

pyspark.sql.dataframe.DataFrame

In [39]:
dateDF.columns

['id', 'today', 'now']

In [40]:
dateDF.rdd.getNumPartitions()

4

In [41]:
from pyspark.sql.functions import date_add, date_sub, date_trunc

dateDF.select(date_add(col("today"), 5), 
              date_sub(col("today"), 5), 
              date_trunc("yyyy", col("today"))).show(5)

+------------------+------------------+-----------------------+
|date_add(today, 5)|date_sub(today, 5)|date_trunc(yyyy, today)|
+------------------+------------------+-----------------------+
|        2019-03-25|        2019-03-15|    2019-01-01 00:00:00|
|        2019-03-25|        2019-03-15|    2019-01-01 00:00:00|
|        2019-03-25|        2019-03-15|    2019-01-01 00:00:00|
|        2019-03-25|        2019-03-15|    2019-01-01 00:00:00|
|        2019-03-25|        2019-03-15|    2019-01-01 00:00:00|
+------------------+------------------+-----------------------+
only showing top 5 rows



In [42]:
from pyspark.sql.functions import to_date

spark.range(5).withColumn("date", lit("2019-01-01")).select(to_date(col("date"))).show()

+---------------+
|to_date(`date`)|
+---------------+
|     2019-01-01|
|     2019-01-01|
|     2019-01-01|
|     2019-01-01|
|     2019-01-01|
+---------------+



In [43]:
from pyspark.sql.functions import datediff, months_between, to_date

dateDF.withColumn("week_ago", date_sub(col("today"), 7)).select(datediff(col("week_ago"), col("today"))).show(5)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
|                       -7|
|                       -7|
|                       -7|
|                       -7|
+-------------------------+
only showing top 5 rows



In [44]:
dateDF.select(to_date(lit("2016-04-01")).alias("start"),
              to_date(lit("2016-05-21")).alias("end"))\
              .select(months_between(col("start"), col("end"))).show(5)

+--------------------------+
|months_between(start, end)|
+--------------------------+
|               -1.64516129|
|               -1.64516129|
|               -1.64516129|
|               -1.64516129|
|               -1.64516129|
+--------------------------+
only showing top 5 rows



In [45]:
dateDF.select(to_date(lit("01-04-1991"), "dd-MM-yyyy").alias("start"),
             to_date(lit("02-01-2019"), "dd-MM-yyyy").alias("end"))\
            .select(months_between(col("end"), col("start"))).show(5)

+--------------------------+
|months_between(end, start)|
+--------------------------+
|              333.03225806|
|              333.03225806|
|              333.03225806|
|              333.03225806|
|              333.03225806|
+--------------------------+
only showing top 5 rows



In [46]:
from pyspark.sql.functions import coalesce

superstore.select(coalesce(col("City"), col("Product Name"))).show(5)

+----------------------------+
|coalesce(City, Product Name)|
+----------------------------+
|                   Henderson|
|                   Henderson|
|                 Los Angeles|
|             Fort Lauderdale|
|             Fort Lauderdale|
+----------------------------+
only showing top 5 rows



In [47]:
superstore.na.drop().show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [48]:
superstore.na.drop("all").show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [49]:
superstore.na.drop("all").show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [50]:
superstore.na.drop("all", subset = ["Order Id", "Order Date"]).show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [51]:
superstore.na.fill("All null values become this string").show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [52]:
superstore.na.replace([" "], ["UNKNOWN"], "Description").show(5)

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [53]:
#struct funtion is used to create a complex column by combining multiple columns so that they can be later queried

from pyspark.sql.functions import struct

df_test = superstore.select(struct("Row ID", "Order ID").alias("complex"))

In [54]:
df_test.select("complex.Order ID").show(5)

+--------------+
|      Order ID|
+--------------+
|CA-2017-152156|
|CA-2017-152156|
|CA-2017-138688|
|US-2016-108966|
|US-2016-108966|
+--------------+
only showing top 5 rows



In [55]:
#split function is used to split rows of a column into arrays

from pyspark.sql.functions import split

superstore.select(split(col("Customer Name"), " ").alias("First_and_Last_Names"))\
.selectExpr("First_and_Last_Names[1]")\
.show(5)

+-----------------------+
|First_and_Last_Names[1]|
+-----------------------+
|                   Gute|
|                   Gute|
|                    Van|
|              O'Donnell|
|              O'Donnell|
+-----------------------+
only showing top 5 rows



In [56]:
#size function can be used to find the size of the array

from pyspark.sql.functions import size, array_contains

superstore.select(size(split(col("Customer Name"), " ")).alias("name_split")).show(5)

+----------+
|name_split|
+----------+
|         2|
|         2|
|         3|
|         2|
|         2|
+----------+
only showing top 5 rows



In [57]:
#array_contains can be used to check whether the array contains a given value

superstore.select(array_contains(split(col("Customer Name"), " "), "Hoffman").alias("is_hoffman")).show(5)

+----------+
|is_hoffman|
+----------+
|     false|
|     false|
|     false|
|     false|
|     false|
+----------+
only showing top 5 rows



In [58]:
#explode function can be used to create new rows from the indicidual values of an array

from pyspark.sql.functions import explode

superstore.withColumn("splitted", split(col("Customer Name"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Customer Name","splitted", "exploded").show(5)

+---------------+-------------------+--------+
|  Customer Name|           splitted|exploded|
+---------------+-------------------+--------+
|    Claire Gute|     [Claire, Gute]|  Claire|
|    Claire Gute|     [Claire, Gute]|    Gute|
|    Claire Gute|     [Claire, Gute]|  Claire|
|    Claire Gute|     [Claire, Gute]|    Gute|
|Darrin Van Huff|[Darrin, Van, Huff]|  Darrin|
+---------------+-------------------+--------+
only showing top 5 rows



In [59]:
#map function can be used to create key value pairs of columns

from pyspark.sql.functions import create_map

superstore.select(create_map(col("Customer Name"), col("Order ID")).alias("mapped")).show(5, False)

+-----------------------------------+
|mapped                             |
+-----------------------------------+
|[Claire Gute -> CA-2017-152156]    |
|[Claire Gute -> CA-2017-152156]    |
|[Darrin Van Huff -> CA-2017-138688]|
|[Sean O'Donnell -> US-2016-108966] |
|[Sean O'Donnell -> US-2016-108966] |
+-----------------------------------+
only showing top 5 rows



In [60]:
#maps can be queried

superstore.select(create_map(col("Customer Name"), col("Order ID")).alias("mapped"))\
.selectExpr("mapped['Claire Gute']").show(5)

+-------------------+
|mapped[Claire Gute]|
+-------------------+
|     CA-2017-152156|
|     CA-2017-152156|
|               null|
|               null|
|               null|
+-------------------+
only showing top 5 rows



In [61]:
#handling jason data

jsondf = spark.range(1).selectExpr(""" '{"myJsonKey": 
                                                {"myJsonValues": [1, 2, 3]}}' 
                                                    as jsonString """)

In [62]:
from pyspark.sql.functions import get_json_object, json_tuple

jsondf.select(
    get_json_object(col("jsonString"), "$.myJsonKey.myJsonValues[0]").alias("column"), 
    json_tuple(col("jsonString"), "myJsonKey").alias("jsonKey")).show(10, False)

+------+------------------------+
|column|jsonKey                 |
+------+------------------------+
|1     |{"myJsonValues":[1,2,3]}|
+------+------------------------+



In [63]:
from pyspark.sql.functions import to_json

superstore.selectExpr("(`Order ID`, `Customer Name`) as myStruct")\
.select(to_json(col("myStruct"))).show(5, False)

+---------------------------------------------------------------+
|structstojson(myStruct)                                        |
+---------------------------------------------------------------+
|{"Order ID":"CA-2017-152156","Customer Name":"Claire Gute"}    |
|{"Order ID":"CA-2017-152156","Customer Name":"Claire Gute"}    |
|{"Order ID":"CA-2017-138688","Customer Name":"Darrin Van Huff"}|
|{"Order ID":"US-2016-108966","Customer Name":"Sean O'Donnell"} |
|{"Order ID":"US-2016-108966","Customer Name":"Sean O'Donnell"} |
+---------------------------------------------------------------+
only showing top 5 rows



# Aggregations

In [64]:
#Count of the Order IDs

from pyspark.sql.functions import count

superstore.select(count("Order ID")).show()

+---------------+
|count(Order ID)|
+---------------+
|           9994|
+---------------+



In [65]:
#Count of distinct Order ID

from pyspark.sql.functions import countDistinct

superstore.select(countDistinct("Order ID")).show()

+------------------------+
|count(DISTINCT Order ID)|
+------------------------+
|                    5009|
+------------------------+



In [66]:
#find only a certain degree of of count distinct

from pyspark.sql.functions import approx_count_distinct

superstore.select(approx_count_distinct("Order ID", 0.01)).show()

+-------------------------------+
|approx_count_distinct(Order ID)|
+-------------------------------+
|                           4967|
+-------------------------------+



In [67]:
#find the first and the last items in a columns

from pyspark.sql.functions import first, last

superstore.select(first("Order ID"), last("Order ID")).show()

+----------------------+---------------------+
|first(Order ID, false)|last(Order ID, false)|
+----------------------+---------------------+
|        CA-2017-152156|       CA-2018-119914|
+----------------------+---------------------+



In [68]:
#find the min and max for a column

from pyspark.sql.functions import min, max

superstore.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|            1|           14|
+-------------+-------------+



In [69]:
#find the sum of a numerical column

from pyspark.sql.functions import sum

superstore.select(sum("Sales")).show()

+------------------+
|        sum(Sales)|
+------------------+
|2297200.8603000017|
+------------------+



In [70]:
# find the disticnt sum of a column

from pyspark.sql.functions import sumDistinct

superstore.select(sumDistinct("Sales")).show()

+-------------------+
|sum(DISTINCT Sales)|
+-------------------+
| 1831956.4445000002|
+-------------------+



In [71]:
#find the sum, max, min, expr of a column

from pyspark.sql.functions import avg

superstore.select(sum("Quantity").alias("total_products_sold"), 
                  countDistinct("Customer ID").alias("distinct_products_sold"),
                 sum("Sales").alias("total_sales_amount"),
                 avg("Sales").alias("mean_sales")).selectExpr("total_products_sold", 
                                                             "distinct_products_sold",
                                                             "total_sales_amount",
                                                             "mean_sales").show()

+-------------------+----------------------+------------------+------------------+
|total_products_sold|distinct_products_sold|total_sales_amount|        mean_sales|
+-------------------+----------------------+------------------+------------------+
|              37873|                   793|2297200.8603000003|229.85800083049833|
+-------------------+----------------------+------------------+------------------+



In [72]:
from pyspark.sql.functions import var_pop, stddev_pop, var_samp, stddev_samp

superstore.select(var_pop("Quantity"), var_samp("Quantity"), 
                  stddev_pop("Quantity"), stddev_samp("Quantity")).show()



+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|4.950617729052486| 4.951113137611382|  2.2249983660786103|   2.2251096911414012|
+-----------------+------------------+--------------------+---------------------+



In [73]:
from pyspark.sql.functions import skewness, kurtosis

superstore.select(skewness("Sales"), kurtosis("Sales")).show()

+------------------+-----------------+
|   skewness(Sales)|  kurtosis(Sales)|
+------------------+-----------------+
|12.970805179533508|305.1584268174984|
+------------------+-----------------+



In [74]:
#aggregarte complex types e.g aggregate the number of states

from pyspark.sql.functions import collect_list, collect_set

superstore.agg(collect_list("State"), collect_set("State")).show()

+--------------------+--------------------+
| collect_list(State)|  collect_set(State)|
+--------------------+--------------------+
|[Kentucky, Kentuc...|[Michigan, Vermon...|
+--------------------+--------------------+



In [75]:
#groupby condition on dataframes

from pyspark.sql.functions import expr

superstore.groupby("Region", "State").agg(expr("count('Order ID')").alias("Order Count"))\
.orderBy("Region", "State").show(50)

+-------+--------------------+-----------+
| Region|               State|Order Count|
+-------+--------------------+-----------+
|Central|            Illinois|        492|
|Central|             Indiana|        149|
|Central|                Iowa|         30|
|Central|              Kansas|         24|
|Central|            Michigan|        255|
|Central|           Minnesota|         89|
|Central|            Missouri|         66|
|Central|            Nebraska|         38|
|Central|        North Dakota|          7|
|Central|            Oklahoma|         66|
|Central|        South Dakota|         12|
|Central|               Texas|        985|
|Central|           Wisconsin|        110|
|   East|         Connecticut|         82|
|   East|            Delaware|         96|
|   East|District of Columbia|         10|
|   East|               Maine|          8|
|   East|            Maryland|        105|
|   East|       Massachusetts|        135|
|   East|       New Hampshire|         27|
|   East|  

In [76]:
#group by functions 

superstore.groupby("Order ID").agg(expr("max(Quantity)").alias("max_quantity"), stddev_pop("Quantity"))\
.orderBy("max_quantity", ascending = False).show()

+--------------+------------+--------------------+
|      Order ID|max_quantity|stddev_pop(Quantity)|
+--------------+------------+--------------------+
|US-2016-119312|          14|                 0.0|
|CA-2017-145583|          14|   4.060762972443398|
|CA-2015-154599|          14|   4.301162633521313|
|CA-2015-120768|          14|   5.354126134736337|
|CA-2016-104241|          14|                 0.0|
|CA-2016-149713|          14|  5.2493385826745405|
|CA-2017-142405|          14|   5.436502143433363|
|CA-2018-152702|          14|                 0.0|
|US-2017-103674|          14|   3.833259389999639|
|CA-2018-130036|          14|                 5.5|
|CA-2018-161410|          14|                 5.5|
|CA-2018-164028|          14|                 0.0|
|CA-2017-105732|          14|  3.3823069050575527|
|CA-2015-158337|          14|                 0.0|
|CA-2015-154165|          14|                 0.0|
|CA-2016-103135|          14|    4.06201920231798|
|CA-2017-140571|          14|  

In [77]:
#window functions ***IMPORTANT***

dfWithDate = superstore.withColumn("date", to_date(col("Order Date"), "MM/d/yyyy H:mm"))
dfWithDate = dfWithDate.select(col("Category"), col("date"), col("Sales"))

dfWithDate.createOrReplaceTempView("dfWithDate")

In [78]:
dfWithDate.show(3)

+---------------+----------+-----------------+
|       Category|      date|            Sales|
+---------------+----------+-----------------+
|      Furniture|2017-11-08|           261.96|
|      Furniture|2017-11-08|731.9399999999999|
|Office Supplies|2017-06-12|            14.62|
+---------------+----------+-----------------+
only showing top 3 rows



In [79]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window\
.partitionBy("date", "Category")\
.orderBy(desc("Sales"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [80]:
maxPurchaseQuantity = max(col("Sales")).over(windowSpec)

In [81]:
from pyspark.sql.functions import dense_rank, rank, row_number

purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)
rownum = row_number().over(windowSpec)

In [82]:
dfWithDate.where("`Category` IS NOT NULL").orderBy("date")\
.select(col("Category"), 
        col("date"), 
        col("Sales"), 
        purchaseRank.alias("rank"), 
        purchaseDenseRank.alias("dense_rank"), 
        maxPurchaseQuantity.alias("max_sales"),
       rownum.alias("row id")).show()

+---------------+----------+------------------+----+----------+------------------+------+
|       Category|      date|             Sales|rank|dense_rank|         max_sales|row id|
+---------------+----------+------------------+----+----------+------------------+------+
|Office Supplies|2015-01-03|            16.448|   1|         1|            16.448|     1|
|Office Supplies|2015-01-04|           272.736|   1|         1|           272.736|     1|
|Office Supplies|2015-01-04|            11.784|   2|         2|           272.736|     2|
|Office Supplies|2015-01-04| 3.539999999999999|   3|         3|           272.736|     3|
|Office Supplies|2015-01-05|            19.536|   1|         1|            19.536|     1|
|      Furniture|2015-01-06|           2573.82|   1|         1|           2573.82|     1|
|Office Supplies|2015-01-06|            609.98|   1|         1|            609.98|     1|
|Office Supplies|2015-01-06|             31.12|   2|         2|            609.98|     2|
|Office Su

In [83]:
#roll up

dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [84]:
dfNoNull.show(5)

+---------------+----------+------------------+
|       Category|      date|             Sales|
+---------------+----------+------------------+
|      Furniture|2017-11-08|            261.96|
|      Furniture|2017-11-08| 731.9399999999999|
|Office Supplies|2017-06-12|             14.62|
|      Furniture|2016-10-11|          957.5775|
|Office Supplies|2016-10-11|22.368000000000002|
+---------------+----------+------------------+
only showing top 5 rows



In [85]:
#grouping sets can be used to group data across multiple groups. Only available in SQL

spark.sql("SELECT Category, date, sum(Sales) FROM dfNoNull GROUP BY Category, date GROUPING SETS ((Category, date), ()) ORDER BY Category ASC, date DESC").show(10)

+---------+----------+------------------+
| Category|      date|        sum(Sales)|
+---------+----------+------------------+
|     null|      null|2297200.8603000017|
|Furniture|2018-12-30|           323.136|
|Furniture|2018-12-29|          2330.718|
|Furniture|2018-12-28| 551.2568000000001|
|Furniture|2018-12-25|           832.454|
|Furniture|2018-12-24|1393.4940000000001|
|Furniture|2018-12-23|           282.114|
|Furniture|2018-12-22| 4086.455999999999|
|Furniture|2018-12-21|             15.92|
|Furniture|2018-12-19|115.37800000000001|
+---------+----------+------------------+
only showing top 10 rows



In [86]:
#roll up function

rolledUpDF = dfNoNull.rollup("date", "Category")\
.agg(sum("Sales")).selectExpr("date", "Category", "`sum(Sales)` as total_sales")\
.orderBy("date")

rolledUpDF.show()

+----------+---------------+------------------+
|      date|       Category|       total_sales|
+----------+---------------+------------------+
|      null|           null|2297200.8603000017|
|2015-01-03|Office Supplies|            16.448|
|2015-01-03|           null|            16.448|
|2015-01-04|           null|            288.06|
|2015-01-04|Office Supplies|            288.06|
|2015-01-05|Office Supplies|            19.536|
|2015-01-05|           null|            19.536|
|2015-01-06|           null| 4407.099999999999|
|2015-01-06|      Furniture|           2573.82|
|2015-01-06|Office Supplies|            685.34|
|2015-01-06|     Technology|           1147.94|
|2015-01-07|Office Supplies|10.429999999999998|
|2015-01-07|      Furniture| 76.72800000000001|
|2015-01-07|           null|            87.158|
|2015-01-09|Office Supplies|             9.344|
|2015-01-09|     Technology|31.200000000000003|
|2015-01-09|           null|40.544000000000004|
|2015-01-10|           null|            

In [87]:
rolledUpDF.where("Category IS NULL").show()

+----------+--------+------------------+
|      date|Category|       total_sales|
+----------+--------+------------------+
|      null|    null|2297200.8603000017|
|2015-01-03|    null|            16.448|
|2015-01-04|    null|            288.06|
|2015-01-05|    null|            19.536|
|2015-01-06|    null| 4407.099999999999|
|2015-01-07|    null|            87.158|
|2015-01-09|    null|40.544000000000004|
|2015-01-10|    null|             54.83|
|2015-01-11|    null|              9.94|
|2015-01-13|    null|3553.7949999999996|
|2015-01-14|    null|             61.96|
|2015-01-15|    null|            149.95|
|2015-01-16|    null|           299.964|
|2015-01-18|    null|            64.864|
|2015-01-19|    null|378.59400000000005|
|2015-01-20|    null|           2673.87|
|2015-01-21|    null|            25.248|
|2015-01-23|    null|46.019999999999996|
|2015-01-26|    null|           1097.25|
|2015-01-27|    null|            426.67|
+----------+--------+------------------+
only showing top

In [88]:
rolledUpDF.where("date IS NULL").show()

+----+--------+------------------+
|date|Category|       total_sales|
+----+--------+------------------+
|null|    null|2297200.8603000017|
+----+--------+------------------+



In [89]:
#cube function

cubeDF = dfNoNull.cube("date", "Category", "Sales")\
.agg(max("Sales"))\
.selectExpr("date", "Category", "`max(Sales)` as highest_sales")\
.orderBy("date")

In [90]:
cubeDF.sort("date", ascending = False).show()

+----------+---------------+------------------+
|      date|       Category|     highest_sales|
+----------+---------------+------------------+
|2018-12-30|     Technology|             90.93|
|2018-12-30|Office Supplies|             3.024|
|2018-12-30|Office Supplies|             209.3|
|2018-12-30|           null|             90.93|
|2018-12-30|           null|            13.904|
|2018-12-30|           null|           323.136|
|2018-12-30|Office Supplies|            13.904|
|2018-12-30|           null|             20.72|
|2018-12-30|Office Supplies|             20.72|
|2018-12-30|           null|             3.024|
|2018-12-30|           null|             209.3|
|2018-12-30|      Furniture|           323.136|
|2018-12-30|      Furniture|           323.136|
|2018-12-30|Office Supplies|52.775999999999996|
|2018-12-30|           null|52.775999999999996|
|2018-12-30|     Technology|             90.93|
|2018-12-30|           null|           323.136|
|2018-12-30|Office Supplies|            

In [91]:
cubeDF.where("Category IS NULL").show()

+----+--------+------------------+
|date|Category|     highest_sales|
+----+--------+------------------+
|null|    null|2.8160000000000003|
|null|    null|              7.76|
|null|    null|              8.62|
|null|    null|            72.784|
|null|    null|374.37600000000003|
|null|    null|             47.82|
|null|    null|25.920000000000005|
|null|    null|             37.32|
|null|    null|             43.68|
|null|    null|            35.712|
|null|    null|              10.9|
|null|    null|           170.058|
|null|    null|333.09000000000003|
|null|    null| 56.82000000000001|
|null|    null|            95.992|
|null|    null|15.959999999999999|
|null|    null|435.16800000000006|
|null|    null|25.695999999999998|
|null|    null|            141.42|
|null|    null|           151.188|
+----+--------+------------------+
only showing top 20 rows



In [92]:
pivotDF = dfWithDate.groupby("date").pivot("Category").sum()

In [93]:
pivotDF.where("`date` > '2018-11-30'").orderBy("date", ascending = True).show(30)

+----------+------------------+------------------+------------------+
|      date|         Furniture|   Office Supplies|        Technology|
+----------+------------------+------------------+------------------+
|2018-12-01|          2542.292|1371.4499999999998|1417.4360000000001|
|2018-12-02|          3527.504|          2665.612|3758.0660000000003|
|2018-12-03|            591.84|           447.932|364.07000000000005|
|2018-12-04| 992.7819999999999|           199.138|          1447.718|
|2018-12-05|            321.48|            796.06|           335.596|
|2018-12-06|              null|             10.68|              null|
|2018-12-07|             82.38|          2279.774|            554.36|
|2018-12-08|3368.2650000000003|2632.7520000000004|1642.0240000000001|
|2018-12-09|1927.2330000000002|          1447.079|          2096.078|
|2018-12-10|          2102.264|           973.235| 798.0600000000002|
|2018-12-11|          1448.529| 899.7460000000001|475.69000000000005|
|2018-12-13|489.0159

In [94]:
from pyspark.sql.functions import monotonically_increasing_id, col, year

df1 = superstore.select(monotonically_increasing_id(), year("Order Date"))\
.withColumnRenamed("year(Order Date)", "date1")\
.withColumnRenamed("monotonically_increasing_id()", "ROW_ID")

In [95]:
df2 = superstore.select(monotonically_increasing_id(), year("Ship Date"))\
.withColumnRenamed("year(Ship Date)", "date2")\
.withColumnRenamed("monotonically_increasing_id()", "ROW_ID")

In [96]:
df3 = df2.join(df1, df1.ROW_ID == df2.ROW_ID)\
.withColumn("difference", df2.date2 - df1.date1)\
.selectExpr("date1", "date2", "difference")

In [97]:
df3.show(15)

+-----+-----+----------+
|date1|date2|difference|
+-----+-----+----------+
| 2017| 2017|         0|
| 2016| 2016|         0|
| 2015| 2015|         0|
| 2018| 2018|         0|
| 2017| 2017|         0|
| 2016| 2016|         0|
| 2017| 2017|         0|
| 2018| 2018|         0|
| 2016| 2016|         0|
| 2016| 2016|         0|
| 2017| 2017|         0|
| 2018| 2018|         0|
| 2018| 2018|         0|
| 2016| 2016|         0|
| 2017| 2017|         0|
+-----+-----+----------+
only showing top 15 rows



In [98]:
df3.where("difference > 0").orderBy("date1", ascending = False).show()

+-----+-----+----------+
|date1|date2|difference|
+-----+-----+----------+
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
| 2018| 2019|         1|
+-----+-----+----------+
only showing top 20 rows



In [99]:
df3.describe().toPandas().T

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
date1,9994,2016.722233340004,1.1235549110443188,2015,2018
date2,9994,2016.7374424654793,1.1261405941480775,2015,2019
difference,9994,0.015209125475285171,0.12238997837870935,0,1


# Pair RDD

## Pair RDD with tuples

In [100]:
spark

# Joins

In [101]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])]).toDF("id", "name", "graduate_course", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")]).toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")]).toDF("id", "status")

In [102]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

### Inner Join

In [103]:
joinCondition = person["graduate_course"] == graduateProgram["id"]

joinedDf = person.join(graduateProgram, joinCondition)
joinedDf.show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [104]:
spark.sql("SELECT * FROM person INNER JOIN graduateProgram ON person.graduate_course = graduateProgram.id").show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



### Outer Join

In [105]:
joinType = "outer"

joinedDf = person.join(graduateProgram, joinCondition, joinType)
joinedDf.show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [106]:
spark.sql("SELECT * FROM person FULL OUTER JOIN graduateProgram ON person.graduate_course = graduateProgram.id").show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



### Left Outer Join

In [107]:
joinType = "left_outer"

joinedDf = person.join(graduateProgram, joinCondition, joinType)
joinedDf.show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [108]:
spark.sql("SELECT * FROM person LEFT OUTER JOIN graduateProgram ON person.graduate_course = graduateProgram.id").show()

+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+---------------+---+-------+--------------------+-----------+



### Right Outer Join

In [109]:
joinType = "right_outer"

joinedDf = person.join(graduateProgram, joinCondition, joinType)
joinedDf.show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



In [110]:
spark.sql("SELECT * FROM person RIGHT OUTER JOIN graduateProgram ON person.graduate_course = graduateProgram.id").show()

+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_course|   spark_status| id| degree|          department|     school|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|              0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|              1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|              1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|           null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+---------------+---------------+---+-------+--------------------+-----------+



### Left Semi Joins

In [111]:
joinType = "left_semi"

joinedDf = graduateProgram.join(person, joinCondition, joinType)
joinedDf.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [112]:
spark.sql("SELECT * FROM graduateProgram LEFT SEMI JOIN person ON graduateProgram.id = person.graduate_course").show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



### Left Anti Join

In [113]:
joinType = "left_anti"

joinedDf = graduateProgram.join(person, joinCondition, joinType)
joinedDf.show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



In [114]:
spark.sql("SELECT * FROM graduateProgram LEFT ANTI JOIN person ON graduateProgram.id = person.graduate_course").show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



### Cross Join

In [None]:
"""
joinType = "cross"

joinedDf = person.join(graduateProgram, joinCondition, joinType)
joinedDf.show()
"""

In [None]:
"""
spark.sql("SELECT * FROM person CROSS JOIN graduateProgram").show()
"""

## Window Functions and Sub-Queries in Spark SQL

In [117]:
spark.sql("SELECT * FROM `superstore_view` LIMIT 5").show()

+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|         Order Date|          Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+-------------------+-------------------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2017-152156|2017-11-08 00:00:00|2017-11-11 00:00:00|  Second Class|   CG-12520|    Claire

In [118]:
#Maximum, Minimum and Avg Sales Per Order Date

spark.sql("SELECT `Customer Name`, `Order Date`, Sales, \
          MAX(Sales) OVER(PARTITION BY `Order Date`) as Max_Sales, \
          MIN(Sales) OVER(PARTITION BY `Order Date`) as Min_Sales,  \
          AVG(Sales) Over(PARTITION BY `Order Date`) as Avg_Sales \
          FROM superstore_view").show()

+----------------+-------------------+------------------+------------------+------------------+----------+
|   Customer Name|         Order Date|             Sales|         Max_Sales|         Min_Sales| Avg_Sales|
+----------------+-------------------+------------------+------------------+------------------+----------+
|   Brendan Sweed|2015-12-05 00:00:00|1113.0240000000001|1113.0240000000001|              15.0|185.227625|
|   Brendan Sweed|2015-12-05 00:00:00|167.96800000000002|1113.0240000000001|              15.0|185.227625|
|  David Kendrick|2015-12-05 00:00:00|24.816000000000003|1113.0240000000001|              15.0|185.227625|
|  David Kendrick|2015-12-05 00:00:00|408.74399999999997|1113.0240000000001|              15.0|185.227625|
|     Roy Collins|2015-12-05 00:00:00|             24.56|1113.0240000000001|              15.0|185.227625|
|       Nona Balk|2015-12-05 00:00:00|           348.488|1113.0240000000001|              15.0|185.227625|
|       Nona Balk|2015-12-05 00:00:00

In [119]:
#Days between orders for each customer sorted by descending order of days between orders

spark.sql("SELECT `Customer Name`, Sales, Current_Order_Date, Next_Order_Date, \
          DATEDIFF(Next_Order_Date, Current_Order_Date) as DaysBetweenOrders \
          FROM \
          \
          (SELECT `Customer Name`, Sales, `Order Date` as Current_Order_Date, \
          LEAD(`Order Date`, 1) OVER(PARTITION BY `Customer Name` ORDER BY `Order Date`) as Next_Order_Date \
          FROM superstore_view) \
          \
          ORDER BY DaysBetweenOrders DESC").show()

+-----------------+------------------+-------------------+-------------------+-----------------+
|    Customer Name|             Sales| Current_Order_Date|    Next_Order_Date|DaysBetweenOrders|
+-----------------+------------------+-------------------+-------------------+-----------------+
|Jennifer Halladay|22.776000000000003|2015-02-24 00:00:00|2018-11-19 00:00:00|             1364|
| Mitch Willingham|            617.97|2015-05-21 00:00:00|2018-11-16 00:00:00|             1275|
|    Beth Fritzler|           314.352|2015-03-22 00:00:00|2018-07-10 00:00:00|             1206|
|    Hilary Holden|             13.36|2015-08-29 00:00:00|2018-10-19 00:00:00|             1147|
|   David Kendrick|408.74399999999997|2015-12-05 00:00:00|2018-12-20 00:00:00|             1111|
|    Randy Bradley|            657.93|2015-11-17 00:00:00|2018-10-23 00:00:00|             1071|
|   Sharelle Roach|            212.94|2015-06-14 00:00:00|2018-04-17 00:00:00|             1038|
|   Vicky Freymann|           

In [120]:
#Top Customers in each city by total sales

spark.sql("SELECT City, `Customer Name`, Total_sales, \
          ROW_NUMBER() OVER(PARTITION BY CITY ORDER BY Total_sales DESC) as customer_rank_in_city \
          FROM \
          \
          (SELECT City, `Customer Name`, SUM(Sales) as Total_sales  \
          FROM superstore_view \
          GROUP BY `Customer Name`, City \
          ORDER BY City) \
          \
          ORDER BY Total_sales DESC").where(col("customer_rank_in_city") == 1).show(50)

+---------------+--------------------+------------------+---------------------+
|           City|       Customer Name|       Total_sales|customer_rank_in_city|
+---------------+--------------------+------------------+---------------------+
|   Jacksonville|         Sean Miller|         23661.228|                    1|
|      Lafayette|        Tamara Chand|18336.739999999994|                    1|
|        Seattle|        Raymond Buch|          14052.48|                    1|
|  New York City|        Tom Ashbrook|13723.498000000001|                    1|
|    San Antonio|        Becky Martin|10539.895999999999|                    1|
|         Newark|        Hunter Lopez|          10499.97|                    1|
|    Minneapolis|        Sanjit Chand|           9900.19|                    1|
|        Detroit|       Adrian Barton|           9892.74|                    1|
|       Lakewood|        Bill Shonely| 9135.189999999999|                    1|
|      Arlington|        Sanjit Engle|  

In [157]:
#Rank the top 10 products based on profit in each segment sorted from 1 to 10 under profit_rank
#Rank the products in each segment based on sales under a sales_rank column
#Find the difference in sales_ranks & profit_ranks for each product

spark.sql("SELECT *, (sales_rank - profit_rank) as rank_diff \
          FROM \
          \
              (SELECT Segment, `Product Name`,Total_sales, Total_profit, \
              ROW_NUMBER() OVER(PARTITION BY Segment ORDER BY Total_sales DESC) as sales_rank, \
              ROW_NUMBER() OVER(PARTITION BY Segment ORDER BY Total_profit DESC) as profit_rank \
              FROM \
          \
                  (SELECT Segment, `Product Name`, \
                  SUM(Sales) as Total_sales, \
                  SUM(Profit) as Total_profit  \
                  FROM superstore_view \
                  GROUP BY Segment, `Product Name` \
                  ORDER BY Segment) \
          \
          ORDER BY Segment, Total_profit DESC) \
          WHERE profit_rank <= 10").show(30)

+-----------+--------------------+------------------+------------------+----------+-----------+---------+
|    Segment|        Product Name|       Total_sales|      Total_profit|sales_rank|profit_rank|rank_diff|
+-----------+--------------------+------------------+------------------+----------+-----------+---------+
|   Consumer|Canon imageCLASS ...|32899.905999999995|12879.963199999998|         1|          1|        0|
|   Consumer|Ibico EPK-21 Elec...|           9449.95|4630.4755000000005|        12|          2|       10|
|   Consumer|HP Designjet T520...|         18374.895|4094.9765999999986|         2|          3|       -1|
|   Consumer|Ativa V4110MDD Mi...|           7699.89|3772.9460999999997|        15|          4|       11|
|   Consumer|Fellowes PB500 El...|         14489.286|          3685.871|         5|          5|        0|
|   Consumer|GBC Ibimaster 500...|          17122.05|2663.4300000000003|         4|          6|       -2|
|   Consumer|Canon PC1060 Pers...|          67