# Spark 101

In this lesson we will cover the basics of working with spark dataframes, and show how spark dataframes are different from the pandas dataframes we have been working with.

While spark dataframes might superficially look like pandas dataframes, and even share some of the same methods and syntax, it is important to keep in mind they are 2 seperate types of objects, and, while spark and pandas code might look superficially similar, it tends to be semantically very different.

We'll begin by creating the spark session:

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Creating Dataframes

Spark can convert any pandas dataframe into a spark dataframe with a simple method call. For this lesson, we will use this functionality to demonstrate the differences between spark and pandas dataframes and explore how to work with spark dataframes.

In [2]:
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


Here we start with a simple pandas dataset, and now we will convert it to a spark dataframe:

In [3]:
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

Notice that, while we do see the column names, we don't see the data in the dataframe like we would with a pandas dataframe. This is because spark is lazy, in that it won't show us values until it has to. For the purposes of looking at the first few rows of our data, we can use the `.show` method.

In [4]:
df.show(5)

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



Like pandas dataframes, spark dataframes have a `.describe` method:

In [5]:
df.describe()

DataFrame[summary: string, n: string, group: string]

Which, also like pandas, returns another dataframe. However, since this is a spark dataframe, we have to explicitly show it.

In [6]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



By default spark will show the first 20 rows, but we can specify how many we want by passing a number to `.show`.

Observing the shape of our spark dataframe does require a little bit of creativity:

In [7]:
print("DataFrame shape: ", df.count(), " x ", len(df.columns))

DataFrame shape:  20  x  2


Let's use some different data so that we have a more robust dataset:

In [8]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



Let's look at another difference from pandas:

In [9]:
mpg.hwy

Column<'hwy'>

While this expression would produce a Series of values from a pandas dataframe, for a spark dataframe this produces a Column object, which is an object that represents a vertical slice of a dataframe, but does not contain the data itself.

One way to use our column objects is to use them in combination with the `.select` method. `.select` is very powerful, and lets us specify what data we want to see in the resulting dataframe.

In [10]:
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

Again, notice that we don't see any data, instead we see the new dataframe that is produced. To see the actual data, we'll again need to use `.show`

In [11]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

+---+---+----------+
|hwy|cty|     model|
+---+---+----------+
| 29| 18|        a4|
| 29| 21|        a4|
| 31| 20|        a4|
| 30| 21|        a4|
| 26| 16|        a4|
| 26| 18|        a4|
| 27| 18|        a4|
| 26| 18|a4 quattro|
| 25| 16|a4 quattro|
| 28| 20|a4 quattro|
+---+---+----------+
only showing top 10 rows



Our column objects support a number of operations, including the arithmetic operators:

In [12]:
mpg.hwy + 1

Column<'(hwy + 1)'>

Here we get back a column that represents the values from the original hwy column with 1 added to them. To actually see this data, we'd need to select it and show the dataframe.

In [13]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



Once we have a column object, we can use the `.alias` method to rename it:

In [14]:
mpg.select(mpg.hwy.alias("highway_mileage")).show(5)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



We can also store column objects in variables and reference them.

In [15]:
col1 = mpg.hwy.alias("highway_mileage")
col1

Column<'hwy AS highway_mileage'>

In [16]:
col2 = (mpg.hwy / 2).alias("highway_mileage_halved")
mpg.select(col1, col2).show(5)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
|             30|                  15.0|
|             26|                  13.0|
+---------------+----------------------+
only showing top 5 rows



## Other ways to create columns

In addition to the syntax we've seen above, we can create columns with the `col` and `expr` functions from `pyspark.sql.functions` module.

In [17]:
from pyspark.sql.functions import col, expr

In [18]:
col("hwy")

Column<'hwy'>

The output above is somewhat meaningless. Without attaching this col to a source, it's just a lazy object:

In [19]:
test_col = col("hwy")

In [20]:
test_col

Column<'hwy'>

We can mix and match the syntax we use, and the column object produced by the `col` function is the same as the the previous column object we saw.

Here `col("hwy")` finds a match in the mpg data when we use `.select()`

In [21]:
avg_column = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



We created a variable named `avg_column` that represents the average of the highway and city mileage of each vehicle. This variable is created by using the `col` function to produce pyspark Column objects and using the arithmetic operators to combine them.

Next we select the original highway and city mileage columns, in addition to our new average mileage column. We demonstrate the `col` function to select the `hwy` column and refer to the city mileage column with the `mpg.cty` syntax we saw previously. We also give all of our columns more readable aliases before showing the resulting dataframe.

Notice something curious when we display `avg_column` by itself:

In [22]:
avg_column

Column<'((hwy + cty) / 2)'>

It's like a recipe or formula for spark to use when it has something to match `hwy` and `cty` to. Again, Spark is **lazy** and tries to limit the amount of information stored in memory. It accomplishes this by storing the instructions and only deploying those instructions when absolutely necessary.

## `expr`
The `expr` function is more powerful than `col`. It does everything `col` does and more. `expr` returns the same type of column object, but allows us to express manipulations to the column within the string that defines the column.

In [23]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



There's a lot of redundancy in Spark...

In [24]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



Note that all the columns created above are identical, and which syntax to use is merely a style choice.

## Type Casting

We can view the types of the column in our dataframe in one of two ways:

In [25]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [26]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



Both provide the same information.

To convert from one type to another, we can use the `.cast` method on a column.

In [27]:
mpg.select(mpg.hwy.cast("string")).printSchema()

root
 |-- hwy: string (nullable = true)



Note that if a value is not able to be converted, it will be replaced with null:

In [28]:
mpg.select(mpg.model, mpg.model.cast("int")).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



# ------------------------------------------------------------------------
## Exercise 1:

Within your `codeup-data-science` directory, create a new repo named `spark-exercises`. This will be where you do your work for this module. Create a repository on GitHub with the same name, and link your local repository to GitHub.

Save this work in your `spark-exercises` repo. Then add, commit, and push your changes.

Create a jupyter notebook or python script named `spark101` for this exercise.

Create a spark data frame that contains your favorite programming languages.

- Create a dataframe with one column named `language`
> Hint: Start with a pandas dataframe. Maybe use a dictionary?
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

# ------------------------------------------------------------------------

In [29]:
# import pandas as pd

# import pyspark
# from pyspark.sql.functions import *

# spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [30]:
pd_lang_df = pd.DataFrame(
    {"language": ["LOLCODE", "Rockstar", "TrumpScript", "Chicken", "Python"]})

spk_lang_df = spark.createDataFrame(pd_lang_df)

In [31]:
spk_lang_df.printSchema()

root
 |-- language: string (nullable = true)



In [32]:
print("DataFrame shape: ", spk_lang_df.count(), " by ", len(spk_lang_df.columns))

DataFrame shape:  5  by  1


In [33]:
spk_lang_df.show(5)

+-----------+
|   language|
+-----------+
|    LOLCODE|
|   Rockstar|
|TrumpScript|
|    Chicken|
|     Python|
+-----------+



## Spark SQL

As we've seen through the column definitions, spark is very flexible and allows us many different ways to express ourselves. Another way that is fairly different than what we've seen above is through spark SQL, which lets us write SQL queries against our spark dataframes.

In order to start using spark SQL, we'll first "register" the table with spark:




In [34]:
mpg.createOrReplaceTempView("mpg")

Now we can write a sql query against the `mpg` table:

In [35]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
)

DataFrame[hwy: bigint, cty: bigint, avg: double]

Notice that the resulting value is another dataframe. As we know, in order to view the values in a dataframe, we need to use `.show`

In [36]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



It is worth noting that all of the methods for creating / manipulating dataframes outlined above are the same in terms of performance as well. All of the resulting dataframes get turned into the same spark code that gets executed on the JVM, so it really is just a style choice as to which to use.

## Basic Built-in Functions

We've used the `col` and `expr` functions, but there are many other functions within the `pyspark.sql.functions` module, all of which operate on pyspark dataframe columns. Here we'll demonstrate several:

- `concat`: to concatenate strings
- `sum`: to sum a group
- `avg`: to take the average of a group
- `min`: to find the minimum
- `max`: to find the maximum

### WARNING:
**Note that importing the `sum` function directly will override python's built-in `sum` function.** 

This means you will get an error if you try to `sum` a list of numbers, because `sum` will refernce the **pyspark** `sum` function, which works with pyspark dataframe columns, while the built-in `sum` function works with lists of numbers. **The same holds true for the built in `min` and `max` functions.**

In [37]:
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In this lesson we will explicitly import any functions from pyspark.sql.functions that we use, but it very common to see something like:

> `from pyspark.sql.functions import *`

which will import *all* of the functions from the `pyspark.sql.functions` module.

In [38]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS average_1)|        average_2|min(hwy)|max(hwy)|
+------------------------------------+-----------------+--------+--------+
|                   23.44017094017094|23.44017094017094|      12|      44|
+------------------------------------+-----------------+--------+--------+



In [39]:
mpg.columns

['manufacturer',
 'model',
 'displ',
 'year',
 'cyl',
 'trans',
 'drv',
 'cty',
 'hwy',
 'fl',
 'class']

### `concat`

We can create custom values by concatenating columns together:

In [40]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



What if we want to concatenate a string that's not already in our data? In order to use a string literal as part of our select, we'll need to use the `lit` function, otherwise spark will try to resolve our string as a column.

In [41]:
from pyspark.sql.functions import lit

In [42]:
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



In the example above, we select the concatenation of the number of cylinders (the value from the `cyl` column) and the string literal " cylinders".

In [43]:
# What happens if we don't use `lit()`?
mpg.select(concat(mpg.cyl, " cylinders")).show(5)

AnalysisException: cannot resolve '` cylinders`' given input columns: [class, cty, cyl, displ, drv, fl, hwy, manufacturer, model, trans, year];
'Project [unresolvedalias(concat(cyl#297L, ' cylinders), Some(org.apache.spark.sql.Column$$Lambda$3380/0x0000000801425840@2ea166c0))]
+- LogicalRDD [manufacturer#293, model#294, displ#295, year#296L, cyl#297L, trans#298, drv#299, cty#300L, hwy#301L, fl#302, class#303], false


## More `pyspark` Functions for String Manipulation

Let's take a look at a couple more functions for string manipulation.

In [44]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In order to demonstrate these functions we'll create a dataframe with some text data.

In [45]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



The `regexp_extract` function lets us specify a regular expression with at least one capture group, and create a new column based on the contents of a capture group.

In [46]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



In the example above, the first argument to `regexp_extract` is the name of the string column to extract from, the second argument is the regular expression itself, and the last argument specifies which capture group we want to use. If, for example, our regular expression had 2 capture groups in it and we wanted the contents of the 2nd group, we would specify a 2 here.

In addition to `regexp_extract`, `regexp_replace` lets us make substitutions based on a regular expression:

In [47]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



In our example above, we obtain just the city, state, and zip code of the address by replacing everything up to the first comma with an empty string.

## `.filter` and `.where`

Spark provides two dataframe methods, `.filter` and `.where`, which both allow us to select a subset of the rows of our dataframe.

In [48]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

In [49]:
mpg.filter(mpg.cyl == 4).filter(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

In [50]:
mpg.where(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

They give the same result. There is no difference. "filter" is the standard Scala name for such a function and "where" is more familiar to people who use SQL.

## `when()` and `.otherwise()`

Similar to:
- an `IF` in Excel,
- `CASE...WHEN` in SQL, 
- `np.where` in python, 

Spark provides a `when()` function.

The `when` function lets us specify a condition, and **a value to produce** if that condition is true:

In [51]:
from pyspark.sql.functions import when

In [52]:
mpg.select(mpg.hwy, when(mpg.hwy > 25, "good_mileage").alias("mpg_desc")).show(
    12
)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



Notice here that if the condition we specified is false, `null` will be produced. Instead of null, we can specify a value to use if our condition is false with the `.otherwise` method.

In [53]:
mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, "good_mileage")
    .otherwise("bad_mileage")
    .alias("mpg_desc"),
).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



To specify multiple conditions, we can chain `.when` calls. The first condition that is met will be the value that is used, and if none of the conditions are met the value specified in the `.otherwise` will be used (or `null` if you don't provide a `.otherwise`).

In [54]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



Notice here that a car with a `displ` of 1.8 matches both conditions we specified, but `small` is produced because it is associated with the first matching condition. For any value between 2 and 3, `medium` will be produced, and anything larger than 3 will produce `large`

In [55]:
mpg.select(
    mpg.model,
    mpg.displ,
    (
        when((mpg.displ < 2) & (col('model') == 'audi'), "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).where((col('model') == 'tiburon') | (mpg.model == 'civic')).show(50)

+-------+-----+-----------+
|  model|displ|engine_size|
+-------+-----+-----------+
|  civic|  1.6|     medium|
|  civic|  1.6|     medium|
|  civic|  1.6|     medium|
|  civic|  1.6|     medium|
|  civic|  1.6|     medium|
|  civic|  1.8|     medium|
|  civic|  1.8|     medium|
|  civic|  1.8|     medium|
|  civic|  2.0|     medium|
|tiburon|  2.0|     medium|
|tiburon|  2.0|     medium|
|tiburon|  2.0|     medium|
|tiburon|  2.0|     medium|
|tiburon|  2.7|     medium|
|tiburon|  2.7|     medium|
|tiburon|  2.7|     medium|
+-------+-----+-----------+



# ------------------------------------------------------------------------
## Exercise 2:

Load the `mpg` dataset as a spark dataframe.

a. Create 1 column of output that contains a message like the one below for each record:

    The 1999 audi a4 has a 4 cylinder engine.

> Hint: You will need to concatenate values that already exist in the data with string literals

b. Transform the trans column so that it only contains either manual or auto.

> Hint: Consider spark string methods and `when().otherwise()` chaining
# ------------------------------------------------------------------------

In [56]:
# Create 1 column of output that contains a message like the one below for each record:
# The 1999 audi a4 has a 4 cylinder engine.
mpg.select(
    concat(
        lit("The "),
        col("year"),
        lit(" "),
        col("manufacturer"),
        lit(" "),
        col("model"),
        lit(" has a "),
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("vehicle_cylinder_desc")
).show(truncate=False)

+--------------------------------------------------------------+
|vehicle_cylinder_desc                                         |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [57]:
# Transform the trans column so that it only contains either manual or auto.
mpg.show(10)

+------------+----------+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+----------+-----+----+---+----------+---+---+---+---+-------+
|        audi|        a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|        a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|        a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|        a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|        a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
|        audi|        a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
|        audi|        a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|  auto(l5)|  4| 16| 25|  p|compact|
|        audi|a4 quattro|  2.0|2008|  4|manual(m6)|  4| 20| 28|  p|compact|
+-----------

In [58]:
# multiple ways to do this
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        mpg.trans.like("a%"), "auto")
    .otherwise("manual").alias("when + like")
).show()

+----------+--------------+--------------+-----------+
|     trans|regexp_extract|regexp_replace|when + like|
+----------+--------------+--------------+-----------+
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|manual(m6)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(l5)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(l5

## `withColumn()`

We can add a new column to our dataframe using the withColumn() function. The first argument is the name of the new column, and the second argument is the forumla or value that we want in our column.

In [59]:
mpg.withColumn("is_car", lit("yes")).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|is_car|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|   yes|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|   yes|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|   yes|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|   yes|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|   yes|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|   yes|
|        audi|                a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|   yes|
|        audi|        a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|   yes|

We can also use `.withColumn()` to perform operations using multiple columns as input:

In [60]:
mpg.select(mpg.manufacturer, mpg.model,mpg.hwy, mpg.cty).withColumn("ratio", col('hwy') / col('cty')).show()

+------------+------------------+---+---+------------------+
|manufacturer|             model|hwy|cty|             ratio|
+------------+------------------+---+---+------------------+
|        audi|                a4| 29| 18|1.6111111111111112|
|        audi|                a4| 29| 21| 1.380952380952381|
|        audi|                a4| 31| 20|              1.55|
|        audi|                a4| 30| 21|1.4285714285714286|
|        audi|                a4| 26| 16|             1.625|
|        audi|                a4| 26| 18|1.4444444444444444|
|        audi|                a4| 27| 18|               1.5|
|        audi|        a4 quattro| 26| 18|1.4444444444444444|
|        audi|        a4 quattro| 25| 16|            1.5625|
|        audi|        a4 quattro| 28| 20|               1.4|
|        audi|        a4 quattro| 27| 19|1.4210526315789473|
|        audi|        a4 quattro| 25| 15|1.6666666666666667|
|        audi|        a4 quattro| 25| 17|1.4705882352941178|
|        audi|        a4

**NOTE:** If I use `.select()`, I must include the columns used to create the `.withColumn()` result

In [61]:
mpg.select(mpg.manufacturer, mpg.model).withColumn("ratio", mpg.hwy / mpg.cty).show()

AnalysisException: Resolved attribute(s) hwy#301L,cty#300L missing from manufacturer#293,model#294 in operator !Project [manufacturer#293, model#294, (cast(hwy#301L as double) / cast(cty#300L as double)) AS ratio#986].;
!Project [manufacturer#293, model#294, (cast(hwy#301L as double) / cast(cty#300L as double)) AS ratio#986]
+- Project [manufacturer#293, model#294]
   +- LogicalRDD [manufacturer#293, model#294, displ#295, year#296L, cyl#297L, trans#298, drv#299, cty#300L, hwy#301L, fl#302, class#303], false


## Sorting and Ordering

Spark lets us sort the rows in our dataframe by one or multiple columns with two methods: `.sort`, and `.orderBy`. `.sort` and `.orderBy` are aliases of each other and do the exact same thing. Like other methods we've seen, `.sort` takes in a Column object or a string that is the name of a column.

In [62]:
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|       dodge|        durango 4wd|  5.9|1999|  8|  auto(l4)|  4| 11| 15|  r|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

By default, values are sorted in ascending order. To sort in descending order, we can use the `.desc` method on any Column object, or the `desc` function from `pyspark.sql.functions`.

In [63]:
from pyspark.sql.functions import asc, desc

In [64]:
# Using the .desc() method
mpg.sort(mpg.hwy.desc()).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [65]:
# Using the col() function with .desc() method
mpg.sort(col("hwy").desc()).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [66]:
# Using the desc() function
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



To specify sorting by multiple columns, we provide each column as a separate argument to `.sort`.

Here we will first reverse alphabetically by the vehicle's class, then by the number of cylinders from lowest to highest, then by the vehicle's highway mileage, from greatest to smallest:

In [67]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|        jeep|gr

## Grouping and Aggregating

To aggregate our data by group, we can use the `.groupBy` method. Like with `.select`, we can pass either Column objects or strings that are column names to `.groupBy`. All of the expressions below are equivalent.

In [68]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

<pyspark.sql.group.GroupedData at 0x7fc544ee93a0>

Once the data is grouped, we need to specify an aggregation. We can use one of the aggregate functions we imported earlier (`sum`, `avg`, `min`, `max`, etc.) along with a column:

In [69]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
|  5|              20.5|            28.75|
+---+------------------+-----------------+



To group by multiple columns, pass each of the columns a a separate argument to `.groupBy` (Note that this is different from pandas, where we would need to pass a list).

In [70]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|   midsize|              16.0|              24.0|
|  6|   compact|16.923076923076923|25.307692307692307|
|  4|   compact|            21.375|          29.46875|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  4|   minivan|              18.0|              24.0|
|  6|    pickup|              14.5|              17.9|
|  8|    pickup|              11.8|              15.8|
|  6|   minivan|              15.6|              22.2|
|  6|       suv|              14.5|              18.5|
|  6|subcompact|              17.0|24.714285714285715|
|  8|subcompact|              14.8|              21.6|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  4|     

In addition to `.groupBy`, we can use `.rollup`, which will do the same aggregations, but will also include the overall total:

In [71]:
mpg.rollup("cyl").count().sort("cyl").show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



Here the null value in `cyl` indicates the total count.

In [72]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



And in the example above, the null row represents the overall average highway mileage.

In [73]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



## Crosstabs and Pivot Tables

In addition to groupby, spark provides a couple other ways to do aggregation. One of which is `.crosstab`. This is very similary to pandas `.crosstab` function, in that it calculates the number of occurances of each unique value from the two passed columns:

In [74]:
mpg.crosstab("class", "cyl").show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



`.crosstab` simply does counts, if we want a different aggregation, we can use `.pivot`. For example, to find the average highway mileage for each combination of car class and number of cylinders, we could write the following:

In [75]:
mpg.groupby("class").pivot("cyl").mean("hwy").show()

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



Here the unique values from the column we group by will be the rows in the resulting dataframe, and the unique values from the column we pivot on will become the columns. The values in each cell will be equal to the aggregation we specified over the group of values defined by the intersection of the rows and the columns.

# ------------------------------------------------------------------------
## Exercise 3: 

Load the `tips` dataset as a spark dataframe.

a. What percentage of observations are smokers?
> Hint: `.groupBy()` and `.withColumn()` are useful functions here

b. Create a column that contains the tip percentage
> Hint: `.withColumn()` is useful here

c. Calculate the average tip percentage for each combination of sex and smoker.
> Hint: Chain additional functions off the answer to part b 

# ------------------------------------------------------------------------

In [76]:
# Load the tips dataset

tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [77]:
tips.groupBy("smoker").count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [78]:
from pyspark.sql.functions import *

In [79]:
tips.groupBy("smoker").count().withColumn("percent", 
                                          round((col("count")/tips.count()*100), 0)
).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|   62.0|
|   Yes|   93|   38.0|
+------+-----+-------+



In [80]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [81]:
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [82]:
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).groupby("sex").pivot("smoker").agg(mean("tip_percentage")).show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941035|
|  Male|0.1606687151291298| 0.1527711752024851|
+------+------------------+-------------------+



## Handling Missing Data

Let's take a look at how spark handles missing data. First we'll create a dataframe that has a few missing values:

In [83]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



Spark provides two main ways to deal with missing values:

- `.fill`: to replace missing values with a specified value
- `.drop`: to drop rows containing missing values
Both methods are accessed through the `.na` property. We'll look at some examples below:

In [84]:
df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [85]:
df.na.fill(0).show()

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



For both methods, we can specify that we only want to fill or drop values in a specific column with a second argument:

In [86]:
df.na.fill(0, subset="x").show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|NaN|
+---+---+



Notice that above the na values in the x column were filled with 0, but the na values in y were left alone.

In [87]:
df.na.drop(subset="y").show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In the example above, the rows that had an na value for the y column were dropped, but the rows with na values for only the x column are still present.

## Explaining DataFrame Transformations

The `.explain` method will show us how spark is thinking about our dataframe.

For our basic example, we see that there is only a single step:

In [88]:
mpg.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




When we add a select function, there is another step after "Scan ExistingRDD", a "Project" that contains the names of the columns we are looking for:

In [89]:
mpg.select(mpg.cyl, mpg.hwy).explain()

== Physical Plan ==
*(1) Project [cyl#297L, hwy#301L]
+- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




Here we are doing a more advanced select calculation, but this is still just a single step to spark.

In [90]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()

== Physical Plan ==
*(1) Project [(cast((cyl#297L + hwy#301L) as double) / 2.0) AS avg_mpg#2017]
+- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




A filter is also a single step:

In [91]:
mpg.filter(mpg.cyl == 6).explain()

== Physical Plan ==
*(1) Filter (isnotnull(cyl#297L) AND (cyl#297L = 6))
+- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




Without reading ahead, do you think the execution plan for the two dataframes below will be the same or not?

In [92]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()

== Physical Plan ==
*(1) Project [cyl#297L, hwy#301L]
+- *(1) Filter (isnotnull(cyl#297L) AND (cyl#297L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]


== Physical Plan ==
*(1) Project [cyl#297L, hwy#301L]
+- *(1) Filter (isnotnull(cyl#297L) AND (cyl#297L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




Notice that even though we specified the transformations (`.select` and `.filter`) in a different order, we end up with the same output when we call `.explain`. This is because spark will look at our dataframe and transform it into the most efficient representation possible.

In [93]:
mpg.selectExpr("cyl + 3 * 16 / 4 + 19 AS unused", "hwy").select(
    "hwy"
).explain()

== Physical Plan ==
*(1) Project [hwy#301L]
+- *(1) Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




Notice above that we had 2 separate select statements, but spark condensed this down to a single Project, as it is smart enough to realize that it doesn't actually need to do all the arithmetic we specified in the first select, since we arent using that value later on.

Now our execution plan gets more complicated:

In [94]:
mpg.select(min(mpg.cyl)).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(cyl#297L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#1559]
      +- HashAggregate(keys=[], functions=[partial_min(cyl#297L)])
         +- Project [cyl#297L]
            +- Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




This is because in steps prior, we were applying transformations that applied to each row individually. To calculate a minimum, we have to look at all the rows in the dataset to find the smallest.

In [95]:
mpg.groupby(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[cyl#297L], functions=[min(hwy#301L), max(hwy#301L)])
   +- Exchange hashpartitioning(cyl#297L, 200), ENSURE_REQUIREMENTS, [id=#1576]
      +- HashAggregate(keys=[cyl#297L], functions=[partial_min(hwy#301L), partial_max(hwy#301L)])
         +- Project [cyl#297L, hwy#301L]
            +- Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




In [96]:
(
    mpg.select(col("cyl"), expr("(cty + hwy) / 2 AS avg_mpg"))
    .filter(expr('class == "compact"'))
    .groupby("cyl")
    .agg(min("avg_mpg"), avg("avg_mpg"), max("avg_mpg"))
    .explain()
)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[cyl#297L], functions=[min(avg_mpg#2054), avg(avg_mpg#2054), max(avg_mpg#2054)])
   +- Exchange hashpartitioning(cyl#297L, 200), ENSURE_REQUIREMENTS, [id=#1597]
      +- HashAggregate(keys=[cyl#297L], functions=[partial_min(avg_mpg#2054), partial_avg(avg_mpg#2054), partial_max(avg_mpg#2054)])
         +- Project [cyl#297L, (cast((cty#300L + hwy#301L) as double) / 2.0) AS avg_mpg#2054]
            +- Filter (isnotnull(class#303) AND (class#303 = compact))
               +- Scan ExistingRDD[manufacturer#293,model#294,displ#295,year#296L,cyl#297L,trans#298,drv#299,cty#300L,hwy#301L,fl#302,class#303]




## More Dataframe Manipulation Examples

Let's take a look at some more examples of working with spark dataframes. For these examples, we'll be working with a dataset of observations of the weather in seattle.

In [97]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



Let's print out the number of rows and columns in our dataset:

In [98]:
print(weather.count(), "rows", len(weather.columns), "columns")

1461 rows 6 columns


In [99]:
weather.printSchema()

root
 |-- date: string (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- weather: string (nullable = true)



Let's first find the dates where the data starts and stops:

In [100]:
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

('2012-01-01', '2015-12-31')

We used `.select` to select the minimum date and the maximum date. `.first` returns us the first row of our results, which consists of two value, and so can be unpacked into the min_date and max_date variables.

Next we will combine the temp max and min columns into a single column, `temp_avg`:

In [101]:
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")
weather.show(6)

+----------+-------------+----+-------+--------+
|      date|precipitation|wind|weather|temp_avg|
+----------+-------------+----+-------+--------+
|2012-01-01|          0.0| 4.7|drizzle|     9.0|
|2012-01-02|         10.9| 4.5|   rain|     6.5|
|2012-01-03|          0.8| 2.3|   rain|     9.5|
|2012-01-04|         20.3| 4.7|   rain|     9.0|
|2012-01-05|          1.3| 6.1|   rain|     6.0|
|2012-01-06|          2.5| 2.2|   rain|     3.5|
+----------+-------------+----+-------+--------+
only showing top 6 rows



Now we will calculate the total amount of rainfall for each month. We'll do this by first creating a month column, then grouping by the month, and finally, aggregating by taking the sum of the precipitation. To do this we will need to use the `month` function.

In [102]:
from pyspark.sql.functions import month, year, quarter

In [103]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4| 375.3999999999999|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+



The `.sort` at the end isn't necessary, but presents that data in a friendlier way.

Let's now take a look at the average temperature for each type of weather in December 2013:

In [104]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

+-------+-----------------+
|weather|    avg(temp_avg)|
+-------+-----------------+
|    fog|7.555555555555555|
|    sun|2.977272727272727|
+-------+-----------------+



Here we first have a couple of `.filter` calls in order to restrict our data to December of 2013. We then group by the weather column, and lastly, aggregate by taking the average of our `temp_avg` column. The combination of group by and agg will calculate the average temperature for each unique value of the `weather` column.

Let's now find out how many days had freezing temperatures in each month of 2013.

In [105]:
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

+-----+------------------------------+
|month|no_of_days_with_freezing_temps|
+-----+------------------------------+
|    1|                             3|
|    2|                             0|
|    3|                             0|
|    4|                             0|
|    5|                             0|
|    6|                             0|
|    7|                             0|
|    8|                             0|
|    9|                             0|
|   10|                             0|
|   11|                             0|
|   12|                             5|
+-----+------------------------------+



One last example, let's calculate the average temperature for each quarter of each year:

In [106]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)

+----+-------+------------------+
|year|quarter|          temp_avg|
+----+-------+------------------+
|2012|      1| 5.587912087912088|
|2012|      2|12.675824175824175|
|2012|      3|            18.375|
|2012|      4| 8.581521739130435|
|2013|      1| 6.405555555555556|
|2013|      2|14.505494505494505|
|2013|      3| 19.47826086956522|
|2013|      4| 8.032608695652174|
|2014|      1| 7.205555555555556|
|2014|      2|14.296703296703297|
|2014|      3|19.858695652173914|
|2014|      4|  9.88586956521739|
|2015|      1| 8.972222222222221|
|2015|      2|15.258241758241759|
|2015|      3|19.407608695652176|
|2015|      4| 8.956521739130435|
+----+-------+------------------+



Here we created the `quarter` and `year` columns, then grouped by these two new columns, and took the average temperature as our aggregate. Lastly, we sorted by the year and quarter for presentation purposes.

We could also use a pivot table like this:

In [107]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg"))
    .sort("quarter")
    .show()
)

+-------+-----+-----+-----+-----+
|quarter| 2012| 2013| 2014| 2015|
+-------+-----+-----+-----+-----+
|      1| 5.59| 6.41| 7.21| 8.97|
|      2|12.68|14.51| 14.3|15.26|
|      3|18.38|19.48|19.86|19.41|
|      4| 8.58| 8.03| 9.89| 8.96|
+-------+-----+-----+-----+-----+



Here instead of grouping by two columns, we grouped by the first column and pivoted on the other column.

## Joins

Like pandas and sql, spark has functionality that lets us combine two tabular datasets, known as a join.

We'll start by creating some data that we can join together:

In [108]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

--- users ---
+---+-----+-------+
| id| name|role_id|
+---+-----+-------+
|  1|  bob|    1.0|
|  2|  joe|    2.0|
|  3|sally|    3.0|
|  4| adam|    3.0|
|  5| jane|    NaN|
|  6| mike|    NaN|
+---+-----+-------+

--- roles ---
+---+---------+
| id|     name|
+---+---------+
|  1|    admin|
|  2|   author|
|  3| reviewer|
|  4|commenter|
+---+---------+



To join two dataframes together, we'll need to call the `.join` method on one of them and supply the other as an argument. In addition, we'll need to supply the condition on which we are joining. In our case, we are joining where the `role_id` column on the users table is equal to the `id` column on the roles table.

In [109]:
users.join(roles, on=users.role_id == roles.id).show()

+---+-----+-------+---+--------+
| id| name|role_id| id|    name|
+---+-----+-------+---+--------+
|  1|  bob|    1.0|  1|   admin|
|  2|  joe|    2.0|  2|  author|
|  3|sally|    3.0|  3|reviewer|
|  4| adam|    3.0|  3|reviewer|
+---+-----+-------+---+--------+



By default, spark will perform an inner join, meaning that records from both dataframes will have a match with the other. We can also specify either a left or a right join, which will keep all of the records from either the left or right side, even if those records don't have a match with the other dataframe.

In [110]:
users.join(roles, on=users.role_id == roles.id, how="left").show()

+---+-----+-------+----+--------+
| id| name|role_id|  id|    name|
+---+-----+-------+----+--------+
|  1|  bob|    1.0|   1|   admin|
|  2|  joe|    2.0|   2|  author|
|  3|sally|    3.0|   3|reviewer|
|  4| adam|    3.0|   3|reviewer|
|  5| jane|    NaN|null|    null|
|  6| mike|    NaN|null|    null|
+---+-----+-------+----+--------+



In [111]:
users.join(roles, on=users.role_id == roles.id, how="right").show()

+----+-----+-------+---+---------+
|  id| name|role_id| id|     name|
+----+-----+-------+---+---------+
|   1|  bob|    1.0|  1|    admin|
|   2|  joe|    2.0|  2|   author|
|   3|sally|    3.0|  3| reviewer|
|   4| adam|    3.0|  3| reviewer|
|null| null|   null|  4|commenter|
+----+-----+-------+---+---------+



Notice that examples above have a duplicate `id` column. There are several ways we could go about dealing with this:

- alias each dataframe + explicitly select columns after joining (this could also be implemented with spark SQL)
- rename duplicated columns before merging
- drop duplicated columns after the merge (`.drop(right.id)`)

## Visualization (or Lack Therof)

Spark does not provide a way to do visualization with their dataframes. To visualize data from spark, you should use the `.toPandas` method on a spark dataframe to convert it to a pandas dataframe, then visualize as you normally would.

> Converting a spark dataframe to a pandas dataframe will pull all the data into memory, so make sure you have enough available memory to do so.

# ------------------------------------------------------------------------
## Exercise 4:

Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [112]:
from vega_datasets import data

weather = data.seattle_weather()
weather = spark.createDataFrame(weather)
weather.show(4)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 4 rows



Convert temperatures from c to f: (0°C × 9/5) + 32 = 32°F

In [113]:
# pandas equivalent -- df.temp_max = df.temp_max * 9 / 5 + 32

weather = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))

In [124]:
weather.show(15)

+-------------------+-------------+------------------+------------------+----+-------+
|               date|precipitation|          temp_max|          temp_min|wind|weather|
+-------------------+-------------+------------------+------------------+----+-------+
|2012-01-01 00:00:00|          0.0|             55.04|              41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|             48.02|             37.04| 6.1|   rain|
|2012-01-06 00:00:00|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07 00:00:00|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08 00:00:00|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09 00:00:00|          4.3|        

In [128]:
weather.count()

1461

In [129]:
weather.filter(col('weather') == 'rain').count()

259

Which month has the most rain, on average?

In [141]:
weather.withColumn("month", month("date")).withColumn("year", year("date")).groupBy("month", "year").agg(sum("precipitation").alias("total_monthly_precipitation")).show()

+-----+----+---------------------------+
|month|year|total_monthly_precipitation|
+-----+----+---------------------------+
|    2|2012|                       92.3|
|    1|2012|         173.29999999999998|
|    3|2012|                      183.0|
|    4|2012|          68.09999999999998|
|    8|2012|                        0.0|
|    6|2012|                       75.1|
|    5|2012|         52.199999999999996|
|    7|2012|                       26.3|
|   12|2012|                      174.0|
|   10|2012|         170.29999999999998|
|    9|2012|         0.8999999999999999|
|   11|2012|                      210.5|
|    1|2013|         105.69999999999997|
|    2|2013|         40.300000000000004|
|    3|2013|                       69.7|
|    4|2013|         149.60000000000002|
|    8|2013|                       34.4|
|    6|2013|                       33.1|
|    7|2013|                        0.0|
|    5|2013|          60.49999999999999|
+-----+----+---------------------------+
only showing top

In [115]:
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

Row(month=11, avg_monthly_rain=160.625)

In [116]:
row.avg_monthly_rain

160.625

Which year is the windiest?

In [144]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_winds"))
    .sort(col("total_winds").desc())
    .head(5)
)

[Row(year=2012, total_winds=1244.7000000000003),
 Row(year=2014, total_winds=1236.5),
 Row(year=2015, total_winds=1153.3),
 Row(year=2013, total_winds=1100.7999999999997)]

What is the most frequent type of weather in january?

In [118]:
(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



What is the average high and low tempurature on sunny days in July in 2013 and 2014?

In [119]:
(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



What percentage of days were rainy in q3 of 2015?

In [136]:
# in pandas -- (df.weather == "rain").mean()
# measure a rainy day by weather == rain
rainy_q3_2015 = (
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
)

In [137]:
# Visualizing object
rainy_q3_2015.show()

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [139]:
# Renaming column
rainy_q3_2015.select(col('avg(rain)').alias('percent_rain_q3_2015')).show()

+--------------------+
|percent_rain_q3_2015|
+--------------------+
|0.021739130434782608|
+--------------------+



For each year, find what percentage of days it rained (had non-zero precipitation).

In [121]:
# measure a rainy day by precipitation > 0
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("rain"), "year")
    .groupby("year")
    .agg(mean("rain"))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

