In [3]:
# imports 
import pandas as pd
import numpy as np
import pyspark 
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import when
from pyspark.sql.functions import asc, desc
from vega_datasets import data
from pyspark.sql.functions import month, year, quarter
import pyspark.sql.functions as F


#### Create a spark data frame that contains your favorite programming languages.


In [28]:
from pyspark.sql import SparkSession

# Establish spark session 
spark = SparkSession.builder.getOrCreate()

# Create a spark dataframe 
languages = spark.sparkContext.parallelize(
    [
        (1, "python"),
        (2, "C++"),
        (3, "Ruby"),
        (4, "Julia"),
        (5, "Java"),
        (6, "Rust"),
        (7, "SQL"),
    ]
)
df = languages.toDF(["id", "language"])

In [29]:
df.show()

+---+--------+
| id|language|
+---+--------+
|  1|  python|
|  2|     C++|
|  3|    Ruby|
|  4|   Julia|
|  5|    Java|
|  6|    Rust|
|  7|     SQL|
+---+--------+



- The name of the column should be language

In [40]:
df.columns[1]

'language'

- View the schema of the dataframe


In [36]:
df.schema

StructType([StructField('id', LongType(), True), StructField('language', StringType(), True)])

- Output the shape of the dataframe


In [37]:
print((df.count(), len(df.columns)))

(7, 2)


- Show the first 5 records in the dataframe


In [179]:
df.select('language').show(5)

+--------+
|language|
+--------+
|  python|
|     C++|
|    Ruby|
|   Julia|
|    Java|
+--------+
only showing top 5 rows



#### Load the mpg dataset as a spark dataframe.


In [41]:
mpg_data = pd.read_csv('mpg.csv')
mpg = spark.createDataFrame(mpg_data)
mpg.show(5)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|Unnamed: 0|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|         1|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|         2|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|         3|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|         4|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|         5|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows




- Create 1 column of output that contains a message like the one below: (For each vehicle.)
    - The 1999 audi a4 has a 4 cylinder engine.



In [None]:
# order
# year\manufactuer\model\string literal\cyl\string literal

In [52]:
mpg.select(
    F.concat(
        lit("The "),
        mpg.year,
        lit(" "),
        mpg.manufacturer,
        lit(" "),
        mpg.model,
        lit(" has a "),
        mpg.cyl,
        lit(" cylinder engine"),
    )
).show(10, truncate=False)

+-----------------------------------------------------------------------------+
|concat(The , year,  , manufacturer,  , model,  has a , cyl,  cylinder engine)|
+-----------------------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 2008 audi a4 has a 4 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 has a 6 cylinder engine                                     |
|The 2008 audi a4 has a 6 cylinder engine                                     |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 1999 audi a4 quattro has a 4 cylinder engine                             |
|The 2008 audi a4 quattro has a 4 cylind

- Transform the trans column so that it only contains either manual or auto.

In [57]:
mpg.where(mpg["trans"].contains("auto")).show(3)


+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|Unnamed: 0|manufacturer|model|displ|year|cyl|   trans|drv|cty|hwy| fl|  class|
+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|         1|        audi|   a4|  1.8|1999|  4|auto(l5)|  f| 18| 29|  p|compact|
|         4|        audi|   a4|  2.0|2008|  4|auto(av)|  f| 21| 30|  p|compact|
|         5|        audi|   a4|  2.8|1999|  6|auto(l5)|  f| 16| 26|  p|compact|
+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
only showing top 3 rows



In [182]:
mpg.where(mpg["trans"].startswith("auto")).show(3)

+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|Unnamed: 0|manufacturer|model|displ|year|cyl|   trans|drv|cty|hwy| fl|  class|
+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
|         1|        audi|   a4|  1.8|1999|  4|auto(l5)|  f| 18| 29|  p|compact|
|         4|        audi|   a4|  2.0|2008|  4|auto(av)|  f| 21| 30|  p|compact|
|         5|        audi|   a4|  2.8|1999|  6|auto(l5)|  f| 16| 26|  p|compact|
+----------+------------+-----+-----+----+---+--------+---+---+---+---+-------+
only showing top 3 rows



In [60]:
mpg.where(mpg["trans"].contains("manual")).show(3)

+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|Unnamed: 0|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|         2|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|         3|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|         6|        audi|   a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
+----------+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [185]:
mpg.where(mpg["trans"].contains("manual").otherwise('auto')).alias('transsystem').show()

IllegalArgumentException: otherwise() can only be applied on a Column previously generated by when()

#### Load the tips dataset as a spark dataframe.

In [42]:
tip_data = pd.read_csv('tips.csv')
tips = spark.createDataFrame(tip_data)
tips.show(5)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+----------+----------+----+------+------+---+------+----+
|Unnamed: 0|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----------+----+------+------+---+------+----+
|         1|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|         2|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|         3|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|         4|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|         5|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----------+----+------+------+---+------+----+
only showing top 5 rows




- What percentage of observations are smokers?


In [186]:
count_no = tips.filter(tips.smoker == 'No').count()
count_yes = tips.filter(tips.smoker == 'Yes').count()

total_population = count_no + count_yes

percent_of_smokers = round(count_yes / (total_population) * 100, 2)
percent_of_non_smokers = round(count_no / (total_population) * 100, 2)

print(f'Percentage of smokers -> {percent_of_smokers}')
print(f'Percentage of NON smokers -> {percent_of_non_smokers}')

Percentage of smokers -> 38.11
Percentage of NON smokers -> 61.89


- Create a column that contains the tip percentage


In [87]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).show(5)

+------------+-----+-------+
|customer_tip|total|avg_tip|
+------------+-----+-------+
|        1.01|16.99|   5.94|
|        1.66|10.34|  16.05|
|         3.5|21.01|  16.66|
|        3.31|23.68|  13.98|
|        3.61|24.59|  14.68|
+------------+-----+-------+
only showing top 5 rows



- Calculate the average tip percentage for each combination of sex and smoker.


In [188]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).groupBy('sex', 'smoker').agg(F.mean(avg_column)).show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `sex` cannot be resolved. Did you mean one of the following? [`total`, `avg_tip`, `customer_tip`].;
'Aggregate ['sex, 'smoker], ['sex, 'smoker, avg(round((('tip / 'total_bill) * 100), 2)) AS avg(round(((tip / total_bill) * 100), 2))#3181]
+- Project [tip#258 AS customer_tip#3171, total_bill#257 AS total#3172, round(((tip#258 / total_bill#257) * cast(100 as double)), 2) AS avg_tip#3173]
   +- LogicalRDD [Unnamed: 0#256L, total_bill#257, tip#258, sex#259, smoker#260, day#261, time#262, size#263L], false


In [89]:
tips_smoker = tips.where(tips["smoker"] == 'Yes')
tips_non_smoker = tips.where(tips["smoker"] == 'No')
tips_male = tips.where(tips["sex"] == 'Male')
tips_female = tips.where(tips["sex"] == 'Female')

In [90]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips_smoker.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).show(5)

+------------+-----+-------+
|customer_tip|total|avg_tip|
+------------+-----+-------+
|         3.0|38.01|   7.89|
|        1.76|11.24|  15.66|
|        3.21|20.29|  15.82|
|         2.0|13.81|  14.48|
|        1.98|11.02|  17.97|
+------------+-----+-------+
only showing top 5 rows



In [91]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips_non_smoker.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).show(5)

+------------+-----+-------+
|customer_tip|total|avg_tip|
+------------+-----+-------+
|        1.01|16.99|   5.94|
|        1.66|10.34|  16.05|
|         3.5|21.01|  16.66|
|        3.31|23.68|  13.98|
|        3.61|24.59|  14.68|
+------------+-----+-------+
only showing top 5 rows



In [92]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips_male.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).show(5)

+------------+-----+-------+
|customer_tip|total|avg_tip|
+------------+-----+-------+
|        1.66|10.34|  16.05|
|         3.5|21.01|  16.66|
|        3.31|23.68|  13.98|
|        4.71|25.29|  18.62|
|         2.0| 8.77|  22.81|
+------------+-----+-------+
only showing top 5 rows



In [93]:
avg_column = F.round((col('tip') / col("total_bill")) * 100, 2)

tips_female.select(
    col("tip").alias("customer_tip"),
    tips.total_bill.alias("total"),
    avg_column.alias("avg_tip"),
).show(5)

+------------+-----+-------+
|customer_tip|total|avg_tip|
+------------+-----+-------+
|        1.01|16.99|   5.94|
|        3.61|24.59|  14.68|
|         5.0|35.26|  14.18|
|        3.02|14.83|  20.36|
|        1.67|10.33|  16.17|
+------------+-----+-------+
only showing top 5 rows



#### Use the Seattle weather dataset referenced in the lesson to answer the questions below.


In [95]:
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



- Convert the temperatures to fahrenheit.


In [107]:
weather = weather.withColumn("max_temp_in_fahrenheit", F.round((F.col("temp_max") * 9/5) + 32, 2))

weather.select(
    F.col("temp_max").alias("Max temp in Celsius"),
    F.col("max_temp_in_fahrenheit").alias("Max temp in Fahrenheit")
).show(5)

+-------------------+----------------------+
|Max temp in Celsius|Max temp in Fahrenheit|
+-------------------+----------------------+
|               12.8|                 55.04|
|               10.6|                 51.08|
|               11.7|                 53.06|
|               12.2|                 53.96|
|                8.9|                 48.02|
+-------------------+----------------------+
only showing top 5 rows



In [108]:
weather = weather.withColumn("min_temp_in_fahrenheit", F.round((F.col("temp_min") * 9/5) + 32, 2))

weather.select(
    F.col("temp_min").alias("Min temp in Celsius"),
    F.col("min_temp_in_fahrenheit").alias("Min temp in Fahrenheit")
).show(5)

+-------------------+----------------------+
|Min temp in Celsius|Min temp in Fahrenheit|
+-------------------+----------------------+
|                5.0|                  41.0|
|                2.8|                 37.04|
|                7.2|                 44.96|
|                5.6|                 42.08|
|                2.8|                 37.04|
+-------------------+----------------------+
only showing top 5 rows



- Which month has the most rain, on average?


In [122]:
weather_rain = weather.where(weather["weather"] == 'rain')
weather_rain.show(3)

+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|      date|precipitation|temp_max|temp_min|wind|weather|max_temp_in_fahrenheit|min_temp_in_fahrenheit|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|                 51.08|                 37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|                 53.06|                 44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|                 53.96|                 42.08|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
only showing top 3 rows



In [121]:
weather_rain.show(3)

+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|      date|precipitation|temp_max|temp_min|wind|weather|max_temp_in_fahrenheit|min_temp_in_fahrenheit|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|                 51.08|                 37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|                 53.06|                 44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|                 53.96|                 42.08|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
only showing top 3 rows



In [120]:
weather_rain.sort(weather_rain.precipitation.desc()).show(1)

+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|      date|precipitation|temp_max|temp_min|wind|weather|max_temp_in_fahrenheit|min_temp_in_fahrenheit|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
|2012-11-19|         54.1|    13.3|     8.3| 6.0|   rain|                 55.94|                 46.94|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+
only showing top 1 row



- Which year was the windiest?


In [128]:
weather.select('wind').show(2)

+----+
|wind|
+----+
| 4.7|
| 4.5|
+----+
only showing top 2 rows



- What is the most frequent type of weather in January?


In [193]:
weather_df = weather.withColumn('year', F.year(F.col('date'))).withColumn('month', F.month(F.col('date')))

weather_jan = weather_df.where(weather_df["month"] == 1)

weather_jan.groupBy("weather").count().show()




+-------+-----+
|weather|count|
+-------+-----+
|drizzle|   10|
|   rain|   35|
|    sun|   33|
|   snow|    8|
|    fog|   38|
+-------+-----+



                                                                                

- What is the average high and low temperature on sunny days in July in 2013 and 2014?


In [153]:
weather_sunny = weather_df.where(weather["weather"] == 'sun').show(3)

+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|max_temp_in_fahrenheit|min_temp_in_fahrenheit|year|month|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|                  50.0|                 37.04|2012|    1|
|2012-01-11|          0.0|     6.1|    -1.1| 5.1|    sun|                 42.98|                 30.02|2012|    1|
|2012-01-12|          0.0|     6.1|    -1.7| 1.9|    sun|                 42.98|                 28.94|2012|    1|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
only showing top 3 rows



In [176]:
weather_sunny.filter(weather_sunny['year'] == 2013).show()

AttributeError: 'NoneType' object has no attribute 'filter'

- What percentage of days were rainy in Q3 of 2015?


In [177]:
weather_rainy = weather_df.where(weather_df["weather"] == 'rain').show(3)

+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|max_temp_in_fahrenheit|min_temp_in_fahrenheit|year|month|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|                 51.08|                 37.04|2012|    1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|                 53.06|                 44.96|2012|    1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|                 53.96|                 42.08|2012|    1|
+----------+-------------+--------+--------+----+-------+----------------------+----------------------+----+-----+
only showing top 3 rows



In [178]:
rainy_year_2012 = weather_rainy.filter(weather_rainy["year"] == 2015)


AttributeError: 'NoneType' object has no attribute 'filter'

- For each year, find what percentage of days it rained (had non-zero precipitation).

In [160]:
value_counts = weather_df.groupBy("year").count().show()



+----+-----+
|year|count|
+----+-----+
|2012|  366|
|2013|  365|
|2014|  365|
|2015|  365|
+----+-----+



                                                                                

In [161]:
year_2012 = weather_df.filter(weather_df["year"] == 2012)
year_2013 = weather_df.filter(weather_df["year"] == 2013)
year_2014 = weather_df.filter(weather_df["year"] == 2014)
year_2015 = weather_df.filter(weather_df["year"] == 2015)

In [168]:
count_2012 = year_2012.where(year_2012['precipitation'] == 0).count()
count_2013 = year_2013.where(year_2013['precipitation'] == 0).count()
count_2014 = year_2014.where(year_2014['precipitation'] == 0).count()
count_2015 = year_2015.where(year_2015['precipitation'] == 0).count()

percent_2012 = round(count_2012 / year_2012.count(), 2)
percent_2013 = round(count_2013 / year_2013.count(), 2)
percent_2014 = round(count_2014 / year_2014.count(), 2)
percent_2015 = round(count_2015 / year_2015.count(), 2)

print(f'percent of 2012 -> {percent_2012}')
print(f'percent of 2013 -> {percent_2013}')
print(f'percent of 2014 -> {percent_2014}')
print(f'percent of 2015 -> {percent_2015}')

percent of 2012 -> 0.52
percent of 2013 -> 0.58
percent of 2014 -> 0.59
percent of 2015 -> 0.61
