In [1]:
## ignoring warning mesasges
import warnings

warnings.filterwarnings("ignore")

# SPARK101 EXERCISES

## 1. **Create a spark data frame that contains your favorite programming languages.**

In [2]:
import pyspark

In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/26 09:12:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

* The name of the column should be language  

In [5]:
# created a spark session
spark = pyspark.sql.SparkSession.builder.appName("ProgrammingLanguages").getOrCreate()

23/11/26 09:12:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
# Created a list of my favorite programming languages
fav_languages = ["Python", "C++", "Java", "C#", "R"]
fav_languages

['Python', 'C++', 'Java', 'C#', 'R']

In [7]:
# imported sparksession and row for creating asaprk df column
from pyspark.sql import SparkSession
from pyspark.sql import Row

prgm_lang_df = spark.createDataFrame([Row(Languages=lang) for lang in fav_languages])

In [8]:
prgm_lang_df.show()

                                                                                

+---------+
|Languages|
+---------+
|   Python|
|      C++|
|     Java|
|       C#|
|        R|
+---------+



* View the schema of the dataframe  

In [9]:
# Viewing Schema
prgm_lang_df.printSchema()

root
 |-- Languages: string (nullable = true)



* Output the shape of the dataframe  

In [10]:
# getting rows count and columns because there is shape attribute

# number of rows
num_rows = prgm_lang_df.count() 

# number of columns
columns = prgm_lang_df.columns
num_columns = len(columns)

                                                                                

In [11]:
# printing out shape

print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {num_columns}")

Number of Rows: 5
Number of Columns: 1


* Show the first 5 records in the dataframe

In [12]:
prgm_lang_df.show(5)

+---------+
|Languages|
+---------+
|   Python|
|      C++|
|     Java|
|       C#|
|        R|
+---------+



In [13]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pydataset import data
# from vega_datasets import data

# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import col, expr, concat, sum, avg, min, max, count, mean, lit, regexp_extract, regexp_replace, when, asc, desc, month, year, quarter

### Visualization (or Lack Therof)

Spark does not provide a way to do visualization with their dataframes. To
visualize data from spark, you should use the `.toPandas` method on a spark
dataframe to convert it to a pandas dataframe, then visualize as you normally
would.

!!!warning "Converting to A Pandas Dataframe"
    Converting a spark dataframe to a pandas dataframe will pull all the data into memory, so make sure you have enough available memory to do so.

## References

- [PySpark API Docs](https://spark.apache.org/docs/latest/api/python/index.html)
- [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) -- Note that the docs here show examples in many different programming languages, make sure you choose Python.
- [DataFrame class](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
- [Column class](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column)
- [pyspark.sql.functions module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)
- `df.na`: [DataFrameNaFunctions class](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions)

In [14]:
spark = SparkSession.builder.getOrCreate()

## demo code to populate mpg

In [15]:
# mpg = spark.createDataFrame(data("mpg"))

# mpg.write.json("data/mpg_json", mode="overwrite")

# # like much else in spark, there's multiple ways we could do this:
# (
#     mpg.write.format("csv")
#     .mode("overwrite")
#     .option("header", "true")
#     .save("data/mpg_csv")
# )

## 2. **Load the mpg dataset as a spark dataframe.**

$a.$ Create 1 column of output that contains a message like the one below:

 `The 1999 audi a4 has a 4 cylinder engine.`
 
For each vehicle.

In [16]:
# displaying top 20 of the vehicles
# concat can be used to create a message like inthe example.

# load mpg dataset
mpg = spark.createDataFrame(data("mpg"))

# show the data
mpg.show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
|        audi|                a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|  auto(l5)|  4| 16| 25|  p|c

In [17]:
from pyspark.sql import functions as F

def create_message(year, make, model, cylinders):
    return f"The {year} {make} {model} has a {cylinders} cylinder engine."

create_message_udf = F.udf(create_message)

mpg_df = mpg.withColumn(
    "output_message",
    create_message_udf(
        F.col("year"), F.col("manufacturer"), F.col("model"), F.col("cyl")
    )
)

mpg_df.select("output_message").show(truncate=False)

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------------------------------------------------+
|output_message                                                |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

                                                                                

$b.$ Transform the trans column so that it only contains either manual or auto.

In [18]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import StringType

In [19]:
mpg.select(mpg.trans).show(5)

+----------+
|     trans|
+----------+
|  auto(l5)|
|manual(m5)|
|manual(m6)|
|  auto(av)|
|  auto(l5)|
+----------+
only showing top 5 rows



In [20]:
mpg_transformed = mpg.withColumn("trans", regexp_extract(mpg["trans"], r"(manual|auto)", 1).cast(StringType()))

mpg_transformed.select("trans").show(5)

+------+
| trans|
+------+
|  auto|
|manual|
|manual|
|  auto|
|  auto|
+------+
only showing top 5 rows



In [21]:
mpg.select(
    "trans",
    regexp_extract("trans", r"^(\w+)", 1).alias("trans_new"),
).show(truncate=False)

+----------+---------+
|trans     |trans_new|
+----------+---------+
|auto(l5)  |auto     |
|manual(m5)|manual   |
|manual(m6)|manual   |
|auto(av)  |auto     |
|auto(l5)  |auto     |
|manual(m5)|manual   |
|auto(av)  |auto     |
|manual(m5)|manual   |
|auto(l5)  |auto     |
|manual(m6)|manual   |
|auto(s6)  |auto     |
|auto(l5)  |auto     |
|manual(m5)|manual   |
|auto(s6)  |auto     |
|manual(m6)|manual   |
|auto(l5)  |auto     |
|auto(s6)  |auto     |
|auto(s6)  |auto     |
|auto(l4)  |auto     |
|auto(l4)  |auto     |
+----------+---------+
only showing top 20 rows



## 3. **Load the tips dataset as a spark dataframe.**

In [22]:
# utilized demo code to populate mpg
tips = spark.createDataFrame(data("tips"))

tips.write.json("data/tips_json", mode="overwrite")

# like much else in spark, there's multiple ways we could do this:
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("data/tips_csv")
)

                                                                                

$a.$ What percentage of observations are smokers?  

In [23]:
# showing statistical info on tips df
tips.describe().show()

23/11/26 09:12:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+------------------+------+------+----+------+------------------+
|summary|        total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+------------------+------------------+------+------+----+------+------------------+
|  count|               244|               244|   244|   244| 244|   244|               244|
|   mean|19.785942622950813|2.9982786885245907|  NULL|  NULL|NULL|  NULL| 2.569672131147541|
| stddev| 8.902411954856856| 1.383638189001182|  NULL|  NULL|NULL|  NULL|0.9510998047322344|
|    min|              3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+------------------+------+------+----+------+------------------+



In [24]:
# printing entire tips df to sere the columns
tips.show(244)

+----------+----+------+------+----+------+----+
|total_bill| tip|   sex|smoker| day|  time|size|
+----------+----+------+------+----+------+----+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|
|     24.59|3.61|Female|    No| Sun|Dinner|   4|
|     25.29|4.71|  Male|    No| Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No| Sun|Dinner|   2|
|     26.88|3.12|  Male|    No| Sun|Dinner|   4|
|     15.04|1.96|  Male|    No| Sun|Dinner|   2|
|     14.78|3.23|  Male|    No| Sun|Dinner|   2|
|     10.27|1.71|  Male|    No| Sun|Dinner|   2|
|     35.26| 5.0|Female|    No| Sun|Dinner|   4|
|     15.42|1.57|  Male|    No| Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No| Sun|Dinner|   4|
|     14.83|3.02|Female|    No| Sun|Dinner|   2|
|     21.58|3.92|  Male|    No| Sun|Dinner|   2|
|     10.33|1.67|Female|    No| Sun|Dinner|   3|
|     16.29|3.71|  M

* **To calculate the percentage of smokers we need to filter by column and set count the number of of smokers present in the spark dataframe.**

* **Then we assign the total number of observations to a variable and do some math to find a percentagr. and we can print a prettu statement with the value rounded**

In [25]:
# filter using col and set smoker equal to Yes to retrieve all smokers
smokers_count = tips.filter(col("smoker") == "Yes").count()
# count of all observations
total_observations = tips.count()

# calculate the percentage of smokers in tips df
percentage_smokers = (smokers_count / total_observations) * 100
#printing the perecenatge of individuals that smoke
print(f"The percentage of observations where individuals are smokers is: {percentage_smokers:.0f}%")

The percentage of observations where individuals are smokers is: 38%


$b.$ Create a column that contains the tip percentage  

In [26]:
# in pyspark we can use round or format_number to achieve a rounded output
from pyspark.sql.functions import round

tips = tips.withColumn("tip_percentage", round((col("tip") / col("total_bill")) * 100, 0))

tips.show(244)

+----------+----+------+------+----+------+----+--------------+
|total_bill| tip|   sex|smoker| day|  time|size|tip_percentage|
+----------+----+------+------+----+------+----+--------------+
|     16.99|1.01|Female|    No| Sun|Dinner|   2|           6.0|
|     10.34|1.66|  Male|    No| Sun|Dinner|   3|          16.0|
|     21.01| 3.5|  Male|    No| Sun|Dinner|   3|          17.0|
|     23.68|3.31|  Male|    No| Sun|Dinner|   2|          14.0|
|     24.59|3.61|Female|    No| Sun|Dinner|   4|          15.0|
|     25.29|4.71|  Male|    No| Sun|Dinner|   4|          19.0|
|      8.77| 2.0|  Male|    No| Sun|Dinner|   2|          23.0|
|     26.88|3.12|  Male|    No| Sun|Dinner|   4|          12.0|
|     15.04|1.96|  Male|    No| Sun|Dinner|   2|          13.0|
|     14.78|3.23|  Male|    No| Sun|Dinner|   2|          22.0|
|     10.27|1.71|  Male|    No| Sun|Dinner|   2|          17.0|
|     35.26| 5.0|Female|    No| Sun|Dinner|   4|          14.0|
|     15.42|1.57|  Male|    No| Sun|Dinn

$c.$ Calculate the average tip percentage for each combination of sex and smoker.  

In [27]:
# assigning a variable with a groupby of valyes and aggregattion of the avg tip percentage.
tips_avg_tip_percentage = (
    tips.groupBy("sex", "smoker")
    .agg(
        round(avg(col("tip_percentage")), 2).alias("avg_tip_percentage")
    )
)    

In [28]:
tips_avg_tip_percentage.show()



+------+------+------------------+
|   sex|smoker|avg_tip_percentage|
+------+------+------------------+
|  Male|    No|              16.1|
|Female|    No|             15.69|
|  Male|   Yes|             15.28|
|Female|   Yes|             18.24|
+------+------+------------------+



                                                                                

## 4. **Use the seattle weather dataset referenced in the lesson to answer the questions below.**

In [29]:
# need vega_datasets imoprt here
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



* Convert the temperatures to fahrenheit.

In [30]:
weather.describe().show()

[Stage 35:>                                                         (0 + 8) / 8]

+-------+----------+-----------------+------------------+-----------------+------------------+-------+
|summary|      date|    precipitation|          temp_max|         temp_min|              wind|weather|
+-------+----------+-----------------+------------------+-----------------+------------------+-------+
|  count|      1461|             1461|              1461|             1461|              1461|   1461|
|   mean|      NULL| 3.02943189596167|16.439082819986314|8.234770704996578|3.2411362080766604|   NULL|
| stddev|      NULL|6.680194322314738| 7.349758097360176|5.023004179961266|1.4378250588746198|   NULL|
|    min|2012-01-01|              0.0|              -1.6|             -7.1|               0.4|drizzle|
|    max|2015-12-31|             55.9|              35.6|             18.3|               9.5|    sun|
+-------+----------+-----------------+------------------+-----------------+------------------+-------+



                                                                                

In [31]:
weather

DataFrame[date: string, precipitation: double, temp_max: double, temp_min: double, wind: double, weather: string]

* Which month has the most rain, on average?

* Which year was the windiest?

* What is the most frequent type of weather in January?
  

* What is the average high and low temperature on sunny days in July in 2013 and 2014?

  
* What percentage of days were rainy in q3 of 2015?
  

* For each year, find what percentage of days it rained (had non-zero precipitation).