In [17]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

# 1

Create a spark data frame that contains your favorite programming languages.

    The name of the column should be language
    View the schema of the dataframe
    Output the shape of the dataframe
    Show the first 5 records in the dataframe



In [2]:
df = pd.DataFrame({'language':['python', 'numpy', 'C++', 'candy', 'java', 'French']})

In [3]:
df

Unnamed: 0,language
0,python
1,numpy
2,C++
3,candy
4,java
5,French


In [4]:
dfs = spark.createDataFrame(df)

In [5]:
dfs

DataFrame[language: string]

In [6]:
dfs.show(5)

                                                                                

+--------+
|language|
+--------+
|  python|
|   numpy|
|     C++|
|   candy|
|    java|
+--------+
only showing top 5 rows



In [7]:
dfs.describe()

                                                                                

DataFrame[summary: string, language: string]

In [8]:
print(dfs.count(), 'rows', len(dfs.columns), 'columns')

6 rows 1 columns


In [9]:
dfs.printSchema()

root
 |-- language: string (nullable = true)



# 2
Load the mpg dataset as a spark dataframe.

    Create 1 column of output that contains a message like the one below:

The 1999 audi a4 has a 4 cylinder engine.

For each vehicle.

Transform the trans column so that it only contains either manual or auto.


In [10]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [11]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [12]:
#The 1999 audi a4 has a 4 cylinder engine.

print('The', mpg.year, mpg.manufacturer, mpg.model, 'has a', mpg.cyl, 'cylinder engine')

The Column<'year'> Column<'manufacturer'> Column<'model'> has a Column<'cyl'> cylinder engine


In [20]:
#The 1999 audi a4 has a 4 cylinder engine.

#col1 = ('The', mpg.year, mpg.manufacturer, mpg.model, 'has a', mpg.cyl, 'cylinder engine').alias('message')


#mpg.select('The', mpg.year, mpg.manufacturer, mpg.model, 'has a', mpg.cyl, 'cylinder engine').alias('message').show(5)


# I needed to specify 'lit'

In [18]:
# from class review
mpg.select(
    concat(
        lit("The "),
        col("year"),
        lit(" "),
        col("manufacturer"),
        lit(" "),
        col("model"),
        lit(" has a "),
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("vehicle_cylinder_desc")
).show(truncate=False)

+--------------------------------------------------------------+
|vehicle_cylinder_desc                                         |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

# 2b

In [None]:
from pyspark.sql.functions import when

In [21]:
mpg.select(mpg.trans, when(mpg.trans.contains('auto'), 'auto').otherwise('manual')).alias('testcol').show(5)


+----------+---------------------------------------------------------+
|     trans|CASE WHEN contains(trans, auto) THEN auto ELSE manual END|
+----------+---------------------------------------------------------+
|  auto(l5)|                                                     auto|
|manual(m5)|                                                   manual|
|manual(m6)|                                                   manual|
|  auto(av)|                                                     auto|
|  auto(l5)|                                                     auto|
+----------+---------------------------------------------------------+
only showing top 5 rows



In [22]:
# other methods from class review 
# multiple ways to do this, here's 3 of them
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        mpg.trans.like("auto%"), "auto"
    ).otherwise("manual").alias("when + like")
).show()

+----------+--------------+--------------+-----------+
|     trans|regexp_extract|regexp_replace|when + like|
+----------+--------------+--------------+-----------+
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|manual(m6)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(l5)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(l5

# 3

Load the tips dataset as a spark dataframe.

    What percentage of observations are smokers?
    Create a column that contains the tip percentage
    Calculate the average tip percentage for each combination of sex and smoker.



In [50]:
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [24]:
tips.groupBy(tips.smoker)

<pyspark.sql.group.GroupedData at 0x12745a520>

In [25]:
tips.groupBy(tips.smoker).count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



[Stage 14:=====>                                                   (1 + 9) / 10]                                                                                

In [26]:
m = tips.groupBy(tips.smoker).count()

In [27]:
m.show()

[Stage 17:>                                                       (0 + 10) / 10]

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



                                                                                

In [28]:
tips.count()

244

In [29]:
m.select(m['count']/tips.count()).show()

+-------------------+
|      (count / 244)|
+-------------------+
| 0.6188524590163934|
|0.38114754098360654|
+-------------------+



In [36]:
# pretty version from review
tips.groupby('smoker').count().withColumn('percent', col('count') / tips.count()).show()

+------+-----+-------------------+
|smoker|count|            percent|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



In [30]:
#tip pecentage
col2 = (tips.tip/tips.total_bill).alias('per')

In [31]:
tips.select(col2).show(5)

+-------------------+
|                per|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



 Calculate the average tip percentage for each combination of sex and smoker.

In [32]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [33]:
from pyspark.sql.functions import col, expr, avg

In [None]:
#verify groupby works

In [51]:
tips.groupBy('sex', 'smoker').agg(round(avg(tips.tip), 3)).show(5)

+------+------+------------------+
|   sex|smoker|round(avg(tip), 3)|
+------+------+------------------+
|  Male|    No|             3.113|
|Female|    No|             2.774|
|  Male|   Yes|             3.051|
|Female|   Yes|             2.932|
+------+------+------------------+



In [52]:
tips.withColumn('per', tips.tip/tips.total_bill).show(5) #check it worked

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|                per|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [55]:
tips = tips.withColumn('per', tips.tip/tips.total_bill) #dont show 5 or itll only equal the first five rows duh

In [57]:
tips.show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|                per|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [58]:
tips.groupBy('sex', 'smoker').agg(round(avg(tips.per), 3)).show(5)

+------+------+------------------+
|   sex|smoker|round(avg(per), 3)|
+------+------+------------------+
|  Male|    No|             0.161|
|Female|    No|             0.157|
|  Male|   Yes|             0.153|
|Female|   Yes|             0.182|
+------+------+------------------+



In [37]:
# from review
tips.withColumn("tip_percentage", col('tip') / col('total_bill'))\
    .groupby("sex")\
    .pivot("smoker")\
    .agg(round(mean("tip_percentage"), 4))\
    .show()

[Stage 51:>                                                       (0 + 10) / 10]

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|Female|0.1569|0.1822|
|  Male|0.1607|0.1528|
+------+------+------+



                                                                                

# 4

Use the seattle weather dataset referenced in the lesson to answer the questions below.

    Convert the temperatures to fahrenheit.
    Which month has the most rain, on average?
    Which year was the windiest?
    What is the most frequent type of weather in January?
    What is the average high and low temperature on sunny days in July in 2013 and 2014?
    What percentage of days were rainy in q3 of 2015?
    For each year, find what percentage of days it rained (had non-zero precipitation).



In [63]:
from vega_datasets import data

weather = data.seattle_weather()
weather = spark.createDataFrame(weather)
weather.show(4)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 4 rows



In [64]:
weather = weather.withColumn(
    'max_F', (weather.temp_max * 9 / 5 + 32)
).withColumn('min_F', (weather.temp_min * 9 / 5 + 32))
weather

DataFrame[date: timestamp, precipitation: double, temp_max: double, temp_min: double, wind: double, weather: string, max_F: double, moin_F: double]

In [65]:
weather.show(5)

+-------------------+-------------+--------+--------+----+-------+-----+------+
|               date|precipitation|temp_max|temp_min|wind|weather|max_F|moin_F|
+-------------------+-------------+--------+--------+----+-------+-----+------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|55.04|  41.0|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|51.08| 37.04|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|53.06| 44.96|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|53.96| 42.08|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|48.02| 37.04|
+-------------------+-------------+--------+--------+----+-------+-----+------+
only showing top 5 rows



In [66]:
# class review
weather_review = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))
weather

DataFrame[date: timestamp, precipitation: double, temp_max: double, temp_min: double, wind: double, weather: string, max_F: double, moin_F: double]

In [67]:
weather_review.show(5)

+-------------------+-------------+--------+--------+----+-------+-----+------+
|               date|precipitation|temp_max|temp_min|wind|weather|max_F|moin_F|
+-------------------+-------------+--------+--------+----+-------+-----+------+
|2012-01-01 00:00:00|          0.0|   55.04|    41.0| 4.7|drizzle|55.04|  41.0|
|2012-01-02 00:00:00|         10.9|   51.08|   37.04| 4.5|   rain|51.08| 37.04|
|2012-01-03 00:00:00|          0.8|   53.06|   44.96| 2.3|   rain|53.06| 44.96|
|2012-01-04 00:00:00|         20.3|   53.96|   42.08| 4.7|   rain|53.96| 42.08|
|2012-01-05 00:00:00|          1.3|   48.02|   37.04| 6.1|   rain|48.02| 37.04|
+-------------------+-------------+--------+--------+----+-------+-----+------+
only showing top 5 rows



Which month has the most rain, on average? 

In [72]:
rainy = (
    weather.withColumn('month', month('date'))
    .withColumn('year', year('date'))
    .groupBy('month', 'year')
    .agg(sum('precipitation').alias('avg_mon')))

In [73]:
rainy.show(5)

[Stage 88:>                                                       (0 + 10) / 10]

+-----+----+------------------+
|month|year|           avg_mon|
+-----+----+------------------+
|    2|2012|              92.3|
|    5|2012|              52.2|
|    1|2012|173.29999999999998|
|    3|2012|             183.0|
|    4|2012| 68.09999999999998|
+-----+----+------------------+
only showing top 5 rows



                                                                                

In [76]:
rainy.groupBy('month').agg(avg('avg_mon').alias('tot_monthly_avg')).show(5)

+-----+------------------+
|month|   tot_monthly_avg|
+-----+------------------+
|   12|           155.675|
|    1|116.49999999999999|
|    6|            33.225|
|    3|            151.55|
|    5|            51.875|
+-----+------------------+
only showing top 5 rows



In [78]:
monthly_rain = rainy.groupBy('month').agg(avg('avg_mon').alias('tot_monthly_avg'))

In [86]:
monthly_rain.sort(col('tot_monthly_avg').desc()).show(1)

[Stage 130:>                                                      (0 + 10) / 10]

+-----+---------------+
|month|tot_monthly_avg|
+-----+---------------+
|   11|        160.625|
+-----+---------------+
only showing top 1 row



                                                                                

In [68]:
# From Review
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

                                                                                

Row(month=11, avg_monthly_rain=160.625)

In [None]:
# windiest year

In [88]:
(
    weather.withColumn('year', year('date'))
    .groupBy('year')
    .agg(sum('wind').alias('yrly_wind'))
    .sort(col('yrly_wind').desc())
    .head(3)
)

[Row(year=2012, yrly_wind=1244.7),
 Row(year=2014, yrly_wind=1236.5),
 Row(year=2015, yrly_wind=1153.2999999999997)]

In [None]:
# rest is from review

In [None]:
# whats Jan like

In [89]:
(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



In [90]:
# .value_counts for spark
weather.groupBy('weather').count().show()

+-------+-----+
|weather|count|
+-------+-----+
|drizzle|   54|
|   rain|  259|
|    sun|  714|
|   snow|   23|
|    fog|  411|
+-------+-----+



In [None]:
# avg high low

In [91]:
(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+------------------+-----------------+
| average_high_temp| average_low_temp|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



In [None]:
# What percentage of days were rainy in q3 of 2015?

In [None]:
# in pandas -- (df.weather == "rain").mean()
# measure a rainy day by weather == rain
(
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
    .show()
)

In [None]:
#  For each year, find what percentage of days it rained (had non-zero precipitation).

In [None]:
# measure a rainy day by precipitation > 0
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("did_rain"), "year")
    .groupby("year")
    .agg(mean("did_rain"))
    .show()
)