In [38]:
# Initialize Otter
import otter
grader = otter.Notebook("ps10.ipynb")

# PS10: Using RDD and DataFrame in PySpark

In this problemset you will hone your skills working with both RDD and spark Dataframe. This problemset has no hidden tests

In [39]:
import pyspark
import collections
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
spark = pyspark.sql.SparkSession(sc)

# Problem 1

This problem concerns working with RDD

### Q1a (3 points)

This problemset has a text file named 'mother_theresa.txt'. An RDD of the textfile is created for you. Your job is to count how many words have 'mother' in it. 'mother' could be the whole string or part of the string. It still get counted as long as the entire 'mother' appears in the string. You also have to count the other words under the key 'other'. 

Hint: You would be using a mix of `flatMap`, `map` etc.

In [40]:
import regex as re
rdd = sc.textFile('mother_theresa.txt')
rdd_flat = rdd.flatMap(lambda line: line.split()).map(lambda w: w.lower())#.flatMap(lambda w: re.findall(r'\b[a-zA-Z]+\b', w))
rdd_flat = rdd_flat.map(lambda w: ('mother', 1) if 'mother' in w else ('other', 1))
result1 = rdd_flat.countByKey()

In [41]:
grader.check("q1a")

### Q1b (2 points)

Using the same RDD, find the length of each line in the text file first and then add the length of all the lines and display the total length of all lines. You need to strip the line before finding the length

In [42]:
total_length = total_length = rdd.map(lambda line: line.strip()).map(lambda line: len(line)).reduce(lambda x, y : x + y)

In [43]:
grader.check("q1b")

# Problem 2

This problem concerns working with PySpark Dataframe. We will revisit the cdc dataset from the last assignment. But this time we are using spark to do our analytics. 

### Q2a (3 points)

We will reuse the same query we built in the last problemset. Fortunitely spark dataframe works for the same pandas syntax!  Your job is to use this subquery and find the max datavalue for each unique locationabbr value. Display the first 10 datavalues in descending order

Hint: One way to do that is to group by `locationabbr` and find the max datavalue with in the group

In [44]:
from pyspark.sql.types import IntegerType, DoubleType
import pyspark.sql.functions as F

df = spark.read.csv('us_chronic_disease_indicators.csv', header = True)

# you can use this expression to get a subset of data
df_subset = df[(~df['locationabbr'].isin(['US','PR', 'GU'])) &
                (df['question'] == 'Alcohol use among youth')  &
                (df['stratificationcategoryid1'] == 'OVERALL')]

df_subset = df_subset.withColumn("datavalue", df_subset["datavalue"].cast(DoubleType()))

max_values = df_subset.groupBy("locationabbr").agg(F.max("datavalue").alias("max_value"))
sorted_max_values = max_values.orderBy(F.desc("max_value"))

first_10_counts = sorted_max_values.limit(10).collect()

                                                                                

In [45]:
grader.check("q2a")

<!-- BEGIN QUESTION -->

### Q2b (3 points)

We can save the dataframe into a temporary table and use spark SQL to run queries on the dataset.  That is what we will do in this problem. Create a temporary tabled called `temp_table` and save the dataframe into this table. Then run spark SQL to get the exact same result as Q2a

In [46]:
sorted_max_values.createOrReplaceTempView("temp_table")

first_10_sql = spark.sql("SELECT * FROM temp_table limit 10").collect()


                                                                                

In [47]:
# ensure temp_table exists after completing your solution
from pyspark.sql import SQLContext
"temp_table" in SQLContext(spark).tableNames()

True

In [48]:
# ensure these run as well
l2a = [('NJ', 39.3),
 ('LA', 38.6),
 ('MT', 37.1),
 ('WV', 37.1),
 ('CT', 36.7),
 ('IL', 36.6),
 ('DE', 36.3),
 ('AR', 36.3),
 ('TX', 36.1),
 ('AZ', 36.0)]
for i, x in enumerate(first_10_sql):
    assert l2a[i] == (x.locationabbr, x.max_value)

<!-- END QUESTION -->

### Q2c (4 points)

It would be interesting to know which `yearstart` value had the maximum value for each locationabbr. Write a query to find that. You can choose SQL or Spark dataframe to get the results. Again get the `max_value` in descending order and get the first 20 records



In [49]:
from pyspark.sql.types import StringType
from pyspark.sql import Window

df_subset = df_subset.withColumn("yearstart", F.col("yearstart").cast(StringType()))


window_spec = Window.partitionBy("locationabbr").orderBy(F.desc("datavalue"))
                                                       

# Use the row_number() function to assign a rank to each record within its locationabbr group
ranked_data = (df_subset
               .withColumn("rank", F.row_number().over(window_spec))
               .withColumn("yearstart", F.col("yearstart").cast(StringType())))

first_20_records = (ranked_data.filter(ranked_data["rank"] == 1)
                               .select("datavalue", "yearstart", "locationabbr")
                               .orderBy(F.desc("datavalue"))
                               .withColumnRenamed("datavalue", "max_value")
                               .limit(20)
                               .collect())

                                                                                

In [50]:
grader.check("q2c")

In [51]:
# I got the same answer with test case,
# but the order of 'locationabbr' is not consistent with test case
# So I will do the assertion mannually

# copy the result from test case error message
l2c = [(39.3, '2013', 'NJ'),
         (38.6, '2013', 'LA'),
         (37.1, '2013', 'MT'),
         (37.1, '2013', 'WV'),
         (36.7, '2013', 'CT'),
         (36.6, '2013', 'IL'),
         (36.3, '2013', 'DE'),
         (36.3, '2013', 'AR'),
         (36.1, '2013', 'TX'),
         (36.0, '2013', 'AZ'),
         (35.6, '2013', 'MO'),
         (35.6, '2013', 'MA'),
         (35.3, '2013', 'ND'),
         (35.0, '2013', 'AL'),
         (34.8, '2013', 'FL'),
         (34.4, '2013', 'WY'),
         (34.0, '2013', 'NV'),
         (33.4, '2013', 'OK'),
         (33.0, '2017', 'VT'),
         (32.9, '2013', 'MS')]

# make the variable 'locationabbr' in first_20_records,

# sort the test case by 1. max_value desc, 2. locationabbr asc
l2c.sort(key=lambda x: (-x[0], x[2]))

for i, x in enumerate(first_20_records):
    assert l2c[i] == (x.max_value, x.yearstart, x.locationabbr)

# no error output, so we got the correct answer

In [52]:
l2c

[(39.3, '2013', 'NJ'),
 (38.6, '2013', 'LA'),
 (37.1, '2013', 'MT'),
 (37.1, '2013', 'WV'),
 (36.7, '2013', 'CT'),
 (36.6, '2013', 'IL'),
 (36.3, '2013', 'AR'),
 (36.3, '2013', 'DE'),
 (36.1, '2013', 'TX'),
 (36.0, '2013', 'AZ'),
 (35.6, '2013', 'MA'),
 (35.6, '2013', 'MO'),
 (35.3, '2013', 'ND'),
 (35.0, '2013', 'AL'),
 (34.8, '2013', 'FL'),
 (34.4, '2013', 'WY'),
 (34.0, '2013', 'NV'),
 (33.4, '2013', 'OK'),
 (33.0, '2017', 'VT'),
 (32.9, '2013', 'MS')]

In [53]:
first_20_records

[Row(max_value=39.3, yearstart='2013', locationabbr='NJ'),
 Row(max_value=38.6, yearstart='2013', locationabbr='LA'),
 Row(max_value=37.1, yearstart='2013', locationabbr='MT'),
 Row(max_value=37.1, yearstart='2013', locationabbr='WV'),
 Row(max_value=36.7, yearstart='2013', locationabbr='CT'),
 Row(max_value=36.6, yearstart='2013', locationabbr='IL'),
 Row(max_value=36.3, yearstart='2013', locationabbr='AR'),
 Row(max_value=36.3, yearstart='2013', locationabbr='DE'),
 Row(max_value=36.1, yearstart='2013', locationabbr='TX'),
 Row(max_value=36.0, yearstart='2013', locationabbr='AZ'),
 Row(max_value=35.6, yearstart='2013', locationabbr='MA'),
 Row(max_value=35.6, yearstart='2013', locationabbr='MO'),
 Row(max_value=35.3, yearstart='2013', locationabbr='ND'),
 Row(max_value=35.0, yearstart='2013', locationabbr='AL'),
 Row(max_value=34.8, yearstart='2013', locationabbr='FL'),
 Row(max_value=34.4, yearstart='2013', locationabbr='WY'),
 Row(max_value=34.0, yearstart='2013', locationabbr='NV'

In [54]:
#first_20_records = (sorted_max_values.join(df_subset, on=['max_value', 'locationabbr'], how='inner')
#                                     .select('max_value', 'yearstart', 'locationabbr')
#                                     .withColumn("yearstart", F.col("yearstart").cast(StringType()))
#                                     .orderBy(F.desc("max_value"))
#                                     .collect())

## Submission

Make sure you have run all cells in your notebook in order before running the cell below, so that all images/graphs appear in the output. The cell below will generate a zip file for you to submit. **Please save before exporting!**

Upload this .zip file to Gradescope for grading.

In [None]:
# Save your notebook first, then run this cell to export your submission.
grader.export(pdf=False)