# Tutorial
Spark's Python wrapper lets us interact with data very similarly to Pandas, which should be very familiar to Python users. In this notebook you will learn how to use the basic functionality of the wrapper, as well as visualize the data that you will be working with for the project. Make sure you have downloaded and unzipped the data to the correct location before trying to run the code.

In [None]:
# import necessary libraries
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession, dataframe
import plotly.express as px
geojson = px.data.gapminder()
# create sparksession
spark = SparkSession \
    .builder \
    .appName("CS236") \
    .getOrCreate()

In [None]:
# Utility function to write query plans to a file
# you will be using this to understand how your queries are being processed
def write_explain(df: dataframe.DataFrame, output_path: str = "out.txt"):
    from contextlib import redirect_stdout
    with open(output_path, "w") as f:
        with redirect_stdout(f):
            df.explain(extended=True)

Read a csv to a Spark dataframe, then return the column names

In [None]:
%%timeit
sdf = spark.read.csv("../data/StateAndCountyData.csv", header=True)

In [None]:
sdf = spark.read.csv("../data/StateAndCountyData.csv", header=True)
sdf.columns

Show the first 20 rows of the Spark dataframe

In [None]:
sdf.show()

In [None]:
sdf.createOrReplaceTempView('state_county')
# run your SQL query as you would with any database
my_df = spark.sql(
'''
select 
  state
  , avg(value) as avg
from state_county
where variable_code = 'PCT_LACCESS_POP15' 
group by state
order by state
'''
)
my_df.show()

In [None]:
write_explain(my_df)
# print out the query plan
my_df.explain()

## Visualizing with Choropleths
We will be using Plotly Express to easily visualize the data you will be working with. The most important arguments besides the dataframe itself are `locations` and `color`.
- `locations` - the name of the column that defines which values go into which state in the chart
- `color` - the name of the column that contains the values to be displayed

In [None]:
fig = px.choropleth(my_df,
                    locations='state',
                    color='avg',
                    color_continuous_scale='spectral_r',
                    locationmode='USA-states',
                    scope='usa')
fig.update_geos(
    visible=True, 
    scope="usa",
)
fig.show()

# Reading into RDD's
RDD (Resilient Distributed Dataset) allows you to have a finer control over the parallelization of your jobs. 
The thought process is very similar to Hadoop, but with less boilerplate.

Note that there are many (probably better) ways to do the same task in Spark. Experiment with different methods when doing your project.

In [None]:
data = spark.sparkContext.textFile("../data/StateAndCountyData.csv")
header = data.first()
# Remove the header from the file. It is not enough to just remove the first row,
# since some spark applications take in multiple files each with headers.
data = data.filter(lambda row: row != header)
data.first()

In [None]:
# Get the column names to make it easier to reference column values later on
cols_dict = dict([(j, i) for i, j in enumerate(header.split(","))])
cols_dict

In [None]:
# Split lines into column values
cols_rdd = data.map(lambda row: row.split(","))
cols_rdd.first()

In [None]:
# Filter values to the variable code we want
pct_laccess_pop_15_rdd = cols_rdd.filter(lambda row: row[cols_dict["Variable_Code"]] == "PCT_LACCESS_POP15")
pct_laccess_pop_15_rdd.first()

In [None]:
# Map states to their respective values
states_values_rdd = pct_laccess_pop_15_rdd.map(
    lambda row: (row[cols_dict["State"]], float(row[cols_dict["Value"]]))
)
states_values_rdd.first()

In [None]:
# Get the total values & sums for each state
reduce_rdd = states_values_rdd.aggregateByKey(
    # starting (sum, count) values for each state
    (0, 0),
    # accumulates the (sum, count) tuple to calculate the average
    # this one runs between value (think rows)
    lambda accum, value: (accum[0] + value, accum[1] + 1), 
    # accumulates the (sum, count) tuples between the partitions
    lambda accum_1, accum_2: (accum_1[0] + accum_2[0], accum_1[1] + accum_2[1]))
reduce_rdd.first()

In [None]:
# calculate average by doing sum / count
average_rdd = aggregate_rdd.mapValues(lambda accum: accum[0] / accum[1])
cols = ["state", "avg"]
# convert to a Pandas dataframe to visualize
df = pd.DataFrame(average_rdd.collect(), columns=cols)
df.head()

In [None]:
fig = px.choropleth(df,
                    locations='state',
                    color='avg',
                    color_continuous_scale='spectral_r',
                    locationmode='USA-states',
                    scope='usa')
fig.update_geos(
    visible=True, 
    scope="usa",
)
fig.show()