In [424]:
import pandas as pd
import pyspark
import numpy as np
from pydataset import data
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, expr

## *1.) Create a spark data frame that contains your favorite programming languages.*

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [425]:
pandas_dataframe = pd.DataFrame(dict(n=np.arange(20), language=np.random.choice(list(['python', 'r', 'java', 'javascript', 'C++']), 20)))
pandas_dataframe.drop(columns = 'n', inplace = True)
df = spark.createDataFrame(pandas_dataframe)
df.show()

+----------+
|  language|
+----------+
|      java|
|      java|
|       C++|
|       C++|
|javascript|
|      java|
|      java|
|javascript|
|    python|
|      java|
|javascript|
|      java|
|         r|
|       C++|
|         r|
|    python|
|       C++|
|      java|
|    python|
|       C++|
+----------+



In [426]:
(df.count(), len(df.columns))

(20, 1)

In [427]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [428]:
df.show(5)

+----------+
|  language|
+----------+
|      java|
|      java|
|       C++|
|       C++|
|javascript|
+----------+
only showing top 5 rows



## *2.) Load the mpg dataset as a spark dataframe.*

- Create 1 column of output that contains a message like the one below:
    - The 1999 audi a4 has a 4 cylinder engine. For each vehicle.

- Transform the trans column so that it only contains either manual or auto.

In [429]:
pandas_df = data('mpg')

In [430]:
df = spark.createDataFrame(pandas_df)
df.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [431]:
df.select(concat(lit('The '), df.year,lit(' '),df.manufacturer,lit(' '), df.model,lit(' has a '), df.cyl, lit(' cylinder engine.')).alias('car')).show(truncate = False)

+--------------------------------------------------------------+
|car                                                           |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [433]:
# multiple ways to do this
df.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        df.trans.like("auto%"), "auto"
    ).otherwise("manual").alias("when + like")
).show()

NameError: name 'regexp_extract' is not defined

## *3.) Load the tips dataset as a spark dataframe.*

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [435]:
tips = data('tips')
tips_df = spark.createDataFrame(tips)

In [436]:
tips_df.describe()

DataFrame[summary: string, total_bill: string, tip: string, sex: string, smoker: string, day: string, time: string, size: string]

In [437]:
tips_df.select(tips_df.smoker).count()

244

In [438]:
#percentage of obseervations are smokers
round((tips_df.filter(tips_df.smoker == 'Yes').count() / tips_df.select(tips_df.smoker).count()) *100,2)

38.11

In [439]:
#have to use cast to change dtype

tips_df.select(tips_df.tip.cast("float")).printSchema()

tips_df.select(tips_df.total_bill.cast("float")).printSchema()

root
 |-- tip: float (nullable = true)

root
 |-- total_bill: float (nullable = true)



In [440]:
col = tips_df.select((tips_df.tip / tips_df.total_bill).alias('tip_percentage'))

In [441]:
avg_column = (tips_df.tip / tips_df.total_bill)


In [442]:
# tip percentage for combination of sex and smoker.
tips_df.select(tips_df.sex, tips_df.smoker,avg_column).show(truncate = False)

+------+------+-------------------+
|sex   |smoker|(tip / total_bill) |
+------+------+-------------------+
|Female|No    |0.05944673337257211|
|Male  |No    |0.16054158607350097|
|Male  |No    |0.16658733936220846|
|Male  |No    |0.1397804054054054 |
|Female|No    |0.14680764538430255|
|Male  |No    |0.18623962040332148|
|Male  |No    |0.22805017103762829|
|Male  |No    |0.11607142857142858|
|Male  |No    |0.13031914893617022|
|Male  |No    |0.2185385656292287 |
|Male  |No    |0.1665043816942551 |
|Female|No    |0.14180374361883155|
|Male  |No    |0.10181582360570687|
|Male  |No    |0.16277807921866522|
|Female|No    |0.20364126770060686|
|Male  |No    |0.18164967562557924|
|Female|No    |0.1616650532429816 |
|Male  |No    |0.22774708410067526|
|Female|No    |0.20624631703005306|
|Male  |No    |0.16222760290556903|
+------+------+-------------------+
only showing top 20 rows



In [446]:
tips_df.groupBy("smoker").count().withColumn(
    "percent",
    concat(round((col("count") / tips_df.count() * 100), 0).cast("int"), lit("%")),
).show()

TypeError: 'DataFrame' object is not callable

*# 4.) Use the seattle weather dataset referenced in the lesson to answer the questions below.*

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
What percentage of days were rainy in q3 of 2015?
For each year, find what percentage of days it rained (had non-zero precipitation).

In [447]:
from vega_datasets import data
from pyspark.sql.functions import month, year, quarter

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(weather)
df.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [448]:
df.select(df.temp_max.cast("float")).printSchema

df.select(df.temp_min.cast("float")).printSchema()

root
 |-- temp_min: float (nullable = true)



In [449]:
avg_column = (tips_df.tip / tips_df.total_bill)

In [268]:
#converting to degrees f
temp_max_f = ((df.temp_max * 9/5) + 32).alias('temp_max_f')
temp_min_f = ((df.temp_min * 9/5) + 32).alias('temp_min_f')

In [328]:
df2 = df.select(df.date, df.precipitation,temp_max_f, temp_min_f, df.wind, df.weather)

In [337]:
df2.show()

+----------+-------------+------------------+------------------+----+-------+
|      date|precipitation|        temp_max_f|        temp_min_f|wind|weather|
+----------+-------------+------------------+------------------+----+-------+
|2012-01-01|          0.0|             55.04|              41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05|          1.3|             48.02|             37.04| 6.1|   rain|
|2012-01-06|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09|          4.3|             48.92|              41.0| 3.4|   rain|
|2012-01-10|          1.0|42.980000000000004|             33.08|

In [270]:
#most rain was month 11
(
    df.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+



In [348]:
#2012 was the windiest
df.withColumn("year", year('date')).groupBy("year").agg(sum("wind").alias("windspeed")).sort("year").show()

+----+------------------+
|year|         windspeed|
+----+------------------+
|2012|1244.6999999999998|
|2013|1100.8000000000002|
|2014|1236.5000000000005|
|2015|            1153.3|
+----+------------------+



In [314]:
#most freuqent type of weather for january
df.withColumn("month", month("date")).groupby('weather').agg(count("weather")).show()

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|    fog|           411|
|drizzle|            54|
|   rain|           259|
|    sun|           714|
|   snow|            23|
+-------+--------------+



In [321]:
#most freuqent type of weather for january is fog
df.filter(month("date") == 1).withColumn('month', month('date')).groupby('month', 'weather').agg(count("weather")).show(truncate = False)

+-----+-------+--------------+
|month|weather|count(weather)|
+-----+-------+--------------+
|1    |drizzle|10            |
|1    |sun    |33            |
|1    |snow   |8             |
|1    |rain   |35            |
|1    |fog    |38            |
+-----+-------+--------------+



In [341]:
df2.printSchema()

root
 |-- date: string (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- temp_max_f: double (nullable = true)
 |-- temp_min_f: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- weather: string (nullable = true)



In [361]:
#What is the average high and low temperature on sunny days in July in 2013 and 2014?
df2.filter(df2.weather == 'sun').withColumn('year', year('date')).groupby('year').agg(count('weather')).show()

+----+--------------+
|year|count(weather)|
+----+--------------+
|2015|           180|
|2013|           205|
|2014|           211|
|2012|           118|
+----+--------------+



In [364]:
df2.show()

+----------+-------------+------------------+------------------+----+-------+
|      date|precipitation|        temp_max_f|        temp_min_f|wind|weather|
+----------+-------------+------------------+------------------+----+-------+
|2012-01-01|          0.0|             55.04|              41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05|          1.3|             48.02|             37.04| 6.1|   rain|
|2012-01-06|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09|          4.3|             48.92|              41.0| 3.4|   rain|
|2012-01-10|          1.0|42.980000000000004|             33.08|

In [372]:
#What percentage of days were rainy in q3 of 2015?#filter weather filter quarter
df2.filter(year('date') == 2015).withColumn("quarter", quarter("date")).filter(quarter('date') ==3).filter(df.weather == 'rain').agg(count('weather')).show()

+--------------+
|count(weather)|
+--------------+
|             2|
+--------------+



In [373]:
df2.filter(year('date') == 2015).withColumn("quarter", quarter("date")).filter(quarter('date') ==3).filter(df.weather != 'rain').agg(count('weather')).show()

+--------------+
|count(weather)|
+--------------+
|            90|
+--------------+



In [375]:
#2 percent of days in 2015 were rainy
2/90 * 100

2.2222222222222223

In [394]:
from pyspark.sql.functions import when

In [415]:
#For each year, find what percentage of days it rained (had non-zero precipitation).
df2.withColumn("year", year("date")).select(when(df2.weather != 'rain', 0).otherwise(1).alias('rain'), 'year').groupby("year").agg(mean("rain")).show()

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015| 0.0136986301369863|
|2013| 0.1643835616438356|
|2014|0.00821917808219178|
|2012| 0.5218579234972678|
+----+-------------------+



In [413]:
# measure a rainy day by precipitation > 0
(df2.withColumn("year", year("date")).select(when(df2.precipitation > 0, 1).otherwise(0).alias("rain"), "year").groupby("year").agg(mean("rain")).show())

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

