# Spark API

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
spark

For demonstration, we'll create a spark dataframe from a pandas dataframe.

In [3]:
import numpy as np
import pandas as pd
import pydataset

In [54]:
tips = pydataset.data('tips')

In [56]:
tips.head()

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
1,16.99,1.01,Female,No,Sun,Dinner,2
2,10.34,1.66,Male,No,Sun,Dinner,3
3,21.01,3.5,Male,No,Sun,Dinner,3
4,23.68,3.31,Male,No,Sun,Dinner,2
5,24.59,3.61,Female,No,Sun,Dinner,4


In [4]:
df = spark.createDataFrame(tips)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [5]:
df

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [6]:
df.head()

Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2)

## DataFrame Basics

In [7]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [9]:
df.show(5, vertical=True)

-RECORD 0------------
 total_bill | 16.99  
 tip        | 1.01   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 1------------
 total_bill | 10.34  
 tip        | 1.66   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 2------------
 total_bill | 21.01  
 tip        | 3.5    
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 3      
-RECORD 3------------
 total_bill | 23.68  
 tip        | 3.31   
 sex        | Male   
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 2      
-RECORD 4------------
 total_bill | 24.59  
 tip        | 3.61   
 sex        | Female 
 smoker     | No     
 day        | Sun    
 time       | Dinner 
 size       | 4      
only showing top 5 rows



In [8]:
df.head(5)

[Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=10.34, tip=1.66, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=21.01, tip=3.5, sex='Male', smoker='No', day='Sun', time='Dinner', size=3),
 Row(total_bill=23.68, tip=3.31, sex='Male', smoker='No', day='Sun', time='Dinner', size=2),
 Row(total_bill=24.59, tip=3.61, sex='Female', smoker='No', day='Sun', time='Dinner', size=4)]

In [14]:
df.head(5)[0]

Row(total_bill=16.99, tip=1.01, sex='Female', smoker='No', day='Sun', time='Dinner', size=2)

In [13]:
df.head(5)[0].time

'Dinner'

In [10]:
# Don't do this!
# just use .show to view df contents
df2 = df.show(10)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 10 rows



In [11]:
type(df2)

NoneType

### Selecting Columns

In [15]:
df.select('total_bill', 'tip', 'size', 'day').show()

+----------+----+----+---+
|total_bill| tip|size|day|
+----------+----+----+---+
|     16.99|1.01|   2|Sun|
|     10.34|1.66|   3|Sun|
|     21.01| 3.5|   3|Sun|
|     23.68|3.31|   2|Sun|
|     24.59|3.61|   4|Sun|
|     25.29|4.71|   4|Sun|
|      8.77| 2.0|   2|Sun|
|     26.88|3.12|   4|Sun|
|     15.04|1.96|   2|Sun|
|     14.78|3.23|   2|Sun|
|     10.27|1.71|   2|Sun|
|     35.26| 5.0|   4|Sun|
|     15.42|1.57|   2|Sun|
|     18.43| 3.0|   4|Sun|
|     14.83|3.02|   2|Sun|
|     21.58|3.92|   2|Sun|
|     10.33|1.67|   3|Sun|
|     16.29|3.71|   3|Sun|
|     16.97| 3.5|   3|Sun|
|     20.65|3.35|   3|Sat|
+----------+----+----+---+
only showing top 20 rows



In [16]:
df.select('*')

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

In [17]:
df.select(df.tip / df.total_bill).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [18]:
col = df.tip / df.total_bill
col

Column<'(tip / total_bill)'>

In [20]:
df.select(col).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [21]:
df.select('tip', 'total_bill',col).show(5)

+----+----------+-------------------+
| tip|total_bill| (tip / total_bill)|
+----+----------+-------------------+
|1.01|     16.99|0.05944673337257211|
|1.66|     10.34|0.16054158607350097|
| 3.5|     21.01|0.16658733936220846|
|3.31|     23.68| 0.1397804054054054|
|3.61|     24.59|0.14680764538430255|
+----+----------+-------------------+
only showing top 5 rows



In [19]:
#add a column but not in the same df, we need to asign to a new variable
df.select('*', col.alias('tip_pct')).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [22]:
# new df with the new column created
df_with_tip_pct = df.select('*', col.alias('tip_pct'))

In [24]:
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [23]:
df_with_tip_pct.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



### Selecting w/ Built In Functions

In [25]:
from pyspark.sql.functions import sum, mean, concat, lit, regexp_extract, regexp_replace, when

In [27]:
mean(df.tip)

Column<'avg(tip)'>

In [29]:
df.select(mean(df.tip), sum(df.total_bill))

DataFrame[avg(tip): double, sum(total_bill): double]

In [30]:
df.select(mean(df.tip), sum(df.total_bill)).show()

+----------------+-----------------+
|        avg(tip)|  sum(total_bill)|
+----------------+-----------------+
|2.99827868852459|4827.769999999999|
+----------------+-----------------+



In [32]:
df.select(concat('day', lit(' '), 'time')).show(5)

+--------------------+
|concat(day,  , time)|
+--------------------+
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
|          Sun Dinner|
+--------------------+
only showing top 5 rows



In [33]:
df.select(df.time.cast('int')).show(5)

+----+
|time|
+----+
|null|
|null|
|null|
|null|
|null|
+----+
only showing top 5 rows



In [37]:
# the transformation is .cast and the action is .show

In [38]:
df.select(df.size.cast('string')).show(5)

+----+
|size|
+----+
|   2|
|   3|
|   3|
|   2|
|   4|
+----+
only showing top 5 rows



In [39]:
df.select(df.size.cast('string')).count()

244

In [40]:
df = df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct')
)

In [41]:
df

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint, tip_pct: double]

### When / Otherwise

In [43]:
when(df.tip_pct > .2, 'good tip')

Column<'CASE WHEN (tip_pct > 0.2) THEN good tip END'>

In [45]:
df.select(
    'tip_pct',
    when(df.tip_pct > .2, 'good tip')
).show(5)

+-------------------+-------------------------------------------+
|            tip_pct|CASE WHEN (tip_pct > 0.2) THEN good tip END|
+-------------------+-------------------------------------------+
|0.05944673337257211|                                       null|
|0.16054158607350097|                                       null|
|0.16658733936220846|                                       null|
| 0.1397804054054054|                                       null|
|0.14680764538430255|                                       null|
+-------------------+-------------------------------------------+
only showing top 5 rows



In [47]:
df.select(
    'tip_pct',
    when(df.tip_pct > .2, 'good tip')
).show()

+-------------------+-------------------------------------------+
|            tip_pct|CASE WHEN (tip_pct > 0.2) THEN good tip END|
+-------------------+-------------------------------------------+
|0.05944673337257211|                                       null|
|0.16054158607350097|                                       null|
|0.16658733936220846|                                       null|
| 0.1397804054054054|                                       null|
|0.14680764538430255|                                       null|
|0.18623962040332148|                                       null|
|0.22805017103762829|                                   good tip|
|0.11607142857142858|                                       null|
|0.13031914893617022|                                       null|
| 0.2185385656292287|                                   good tip|
| 0.1665043816942551|                                       null|
|0.14180374361883155|                                       null|
|0.1018158

In [52]:
df.select(
    'tip_pct',
    (when(df.tip_pct > .2, 'good tip').otherwise('not good tip'))).show(25)

+-------------------+-------------------------------------------------------------+
|            tip_pct|CASE WHEN (tip_pct > 0.2) THEN good tip ELSE not good tip END|
+-------------------+-------------------------------------------------------------+
|0.05944673337257211|                                                 not good tip|
|0.16054158607350097|                                                 not good tip|
|0.16658733936220846|                                                 not good tip|
| 0.1397804054054054|                                                 not good tip|
|0.14680764538430255|                                                 not good tip|
|0.18623962040332148|                                                 not good tip|
|0.22805017103762829|                                                     good tip|
|0.11607142857142858|                                                 not good tip|
|0.13031914893617022|                                                 not go

In [51]:
df.select(
    'tip_pct',
    (when(df.tip_pct > .2, 'good tip')
     .otherwise('not good tip')
     .alias('tip_desc'))
).show(25)

+-------------------+------------+
|            tip_pct|    tip_desc|
+-------------------+------------+
|0.05944673337257211|not good tip|
|0.16054158607350097|not good tip|
|0.16658733936220846|not good tip|
| 0.1397804054054054|not good tip|
|0.14680764538430255|not good tip|
|0.18623962040332148|not good tip|
|0.22805017103762829|    good tip|
|0.11607142857142858|not good tip|
|0.13031914893617022|not good tip|
| 0.2185385656292287|    good tip|
| 0.1665043816942551|not good tip|
|0.14180374361883155|not good tip|
|0.10181582360570687|not good tip|
|0.16277807921866522|not good tip|
|0.20364126770060686|    good tip|
|0.18164967562557924|not good tip|
| 0.1616650532429816|not good tip|
|0.22774708410067526|    good tip|
|0.20624631703005306|    good tip|
|0.16222760290556903|not good tip|
|0.22767857142857142|    good tip|
|0.13553474618038444|not good tip|
|0.14140773620798985|not good tip|
|0.19228817858954844|not good tip|
|0.16044399596367306|not good tip|
+-------------------

### Regex

In [53]:
df.select(
    'time',
    regexp_extract('time', r'(\w).*', 1).alias('first_letter'),
    regexp_replace('time', r'[aeiou]', 'X')
).show(5)

+------+------------+-----------------------------------+
|  time|first_letter|regexp_replace(time, [aeiou], X, 1)|
+------+------------+-----------------------------------+
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
|Dinner|           D|                             DXnnXr|
+------+------------+-----------------------------------+
only showing top 5 rows



## Transforming Rows

In [None]:
df.show()

### Sorting

In [None]:
df.orderBy(df.total_bill).show()

In [None]:
df.sort(df.day, df.size).show()

In [None]:
from pyspark.sql.functions import asc, desc, col

In [None]:
df.sort(df.day, asc('time'), desc('size')).show()

In [None]:
col('size').asc()

In [None]:
df.sort(col('size').desc(), col('time')).show()

### Filtering

In [None]:
df.where(df.tip < 4).show()

In [None]:
mask = df.tip < 4
df.where(mask).show()

In [None]:
df.filter((df.time == "Dinner") | (df.tip <= 2)).sort('tip').show()

In [None]:
df.where(df.smoker == "Yes").where(df.day == "Sat").show()

## Aggregating

In [None]:
from pyspark.sql.functions import mean, min, max

In [None]:
df.show(5)

In [None]:
df.groupBy('time').agg(mean('tip')).show()

In [None]:
df.groupBy('time').agg(min('tip'), mean('tip'), max('tip')).show()

In [None]:
df.groupBy('time').agg(mean('tip').alias('avg_tip')).show()

In [None]:
df.groupBy('time', 'day').agg(mean('total_bill')).show()

In [None]:
df.crosstab('time', 'day').show()

In [None]:
df.groupBy('time').pivot('day').agg(mean('total_bill')).show()

`.crosstab` is just for counts, for other methods of summarizing groups, use `.groupBy` (maybe in combination with `.pivot`) + `.agg`.

## Additional Features

### Spark SQL

In [None]:
df.createOrReplaceTempView('tips')

In [None]:
spark.sql('''
SELECT *
FROM tips
''').show()

In [None]:
# find the tip, total_bill, and day with the highest overall sales for that day
spark.sql('''
SELECT tip, total_bill, day
FROM tips
WHERE day = (
    SELECT day
    FROM tips
    GROUP BY day
    ORDER BY sum(total_bill) DESC
    LIMIT 1
)    
''').show()

### More Spark Dataframe Manipulation

In [None]:
df.where(
    df.time == 'Dinner'
).select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).explain()

In [None]:
df.select(
    '*',
    (df.tip / df.total_bill).alias('tip_pct'),
).where(
    df.time == 'Dinner'
).explain()

### Mixing in SQL Expressions

In [None]:
from pyspark.sql.functions import expr

Expr lets us mix in parts of SQL into our dataframes

In [None]:
df.select(
    '*',
    expr('tip / total_bill as tip_pct')
).where(
    expr('day = "Sun" AND time = "Dinner"')
).show()

## Joins

In [None]:
df1 = spark.createDataFrame(pd.DataFrame({
    'id': np.arange(100) + 1,
    'x': np.random.randn(100).round(3),
    'group_id': np.random.choice(range(1, 7), 100),
}))
df2 = spark.createDataFrame(pd.DataFrame({
    'id': range(1, 7),
    'group': list('abcdef')
}))
df1.show(5)
df2.show()

In [None]:
df_merged = df1.join(df2, df1.group_id == df2.id)
df_merged.show(5)

In [None]:
df1.join(df2.withColumnRenamed('id', 'group_id'), 'group_id').show(5)