# Spark API

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
spark

For demonstration, we'll create a spark dataframe from a pandas dataframe.

In [3]:
import numpy as np
import pandas as pd
import pydataset

In [4]:
tips = pydataset.data('tips')
df = spark.createDataFrame(tips)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

## DataFrame Basics

In [5]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [7]:
df.show(5, vertical = True)

-RECORD 0------------
 total_bill | 16.99  
 tip        | 1.01   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 1------------
 total_bill | 10.34  
 tip        | 1.66   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 2------------
 total_bill | 21.01  
 tip        | 3.5    
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 3------------
 total_bill | 23.68  
 tip        | 3.31   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 4------------
 total_bill | 24.59  
 tip        | 3.61   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 4      
only showing top 5 rows



In [6]:
df.head(5)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=23.68, tip=3.31, sex='Male', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=24.59, tip=3.61, sex='Female', smoker='No', day='Sun', time='Dinner', size=4)]

In [8]:
# Don't do this! it's just a print method
# just use .show to view df contents
df2 = df.show(10)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 10 rows



In [9]:
type(df2)

NoneType

### Selecting Columns

In [10]:
df.select('total_bill', 'tip', 'size', 'day').show()

+----------+----+----+---+
|total_bill| tip|size|day|
+----------+----+----+---+
|     16.99|1.01|   2|Sun|
|     10.34|1.66|   3|Sun|
|     21.01| 3.5|   3|Sun|
|     23.68|3.31|   2|Sun|
|     24.59|3.61|   4|Sun|
|     25.29|4.71|   4|Sun|
|      8.77| 2.0|   2|Sun|
|     26.88|3.12|   4|Sun|
|     15.04|1.96|   2|Sun|
|     14.78|3.23|   2|Sun|
|     10.27|1.71|   2|Sun|
|     35.26| 5.0|   4|Sun|
|     15.42|1.57|   2|Sun|
|     18.43| 3.0|   4|Sun|
|     14.83|3.02|   2|Sun|
|     21.58|3.92|   2|Sun|
|     10.33|1.67|   3|Sun|
|     16.29|3.71|   3|Sun|
|     16.97| 3.5|   3|Sun|
|     20.65|3.35|   3|Sat|
+----------+----+----+---+
only showing top 20 rows



In [11]:
df.select('*')

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [12]:
df.select(df.tip / df.total_bill).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [13]:
col = df.tip / df.total_bill
col

Column<'(tip / total_bill)'>

In [14]:
#alias is naming a column, like such and such AS new_name
df.select('*', col.alias('tip_pct')).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [15]:
# how to get your column with a new df
df_with_tip_pct = df.select('*', col.alias('tip_pct'))

In [16]:
df_with_tip_pct.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



To add columns to a spark dataframe, we need to create a new variable.

### Selecting w/ Built In Functions

In [17]:
from pyspark.sql.functions import sum, mean, concat, lit, regexp_extract, regexp_replace, when

In [19]:
mean(df.tip)

Column<'avg(tip)'>

In [18]:
df.select(mean(df.tip), sum(df.total_bill)).show()

+------------------+-----------------+
|          avg(tip)|  sum(total_bill)|
+------------------+-----------------+
|2.9982786885245907|4827.769999999999|
+------------------+-----------------+



In [20]:
# have to use lit (aka string literal) 
# it's looking for a column named ' ' (space)
df.select(concat('day', lit(' '), 'time')).show(5)

+--------------------+
|concat(day,  , time)|
+--------------------+
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
+--------------------+
only showing top 5 rows



In [21]:
df.select(df.time.cast('int')).show(5)

+----+
|time|
+----+
|null|
|null|
|null|
|null|
|null|
+----+
only showing top 5 rows



In [22]:
df = df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct')
)

In [23]:
df

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint, tip_pct: double]

### When / Otherwise

In [25]:
when(df.tip_pct > 2, 'good tip')

Column<'CASE WHEN (tip_pct > 2) THEN good tip END'>

In [28]:
df.select(
    'tip_pct',
    (when(df.tip_pct > .2, 'good tip')
     .otherwise('not good tip')
     .alias('tip_desc'))
).show(10)

+-------------------+------------+
|            tip_pct|    tip_desc|
+-------------------+------------+
|0.05944673337257211|not good tip|
|0.16054158607350097|not good tip|
|0.16658733936220846|not good tip|
| 0.1397804054054054|not good tip|
|0.14680764538430255|not good tip|
|0.18623962040332148|not good tip|
|0.22805017103762829|    good tip|
|0.11607142857142858|not good tip|
|0.13031914893617022|not good tip|
| 0.2185385656292287|    good tip|
+-------------------+------------+
only showing top 10 rows



### Regex

In [29]:
df.select(
    'time',
    regexp_extract('time', r'(\w).*', 1).alias('first_letter'),
    regexp_replace('time', r'[aeiou]', 'X')
).show(5)

+------+------------+-----------------------------------+
|  time|first_letter|regexp_replace(time, [aeiou], X, 1)|
+------+------------+-----------------------------------+
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
+------+------------+-----------------------------------+
only showing top 5 rows



## Transforming Rows

In [30]:
df.show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

### Sorting

In [31]:
df.orderBy(df.total_bill).show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|      3.07| 1.0|Female|   Yes| Sat|Dinner|   1|0.32573289902280134|
|      5.75| 1.0|Female|   Yes| Fri|Dinner|   2|0.17391304347826086|
|      7.25| 1.0|Female|    No| Sat|Dinner|   1|0.13793103448275862|
|      7.25|5.15|  Male|   Yes| Sun|Dinner|   2|  0.710344827586207|
|      7.51| 2.0|  Male|    No|Thur| Lunch|   2| 0.2663115845539281|
|      7.56|1.44|  Male|    No|Thur| Lunch|   2|0.19047619047619047|
|      7.74|1.44|  Male|   Yes| Sat|Dinner|   2|0.18604651162790697|
|      8.35| 1.5|Female|    No|Thur| Lunch|   2|0.17964071856287425|
|      8.51|1.25|Female|    No|Thur| Lunch|   2|0.14688601645123384|
|      8.52|1.48|  Male|    No|Thur| Lunch|   2|0.17370892018779344|
|      8.58|1.92|  Male|   Yes| Fri| Lunch|   1|0.22377622377622378|
|      8.77| 2.0|  Male|    No| Su

In [32]:
df.sort(df.day, df.size).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|      8.58|1.92|  Male|   Yes|Fri| Lunch|   1|0.22377622377622378|
|     12.46| 1.5|  Male|    No|Fri|Dinner|   2| 0.1203852327447833|
|     12.03| 1.5|  Male|   Yes|Fri|Dinner|   2|0.12468827930174564|
|     16.32| 4.3|Female|   Yes|Fri|Dinner|   2|0.26348039215686275|
|     13.42|1.58|  Male|   Yes|Fri| Lunch|   2|0.11773472429210134|
|     15.38| 3.0|Female|   Yes|Fri|Dinner|   2|0.19505851755526657|
|     11.35| 2.5|Female|   Yes|Fri|Dinner|   2|0.22026431718061676|
|     28.97| 3.0|  Male|   Yes|Fri|Dinner|   2|0.10355540214014498|
|     27.28| 4.0|  Male|   Yes|Fri|Dinner|   2|0.14662756598240467|
|      5.75| 1.0|Female|   Yes|Fri|Dinner|   2|0.17391304347826086|
|     22.49| 3.5|  Male|    No|Fri|Dinner|   2|0.15562472209871056|
|     21.01| 3.0|  Male|   Yes|Fri|Dinner|   2| 

In [33]:
from pyspark.sql.functions import asc, desc, col

In [34]:
df.sort(df.day, asc('time'), desc('size')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     40.17|4.73|  Male|   Yes|Fri|Dinner|   4| 0.1177495643515061|
|     28.97| 3.0|  Male|   Yes|Fri|Dinner|   2|0.10355540214014498|
|     11.35| 2.5|Female|   Yes|Fri|Dinner|   2|0.22026431718061676|
|     27.28| 4.0|  Male|   Yes|Fri|Dinner|   2|0.14662756598240467|
|     21.01| 3.0|  Male|   Yes|Fri|Dinner|   2| 0.1427891480247501|
|     12.03| 1.5|  Male|   Yes|Fri|Dinner|   2|0.12468827930174564|
|     12.46| 1.5|  Male|    No|Fri|Dinner|   2| 0.1203852327447833|
|     16.32| 4.3|Female|   Yes|Fri|Dinner|   2|0.26348039215686275|
|     15.38| 3.0|Female|   Yes|Fri|Dinner|   2|0.19505851755526657|
|     22.49| 3.5|  Male|    No|Fri|Dinner|   2|0.15562472209871056|
|      5.75| 1.0|Female|   Yes|Fri|Dinner|   2|0.17391304347826086|
|     22.75|3.25|Female|    No|Fri|Dinner|   2|0

In [35]:
col('size').asc()

Column<'size ASC NULLS FIRST'>

In [36]:
df.sort(col('size').desc(), col('time')).show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|     48.17| 5.0|  Male|    No| Sun|Dinner|   6|0.10379904504878555|
|     27.05| 5.0|Female|    No|Thur| Lunch|   6|0.18484288354898337|
|      29.8| 4.2|Female|    No|Thur| Lunch|   6|0.14093959731543623|
|      34.3| 6.7|  Male|    No|Thur| Lunch|   6|0.19533527696793004|
|     29.85|5.14|Female|    No| Sun|Dinner|   5| 0.1721943048576214|
|     28.15| 3.0|  Male|   Yes| Sat|Dinner|   5|0.10657193605683837|
|     20.69| 5.0|  Male|    No| Sun|Dinner|   5| 0.2416626389560174|
|     30.46| 2.0|  Male|   Yes| Sun|Dinner|   5|0.06565988181221273|
|     41.19| 5.0|  Male|    No|Thur| Lunch|   5|0.12138868657441128|
|     17.81|2.34|  Male|    No| Sat|Dinner|   4|0.13138686131386862|
|      32.4| 6.0|  Male|    No| Sun|Dinner|   4| 0.1851851851851852|
|     24.59|3.61|Female|    No| Su

### Filtering

In [37]:
df.tip < 4

Column<'(tip < 4)'>

In [38]:
# pandas: df[df.tip < 4]
df.where(df.tip < 4).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|0.10181582360570687|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|0

In [40]:
df.where(df.tip < 4).where(df.smoker == 'Yes').show(5)

+----------+----+----+------+---+------+----+-------------------+
|total_bill| tip| sex|smoker|day|  time|size|            tip_pct|
+----------+----+----+------+---+------+----+-------------------+
|     38.01| 3.0|Male|   Yes|Sat|Dinner|   4|0.07892659826361484|
|     11.24|1.76|Male|   Yes|Sat|Dinner|   2|0.15658362989323843|
|     20.29|3.21|Male|   Yes|Sat|Dinner|   2| 0.1582060128141942|
|     13.81| 2.0|Male|   Yes|Sat|Dinner|   2| 0.1448225923244026|
|     11.02|1.98|Male|   Yes|Sat|Dinner|   2|0.17967332123411978|
+----------+----+----+------+---+------+----+-------------------+
only showing top 5 rows



In [41]:
df.where((df.tip<4) | (df.smoker == 'Yes')).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



Sidebar: Spark Performance

A *Shuffle* is required when we have to look at all the partitions to get the data.

In [42]:
mask = df.tip < 4
df.where(mask).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|0.10181582360570687|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|0

In [43]:
df.filter((df.time == "Dinner") | (df.tip <= 2)).sort('tip').show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|      12.6| 1.0|  Male|   Yes| Sat|Dinner|   2|0.07936507936507936|
|      5.75| 1.0|Female|   Yes| Fri|Dinner|   2|0.17391304347826086|
|      3.07| 1.0|Female|   Yes| Sat|Dinner|   1|0.32573289902280134|
|      7.25| 1.0|Female|    No| Sat|Dinner|   1|0.13793103448275862|
|     16.99|1.01|Female|    No| Sun|Dinner|   2|0.05944673337257211|
|      12.9| 1.1|Female|   Yes| Sat|Dinner|   2|0.08527131782945736|
|     32.83|1.17|  Male|   Yes| Sat|Dinner|   2|0.03563813585135547|
|      8.51|1.25|Female|    No|Thur| Lunch|   2|0.14688601645123384|
|     10.07|1.25|  Male|    No| Sat|Dinner|   2|0.12413108242303872|
|     10.51|1.25|  Male|    No| Sat|Dinner|   2|0.11893434823977164|
|      9.68|1.32|  Male|    No| Sun|Dinner|   2|0.13636363636363638|
|     18.64|1.36|Female|    No|Thu

Chaining `.where` will implicitly AND the conditions together

In [44]:
df.where(df.smoker == "Yes").where(df.day == "Sat").show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     38.01| 3.0|  Male|   Yes|Sat|Dinner|   4|0.07892659826361484|
|     11.24|1.76|  Male|   Yes|Sat|Dinner|   2|0.15658362989323843|
|     20.29|3.21|  Male|   Yes|Sat|Dinner|   2| 0.1582060128141942|
|     13.81| 2.0|  Male|   Yes|Sat|Dinner|   2| 0.1448225923244026|
|     11.02|1.98|  Male|   Yes|Sat|Dinner|   2|0.17967332123411978|
|     18.29|3.76|  Male|   Yes|Sat|Dinner|   4|0.20557681793329688|
|      3.07| 1.0|Female|   Yes|Sat|Dinner|   1|0.32573289902280134|
|     15.01|2.09|  Male|   Yes|Sat|Dinner|   2|0.13924050632911392|
|     26.86|3.14|Female|   Yes|Sat|Dinner|   2|0.11690245718540582|
|     25.28| 5.0|Female|   Yes|Sat|Dinner|   2|0.19778481012658228|
|     17.92|3.08|  Male|   Yes|Sat|Dinner|   2|           0.171875|
|      44.3| 2.5|Female|   Yes|Sat|Dinner|   3|0

`.filter` and `.where` are the same function

## Aggregating

In [51]:
from pyspark.sql.functions import mean, min, max, count

In [46]:
df.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [48]:
# this is a transformation
df.groupBy('time').agg(mean('tip'))

DataFrame[time: string, avg(tip): double]

In [49]:
# this is an action
df.groupBy('time').agg(mean('tip')).show()

+------+------------------+
|  time|          avg(tip)|
+------+------------------+
| Lunch|2.7280882352941176|
|Dinner| 3.102670454545455|
+------+------------------+



In [52]:
df.groupBy('time').agg(count('*')).show()

+------+--------+
|  time|count(1)|
+------+--------+
| Lunch|      68|
|Dinner|     176|
+------+--------+



In [53]:
df.groupBy('time').agg(min('tip'), mean('tip'), max('tip')).show()

+------+--------+------------------+--------+
|  time|min(tip)|          avg(tip)|max(tip)|
+------+--------+------------------+--------+
| Lunch|    1.25|2.7280882352941176|     6.7|
|Dinner|     1.0| 3.102670454545455|    10.0|
+------+--------+------------------+--------+



In [54]:
df.groupBy('time').agg(mean('tip').alias('avg_tip')).show()

+------+------------------+
|  time|           avg_tip|
+------+------------------+
| Lunch|2.7280882352941176|
|Dinner| 3.102670454545455|
+------+------------------+



In [55]:
df.groupBy('time', 'day').agg(mean('total_bill')).show()

+------+----+------------------+
|  time| day|   avg(total_bill)|
+------+----+------------------+
| Lunch|Thur|17.664754098360653|
|Dinner|Thur|             18.78|
| Lunch| Fri|12.845714285714285|
|Dinner| Fri| 19.66333333333333|
|Dinner| Sun|21.409999999999997|
|Dinner| Sat|20.441379310344825|
+------+----+------------------+



In [56]:
df.crosstab('time', 'day').show()

+--------+---+---+---+----+
|time_day|Fri|Sat|Sun|Thur|
+--------+---+---+---+----+
|   Lunch|  7|  0|  0|  61|
|  Dinner| 12| 87| 76|   1|
+--------+---+---+---+----+



In [58]:
df.groupBy('time', 'day').agg(sum('total_bill')).sort('time', 'day').show()

+------+----+------------------+
|  time| day|   sum(total_bill)|
+------+----+------------------+
|Dinner| Fri|235.95999999999998|
|Dinner| Sat|1778.3999999999996|
|Dinner| Sun|1627.1599999999999|
|Dinner|Thur|             18.78|
| Lunch| Fri|             89.92|
| Lunch|Thur|           1077.55|
+------+----+------------------+



In [59]:
# we change the second groupby to a .pivot and day

df.groupBy('time').pivot('day').agg(sum('total_bill')).show()

+------+------------------+------------------+------------------+-------+
|  time|               Fri|               Sat|               Sun|   Thur|
+------+------------------+------------------+------------------+-------+
| Lunch|             89.92|              null|              null|1077.55|
|Dinner|235.95999999999998|1778.3999999999996|1627.1599999999999|  18.78|
+------+------------------+------------------+------------------+-------+



In [57]:
df.groupBy('time').pivot('day').agg(mean('total_bill')).show()

+------+------------------+------------------+------------------+------------------+
|  time|               Fri|               Sat|               Sun|              Thur|
+------+------------------+------------------+------------------+------------------+
| Lunch|12.845714285714285|              null|              null|17.664754098360653|
|Dinner| 19.66333333333333|20.441379310344825|21.409999999999997|             18.78|
+------+------------------+------------------+------------------+------------------+



`.crosstab` is just for counts, for other methods of summarizing groups, use `.groupBy` (maybe in combination with `.pivot`) + `.agg`.

## Additional Features

### Spark SQL

In [60]:
# register a sql table with the running spark constraints
# you have to do this before you start doing .sql stuff
df.createOrReplaceTempView('tips')

In [61]:
spark.sql('''
SELECT *
FROM tips
''').show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [62]:
# find the tip, total_bill, and day with the highest overall sales for that day
spark.sql('''
SELECT tip, total_bill, day
FROM tips
WHERE day = (
    SELECT day
    FROM tips
    GROUP BY day
    ORDER BY sum(total_bill) DESC
    LIMIT 1
)    
''').show()

+----+----------+---+
| tip|total_bill|day|
+----+----------+---+
|3.35|     20.65|Sat|
|4.08|     17.92|Sat|
|2.75|     20.29|Sat|
|2.23|     15.77|Sat|
|7.58|     39.42|Sat|
|3.18|     19.82|Sat|
|2.34|     17.81|Sat|
| 2.0|     13.37|Sat|
| 2.0|     12.69|Sat|
| 4.3|      21.7|Sat|
| 3.0|     19.65|Sat|
|1.45|      9.55|Sat|
| 2.5|     18.35|Sat|
| 3.0|     15.06|Sat|
|2.45|     20.69|Sat|
|3.27|     17.78|Sat|
| 3.6|     24.06|Sat|
| 2.0|     16.31|Sat|
|3.07|     16.93|Sat|
|2.31|     18.69|Sat|
+----+----------+---+
only showing top 20 rows



### More Spark Dataframe Manipulation

In [64]:
df.where(
    df.time == 'Dinner'
).select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#285, (tip#1 / total_bill#0) AS tip_pct#1527]
+- *(1) Filter (isnotnull(time#5) AND (time#5 = Dinner))
   +- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




In [65]:
df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).where(
    df.time == 'Dinner'
).explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#285, (tip#1 / total_bill#0) AS tip_pct#1537]
+- *(1) Filter (isnotnull(time#5) AND (time#5 = Dinner))
   +- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




### Mixing in SQL Expressions

In [66]:
from pyspark.sql.functions import expr

Expr lets us mix in parts of SQL into our dataframes

You can use little bits of sql using `expr('put expression here')`

In [67]:
df.select(
    '*',
    expr('tip / total_bill as tip_pct')
).where(
    expr('day = "Sun" AND time = "Dinner"')
).show()

+----------+----+------+------+---+------+----+-------------------+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|0.11607142857142858|
|     15.04|1.96|  Male|    No|S

## Joins

In [68]:
df1 = spark.createDataFrame(pd.DataFrame({
    'id': np.arange(100) + 1,
    'x': np.random.randn(100).round(3),
    'group_id': np.random.choice(range(1, 7), 100),
}))

df2 = spark.createDataFrame(pd.DataFrame({
    'id': range(1, 7),
    'group': list('abcdef')
}))
df1.show(5)
df2.show()

+---+------+--------+
| id|     x|group_id|
+---+------+--------+
|  1| 1.064|       3|
|  2|-0.153|       2|
|  3| 1.527|       3|
|  4|-0.338|       6|
|  5| 0.648|       6|
+---+------+--------+
only showing top 5 rows

+---+-----+
| id|group|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
|  4|    d|
|  5|    e|
|  6|    f|
+---+-----+



In [69]:
df_merged = df1.join(df2, df1.group_id == df2.id)
df_merged.show(5)

+---+------+--------+---+-----+
| id|     x|group_id| id|group|
+---+------+--------+---+-----+
|  4|-0.338|       6|  6|    f|
|  5| 0.648|       6|  6|    f|
|  7|-0.862|       6|  6|    f|
| 11|-1.598|       6|  6|    f|
| 15|-0.585|       6|  6|    f|
+---+------+--------+---+-----+
only showing top 5 rows



See how there's two columns with the same name? `id`

Try to avoid getting in this situation

In [70]:
df1.join(df2.withColumnRenamed('id', 'group_id'), 'group_id').show(5)

+--------+---+------+-----+
|group_id| id|     x|group|
+--------+---+------+-----+
|       6|  4|-0.338|    f|
|       6|  5| 0.648|    f|
|       6|  7|-0.862|    f|
|       6| 11|-1.598|    f|
|       6| 15|-0.585|    f|
+--------+---+------+-----+
only showing top 5 rows



can also 

## `.explain`

In [71]:
df.explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#285]
+- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




`.explain` tells you where the dataframe came from

scan - get from some place

project - put on something else

filter - shows there was a filter 


In [72]:
df = spark.createDataFrame(pydataset.data('tips'))

In [73]:
df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[total_bill#1681,tip#1682,sex#1683,smoker#1684,day#1685,time#1686,size#1687L]




In [74]:
df.filter(df.time == 'Lunch').explain()

== Physical Plan ==
*(1) Filter (isnotnull(time#1686) AND (time#1686 = Lunch))
+- *(1) Scan ExistingRDD[total_bill#1681,tip#1682,sex#1683,smoker#1684,day#1685,time#1686,size#1687L]




In [76]:
df.filter(df.time == 'Lunch').explain()

== Physical Plan ==
*(1) Filter (isnotnull(time#1686) AND (time#1686 = Lunch))
+- *(1) Scan ExistingRDD[total_bill#1681,tip#1682,sex#1683,smoker#1684,day#1685,time#1686,size#1687L]




In [78]:
df.filter(df.time == 'Lunch').filter(df.size > 1).explain()

== Physical Plan ==
*(1) Filter (((isnotnull(time#1686) AND isnotnull(size#1687L)) AND (time#1686 = Lunch)) AND (size#1687L > 1))
+- *(1) Scan ExistingRDD[total_bill#1681,tip#1682,sex#1683,smoker#1684,day#1685,time#1686,size#1687L]




In [79]:
df.filter(df.time == 'Lunch').filter(df.size > 1).select('*', expr('tip / total_bill as tip_pct')).explain()

== Physical Plan ==
*(1) Project [total_bill#1681, tip#1682, sex#1683, smoker#1684, day#1685, time#1686, size#1687L, (tip#1682 / total_bill#1681) AS tip_pct#1695]
+- *(1) Filter (((isnotnull(time#1686) AND isnotnull(size#1687L)) AND (time#1686 = Lunch)) AND (size#1687L > 1))
   +- *(1) Scan ExistingRDD[total_bill#1681,tip#1682,sex#1683,smoker#1684,day#1685,time#1686,size#1687L]




Spark will always do a filter first then a project. Because it's lazy, and doesn't want to do as much work (i.e. math)