# GRADTDA5622 - Big Data Computing Foundations 2
## Homework 4: Manipulating Data with PySpark SQL
- Semester: Spring 2023
- Instructor: Tom Bihari
- Section: N/A
- Student Name: Able Baker **(fill in)**
- Student Email: baker.12345@osu.edu **(fill in)**
- Student ID: 123456789 **(fill in)**
***

***
# Section: Overview
***

**The Objectives of This Assignment are:**
1. To gain an understanding of how to set up a PySpark program.
2. To gain experience using PySpark SQL to manipulate data to solve problems and answer questions.
3. To get a small preview of Spark DataFrame functions we will use in the future.

**Overview:**
- The main purpose of this homework is to walk through a PySpark program.  The main tasks in this assignment (see below) are almost exactly the same as those in the previous SQLite assignment.  In this assignment, you will create SQL statements and run them in Spark SQL instead of SQLite.  Spoiler alert: The queries and results should be almost identical to those in HW2.  
- Tasks 1 and 2 are done for you.  You can use them as examples when doing Task 3.
- I also included a short example of how to use Spark DataFrame functions to accomplish the same results as the SQL queries.  We will cover this more in future modules.  For a comprehensive list of PySpark functions, see: https://sparkbyexamples.com/pyspark-tutorial/
- NOTE: In order to create tables in SQL, we need to have Hive support enabled in the environment. I have not done that for now.  For the tasks below, I have used TEMP VIEWs instead of TABLES. (I didn't rename the term "table" (used in the SQLite assignment) to "view" below.)

**Instructions:**
- **Follow the instructions** in each section.
- **Fill in** the **Conclusions** section.



---



# Section: Setup
- Add any needed imports, helper functions, etc., here.

In [1]:
# USUALLY DO NOT NEED TO INSTALL PYSPARK IF YOU ARE ON "Jupyter + Spark" on OSC.
# If you do not have PySpark installed on your machine, install using pip. (Uncomment next line and run it.)
# !pip install pyspark

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as SqlF
pyspark.__version__

'3.0.1'

In [3]:
# Identify the location of the shared data folder
shared_data_directory = "../shared_Sp23/"

# Section: Initialize Spark
- Create a Spark session if it does not already exist.  This also sets up a Spark Context.
- See: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/SparkSession.Builder.html

In [4]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('MyApp') \
                    .getOrCreate()
sc = spark.sparkContext  # Get the context, so we have a short name for it if we need it.
print(sc.appName)

MyApp


# Section: Create DataFrames and Tables from CSV Files

For each table you want to load from a CSV file, do the following:
1. Create a Spark dataframe from the csv file.
2. Register the dataframe as a SQL table (give it a table name).

In [5]:
abbrevs_df = spark.read.format('csv').load(shared_data_directory + "state-abbrevs.csv", header=True, inferSchema=True)
abbrevs_df.printSchema()
abbrevs_df.show(5, truncate=False)
abbrevs_df.registerTempTable("abbrevs_table")

root
 |-- state: string (nullable = true)
 |-- abbreviation: string (nullable = true)

+----------+------------+
|state     |abbreviation|
+----------+------------+
|Alabama   |AL          |
|Alaska    |AK          |
|Arizona   |AZ          |
|Arkansas  |AR          |
|California|CA          |
+----------+------------+
only showing top 5 rows



In [6]:
areas_df = spark.read.format('csv').load(shared_data_directory + "state-areas.csv", header=True, inferSchema=True)
areas_df.printSchema()
areas_df.show(5, truncate=False)
areas_df.registerTempTable("areas_table")

root
 |-- state: string (nullable = true)
 |-- area: integer (nullable = true)

+----------+------+
|state     |area  |
+----------+------+
|Alabama   |52423 |
|Alaska    |656425|
|Arizona   |114006|
|Arkansas  |53182 |
|California|163707|
+----------+------+
only showing top 5 rows



In [7]:
populations_df = spark.read.format('csv').load(shared_data_directory + "state-populations.csv", header=True, inferSchema=True)
populations_df.printSchema()
populations_df.show(5, truncate=False)
populations_df.registerTempTable("populations_table")

root
 |-- state: string (nullable = true)
 |-- ages: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- population: double (nullable = true)

+-----+-------+----+----------+
|state|ages   |year|population|
+-----+-------+----+----------+
|AL   |under18|2012|1117489.0 |
|AL   |total  |2012|4817528.0 |
|AL   |under18|2010|1130966.0 |
|AL   |total  |2010|4785570.0 |
|AL   |under18|2011|1125763.0 |
+-----+-------+----+----------+
only showing top 5 rows



---
# Task 1: Create a Table of Summarized State Populations
- Get the min, average, and max total and under18 populations for each of the states over all years.
- Call the table **summarized_state_populations**.
- The table should have the following fields:
  - state, ages, min_pop, avg_pop, max_pop
  - It should be sorted by **state** (from A to Z), and within each state, by **under18** age range first, then **total** age range. 
---

In [8]:
spark.sql("DROP VIEW IF EXISTS summarized_state_populations")

spark.sql("""
    CREATE TEMP VIEW summarized_state_populations
    AS SELECT state, ages, min(population) AS min_pop, avg(population) AS avg_pop, max(population) AS max_pop
    FROM populations_table
    GROUP BY state, ages
    ORDER BY state ASC, ages ASC
                """)

spark.sql("SELECT * from summarized_state_populations").show(3)

+-----+-------+---------+------------------+---------+
|state|   ages|  min_pop|           avg_pop|  max_pop|
+-----+-------+---------+------------------+---------+
|   AK|  total| 553290.0| 646204.8333333334| 735132.0|
|   AK|under18| 177502.0|186672.95833333334| 192636.0|
|   AL|  total|4050055.0|       4484527.875|4833722.0|
+-----+-------+---------+------------------+---------+
only showing top 3 rows



In [9]:
# A LOOK AHEAD: We also can do this with PySpark DataFrames directly.

summarized_state_populations_df = (populations_df
    .groupBy("state", "ages")
    .agg(SqlF.min("population").alias("min_pop"),
         SqlF.avg("population").alias("avg_pop"),
         SqlF.max("population").alias("max_pop"))
    .orderBy("state", "ages"))
    
summarized_state_populations_df.show(3)

+-----+-------+---------+------------------+---------+
|state|   ages|  min_pop|           avg_pop|  max_pop|
+-----+-------+---------+------------------+---------+
|   AK|  total| 553290.0| 646204.8333333334| 735132.0|
|   AK|under18| 177502.0|186672.95833333334| 192636.0|
|   AL|  total|4050055.0|       4484527.875|4833722.0|
+-----+-------+---------+------------------+---------+
only showing top 3 rows



---
# Task 2: Print the Top 3 States with the Largest Normalized Population Swings
- Consider the difference between max and min "total" populations for each state.
- Normalize by dividing by the average population for the state.
- You can use the table from the previous query.
---

In [10]:
result = spark.sql('''
    SELECT state, min_pop, max_pop, (max_pop - min_pop) AS pop_swing, ((max_pop - min_pop) / avg_pop) AS norm_pop_swing
    FROM summarized_state_populations
    WHERE ages = "total"
    ORDER BY norm_pop_swing DESC
    ''')

result.show(3)

+-----+---------+---------+---------+------------------+
|state|  min_pop|  max_pop|pop_swing|    norm_pop_swing|
+-----+---------+---------+---------+------------------+
|   PR|3615086.0|      NaN|      NaN|               NaN|
|   NV|1220695.0|2790136.0|1569441.0| 0.748474068327076|
|   AZ|3684097.0|6626624.0|2942527.0|0.5557600371372102|
+-----+---------+---------+---------+------------------+
only showing top 3 rows



---
# Task 3: Create a Table of Population Densities
- Follow each of the individual steps below.  There likely are shorter ways to do this task, but this way the steps are explicit.
- I have given the shells of some of the queries, so you can "fill in the blanks".
- I have included the expected outputs, so you can see if yours matches.
- Name the table **population_densities_table**
- The table should have the following fields:
  - state_abbrev
  - state_name
  - area
  - year
  - population_total
  - population_under_18
  - population_18_and_over
  - density_total
  - density_under_18
  - density_18_and_over
---

## Create a "pop_and_name_table"
- Join the populations_table and abbrevs_table.
- Table should have fields: state_abbrev, state_name, ages, year, population.

In [11]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create a "pop_name_area_density_table"
- Join the pop_and_name_table and areas_table.
- Table should have fields: state_abbrev, state_name, ages, year, population, area, density.

In [12]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create an "under18_table"
- Select only the "under18" ages records from the pop_name_area_density_table.
- Table should have fields: state_abbrev, state_name, year, population, area, density.

In [13]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create a "total_table"
- Select only the "total" ages records from the pop_name_area_density_table.
- Table should have fields: state_abbrev, state_name, year, population, area, density.

In [14]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create a "total_and_under18_table"
- Join the under18_table and total_table.
- Table should have fields: state_abbrev, state_name, year, area, total_pop, total_density, under18_pop, under18_density.

In [15]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create a "total_under18_over18_table"
- Start with the total_and_under18_table and calculate columns for the over18_pop, over18_density.
- Actually, we mean "18 and over", not "over 18".
- Table should have fields: state_abbrev, state_name, year, area, total_pop, total_density, under18_pop, under18_density, over18_pop, over18_density.

In [16]:
# Fill this in, using the examples from Tasks 1 and 2.

## Create the final population_densities_table"
- Start with the total_under18_over18_table.
- Rename and reorder the columns to match the original request.
- Sort by year and state abbreviation, ascending.
- Table should have fields: state_abbrev, state_name, area, year, 
  - population_total, population_under_18, population_18_and_over, density_total, density_under_18, density_18_and_over.

In [17]:
# Fill this in, using the examples from Tasks 1 and 2.

---
# Cleanup and Exit
---

In [18]:
# Stop the SparkSession. USUALLY NOT NEEDED.
spark.stop()

***
# Section: Conclusions
- What did you learn from this exercise?
***

In [None]:
# Fill this in.