# Big Data Analytics Homework 02

*Complete this assignment in Google Colab. Prior to submitting a copy of this notebook (.ipynb format), run every cell and ensure you have corrected all runtime errors. Be sure to fill in your Name and SUID in the following cell. As always, you must do your own work. This means you may not use answers to the following questions generated by any other person or a generative AI tool such as ChatGPT. You may, however, discuss this assignment with others in a general way and seek help when you need it, but, again, you must do your own work.*

Name:

SUID:

## Setup

In [1]:
! pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as fn

sc = SparkContext.getOrCreate()

spark = SparkSession\
    .builder\
    .appName('Homework 02')\
    .getOrCreate()

## RDDs

### Q1

Create a single-dimensional PySpark RDD named `bernoulli_rdd` that contains 1,000 Bernoulli probability distribution data points consisting of integers 0 or 1 with P(0) = P(1) = 0.5.

Use only PySpark RDDs and `map` to complete this question.

Hint: Use the [`RandomRDDs.uniformRDD`](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.mllib.random.RandomRDDs.html#pyspark.mllib.random.RandomRDDs.uniformRDD) function for your sampling step. But you will still need to map over this RDD with a custom function that creates 1's and 0's from the initial output.

In [3]:
# your code here
from pyspark.mllib.random import RandomRDDs

# Initialize a Spark session
spark = SparkSession.builder.appName("BernoulliRDD").getOrCreate()

# Use the RandomRDDs.uniformRDD function to create random uniform values
random_uniform_values = RandomRDDs.uniformRDD(spark, size=1000)

# Define a function to map uniform values to Bernoulli values (0 or 1)
def map_to_bernoulli(value):
    if value < 0.5:
        return 0
    else:
        return 1

# Apply the mapping function to the uniform values RDD
bernoulli_rdd = random_uniform_values.map(map_to_bernoulli)

In [4]:
# do not modify
bernoulli_rdd.take(15)

[0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0]

### Q2

Using only Spark, count and display the number of 1's and the number of 0's in `bernoulli_rdd`.

Your output must be of the form "There are X 1's and Y 0's in bernoulli_rdd".

In [5]:
# your code here
# Count the number of 1's and 0's in bernoulli_rdd
count_ones = bernoulli_rdd.filter(lambda x: x == 1).count()
count_zeros = bernoulli_rdd.filter(lambda x: x == 0).count()

# Display the results
print(f"There are {count_ones} 1's and {count_zeros} 0's in bernoull_rdd.")

There are 475 1's and 525 0's in bernoull_rdd.


### Q3

Create a two new 2-dimensional RDD named `bernoulli_sample_rdd_1` and `bernoulli_sample_rdd_2` that each contain sample data from `bernoulli_rdd`. Each element of these RDDs should contain 10 samples (with replacement) from the original 1,000. The length of each RDD should be the number of samples which should be 50. In addition to the samples themselves, each data element in each RDD should contain `sample_number`, which should be calculated from each sample, not "hard-coded" as 10.

A sample element of the result will be of the form `[(sample_number, [sample])]`.

`bernoulli_sample_rdd_1` should be created using the [`sample`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html) method and `bernoulli_sample_rdd_2` should be created using the[`takeSample`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.takeSample.html) method.

In [6]:
# your code here
# Create bernoulli_sample_rdd_1 using the sample method
sample_size = 10
num_samples = 50

bernoulli_sample_rdd_1 = bernoulli_rdd.sample(withReplacement=True, fraction=sample_size / bernoulli_rdd.count())
# Add the sample_number to each element
bernoulli_sample_rdd_1 = bernoulli_sample_rdd_1.map(lambda sample: (sample, [sample_number for sample_number in range(num_samples)]))

# Create bernoulli_sample_rdd_2 using the takeSample method
bernoulli_sample_rdd_2 = bernoulli_rdd.takeSample(True, int(sample_size * bernoulli_rdd.count()))

# Add the sample_number to each element
bernoulli_sample_rdd_2 = sc.parallelize(bernoulli_sample_rdd_2).map(lambda sample: (sample, [sample_number for sample_number in range(num_samples)]))

In [7]:
# do not modify
bernoulli_sample_rdd_1.take(10)

[(0,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49]),
 (0,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49]),
 (1,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   

In [8]:
# do not modify
bernoulli_sample_rdd_2.take(10)

[(0,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49]),
 (1,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49]),
 (0,
  [0,
   1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   

### Q4

Explain key difference between `bernoulli_sample_rdd_1` and `bernoulli_sample_rdd_2` and the reason for it.

*answer here*
The key difference between bernoulli_sample_rdd_1 and bernoulli_sample_rdd_2 lies in the method used to create these RDDs, which is the sample method for bernoulli_sample_rdd_1 and the takeSample method for bernoulli_sample_rdd_2.

**bernoulli_sample_rdd_1 - sample method - withReplacement = True**
In this case, we created bernoulli_sample_rdd_1 by using the sample technique with replacement, which enables the use of the identical elements from the original RDD more than once in the sampled RDD. This explains why distinct sets of 10 samples each have the same sample numbers (e.g. 1) associated with them. The bernoulli_sample_rdd_1 dataset contains one duplicate sample for every sample that was randomly chosen with replacement from the original RDD.

**bernoulli_sample_rdd_2 - takeSample method - withReplacement = True**
bernoulli_sample_rdd_2 is produced by takeSample. Unlike the sample method, which returns an RDD, this method takes a random sample of a defined size from the original RDD with or without replacement. The list of sampled items that is returned, however, does not contain duplicates because the same element cannot be included more than once.
In this instance, we produced bernoulli_sample_rdd_2 by using the takeSample method with replacement. There are various sample numbers associated with each sets of 10 samples because it directly returns a list and each element in the original RDD can only be selected once or with replacement (if withReplacement is set to True). In bernoulli_sample_rdd_2, each element is a distinct sample taken from the original RDD.

Another difference between these two RDDs is in how the sample size is determined: bernoulli_sample_rdd_1 uses a fraction of the total, while bernoulli_sample_rdd_2 uses a fixed number of elements for sampling

### Q5

Re-using code from above that created `bernoulli_sample_rdd_2`, create `bernoulli_sample_rdd_3` which has 100 observations per sample.

Using PySpark `map`, create a new RDD named `bernoulli_sample_mean_rdd` that contains the sampling distribution of the means of the samples contained in `bernoulli_sample_rdd_3`.

In [9]:
# your code here
# Create bernoulli_sample_rdd_3 using the takeSample method with 100 observations per sample
sample_size = 100
bernoulli_sample_rdd_3 = bernoulli_rdd.takeSample(True, sample_size * num_samples)

# Add the sample_number to each element
bernoulli_sample_rdd_3 = sc.parallelize(bernoulli_sample_rdd_3).map(lambda sample: (sample, [sample_number for sample_number in range(num_samples)]))

# Calculate the mean of each sample and create bernoulli_sample_mean_rdd
bernoulli_sample_mean_rdd = bernoulli_sample_rdd_3.map(lambda sample: (sample[0], sum(sample[1]) / num_samples))

In [10]:
# do not modify
bernoulli_sample_mean_rdd.take(10)

[(1, 24.5),
 (0, 24.5),
 (1, 24.5),
 (0, 24.5),
 (0, 24.5),
 (1, 24.5),
 (0, 24.5),
 (1, 24.5),
 (0, 24.5),
 (1, 24.5)]

## DataFrames

In this section we will work with data from the U.S. Environmental Protection Agency (EPA). There are two data sets. The first data set consists of daily **temperatures** collected at the U.S. **city** level. The second data set consists of daily **air quality** data collected at the U.S. **county** level. These measurements were taken for the full year 2021.

In [11]:
# download the temperature and aqi data sets
%%bash

if [[ ! -f us-daily-temperatures-2021.csv.csv ]]; then
 wget https://syr-bda.s3.us-east-2.amazonaws.com/us-daily-temperatures-2021.csv -q
fi

if [[ ! -f us-daily-aqi-2021.csv.csv ]]; then
 wget https://syr-bda.s3.us-east-2.amazonaws.com/us-daily-aqi-2021.csv -q
fi

### Q6

Load the temperature data (using Spark) into a data frame called `temperature`. Load the air quality data into a data frame called `aqi`. Print the schema and the number of rows for each data set.

In [12]:
# your code here
# Load temperature data into a DataFrame
temperature = spark.read.csv("us-daily-temperatures-2021.csv", header=True, inferSchema=True)

# Load air quality data into a DataFrame
aqi = spark.read.csv("us-daily-aqi-2021.csv", header=True, inferSchema=True)

# Print the schema and row counts for each DataFrame
print("Temperature Data Schema:")
temperature.printSchema()
print("Number of Rows in Temperature Data: ", temperature.count())

print("Air Quality Data Schema:")
aqi.printSchema()
print("Number of Rows in Air Quality Data: ", aqi.count())

Temperature Data Schema:
root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- city: string (nullable = true)
 |-- sites_reporting: integer (nullable = true)
 |-- mean_temperature_f: double (nullable = true)
 |-- max_temp_f: double (nullable = true)
 |-- max_temp_hour: integer (nullable = true)

Number of Rows in Temperature Data:  206292
Air Quality Data Schema:
root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)
 |-- aqi: integer (nullable = true)
 |-- category: string (nullable = true)

Number of Rows in Air Quality Data:  130922


The `temperature` data is reported at the city level, but the `aqi` data is at the county level. We want the "grain" of these data sets to match. This means we need to aggregate the the `temperature` data to the county level. This is tricky because we could possibly have counties in different states with the same name. This means you'll want to aggregate not just at the county level (by date), but at the state *and* county level (by date). We also have 4 different metrics to deal with: the total number of sites, a mean, a max, and one value — `max_temp_hour` — that corresponds to the value of another metric, `max_temp_f`.

### Q7

Create a new data frame called `temperature_county` that contains the **mean** temperature, the **max** temperature, and the **total** sites reporting, for each unique `date`, `state`, and `county` in the `temperature` data frame. The columns `mean_temperature_f`, `max_temp_f`, and `sites_reporting` should retain their names. Additionally, round the **mean** and **max** columns to the nearest whole number and cast them as integers.

In [13]:
# your code
from pyspark.sql.functions import round, mean, max, count
from pyspark.sql.types import IntegerType

# Group the "temperature" DataFrame by the columns "date," "state," and "county"
temperature_county = temperature.groupBy("date", "state", "county") \
    .agg(
        # Round the mean of "mean_temperature_f" to the nearest whole number
        round(mean("mean_temperature_f")).cast(IntegerType()).alias("mean_temperature_f"),
        # Round the maximum value of "max_temp_f" to the nearest whole number
         round(max("max_temp_f")).cast(IntegerType()).alias("max_temp_f"),
        # Count the number of records in each group and create a column "sites_reporting"
         count("sites_reporting").alias("sites_reporting"))

temperature_county.show()

+----------+--------------+--------------+------------------+----------+---------------+
|      date|         state|        county|mean_temperature_f|max_temp_f|sites_reporting|
+----------+--------------+--------------+------------------+----------+---------------+
|2021-01-01|       Montana|        Fergus|                34|        43|              1|
|2021-01-01|         Texas|         Titus|                40|        46|              1|
|2021-01-02|          Iowa|       Johnson|                18|        22|              1|
|2021-01-02|      Oklahoma|      Garfield|                34|        42|              1|
|2021-01-02|  South Dakota|        Custer|                37|        42|              1|
|2021-01-03| West Virginia|         Mason|                41|        44|              1|
|2021-01-04|North Carolina|       Forsyth|                43|        50|              1|
|2021-01-04|         Texas|          Rusk|                54|        67|              1|
|2021-01-04|         

In [14]:
# do not modify
print('Rows in temperature_county:', temperature_county.count())
temperature_county.orderBy('date', 'state', 'county').show(10)

Rows in temperature_county: 138555
+----------+--------+--------------------+------------------+----------+---------------+
|      date|   state|              county|mean_temperature_f|max_temp_f|sites_reporting|
+----------+--------+--------------------+------------------+----------+---------------+
|2021-01-01| Alabama|            Escambia|                64|        69|              1|
|2021-01-01| Alabama|           Jefferson|                66|        74|              1|
|2021-01-01|  Alaska|              Denali|                -1|         6|              1|
|2021-01-01|  Alaska|Fairbanks North Star|                -9|        -1|              2|
|2021-01-01| Arizona|             Cochise|                40|        49|              1|
|2021-01-01| Arizona|            Coconino|                29|        35|              1|
|2021-01-01| Arizona|            Maricopa|                49|        65|              1|
|2021-01-01| Arizona|              Navajo|                30|        42|   

### Q8

Create a new data frame called `county_max_temp_hour` that reports the `max_temp_hour` at the same level of aggregation as `temperature_county` in the previous step. This means it should have the **same number of rows** as `temperature_county` and contain the same grouping fields, but only one metric, `max_temp_hour`. Once you have created this data frame, **left join** it to `temperature county` as a new data frame called `temperature_county_final`.

I've provided some starter code for you below. Fill in where you see `???` in order to complete the answer.

In [15]:
# your code
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Define a window spec
window_spec = Window.partitionBy("date", "state", "county")\
  .orderBy(desc("max_temp_f"))

# Add row number for each group
df = temperature\
  .withColumn('row_num', row_number().over(window_spec))

# Filter to the first row using row_num
df2 = df.where(df["row_num"] == 1)

# Keep only the grouping columns and max_temp_hour
county_max_temp_hour = df2.select("date", "state", "county", "max_temp_hour")

# Left join county_max_temp_hour to temperature_county
temperature_county_final = temperature_county.join(county_max_temp_hour, ["date", "state", "county"], "left")


In [16]:
# do not modify
print('Rows in county_max_temp_hour:', county_max_temp_hour.count())
county_max_temp_hour.orderBy('date', 'state', 'county').show(10)

Rows in county_max_temp_hour: 138555
+----------+--------+--------------------+-------------+
|      date|   state|              county|max_temp_hour|
+----------+--------+--------------------+-------------+
|2021-01-01| Alabama|            Escambia|            0|
|2021-01-01| Alabama|           Jefferson|           13|
|2021-01-01|  Alaska|              Denali|            4|
|2021-01-01|  Alaska|Fairbanks North Star|           12|
|2021-01-01| Arizona|             Cochise|           12|
|2021-01-01| Arizona|            Coconino|           12|
|2021-01-01| Arizona|            Maricopa|           13|
|2021-01-01| Arizona|              Navajo|           14|
|2021-01-01| Arizona|                Pima|           15|
|2021-01-01|Arkansas|             Pulaski|           10|
+----------+--------+--------------------+-------------+
only showing top 10 rows



In [17]:
# do not modify
print('Rows in temperature_county_final:', temperature_county_final.count())
temperature_county_final.orderBy('date', 'state', 'county').show(10)

Rows in temperature_county_final: 138555
+----------+--------+--------------------+------------------+----------+---------------+-------------+
|      date|   state|              county|mean_temperature_f|max_temp_f|sites_reporting|max_temp_hour|
+----------+--------+--------------------+------------------+----------+---------------+-------------+
|2021-01-01| Alabama|            Escambia|                64|        69|              1|            0|
|2021-01-01| Alabama|           Jefferson|                66|        74|              1|           13|
|2021-01-01|  Alaska|              Denali|                -1|         6|              1|            4|
|2021-01-01|  Alaska|Fairbanks North Star|                -9|        -1|              2|           12|
|2021-01-01| Arizona|             Cochise|                40|        49|              1|           12|
|2021-01-01| Arizona|            Coconino|                29|        35|              1|           12|
|2021-01-01| Arizona|           

### Q9

Join `aqi` to `temperature_county_final` and call the resulting data frame `daily_county_measurements`. Then write code that produces the `date`, name of the `county`, `state`, and `aqi` value where the **highest recorded `aqi`** occurred in 2021. In the event of a tie, take the first instance. Your answer should take the following form:

"The highest recorded AQI value in 2021 occured on [date], in [county] County, [state], and had a value of [aqi]."

I have included some comments and starter code to help you out.

In [18]:
# Join aqi to temperature_county_final and create daily_county_measurements
daily_county_measurements = temperature_county_final.join(aqi, ["date", "state", "county"], "inner")

# Find the maximum AQI value in 2021
max_aqi = daily_county_measurements.filter(daily_county_measurements["date"].like("2021%")) \
    .agg(max("aqi").alias("max_aqi"))

# Create a DataFrame with a single row that contains the highest recorded AQI value in 2021
highest_aqi = daily_county_measurements.join(max_aqi,
                                           [max_aqi["max_aqi"] == daily_county_measurements["aqi"]],
                                           "inner").limit(1)

# Extract values from the data frame
row = highest_aqi.collect()[0]
date = row['date']
county = row['county']
state = row['state']
aqi_value = row['max_aqi']

# Print the output as specified
print(f"The highest recorded AQI value in 2021 occurred on {date}, in {county} County, {state}, and had a value of {aqi_value}.")


The highest recorded AQI value in 2021 occurred on 2021-09-14, in Tulare County, California, and had a value of 537.


### Q10

Using a process similar to Q8, create a new data frame called `highest_temperates_by_state_2021` that contains one row per state, and shows the `date`, `state`, and `max_temp_f` for the **highest recorded temperature** in that state in 2021. In the case of ties, pick the earliest day of the year.

In [19]:
# your code here
# Define a window spec
window_spec = Window.partitionBy("state", "date")\
  .orderBy(desc("max_temp_f"))

# Add row number for each group
df = temperature.filter(temperature["date"].like("2021%"))\
  .withColumn('row_num', row_number().over(window_spec))

# Filter to the first row using row_num
df2 = df.where(df["row_num"] == 1)

# Keep only the grouping columns and max_temp_f
highest_temperates_by_state_2021 = df2.select("date", "state", "max_temp_f")

In [20]:
# do not modify
highest_temperates_by_state_2021\
  .select('date', 'state', 'max_temp_f')\
  .orderBy(desc('max_temp_f'))\
  .show()

+----------+-----------------+----------+
|      date|            state|max_temp_f|
+----------+-----------------+----------+
|2021-11-20|       California|     129.0|
|2021-07-10|       California|     124.0|
|2021-06-17|Country Of Mexico|     123.1|
|2021-07-11|       California|     123.0|
|2021-06-18|Country Of Mexico|     122.2|
|2021-07-09|       California|     121.0|
|2021-07-12|       California|     121.0|
|2021-08-04|Country Of Mexico|     120.9|
|2021-06-19|Country Of Mexico|     120.2|
|2021-06-17|       California|     120.0|
|2021-07-08|       California|     120.0|
|2021-08-04|       California|     120.0|
|2021-08-27|Country Of Mexico|     119.7|
|2021-06-18|       California|     119.3|
|2021-06-15|Country Of Mexico|     119.3|
|2021-08-03|Country Of Mexico|     119.1|
|2021-06-16|       California|     119.0|
|2021-06-19|       California|     119.0|
|2021-07-07|       California|     119.0|
|2021-06-17|          Arizona|     118.0|
+----------+-----------------+----