## Exercises

Using the repo setup directions, setup a new local and remote repository named spark-exercises. The local version of your repo should live inside of ~/codeup-data-science. This repo should be named spark-exercises

Save this work in your spark-exercises repo. Then add, commit, and push your changes.


Create a jupyter notebook or python script named spark101 for this exercise.


Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe


Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:

    - The 1999 audi a4 has a 4 cylinder engine.

- For each vehicle.

- Transform the trans column so that it only contains either manual or auto.


Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.


Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).


In [86]:
import pandas as pd
import numpy as np
from pydataset import data

In [85]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [87]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"language": [
                "python",
                "java",
                "c_plus_plus",
                "html",
            ]
        }))

df.show(truncate=False)

+-----------+
|language   |
+-----------+
|python     |
|java       |
|c_plus_plus|
|html       |
+-----------+



In [90]:
df.describe().show()

+-------+-----------+
|summary|   language|
+-------+-----------+
|  count|          4|
|   mean|       null|
| stddev|       null|
|    min|c_plus_plus|
|    max|     python|
+-------+-----------+



In [102]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [91]:
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [15]:
from pyspark.sql.functions import col, expr

In [17]:
avg_column = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



In [18]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



In [19]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



In [20]:
mpg.createOrReplaceTempView("mpg")

In [21]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
)

DataFrame[hwy: bigint, cty: bigint, avg: double]

In [22]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



In [23]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [24]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [25]:
mpg.select(mpg.hwy.cast("string")).printSchema()

root
 |-- hwy: string (nullable = true)



In [26]:
mpg.select(mpg.model, mpg.model.cast("int")).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



In [27]:
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In [28]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS average_1)|        average_2|min(hwy)|max(hwy)|
+------------------------------------+-----------------+--------+--------+
|                   23.44017094017094|23.44017094017094|      12|      44|
+------------------------------------+-----------------+--------+--------+



In [29]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In [30]:
from pyspark.sql.functions import lit

In [31]:
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



In [32]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [33]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



In [34]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



In [35]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



In [36]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

In [37]:
from pyspark.sql.functions import when

In [39]:
mpg.select(mpg.hwy, when(mpg.hwy > 25, "good_mileage").alias("mpg_desc")).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



In [40]:
mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, "good_mileage")
    .otherwise("bad_mileage")
    .alias("mpg_desc"),
).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



In [41]:
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



In [42]:
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|       dodge|        durango 4wd|  5.9|1999|  8|  auto(l4)|  4| 11| 15|  r|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

In [43]:
from pyspark.sql.functions import asc, desc

In [44]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [45]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|      toyota|  

In [46]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

<pyspark.sql.group.GroupedData at 0x11fc154f0>

In [47]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
|  5|              20.5|            28.75|
+---+------------------+-----------------+



In [48]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|   midsize|              16.0|              24.0|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  4|   compact|            21.375|          29.46875|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  6|    pickup|              14.5|              17.9|
|  8|    pickup|              11.8|              15.8|
|  4|   midsize|              20.5|           29.1875|
|  6|   minivan|              15.6|              22.2|
|  4|   minivan|              18.0|              24.0|
|  6|       suv|              14.5|              18.5|
|  6|subcompact|              17.0|24.714285714285715|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|subcompact|              14.8|              21.6|
|  4|     

In [49]:
mpg.rollup("cyl").count().sort("cyl").show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



In [50]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



In [51]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

[Stage 54:>                                                         (0 + 8) / 8]

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



                                                                                

In [52]:
mpg.crosstab("class", "cyl").show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



In [53]:
mpg.groupby("class").pivot("cyl").mean("hwy").show()

[Stage 68:>                                                         (0 + 8) / 8]

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



                                                                                

In [54]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



In [55]:
df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [56]:
df.na.fill(0).show()

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



In [57]:
df.na.fill(0, subset="x").show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|NaN|
+---+---+



In [58]:
df.na.drop(subset="y").show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [59]:
mpg.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [60]:
mpg.select(mpg.cyl, mpg.hwy).explain()

== Physical Plan ==
*(1) Project [cyl#274L, hwy#278L]
+- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [61]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()

== Physical Plan ==
*(1) Project [(cast((cyl#274L + hwy#278L) as double) / 2.0) AS avg_mpg#1345]
+- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [62]:
mpg.filter(mpg.cyl == 6).explain()

== Physical Plan ==
*(1) Filter (isnotnull(cyl#274L) AND (cyl#274L = 6))
+- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [63]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()

== Physical Plan ==
*(1) Project [cyl#274L, hwy#278L]
+- *(1) Filter (isnotnull(cyl#274L) AND (cyl#274L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]


== Physical Plan ==
*(1) Project [cyl#274L, hwy#278L]
+- *(1) Filter (isnotnull(cyl#274L) AND (cyl#274L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [64]:
mpg.selectExpr("cyl + 3 * 16 / 4 + 19 AS unused", "hwy").select(
    "hwy"
).explain()

== Physical Plan ==
*(1) Project [hwy#278L]
+- *(1) Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [65]:
mpg.select(min(mpg.cyl)).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(cyl#274L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#1011]
      +- HashAggregate(keys=[], functions=[partial_min(cyl#274L)])
         +- Project [cyl#274L]
            +- Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [67]:
mpg.groupby(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[cyl#274L], functions=[min(hwy#278L), max(hwy#278L)])
   +- Exchange hashpartitioning(cyl#274L, 200), ENSURE_REQUIREMENTS, [id=#1045]
      +- HashAggregate(keys=[cyl#274L], functions=[partial_min(hwy#278L), partial_max(hwy#278L)])
         +- Project [cyl#274L, hwy#278L]
            +- Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [68]:
(
    mpg.select(col("cyl"), expr("(cty + hwy) / 2 AS avg_mpg"))
    .filter(expr('class == "compact"'))
    .groupby("cyl")
    .agg(min("avg_mpg"), avg("avg_mpg"), max("avg_mpg"))
    .explain()
)


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[cyl#274L], functions=[min(avg_mpg#1387), avg(avg_mpg#1387), max(avg_mpg#1387)])
   +- Exchange hashpartitioning(cyl#274L, 200), ENSURE_REQUIREMENTS, [id=#1066]
      +- HashAggregate(keys=[cyl#274L], functions=[partial_min(avg_mpg#1387), partial_avg(avg_mpg#1387), partial_max(avg_mpg#1387)])
         +- Project [cyl#274L, (cast((cty#277L + hwy#278L) as double) / 2.0) AS avg_mpg#1387]
            +- Filter (isnotnull(class#280) AND (class#280 = compact))
               +- Scan ExistingRDD[manufacturer#270,model#271,displ#272,year#273L,cyl#274L,trans#275,drv#276,cty#277L,hwy#278L,fl#279,class#280]




In [69]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [70]:
print(weather.count(), "rows", len(weather.columns), "columns")

1461 rows 6 columns


In [71]:
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

                                                                                

('2012-01-01', '2015-12-31')

In [72]:
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")
weather.show(6)

+----------+-------------+----+-------+--------+
|      date|precipitation|wind|weather|temp_avg|
+----------+-------------+----+-------+--------+
|2012-01-01|          0.0| 4.7|drizzle|     9.0|
|2012-01-02|         10.9| 4.5|   rain|     6.5|
|2012-01-03|          0.8| 2.3|   rain|     9.5|
|2012-01-04|         20.3| 4.7|   rain|     9.0|
|2012-01-05|          1.3| 6.1|   rain|     6.0|
|2012-01-06|          2.5| 2.2|   rain|     3.5|
+----------+-------------+----+-------+--------+
only showing top 6 rows



In [73]:
from pyspark.sql.functions import month, year, quarter

In [74]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12| 622.7000000000002|
+-----+------------------+



In [75]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

+-------+-----------------+
|weather|    avg(temp_avg)|
+-------+-----------------+
|    fog|7.555555555555555|
|    sun|2.977272727272727|
+-------+-----------------+



In [76]:
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

+-----+------------------------------+
|month|no_of_days_with_freezing_temps|
+-----+------------------------------+
|    1|                             3|
|    2|                             0|
|    3|                             0|
|    4|                             0|
|    5|                             0|
|    6|                             0|
|    7|                             0|
|    8|                             0|
|    9|                             0|
|   10|                             0|
|   11|                             0|
|   12|                             5|
+-----+------------------------------+



In [77]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)



+----+-------+------------------+
|year|quarter|          temp_avg|
+----+-------+------------------+
|2012|      1| 5.587912087912088|
|2012|      2|12.675824175824175|
|2012|      3|            18.375|
|2012|      4| 8.581521739130435|
|2013|      1| 6.405555555555556|
|2013|      2|14.505494505494505|
|2013|      3| 19.47826086956522|
|2013|      4| 8.032608695652174|
|2014|      1| 7.205555555555556|
|2014|      2|14.296703296703297|
|2014|      3|19.858695652173914|
|2014|      4|  9.88586956521739|
|2015|      1| 8.972222222222221|
|2015|      2|15.258241758241759|
|2015|      3|19.407608695652176|
|2015|      4| 8.956521739130435|
+----+-------+------------------+



                                                                                

In [78]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg"))
    .sort("quarter")
    .show()
)

[Stage 117:>                                                        (0 + 8) / 8]

+-------+-----+-----+-----+-----+
|quarter| 2012| 2013| 2014| 2015|
+-------+-----+-----+-----+-----+
|      1| 5.59| 6.41| 7.21| 8.97|
|      2|12.68|14.51| 14.3|15.26|
|      3|18.38|19.48|19.86|19.41|
|      4| 8.58| 8.03| 9.89| 8.96|
+-------+-----+-----+-----+-----+



                                                                                

In [79]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

--- users ---
+---+-----+-------+
| id| name|role_id|
+---+-----+-------+
|  1|  bob|    1.0|
|  2|  joe|    2.0|
|  3|sally|    3.0|
|  4| adam|    3.0|
|  5| jane|    NaN|
|  6| mike|    NaN|
+---+-----+-------+

--- roles ---
+---+---------+
| id|     name|
+---+---------+
|  1|    admin|
|  2|   author|
|  3| reviewer|
|  4|commenter|
+---+---------+



In [80]:
users.join(roles, on=users.role_id == roles.id).show()

+---+-----+-------+---+--------+
| id| name|role_id| id|    name|
+---+-----+-------+---+--------+
|  1|  bob|    1.0|  1|   admin|
|  2|  joe|    2.0|  2|  author|
|  3|sally|    3.0|  3|reviewer|
|  4| adam|    3.0|  3|reviewer|
+---+-----+-------+---+--------+



In [81]:
users.join(roles, on=users.role_id == roles.id, how="left").show()

+---+-----+-------+----+--------+
| id| name|role_id|  id|    name|
+---+-----+-------+----+--------+
|  1|  bob|    1.0|   1|   admin|
|  2|  joe|    2.0|   2|  author|
|  3|sally|    3.0|   3|reviewer|
|  4| adam|    3.0|   3|reviewer|
|  5| jane|    NaN|null|    null|
|  6| mike|    NaN|null|    null|
+---+-----+-------+----+--------+



In [82]:
users.join(roles, on=users.role_id == roles.id, how="right").show()

+----+-----+-------+---+---------+
|  id| name|role_id| id|     name|
+----+-----+-------+---+---------+
|   1|  bob|    1.0|  1|    admin|
|   2|  joe|    2.0|  2|   author|
|   4| adam|    3.0|  3| reviewer|
|   3|sally|    3.0|  3| reviewer|
|null| null|   null|  4|commenter|
+----+-----+-------+---+---------+



In [98]:
from vega_datasets import data

In [101]:
weather = spark.createDataFrame(data("weather"))

TypeError: field actual: Can not merge type <class 'pyspark.sql.types.MapType'> and <class 'pyspark.sql.types.DoubleType'>