# Spark API

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
spark

For demonstration, we'll create a spark dataframe from a pandas dataframe.

In [3]:
import numpy as np
import pandas as pd
import pydataset

In [4]:
tips = pydataset.data('tips')
df = spark.createDataFrame(tips)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

## DataFrame Basics

In [5]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [7]:
df.show(5, vertical=True)

-RECORD 0------------
 total_bill | 16.99  
 tip        | 1.01   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 1------------
 total_bill | 10.34  
 tip        | 1.66   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 2------------
 total_bill | 21.01  
 tip        | 3.5    
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 3------------
 total_bill | 23.68  
 tip        | 3.31   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 4------------
 total_bill | 24.59  
 tip        | 3.61   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 4      
only showing top 5 rows



In [6]:
df.head(5)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=23.68, tip=3.31, sex='Male', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=24.59, tip=3.61, sex='Female', smoker='No', day='Sun', time='Dinner', size=4)]

In [8]:
# Don't do this!
# just use .show to view df contents, it's a print statement. It won't return anything
df2 = df.show(10)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 10 rows



In [9]:
type(df2)

NoneType

In [10]:
df.head(5)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=23.68, tip=3.31, sex='Male', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=24.59, tip=3.61, sex='Female', smoker='No', day='Sun', time='Dinner', size=4)]

In [11]:
df.head(5)[0].time

'Dinner'

### Selecting Columns

In [12]:
df.select('total_bill', 'tip', 'size', 'day').show()

+----------+----+----+---+
|total_bill| tip|size|day|
+----------+----+----+---+
|     16.99|1.01|   2|Sun|
|     10.34|1.66|   3|Sun|
|     21.01| 3.5|   3|Sun|
|     23.68|3.31|   2|Sun|
|     24.59|3.61|   4|Sun|
|     25.29|4.71|   4|Sun|
|      8.77| 2.0|   2|Sun|
|     26.88|3.12|   4|Sun|
|     15.04|1.96|   2|Sun|
|     14.78|3.23|   2|Sun|
|     10.27|1.71|   2|Sun|
|     35.26| 5.0|   4|Sun|
|     15.42|1.57|   2|Sun|
|     18.43| 3.0|   4|Sun|
|     14.83|3.02|   2|Sun|
|     21.58|3.92|   2|Sun|
|     10.33|1.67|   3|Sun|
|     16.29|3.71|   3|Sun|
|     16.97| 3.5|   3|Sun|
|     20.65|3.35|   3|Sat|
+----------+----+----+---+
only showing top 20 rows



In [13]:
df.select('*')

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [14]:
df.select(df.tip / df.total_bill).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [15]:
col = df.tip / df.total_bill
col

Column<'(tip / total_bill)'>

In [17]:
df.select(col).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [16]:
df.select('*', col.alias('tip_pct')).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [18]:
df_with_tip_pct = df.select('*', col.alias('tip_pct'))

In [19]:
df_with_tip_pct.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [22]:
#to add columns, you need to create a new variable, if you check df, it won't have the tip_pct

### Selecting w/ Built In Functions

In [23]:
from pyspark.sql.functions import sum, mean, concat, lit, regexp_extract, regexp_replace, when

In [25]:
mean(df.tip)

Column<'avg(tip)'>

In [24]:
df.select(mean(df.tip), sum(df.total_bill)).show()

+------------------+-----------------+
|          avg(tip)|  sum(total_bill)|
+------------------+-----------------+
|2.9982786885245907|4827.769999999999|
+------------------+-----------------+



In [None]:
#lit for literal, without it it's going to try and find a column named ' '

In [26]:
df.select(concat('day', lit(' '), 'time')).show(5)

+--------------------+
|concat(day,  , time)|
+--------------------+
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
+--------------------+
only showing top 5 rows



In [27]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [28]:
#objects cant be integers, so they'll just say null
df.select(df.time.cast('int')).show(5)

+----+
|time|
+----+
|null|
|null|
|null|
|null|
|null|
+----+
only showing top 5 rows



In [30]:
df.size.cast('string')

Column<'CAST(size AS STRING)'>

In [29]:
#lets display it
df.select(df.size.cast('string')).show(5)

+----+
|size|
+----+
|   2|
|   3|
|   3|
|   2|
|   4|
+----+
only showing top 5 rows



In [31]:
df = df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct')
)

In [32]:
df

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint, tip_pct: double]

In [33]:
df.show(1)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
+----------+----+------+------+---+------+----+-------------------+
only showing top 1 row



### When / Otherwise

In [34]:
when

<function pyspark.sql.functions.when(condition, value)>

In [35]:
when(df.tip_pct > .2, 'good tip')

Column<'CASE WHEN (tip_pct > 0.2) THEN good tip END'>

In [36]:
df.select(
    'tip_pct',
    (when(df.tip_pct > .2, 'good tip')
     .otherwise('not good tip')
     .alias('tip_desc'))
).show(25)

+-------------------+------------+
|            tip_pct|    tip_desc|
+-------------------+------------+
|0.05944673337257211|not good tip|
|0.16054158607350097|not good tip|
|0.16658733936220846|not good tip|
| 0.1397804054054054|not good tip|
|0.14680764538430255|not good tip|
|0.18623962040332148|not good tip|
|0.22805017103762829|    good tip|
|0.11607142857142858|not good tip|
|0.13031914893617022|not good tip|
| 0.2185385656292287|    good tip|
| 0.1665043816942551|not good tip|
|0.14180374361883155|not good tip|
|0.10181582360570687|not good tip|
|0.16277807921866522|not good tip|
|0.20364126770060686|    good tip|
|0.18164967562557924|not good tip|
| 0.1616650532429816|not good tip|
|0.22774708410067526|    good tip|
|0.20624631703005306|    good tip|
|0.16222760290556903|not good tip|
|0.22767857142857142|    good tip|
|0.13553474618038444|not good tip|
|0.14140773620798985|not good tip|
|0.19228817858954844|not good tip|
|0.16044399596367306|not good tip|
+-------------------

### Regex

In [39]:
df.select(
    'time',
    #you need the parenthesis, as a capture group
    regexp_extract('time', r'(\w).*', 1).alias('first_letter'),
    regexp_replace('time', r'[aeiou]', 'X')
).show(5)

+------+------------+-----------------------------------+
|  time|first_letter|regexp_replace(time, [aeiou], X, 1)|
+------+------------+-----------------------------------+
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
+------+------------+-----------------------------------+
only showing top 5 rows



## Transforming Rows

In [40]:
df.show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

### Sorting

In [43]:
# no difference between orderby and sort

In [41]:
df.orderBy(df.total_bill).show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|      3.07| 1.0|Female|   Yes| Sat|Dinner|   1|0.32573289902280134|
|      5.75| 1.0|Female|   Yes| Fri|Dinner|   2|0.17391304347826086|
|      7.25| 1.0|Female|    No| Sat|Dinner|   1|0.13793103448275862|
|      7.25|5.15|  Male|   Yes| Sun|Dinner|   2|  0.710344827586207|
|      7.51| 2.0|  Male|    No|Thur| Lunch|   2| 0.2663115845539281|
|      7.56|1.44|  Male|    No|Thur| Lunch|   2|0.19047619047619047|
|      7.74|1.44|  Male|   Yes| Sat|Dinner|   2|0.18604651162790697|
|      8.35| 1.5|Female|    No|Thur| Lunch|   2|0.17964071856287425|
|      8.51|1.25|Female|    No|Thur| Lunch|   2|0.14688601645123384|
|      8.52|1.48|  Male|    No|Thur| Lunch|   2|0.17370892018779344|
|      8.58|1.92|  Male|   Yes| Fri| Lunch|   1|0.22377622377622378|
|      8.77| 2.0|  Male|    No| Su

In [42]:
df.sort(df.day, df.size).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|      8.58|1.92|  Male|   Yes|Fri| Lunch|   1|0.22377622377622378|
|     16.32| 4.3|Female|   Yes|Fri|Dinner|   2|0.26348039215686275|
|     13.42|1.58|  Male|   Yes|Fri| Lunch|   2|0.11773472429210134|
|     10.09| 2.0|Female|   Yes|Fri| Lunch|   2|0.19821605550049554|
|     12.16| 2.2|  Male|   Yes|Fri| Lunch|   2|0.18092105263157895|
|     12.46| 1.5|  Male|    No|Fri|Dinner|   2| 0.1203852327447833|
|     27.28| 4.0|  Male|   Yes|Fri|Dinner|   2|0.14662756598240467|
|     21.01| 3.0|  Male|   Yes|Fri|Dinner|   2| 0.1427891480247501|
|     11.35| 2.5|Female|   Yes|Fri|Dinner|   2|0.22026431718061676|
|     22.75|3.25|Female|    No|Fri|Dinner|   2|0.14285714285714285|
|     13.42|3.48|Female|   Yes|Fri| Lunch|   2| 0.2593144560357675|
|      5.75| 1.0|Female|   Yes|Fri|Dinner|   2|0

In [44]:
from pyspark.sql.functions import asc, desc, col

In [45]:
df.sort(df.day, asc('time'), desc('size')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     40.17|4.73|  Male|   Yes|Fri|Dinner|   4| 0.1177495643515061|
|     21.01| 3.0|  Male|   Yes|Fri|Dinner|   2| 0.1427891480247501|
|     12.03| 1.5|  Male|   Yes|Fri|Dinner|   2|0.12468827930174564|
|     11.35| 2.5|Female|   Yes|Fri|Dinner|   2|0.22026431718061676|
|      5.75| 1.0|Female|   Yes|Fri|Dinner|   2|0.17391304347826086|
|     22.75|3.25|Female|    No|Fri|Dinner|   2|0.14285714285714285|
|     12.46| 1.5|  Male|    No|Fri|Dinner|   2| 0.1203852327447833|
|     27.28| 4.0|  Male|   Yes|Fri|Dinner|   2|0.14662756598240467|
|     15.38| 3.0|Female|   Yes|Fri|Dinner|   2|0.19505851755526657|
|     28.97| 3.0|  Male|   Yes|Fri|Dinner|   2|0.10355540214014498|
|     22.49| 3.5|  Male|    No|Fri|Dinner|   2|0.15562472209871056|
|     16.32| 4.3|Female|   Yes|Fri|Dinner|   2|0

In [46]:
col('size').asc()

Column<'size ASC NULLS FIRST'>

In [47]:
df.sort(col('size').desc(), col('time')).show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|     48.17| 5.0|  Male|    No| Sun|Dinner|   6|0.10379904504878555|
|     27.05| 5.0|Female|    No|Thur| Lunch|   6|0.18484288354898337|
|      34.3| 6.7|  Male|    No|Thur| Lunch|   6|0.19533527696793004|
|      29.8| 4.2|Female|    No|Thur| Lunch|   6|0.14093959731543623|
|     29.85|5.14|Female|    No| Sun|Dinner|   5| 0.1721943048576214|
|     20.69| 5.0|  Male|    No| Sun|Dinner|   5| 0.2416626389560174|
|     30.46| 2.0|  Male|   Yes| Sun|Dinner|   5|0.06565988181221273|
|     28.15| 3.0|  Male|   Yes| Sat|Dinner|   5|0.10657193605683837|
|     41.19| 5.0|  Male|    No|Thur| Lunch|   5|0.12138868657441128|
|     29.93|5.07|  Male|    No| Sun|Dinner|   4| 0.1693952555963916|
|     26.88|3.12|  Male|    No| Sun|Dinner|   4|0.11607142857142858|
|     38.73| 3.0|  Male|   Yes| Sa

### Filtering

In [48]:
#sets boolean
df.tip<4

Column<'(tip < 4)'>

In [49]:
#return true values
df.where(df.tip < 4).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|0.10181582360570687|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|0

In [50]:
#and conditions
df.where(df.tip < 4).where(df.smoker=='Yes').show()

+----------+----+------+------+----+------+----+-------------------+
|total_bill| tip|   sex|smoker| day|  time|size|            tip_pct|
+----------+----+------+------+----+------+----+-------------------+
|     38.01| 3.0|  Male|   Yes| Sat|Dinner|   4|0.07892659826361484|
|     11.24|1.76|  Male|   Yes| Sat|Dinner|   2|0.15658362989323843|
|     20.29|3.21|  Male|   Yes| Sat|Dinner|   2| 0.1582060128141942|
|     13.81| 2.0|  Male|   Yes| Sat|Dinner|   2| 0.1448225923244026|
|     11.02|1.98|  Male|   Yes| Sat|Dinner|   2|0.17967332123411978|
|     18.29|3.76|  Male|   Yes| Sat|Dinner|   4|0.20557681793329688|
|      3.07| 1.0|Female|   Yes| Sat|Dinner|   1|0.32573289902280134|
|     15.01|2.09|  Male|   Yes| Sat|Dinner|   2|0.13924050632911392|
|     26.86|3.14|Female|   Yes| Sat|Dinner|   2|0.11690245718540582|
|     17.92|3.08|  Male|   Yes| Sat|Dinner|   2|           0.171875|
|     19.44| 3.0|  Male|   Yes|Thur| Lunch|   2|0.15432098765432098|
|     28.97| 3.0|  Male|   Yes| Fr

In [52]:
#or condition
df.where((df.tip < 4) | (df.smoker=='Yes')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|0.10181582360570687|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|0

In [53]:
mask = df.tip < 4
df.where(mask).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|0.10181582360570687|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|0

In [None]:
# filter and where are also the same thing

In [54]:
df.filter((df.time == "Dinner") & (df.tip <= 2)).sort('tip').show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|      3.07| 1.0|Female|   Yes|Sat|Dinner|   1|0.32573289902280134|
|      5.75| 1.0|Female|   Yes|Fri|Dinner|   2|0.17391304347826086|
|      7.25| 1.0|Female|    No|Sat|Dinner|   1|0.13793103448275862|
|      12.6| 1.0|  Male|   Yes|Sat|Dinner|   2|0.07936507936507936|
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|      12.9| 1.1|Female|   Yes|Sat|Dinner|   2|0.08527131782945736|
|     32.83|1.17|  Male|   Yes|Sat|Dinner|   2|0.03563813585135547|
|     10.51|1.25|  Male|    No|Sat|Dinner|   2|0.11893434823977164|
|     10.07|1.25|  Male|    No|Sat|Dinner|   2|0.12413108242303872|
|      9.68|1.32|  Male|    No|Sun|Dinner|   2|0.13636363636363638|
|      7.74|1.44|  Male|   Yes|Sat|Dinner|   2|0.18604651162790697|
|      9.55|1.45|  Male|    No|Sat|Dinner|   2| 

In [55]:
df.where(df.smoker == "Yes").where(df.day == "Sat").show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     38.01| 3.0|  Male|   Yes|Sat|Dinner|   4|0.07892659826361484|
|     11.24|1.76|  Male|   Yes|Sat|Dinner|   2|0.15658362989323843|
|     20.29|3.21|  Male|   Yes|Sat|Dinner|   2| 0.1582060128141942|
|     13.81| 2.0|  Male|   Yes|Sat|Dinner|   2| 0.1448225923244026|
|     11.02|1.98|  Male|   Yes|Sat|Dinner|   2|0.17967332123411978|
|     18.29|3.76|  Male|   Yes|Sat|Dinner|   4|0.20557681793329688|
|      3.07| 1.0|Female|   Yes|Sat|Dinner|   1|0.32573289902280134|
|     15.01|2.09|  Male|   Yes|Sat|Dinner|   2|0.13924050632911392|
|     26.86|3.14|Female|   Yes|Sat|Dinner|   2|0.11690245718540582|
|     25.28| 5.0|Female|   Yes|Sat|Dinner|   2|0.19778481012658228|
|     17.92|3.08|  Male|   Yes|Sat|Dinner|   2|           0.171875|
|      44.3| 2.5|Female|   Yes|Sat|Dinner|   3|0

## Aggregating

In [59]:
from pyspark.sql.functions import mean, min, max, count

In [57]:
df.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [61]:
df.groupBy('time').agg(count('*').alias('n')).show()

+------+---+
|  time|  n|
+------+---+
| Lunch| 68|
|Dinner|176|
+------+---+



In [62]:
df.groupBy('time').agg(mean('tip')).show()

+------+------------------+
|  time|          avg(tip)|
+------+------------------+
| Lunch|2.7280882352941176|
|Dinner| 3.102670454545455|
+------+------------------+



In [63]:
df.groupBy('time').agg(min('tip'), mean('tip'), max('tip')).show()

+------+--------+------------------+--------+
|  time|min(tip)|          avg(tip)|max(tip)|
+------+--------+------------------+--------+
| Lunch|    1.25|2.7280882352941176|     6.7|
|Dinner|     1.0| 3.102670454545455|    10.0|
+------+--------+------------------+--------+



In [64]:
df.groupBy('time').agg(mean('tip').alias('avg_tip')).show()

+------+------------------+
|  time|           avg_tip|
+------+------------------+
| Lunch|2.7280882352941176|
|Dinner| 3.102670454545455|
+------+------------------+



In [65]:
df.groupBy('time', 'day').agg(mean('total_bill')).show()

+------+----+------------------+
|  time| day|   avg(total_bill)|
+------+----+------------------+
| Lunch|Thur|17.664754098360653|
|Dinner|Thur|             18.78|
| Lunch| Fri|12.845714285714285|
|Dinner| Fri| 19.66333333333333|
|Dinner| Sun|21.409999999999997|
|Dinner| Sat|20.441379310344825|
+------+----+------------------+



In [66]:
df.crosstab('time', 'day').show()

+--------+---+---+---+----+
|time_day|Fri|Sat|Sun|Thur|
+--------+---+---+---+----+
|   Lunch|  7|  0|  0|  61|
|  Dinner| 12| 87| 76|   1|
+--------+---+---+---+----+



In [67]:
df.groupBy('time','day').agg(sum('total_bill')).sort('time','day').show()

+------+----+------------------+
|  time| day|   sum(total_bill)|
+------+----+------------------+
|Dinner| Fri|235.95999999999998|
|Dinner| Sat|1778.3999999999996|
|Dinner| Sun|1627.1599999999999|
|Dinner|Thur|             18.78|
| Lunch| Fri|             89.92|
| Lunch|Thur|           1077.55|
+------+----+------------------+



In [69]:
df.groupBy('time').pivot('day').agg(sum('total_bill')).show()

+------+------------------+------------------+------------------+-------+
|  time|               Fri|               Sat|               Sun|   Thur|
+------+------------------+------------------+------------------+-------+
| Lunch|             89.92|              null|              null|1077.55|
|Dinner|235.95999999999998|1778.3999999999996|1627.1599999999999|  18.78|
+------+------------------+------------------+------------------+-------+



In [68]:
df.groupBy('time').pivot('day').agg(mean('total_bill')).show()

+------+------------------+------------------+------------------+------------------+
|  time|               Fri|               Sat|               Sun|              Thur|
+------+------------------+------------------+------------------+------------------+
| Lunch|12.845714285714285|              null|              null|17.664754098360653|
|Dinner| 19.66333333333333|20.441379310344825|21.409999999999997|             18.78|
+------+------------------+------------------+------------------+------------------+



`.crosstab` is just for counts, for other methods of summarizing groups, use `.groupBy` (maybe in combination with `.pivot`) + `.agg`.

## Additional Features

### Spark SQL

In [70]:
df.createOrReplaceTempView('tips')

In [71]:
spark.sql('''
SELECT *
FROM tips
''').show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [72]:
# find the tip, total_bill, and day with the highest overall sales for that day
spark.sql('''
SELECT tip, total_bill, day
FROM tips
WHERE day = (
    SELECT day
    FROM tips
    GROUP BY day
    ORDER BY sum(total_bill) DESC
    LIMIT 1
)    
''').show()

+----+----------+---+
| tip|total_bill|day|
+----+----------+---+
|3.35|     20.65|Sat|
|4.08|     17.92|Sat|
|2.75|     20.29|Sat|
|2.23|     15.77|Sat|
|7.58|     39.42|Sat|
|3.18|     19.82|Sat|
|2.34|     17.81|Sat|
| 2.0|     13.37|Sat|
| 2.0|     12.69|Sat|
| 4.3|      21.7|Sat|
| 3.0|     19.65|Sat|
|1.45|      9.55|Sat|
| 2.5|     18.35|Sat|
| 3.0|     15.06|Sat|
|2.45|     20.69|Sat|
|3.27|     17.78|Sat|
| 3.6|     24.06|Sat|
| 2.0|     16.31|Sat|
|3.07|     16.93|Sat|
|2.31|     18.69|Sat|
+----+----------+---+
only showing top 20 rows



### More Spark Dataframe Manipulation

In [73]:
df.where(
    df.time == 'Dinner'
).select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#357, (tip#1 / total_bill#0) AS tip_pct#1587]
+- *(1) Filter (isnotnull(time#5) AND (time#5 = Dinner))
   +- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




In [74]:
df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).where(
    df.time == 'Dinner'
).explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#357, (tip#1 / total_bill#0) AS tip_pct#1597]
+- *(1) Filter (isnotnull(time#5) AND (time#5 = Dinner))
   +- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




### Mixing in SQL Expressions

In [75]:
from pyspark.sql.functions import expr

Expr lets us mix in parts of SQL into our dataframes

In [76]:
df.select(
    '*',
    expr('tip / total_bill as tip_pct')
).where(
    expr('day = "Sun" AND time = "Dinner"')
).show()

+----------+----+------+------+---+------+----+-------------------+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|0.11607142857142858|
|     15.04|1.96|  Male|    No|S

## Joins

In [77]:
df1 = spark.createDataFrame(pd.DataFrame({
    'id': np.arange(100) + 1,
    'x': np.random.randn(100).round(3),
    'group_id': np.random.choice(range(1, 7), 100),
}))
df2 = spark.createDataFrame(pd.DataFrame({
    'id': range(1, 7),
    'group': list('abcdef')
}))
df1.show(5)
df2.show()

+---+------+--------+
| id|     x|group_id|
+---+------+--------+
|  1| 0.495|       4|
|  2| 0.621|       2|
|  3|-0.286|       4|
|  4| 1.195|       1|
|  5| 2.001|       4|
+---+------+--------+
only showing top 5 rows

+---+-----+
| id|group|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
|  4|    d|
|  5|    e|
|  6|    f|
+---+-----+



In [79]:
df1.join(df2, on =df1.group_id == df2.id).show()

+---+------+--------+---+-----+
| id|     x|group_id| id|group|
+---+------+--------+---+-----+
|  7| 1.953|       6|  6|    f|
| 21| 0.314|       6|  6|    f|
| 23| 0.801|       6|  6|    f|
| 26| 0.744|       6|  6|    f|
| 38| 0.178|       6|  6|    f|
| 51| 1.031|       6|  6|    f|
| 57|-0.331|       6|  6|    f|
| 60|  1.07|       6|  6|    f|
| 66|-0.305|       6|  6|    f|
| 67| 1.221|       6|  6|    f|
| 74|  0.29|       6|  6|    f|
| 79| 0.607|       6|  6|    f|
| 80| 0.197|       6|  6|    f|
| 85| 0.197|       6|  6|    f|
|100| -0.93|       6|  6|    f|
|  6|-0.741|       5|  5|    e|
| 19| 1.773|       5|  5|    e|
| 25| 0.528|       5|  5|    e|
| 37| 0.807|       5|  5|    e|
| 40|-0.219|       5|  5|    e|
+---+------+--------+---+-----+
only showing top 20 rows



In [80]:
#you must specify condition
df_merged = df1.join(df2, df1.group_id == df2.id)
df_merged.show(5)

+---+-----+--------+---+-----+
| id|    x|group_id| id|group|
+---+-----+--------+---+-----+
|  7|1.953|       6|  6|    f|
| 21|0.314|       6|  6|    f|
| 23|0.801|       6|  6|    f|
| 26|0.744|       6|  6|    f|
| 38|0.178|       6|  6|    f|
+---+-----+--------+---+-----+
only showing top 5 rows



In [81]:
df1.join(df2.withColumnRenamed('id', 'group_id'), 'group_id').show(5)

+--------+---+-----+-----+
|group_id| id|    x|group|
+--------+---+-----+-----+
|       6|  7|1.953|    f|
|       6| 21|0.314|    f|
|       6| 23|0.801|    f|
|       6| 26|0.744|    f|
|       6| 38|0.178|    f|
+--------+---+-----+-----+
only showing top 5 rows



#.explain

In [82]:
df.explain()

== Physical Plan ==
*(1) Project [total_bill#0, tip#1, sex#2, smoker#3, day#4, time#5, size#6L, (tip#1 / total_bill#0) AS tip_pct#357]
+- *(1) Scan ExistingRDD[total_bill#0,tip#1,sex#2,smoker#3,day#4,time#5,size#6L]




In [83]:
df = spark.createDataFrame(pydataset.data('tips'))

In [84]:
df.filter(df.time=='Lunch').explain()

== Physical Plan ==
*(1) Filter (isnotnull(time#1787) AND (time#1787 = Lunch))
+- *(1) Scan ExistingRDD[total_bill#1782,tip#1783,sex#1784,smoker#1785,day#1786,time#1787,size#1788L]




In [85]:
df.filter(df.time=='Lunch').filter(df.size>1).explain()

== Physical Plan ==
*(1) Filter (((isnotnull(time#1787) AND isnotnull(size#1788L)) AND (time#1787 = Lunch)) AND (size#1788L > 1))
+- *(1) Scan ExistingRDD[total_bill#1782,tip#1783,sex#1784,smoker#1785,day#1786,time#1787,size#1788L]




In [86]:
df.filter(df.time=='Lunch').filter(df.size>1).select('*',expr('tip/total_bill as tip_pct')).explain()

== Physical Plan ==
*(1) Project [total_bill#1782, tip#1783, sex#1784, smoker#1785, day#1786, time#1787, size#1788L, (tip#1783 / total_bill#1782) AS tip_pct#1796]
+- *(1) Filter (((isnotnull(time#1787) AND isnotnull(size#1788L)) AND (time#1787 = Lunch)) AND (size#1788L > 1))
   +- *(1) Scan ExistingRDD[total_bill#1782,tip#1783,sex#1784,smoker#1785,day#1786,time#1787,size#1788L]




In [87]:
df.filter(df.smoker=='Yes').select('smoker','total_bill', expr('tip/total_bill as tip_pct')).explain()

== Physical Plan ==
*(1) Project [smoker#1785, total_bill#1782, (tip#1783 / total_bill#1782) AS tip_pct#1805]
+- *(1) Filter (isnotnull(smoker#1785) AND (smoker#1785 = Yes))
   +- *(1) Scan ExistingRDD[total_bill#1782,tip#1783,sex#1784,smoker#1785,day#1786,time#1787,size#1788L]


