In [1]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col,format_number, concat,count, lit, when, regexp_replace, regexp_extract, countDistinct, sum, round,first, avg, month, year
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pydataset import data as pydata
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
import seaborn as sns

## Create a spark data frame that contains your favorite programming languages.

In [2]:
spark = SparkSession.builder \
    .appName("FavoriteProgrammingLanguages") \
    .getOrCreate()


data = [("Python",),
        ("Java",),
        ("JavaScript",),
        ("C++",),
        ("SQL",),
        ("C",),
        ("R",)]

columns = ["language"]
df = spark.createDataFrame(data, columns)

df.printSchema()

num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape of the DataFrame: {num_rows} rows, {num_cols} columns")

df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/22 12:40:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/22 12:40:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


root
 |-- language: string (nullable = true)



                                                                                

Shape of the DataFrame: 7 rows, 1 columns
+----------+
|  language|
+----------+
|    Python|
|      Java|
|JavaScript|
|       C++|
|       SQL|
+----------+
only showing top 5 rows



## The name of the column should be language

In [3]:
df.select('language').show()

+----------+
|  language|
+----------+
|    Python|
|      Java|
|JavaScript|
|       C++|
|       SQL|
|         C|
|         R|
+----------+



## View the schema of the dataframe

In [4]:
df.printSchema()


root
 |-- language: string (nullable = true)



## Output the shape of the dataframe

In [5]:
num_rows = df.count()
num_cols = len(df.columns)

In [6]:
print(f'({num_cols},{num_rows})')

(1,7)


## Show the first 5 records in the dataframe

In [7]:
df.select('language').show(5)

+----------+
|  language|
+----------+
|    Python|
|      Java|
|JavaScript|
|       C++|
|       SQL|
+----------+
only showing top 5 rows



## Load the mpg dataset as a spark dataframe.

In [8]:
mpg = pydata('mpg')

## Create 1 column of output that contains a message like the one below:

#### "The 1999 audi a4 has a 4 cylinder engine."

### For each vehicle.

In [9]:
spark_mpg_df = spark.createDataFrame(mpg)

spark_mpg_df = spark_mpg_df.withColumn(
    "message",
    concat(
        lit("The "),
        col("manufacturer"), lit(" "),
        col("model"), lit(" has a "),
        col("cyl"), lit(" cylinder engine.")
    )
)

spark_mpg_df.show(truncate=False)




+------------+------------------+-----+----+---+----------+---+---+---+---+-------+---------------------------------------------------------+
|manufacturer|model             |displ|year|cyl|trans     |drv|cty|hwy|fl |class  |message                                                  |
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+---------------------------------------------------------+
|audi        |a4                |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The audi a4 has a 4 cylinder engine.                     |
|audi        |a4                |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The audi a4 has a 4 cylinder engine.                     |
|audi        |a4                |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The audi a4 has a 4 cylinder engine.                     |
|audi        |a4                |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The audi a4 has a 4 cylinder engine.                     |
|audi 

## Transform the trans column so that it only contains either manual or auto.

In [10]:
transformed_df = spark_mpg_df.withColumn(
    "trans",
    regexp_extract(col("trans"), r"(\w+)", 1)
)


In [11]:
transformed_df.show()

+------------+------------------+-----+----+---+------+---+---+---+---+-------+--------------------+
|manufacturer|             model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|             message|
+------------+------------------+-----+----+---+------+---+---+---+---+-------+--------------------+
|        audi|                a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|The audi a4 has a...|
|        audi|                a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|The audi a4 has a...|
|        audi|                a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|The audi a4 has a...|
|        audi|                a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|The audi a4 has a...|
|        audi|                a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|The audi a4 has a...|
|        audi|                a4|  2.8|1999|  6|manual|  f| 18| 26|  p|compact|The audi a4 has a...|
|        audi|                a4|  3.1|2008|  6|  auto|  f| 18| 27|  p|compact|The audi a4 

## Load the tips dataset as a spark dataframe.

In [12]:
tips = pydata('tips')

## What percentage of observations are smokers?

In [13]:
tips_df = spark.createDataFrame(tips)

In [14]:
tips_df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [15]:
smoker_counts = tips_df.groupBy('smoker').count().show()

[Stage 18:>                                                         (0 + 8) / 8]

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



                                                                                

In [16]:
total_count = tips_df.count()

In [17]:
total_count

244

In [18]:
smoker_count = tips_df.filter(col("smoker") == "Yes").count()

smoker_percentage = (smoker_count / total_count) * 100

In [19]:
smoker_percentage

38.114754098360656

## Create a column that contains the tip percentage

In [20]:
tips_df = tips_df.withColumn(
     "tip_percentage",
    round((col("tip") / col("total_bill")) * 100,2)
)



In [21]:
tips_df.show(5)

+----------+----+------+------+---+------+----+--------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_percentage|
+----------+----+------+------+---+------+----+--------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|          5.94|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|         16.05|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|         16.66|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|         13.98|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|         14.68|
+----------+----+------+------+---+------+----+--------------+
only showing top 5 rows



## Calculate the average tip percentage for each combination of sex and smoker.

In [22]:
average_tip_percentage_df = tips_df.groupBy("sex", "smoker").agg(avg("tip_percentage").alias("avg_tip_percentage"))

average_tip_percentage_df.show(truncate=False)

+------+------+------------------+
|sex   |smoker|avg_tip_percentage|
+------+------+------------------+
|Male  |No    |16.06659793814433 |
|Female|No    |15.69111111111111 |
|Male  |Yes   |15.276666666666667|
|Female|Yes   |18.214545454545455|
+------+------+------------------+



                                                                                

## Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [23]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)

In [24]:
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



## Convert the temperatures to fahrenheit.

In [25]:
weather = weather.withColumn(
     "f_max",
    round((col("temp_max") * 1.8 + 32),2)
)


In [26]:
weather = weather.withColumn(
     "f_min",
    round((col("temp_min") * 1.8 + 32),2)
)


In [27]:
weather.show(5)

+----------+-------------+--------+--------+----+-------+-----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|f_max|f_min|
+----------+-------------+--------+--------+----+-------+-----+-----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.04| 41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|51.08|37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|53.06|44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|53.96|42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.02|37.04|
+----------+-------------+--------+--------+----+-------+-----+-----+
only showing top 5 rows



## Which month has the most rain, on average?

In [28]:
average_rain_by_month_df = weather.groupBy(month("date").alias("month")) \
    .agg(avg("precipitation").alias("avg_precipitation"))


most_rain_month = average_rain_by_month_df.orderBy(col("avg_precipitation").desc()).first()


In [29]:
average_rain_by_month_df.show(13)

+-----+-------------------+
|month|  avg_precipitation|
+-----+-------------------+
|    1| 3.7580645161290316|
|    6| 1.1075000000000002|
|    3|  4.888709677419355|
|    5| 1.6733870967741935|
|    4|  3.128333333333333|
|    2|  3.734513274336283|
|   12|  5.021774193548389|
|    9| 1.9624999999999997|
|    8| 1.3201612903225806|
|    7|0.38870967741935486|
|   10|  4.059677419354839|
|   11|  5.354166666666667|
+-----+-------------------+



In [30]:
most_rain_month

Row(month=11, avg_precipitation=5.354166666666667)

## Which year was the windiest?

In [31]:
average_wind_by_year_df = weather.groupBy(year("date").alias("year")) \
    .agg(avg("wind").alias("avg_wind"))

In [32]:
most_windy_month = average_wind_by_year_df.orderBy(col("avg_wind").desc()).first()

In [33]:
average_wind_by_year_df.show()

+----+------------------+
|year|          avg_wind|
+----+------------------+
|2012| 3.400819672131148|
|2013|3.0158904109589058|
|2014| 3.387671232876714|
|2015| 3.159726027397261|
+----+------------------+



In [34]:
most_windy_month

Row(year=2012, avg_wind=3.400819672131148)

## What is the most frequent type of weather in January?

In [35]:
january_weather_df = weather.filter(month('date') == 1)

In [36]:
january_weather_counts = weather.groupBy('weather') \
    .count()


most_frequent_weather_january = january_weather_counts.orderBy(col("count").desc()).first()



In [37]:
january_weather_df.show()

+----------+-------------+--------+--------+----+-------+-----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|f_max|f_min|
+----------+-------------+--------+--------+----+-------+-----+-----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.04| 41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|51.08|37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|53.06|44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|53.96|42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.02|37.04|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|39.92|35.96|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|44.96|37.04|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun| 50.0|37.04|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|48.92| 41.0|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|42.98|33.08|
|2012-01-11|          0.0|     6.1|    -1.1| 5.1|    sun|42.98|30.02|
|2012-01-12|        

In [38]:
january_weather_counts.show()

+-------+-----+
|weather|count|
+-------+-----+
|drizzle|   54|
|   rain|  259|
|    sun|  714|
|   snow|   23|
|    fog|  411|
+-------+-----+



## What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [39]:
july_2013_2014_df = weather.filter((month('date') == 7) & (year('date').isin([2013, 2014])) & (col('weather') == 'sun')
)

In [40]:
july_2013_2014_df.show(5)

+----------+-------------+--------+--------+----+-------+-----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|f_max|f_min|
+----------+-------------+--------+--------+----+-------+-----+-----+
|2013-07-01|          0.0|    31.7|    18.3| 2.3|    sun|89.06|64.94|
|2013-07-02|          0.0|    28.3|    15.6| 3.0|    sun|82.94|60.08|
|2013-07-03|          0.0|    26.1|    16.7| 3.2|    sun|78.98|62.06|
|2013-07-05|          0.0|    23.3|    13.9| 2.6|    sun|73.94|57.02|
|2013-07-06|          0.0|    26.1|    13.3| 2.2|    sun|78.98|55.94|
+----------+-------------+--------+--------+----+-------+-----+-----+
only showing top 5 rows



In [41]:
average_high_temp_df = july_2013_2014_df.groupBy(year('date').alias("year")) \
    .agg(avg("temp_max").alias("avg_high_temp"))

In [42]:
average_high_temp_df.show()

+----+------------------+
|year|     avg_high_temp|
+----+------------------+
|2013|26.585185185185193|
|2014|            27.092|
+----+------------------+



## What percentage of days were rainy in q3 of 2015?

In [43]:
q3_df = weather.filter((year('date') == 2015) & (month('date').isin([9,10,11,12])))


In [44]:
q3_df.show()

+----------+-------------+--------+--------+----+-------+-----+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|f_max|f_min|
+----------+-------------+--------+--------+----+-------+-----+-----+
|2015-09-01|          5.8|    19.4|    13.9| 5.0|    fog|66.92|57.02|
|2015-09-02|          0.0|    19.4|    11.1| 3.8|    sun|66.92|51.98|
|2015-09-03|          0.0|    18.3|    10.6| 2.9|    sun|64.94|51.08|
|2015-09-04|          0.0|    18.3|    10.0| 2.9|    sun|64.94| 50.0|
|2015-09-05|          0.3|    20.6|     8.9| 3.5|    sun|69.08|48.02|
|2015-09-06|          5.3|    16.1|    11.7| 2.4|    fog|60.98|53.06|
|2015-09-07|          0.3|    21.1|    13.3| 1.5|    fog|69.98|55.94|
|2015-09-08|          0.0|    22.8|    13.3| 2.4|    sun|73.04|55.94|
|2015-09-09|          0.0|    24.4|    13.9| 3.3|    sun|75.92|57.02|
|2015-09-10|          0.0|    25.0|    14.4| 3.6|    fog| 77.0|57.92|
|2015-09-11|          0.0|    27.2|    15.0| 3.1|    sun|80.96| 59.0|
|2015-09-12|        

In [45]:
total_count = q3_df.count()

In [46]:
total_count

122

In [47]:
rain_count = q3_df.filter(col("weather") == "rain").count()

q3_rain_percentage = rain_count / total_count * 100

In [48]:
q3_rain_percentage

0.819672131147541

## For each year, find what percentage of days it rained (had non-zero precipitation).

In [49]:
rainy_days_df = weather.groupBy(year('date').alias('year')) \
    .agg(sum(when(weather.precipitation > 0, 1).otherwise(0)).alias('rainy_days'))

In [50]:
rainy_days_df.show(2)

+----+----------+
|year|rainy_days|
+----+----------+
|2012|       177|
|2013|       152|
+----+----------+
only showing top 2 rows



In [51]:
total_days_df = weather.groupBy(year('date').alias('year')) \
    .agg(count('*').alias('total_days'))

In [52]:
total_days_df.show(5)

+----+----------+
|year|total_days|
+----+----------+
|2012|       366|
|2013|       365|
|2014|       365|
|2015|       365|
+----+----------+



In [53]:
joined_df = rainy_days_df.join(total_days_df, 'year', 'inner')

In [54]:
joined_df

DataFrame[year: int, rainy_days: bigint, total_days: bigint]

In [55]:
percentage_rainy_days_df = joined_df.withColumn(
    'rainy_days_percentage', round((joined_df.rainy_days / joined_df.total_days) * 100, 2)
)

In [56]:
percentage_rainy_days_df.show(truncate=False)

+----+----------+----------+---------------------+
|year|rainy_days|total_days|rainy_days_percentage|
+----+----------+----------+---------------------+
|2012|177       |366       |48.36                |
|2013|152       |365       |41.64                |
|2014|150       |365       |41.1                 |
|2015|144       |365       |39.45                |
+----+----------+----------+---------------------+

