## Exercises

Using the [repo setup directions](https://ds.codeup.com/fundamentals/git/), setup a new local and remote repository named `spark-exercises`. The local version of your repo should live inside of `~/codeup-data-science`. This repo should be named `spark-exercises`

Save this work in your `spark-exercises` repo. Then add, commit, and push your changes.

Create a jupyter notebook or python script named `spark101` for this exercise.

1. Create a spark data frame that contains your favorite programming languages.

    - The name of the column should be `language`
    - View the schema of the dataframe
    - Output the shape of the dataframe
    - Show the first 5 records in the dataframe

1. Load the `mpg` dataset as a spark dataframe.

    1. Create 1 column of output that contains a message like the one below:

            The 1999 audi a4 has a 4 cylinder engine.

        For each vehicle.

    1. Transform the `trans` column so that it only contains either `manual` or `auto`.

1. Load the `tips` dataset as a spark dataframe.

    1. What percentage of observations are smokers?
    1. Create a column that contains the tip percentage
    1. Calculate the average tip percentage for each combination of sex and smoker.

1. Use the seattle weather dataset referenced in the lesson to answer the questions below.

    - Convert the temperatures to fahrenheit.
    - Which month has the most rain, on average?
    - Which year was the windiest?
    - What is the most frequent type of weather in January?
    - What is the average high and low temperature on sunny days in July in 2013 and 2014?
    - What percentage of days were rainy in q3 of 2015?
    - For each year, find what percentage of days it rained (had non-zero precipitation).

In [60]:
import pyspark
from pydataset import data
from pyspark.sql.functions import col, expr, lit, regexp_extract, regexp_replace
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean, when
from pyspark.sql.functions import asc, desc, month, year, quarter
import pyspark.sql.functions as F
import pandas as pd
import numpy as np

1. Create a spark data frame that contains your favorite programming languages.

    - The name of the column should be `language`
    - View the schema of the dataframe
    - Output the shape of the dataframe
    - Show the first 5 records in the dataframe

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/20 14:35:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/20 14:36:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/20 14:36:00 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [6]:
df = pd.DataFrame(
    {
        'language' : ['python', 'C++', 'Java', 'JavaScript', 'SQL', 'TypeScript', 'MATLAB', 'BASIC']
    }
)

In [7]:
df

Unnamed: 0,language
0,python
1,C++
2,Java
3,JavaScript
4,SQL
5,TypeScript
6,MATLAB
7,BASIC


In [8]:
fpl = spark.createDataFrame(df)

In [9]:
fpl.show(5)

                                                                                

+----------+
|  language|
+----------+
|    python|
|       C++|
|      Java|
|JavaScript|
|       SQL|
+----------+
only showing top 5 rows



In [10]:
fpl.count()

                                                                                

8

In [12]:
fpl.printSchema()

root
 |-- language: string (nullable = true)



In [58]:
fpl.schema

StructType([StructField('language', StringType(), True)])

1. Load the `mpg` dataset as a spark dataframe.

    1. Create 1 column of output that contains a message like the one below:

            The 1999 audi a4 has a 4 cylinder engine.

        For each vehicle.

    1. Transform the `trans` column so that it only contains either `manual` or `auto`.

In [13]:
mpg = spark.createDataFrame(data('mpg'))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [18]:
mpg.select(concat(lit('The '), mpg.year, lit(' '), mpg.manufacturer, lit(' '), mpg.model, lit(' has a '), mpg.cyl, 
                  lit(' cylinder engine.')).alias('description')).show(5, truncate = False)

+-----------------------------------------+
|description                              |
+-----------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.|
|The 1999 audi a4 has a 4 cylinder engine.|
|The 2008 audi a4 has a 4 cylinder engine.|
|The 2008 audi a4 has a 4 cylinder engine.|
|The 1999 audi a4 has a 6 cylinder engine.|
+-----------------------------------------+
only showing top 5 rows



In [59]:
# mpg.select(
    # "trans",
    # regexp_extract("trans", r"^[ma]", 1),
# ).show(truncate=False)

mpg.withColumn('trans',
               when(mpg.trans.like('auto%'), 'auto'
                   ).otherwise(
                   'manual'
               )
              ).select('trans','tran').show(5)

AnalysisException: Column 'tran' does not exist. Did you mean one of the following? [trans, drv, year, class, cty, cyl, fl, hwy, displ, model, manufacturer];
'Project [trans#441, 'tran]
+- Project [manufacturer#13, model#14, displ#15, year#16L, cyl#17L, CASE WHEN trans#18 LIKE auto% THEN auto ELSE manual END AS trans#441, drv#19, cty#20L, hwy#21L, fl#22, class#23]
   +- LogicalRDD [manufacturer#13, model#14, displ#15, year#16L, cyl#17L, trans#18, drv#19, cty#20L, hwy#21L, fl#22, class#23], false


1. Load the `tips` dataset as a spark dataframe.

    1. What percentage of observations are smokers?
    1. Create a column that contains the tip percentage
    1. Calculate the average tip percentage for each combination of sex and smoker.

In [24]:
tips = spark.createDataFrame(data('tips'))
tips.show(5)

                                                                                

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [40]:
round((tips.filter(tips.smoker == 'Yes').count()/tips.count())*100)

38

In [61]:
tips.groupby('smoker').count().withColumn('percent', F.round(
    F.col('count') / tips.count() * 100)).show()

[Stage 62:>                                                         (0 + 8) / 8]

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|   62.0|
|   Yes|   93|   38.0|
+------+-----+-------+



                                                                                

In [57]:
tips.select(
    ((tips.tip / tips.total_bill)).alias('tip_percentage')).show(5)

+-------------------+
|     tip_percentage|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [64]:
tips.withColumn('tip_percentage', tips.tip / tips.total_bill).groupby('sex', 'smoker').agg(F.round(tips.tip/tips.total_bill), 4).alias('avg_tip_percent').show(5)

AssertionError: all exprs should be Column

In [63]:
tips.select(tips.tip, tips.total_bill, F.round(tips.tip/tips.total_bill), 4).alias('avg_tip_percent').show(5)

TypeError: Invalid argument, not a string or column: 4 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [65]:
tips.groupby('sex').pivot('smoker').agg(F.round(F.mean(tips.tip/tips.total_bill), 4)).show()

[Stage 76:>                                                         (0 + 1) / 1]

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|Female|0.1569|0.1822|
|  Male|0.1607|0.1528|
+------+------+------+



                                                                                

1. Use the seattle weather dataset referenced in the lesson to answer the questions below.

    - Convert the temperatures to fahrenheit.
    - Which month has the most rain, on average?
    - Which year was the windiest?
    - What is the most frequent type of weather in January?
    - What is the average high and low temperature on sunny days in July in 2013 and 2014?
    - What percentage of days were rainy in q3 of 2015?
    - For each year, find what percentage of days it rained (had non-zero precipitation).

In [70]:
from vega_datasets import data

In [71]:
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



                                                                                

In [66]:
# c to f: (0 C x 9/5) +32 = 32 F

In [73]:
weather = weather.withColumn(
    'temp_max', (F.col('temp_max') * 9/5 + 32)
).withColumn('temp_min', (F.col('temp_min') * 9/5 + 32))

weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|   55.04|    41.0| 4.7|drizzle|
|2012-01-02|         10.9|   51.08|   37.04| 4.5|   rain|
|2012-01-03|          0.8|   53.06|   44.96| 2.3|   rain|
|2012-01-04|         20.3|   53.96|   42.08| 4.7|   rain|
|2012-01-05|          1.3|   48.02|   37.04| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [74]:
weather.withColumn(
    'month', F.month(weather.date)
).groupby(
    F.col('month')
).agg(
    F.mean(
        weather.precipitation
    ).alias('avg_rainfall')
).sort(
    F.col('avg_rainfall').desc()
).show()

[Stage 82:>                                                         (0 + 8) / 8]

+-----+-------------------+
|month|       avg_rainfall|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548389|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



                                                                                

In [75]:
weather.withColumn(
    'year', F.year(weather.date)
).groupby(
    F.col('year')
).agg(
    F.mean(
        weather.wind
    ).alias('avg_wind')
).sort(
    F.col('avg_wind').desc()
).show()

[Stage 85:>                                                         (0 + 8) / 8]

+----+------------------+
|year|          avg_wind|
+----+------------------+
|2012| 3.400819672131148|
|2014| 3.387671232876714|
|2015| 3.159726027397261|
|2013|3.0158904109589058|
+----+------------------+



                                                                                

In [76]:
weather.filter(
    F.month(weather.date) == 1
).groupby(
    weather.weather
).count().sort(
    F.col('count').desc()
).show()

[Stage 88:>                                                         (0 + 8) / 8]

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



                                                                                

In [77]:
(
    weather.filter(F.month('date') == 7)
    .filter(F.year('date') > 2012)
    .filter(F.year('date') < 2015)
    .filter(F.col('weather') == F.lit('sun'))
    .agg(
        F.avg('temp_max').alias('average_high_temp'),
        F.avg('temp_min').alias('average_low_temp'),
    ).show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



In [78]:
(
    weather.filter(F.year('date') == 2015)
    .filter(F.quarter('date') == 3)
    .select(F.when(F.col('weather') == 'rain', 1).otherwise(0).alias('rain'))
    .agg(F.mean('rain'))
    .show()
)

[Stage 94:>                                                         (0 + 8) / 8]

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



                                                                                