## Spark 101

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 09:30:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe


Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


In [3]:
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

In [4]:
df.show(5)

                                                                                

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



In [6]:
df.describe().show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



                                                                                

In [7]:
from pydataset import data

mpg = spark.createDataFrame(data('mpg'))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [8]:
mpg.hwy

Column<'hwy'>

In [9]:
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

In [10]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

+---+---+----------+
|hwy|cty|     model|
+---+---+----------+
| 29| 18|        a4|
| 29| 21|        a4|
| 31| 20|        a4|
| 30| 21|        a4|
| 26| 16|        a4|
| 26| 18|        a4|
| 27| 18|        a4|
| 26| 18|a4 quattro|
| 25| 16|a4 quattro|
| 28| 20|a4 quattro|
+---+---+----------+
only showing top 10 rows



In [11]:
mpg.hwy + 1

Column<'(hwy + 1)'>

In [12]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



In [13]:
mpg.select(mpg.hwy.alias("highway_mileage")).show(5)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



In [14]:
col1 = mpg.hwy.alias("highway_mileage")

In [15]:
col2 = (mpg.hwy / 2).alias('highway_mileage_halved')
mpg.select(col1, col2).show(5)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
|             30|                  15.0|
|             26|                  13.0|
+---------------+----------------------+
only showing top 5 rows



In [19]:
from pyspark.sql.functions import col, expr

In [20]:
col("hwy")

Column<'hwy'>

In [22]:
avg_column = (col('hwy') + col('cty')) / 2

mpg.select(
    col('hwy').alias('highway_mileage'),
    mpg.cty.alias('city_mileage'),
    avg_column.alias('avg_mileage')
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



In [23]:
mpg.select(
    expr('hwy'), # the same as 'col'
    expr('hwy + 1'), #an arithmetic expression
    expr('hwy AS highway_mileage'), # using an alias
    expr('hwy +1 AS highway_incremented'), # a combination of the above
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



In [24]:
mpg.select(
    mpg.hwy.alias("highway"),
    col('hwy').alias('highway'),
    expr("hwy").alias('highway'),
    expr('hwy AS highway'),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



### Spark SQL

In order to start using spark SQL, we'll first "register" the table with spark:



In [25]:
mpg.createOrReplaceTempView('mpg')

In [26]:
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
    """
    )

DataFrame[hwy: bigint, cty: bigint, avg: double]

In [27]:
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 AS avg
    FROM mpg
    """
    ).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



In [28]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [29]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [30]:
mpg.select(mpg.hwy.cast('string')).printSchema()

root
 |-- hwy: string (nullable = true)



In [32]:
# note that if an entry can't be cast to a type it will be replace with null
mpg.select(mpg.model, mpg.model.cast('int')).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



Note that importing the sum function directly will override the built-in sum function. This means you will get an error if you try to sum a list of numbers, because sum will refernce the pyspark sum function, which works with pyspark dataframe columns, while the built-in sum function works with lists of numbers. The same holds true for the built in min and max functions.

In [33]:
# Note: The pyspark avg and mean functions are aliases of each other
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean


In [34]:
# this can be used to import all pyspark functions directly
# the overwrite of the sum and min and max still applies unless
# we just use: from pyspark.sql import functions as F
from pyspark.sql.functions import *


In [36]:
mpg.select(
    (sum(mpg.hwy) / count(mpg.hwy)).alias('average_1'),
    avg(mpg.hwy).alias('average_2'),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+-----------------+-----------------+--------+--------+
|        average_1|        average_2|min(hwy)|max(hwy)|
+-----------------+-----------------+--------+--------+
|23.44017094017094|23.44017094017094|      12|      44|
+-----------------+-----------------+--------+--------+



In [37]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In order to use a string literal as part of our select, we'll need to use the lit function, otherwise spark will try to resolve our string as a column.



In [39]:
from pyspark.sql.functions import lit

In [38]:
mpg.select(concat(mpg.cyl, lit(' cylinders'))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



In [40]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [41]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)


+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



In [47]:
textdf.select(
    'address',
    regexp_extract("address", r"^(\d+)", 1).alias('street_no'),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



In [48]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)


+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



Spark provides two dataframe methods, .filter and .where, which both allow us to select a subset of the rows of our dataframe.



In [55]:
mpg.filter(mpg.cyl == 4).where(mpg['class'] == 'subcompact').show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

Similar to an IF in Excel, CASE...WHEN in SQL, or np.where in python, spark provides a when function.



In [56]:
from pyspark.sql.functions import when

In [57]:
mpg.select(mpg.hwy, when(mpg.hwy > 25, 'good_mileage').alias('mpg_desc')).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



To specify multiple conditions, we can chain .when calls. The first condition that is met will be the value that is used, and if none of the conditions are met the value specified in the .otherwise will be used (or null if you don't provide a .otherwise).



In [59]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, 'small')
        .when(mpg.displ < 3, 'medium')
        .otherwise('large')
        .alias('engine_size')
    )
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



Spark lets us sort the rows in our dataframe by one or multiple columns with two methods: .sort, and .orderBy. .sort and .orderBy are aliases of each other and do the exact same thing. Like other methods we've seen, .sort takes in a Column object or a string that is the name of a column.



In [60]:
mpg.sort(mpg.hwy).show(8)

[Stage 54:>                                                         (0 + 8) / 8]

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|        ford|    f150 pickup 4wd|  5.4|1999|  8|  auto(l4)|  4| 11| 15|  r|pickup|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

                                                                                

By default, values are sorted in ascending order. To sort in descending order, we can use the .desc method on any Column object, or the desc function from pyspark.sql.functions.



In [61]:
from pyspark.sql.functions import asc, desc

In [62]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col('hwy').desc())
# is the same as
mpg.sort(desc('hwy')).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



To specify sorting by multiple columns, we provide each column as a separate argument to .sort.



In [63]:
mpg.sort(desc('class'), mpg.cyl.asc(), col('hwy').desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|      nissan|  

To aggregate our data by group, we can use the .groupBy method. Like with .select, we can pass either Column objects or strings that are column names to .groupBy. All of the expressions below are equivalent.



In [64]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(mpg.cyl)
mpg.groupBy('cyl')

<pyspark.sql.group.GroupedData at 0x11c4c4100>

Once the data is grouped, we need to specify an aggregation. We can use one of the aggregate functions we imported earlier, along with a column:



In [65]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

[Stage 57:>                                                         (0 + 8) / 8]

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
|  5|              20.5|            28.75|
+---+------------------+-----------------+



                                                                                

To group by multiple columns, pass each of the columns a a separate argument to .groupBy (Note that this is different from pandas, where we would need to pass a list).



In [66]:
mpg.groupBy('cyl', 'class').agg(avg(mpg.cty), avg(mpg.hwy)).show()

[Stage 60:>                                                         (0 + 8) / 8]

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|   midsize|              16.0|              24.0|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  4|   compact|            21.375|          29.46875|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  6|    pickup|              14.5|              17.9|
|  8|    pickup|              11.8|              15.8|
|  4|   midsize|              20.5|           29.1875|
|  6|   minivan|              15.6|              22.2|
|  4|   minivan|              18.0|              24.0|
|  6|       suv|              14.5|              18.5|
|  6|subcompact|              17.0|24.714285714285715|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|subcompact|              14.8|              21.6|
|  4|     

                                                                                

In addition to .groupBy, we can use .rollup, which will do the same aggregations, but will also include the overall total:



In [69]:
mpg.rollup('cyl').count().sort('cyl').show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



Here the null value in cyl indicates the total count.



In [70]:
mpg.rollup('cyl').agg(expr('avg(hwy)')).sort('cyl').show()

[Stage 72:>                                                         (0 + 8) / 8]

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



                                                                                

And in the example above, the null row represents the overall average highway mileage.



In [72]:
mpg.rollup('cyl', 'class').mean('hwy').sort(col('cyl'), col('class')).show()

[Stage 75:>                                                         (0 + 8) / 8]

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



                                                                                

In addition to groupby, spark provides a couple other ways to do aggregation. One of which is .crosstab. This is very similary to pandas .crosstab function, in that it calculates the number of occurances of each unique value from the two passed columns:



In [73]:
mpg.crosstab('class', 'cyl').show()

                                                                                

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|subcompact| 21|  2|  7|  5|
|   compact| 32|  2| 13|  0|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   midsize| 16|  0| 23|  2|
|    pickup|  3|  0| 10| 20|
|   2seater|  0|  0|  0|  5|
+----------+---+---+---+---+



.crosstab simply does counts, if we want a different aggregation, we can use .pivot. For example, to find the average highway mileage for each combination of car class and number of cylinders, we could write the following:



In [74]:
mpg.groupby('class').pivot('cyl').mean('hwy').show()

[Stage 100:>                                                        (0 + 8) / 8]

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



23/05/19 15:36:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 307453 ms exceeds timeout 120000 ms
23/05/19 15:36:23 WARN SparkContext: Killing executors is not supported by current scheduler.
23/05/19 15:36:23 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at sc

### Handling Missing Data

In [None]:
df = spark.createDataFrame(
    pd.DataFrame(
        {'x': [1, 2, np.nan, 4, 5, np.nan]))