In [28]:
# Initialize Otter
import otter
grader = otter.Notebook("ps9.ipynb")

# PS10: Using RDD and DataFrame in PySpark

In this problemset you will hone your skills working with both RDD and spark Dataframe. This problemset has no hidden tests.

**Note: This assignment does not work on Gradescope. Please ensure test cases pass on your local machine and then submit on Gradescope. We will manually grade your submissions**

In [29]:
import pyspark
import collections
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
spark = pyspark.sql.SparkSession(sc)

# Problem 1

This problem concerns working with RDD

### Q1a (3 points)

This problemset has a text file named 'mother_theresa.txt'. An RDD of the textfile is created for you. Your job is to count how many words have 'mother' in it. 'mother' could be the whole string or part of the string. It still get counted as long as the entire 'mother' appears in the string. You also have to count the other words under the key 'other'. 

Hint: You would be using a mix of `flatMap`, `map` etc.

In [30]:
import regex as re
rdd = sc.textFile('mother_theresa.txt')
rdd.take(1) # similar to head

["Mother Teresa was the founder of the Order of the Missionaries of Charity, a Roman Catholic congregation of women dedicated to helping the poor. Considered one of the 20th Century's greatest humanitarians, she was canonized as Saint Teresa of Calcutta in 2016.    "]

In [31]:
counts = rdd.flatMap(lambda line: re.findall(r'\b\w+\b', line.lower())) \
            .filter(lambda word: word.isalpha()) \
            .map(lambda word: ('mother', 1) if 'mother' in word else ('other', 1)) \
            .reduceByKey(lambda a, b: a + b) \
            .collectAsMap()

print(counts)

{'other': 2078, 'mother': 47}


In [32]:
grader.check("q1a")

### Q1b (2 points)

Using the same RDD, find the length of each line in the text file first and then add the length of all the lines and display the total length of all lines. You need to strip the line before finding the length

In [33]:

total_length = rdd.flatMap(lambda line_length: [len(line_length.strip())]) \
                    .reduce(lambda a, b: a + b)

total_length

12630

In [34]:
grader.check("q1b")

# Problem 2

This problem concerns working with PySpark Dataframe. We will use cdc dataset given with this assignment.

This data corresponds to the open dataset provided by U.S. Department of Health & Human Services

https://catalog.data.gov/dataset/u-s-chronic-disease-indicators

It has been cleaned a bit for our analysis. You can learn more about the meaning of the various indicators from the website provided.

### Q2a (3 points)

A query is already provided to get you a subset of data that pertains to alchohol use among youth in 50 states (we remove the ones marked as US, PR and GU in this analysis). Your job is to use this subquery and find the max datavalue for each unique locationabbr value. Display the first 10 datavalues in descending order. If there is a tie, show the state code in descending order as well.

Hint: One way to do that is to group by `locationabbr` and find the max datavalue with in the group

In [35]:
from pyspark.sql.types import IntegerType, DoubleType
import pyspark.sql.functions as F

df = spark.read.csv('us_chronic_disease_indicators.csv', header = True)

# you can use this expression to get a subset of data
df[(~df['locationabbr'].isin(['US','PR', 'GU'])) &
                (df['question'] == 'Alcohol use among youth')  &
                (df['stratificationcategoryid1'] == 'OVERALL')]

first_10_counts = ...

...

Ellipsis

In [36]:
alcohol_df = df[(~df['locationabbr'].isin(['US', 'PR', 'GU'])) &
                 (df['question'] == 'Alcohol use among youth') &
                 (df['stratificationcategoryid1'] == 'OVERALL')]
# alcohol_df.collect()

first_10_counts = alcohol_df.withColumn('datavalue', alcohol_df['datavalue'].cast(DoubleType())) \
                            .groupBy('locationabbr') \
                            .agg(F.max("datavalue").alias("max_value")) \
                            .sort(F.desc("max_value"), F.desc("locationabbr")) \
                            .limit(10)
first_10_counts = first_10_counts.collect()

                                                                                

In [37]:
l2a = [('NJ', 39.3),
       ('LA', 38.6),
       ('WV', 37.1),
       ('MT', 37.1),
       ('CT', 36.7),
       ('IL', 36.6),
       ('DE', 36.3),
       ('AR', 36.3),
       ('TX', 36.1),
       ('AZ', 36.0)]

for i, x in enumerate(first_10_counts):
    l2a[i] == (x.locationabbr, x.max_value)

In [38]:
first_10_counts

[Row(locationabbr='NJ', max_value=39.3),
 Row(locationabbr='LA', max_value=38.6),
 Row(locationabbr='WV', max_value=37.1),
 Row(locationabbr='MT', max_value=37.1),
 Row(locationabbr='CT', max_value=36.7),
 Row(locationabbr='IL', max_value=36.6),
 Row(locationabbr='DE', max_value=36.3),
 Row(locationabbr='AR', max_value=36.3),
 Row(locationabbr='TX', max_value=36.1),
 Row(locationabbr='AZ', max_value=36.0)]

In [39]:
grader.check("q2a")

<!-- BEGIN QUESTION -->

### Q2b (3 points)

We can save the dataframe into a temporary table and use spark SQL to run queries on the dataset.  That is what we will do in this problem. Create a temporary table called `temp_table` and save the dataframe into this table. Then run spark SQL to get the exact same result as Q2a except if there is a tie, show the state code in ascending order

In [40]:
# print(first_10_counts)
alcohol_df.createOrReplaceTempView("temp_table")
first_10_sql = spark.sql("select locationabbr, max(cast(datavalue as double)) as max_value from temp_table group by locationabbr order by max_value desc, locationabbr asc limit 10").collect()
first_10_sql

                                                                                

[Row(locationabbr='NJ', max_value=39.3),
 Row(locationabbr='LA', max_value=38.6),
 Row(locationabbr='MT', max_value=37.1),
 Row(locationabbr='WV', max_value=37.1),
 Row(locationabbr='CT', max_value=36.7),
 Row(locationabbr='IL', max_value=36.6),
 Row(locationabbr='AR', max_value=36.3),
 Row(locationabbr='DE', max_value=36.3),
 Row(locationabbr='TX', max_value=36.1),
 Row(locationabbr='AZ', max_value=36.0)]

In [41]:
# ensure temp_table exists after completing your solution
from pyspark.sql import SQLContext
"temp_table" in SQLContext(spark).tableNames()



True

In [42]:
l2a = [('NJ', 39.3),
 ('LA', 38.6),
 ('MT', 37.1),
 ('WV', 37.1),
 ('CT', 36.7),
 ('IL', 36.6),
 ('AR', 36.3),
 ('DE', 36.3),
 ('TX', 36.1),
 ('AZ', 36.0)]
for i, x in enumerate(first_10_sql):
    assert l2a[i] == (x.locationabbr, x.max_value)

<!-- END QUESTION -->

### Q2c (4 points)

It would be interesting to know which `yearstart` value had the maximum value for each locationabbr. Write a query to find that. You can choose SQL or Spark dataframe to get the results. Again get the `max_value` in descending order and get the first 20 records. If there is a tie, show the state in ascending order



In [43]:
df.describe()

DataFrame[summary: string, yearstart: string, yearend: string, locationabbr: string, locationdesc: string, datasource: string, topic: string, question: string, datavalueunit: string, datavaluetype: string, datavalue: string, lowconfidencelimit: string, highconfidencelimit: string, stratificationcategory1: string, stratification1: string, geolocation: string, topicid: string, questionid: string, datavaluetypeid: string, stratificationcategoryid1: string, stratificationid1: string]

In [44]:
query = """
SELECT max_value, yearstart, locationabbr
FROM (
    SELECT locationabbr, yearstart, datavalue,
        CAST(MAX(datavalue) OVER (PARTITION BY locationabbr) as DOUBLE) as max_value,
        RANK() OVER (PARTITION BY locationabbr ORDER BY datavalue DESC, yearstart ASC) as rank
    FROM temp_table
) ranked
WHERE rank = 1
ORDER BY
    max_value DESC,
    locationabbr ASC
limit 20;
"""
first_20_records = spark.sql(query).collect()
first_20_records


                                                                                

[Row(max_value=39.3, yearstart='2013', locationabbr='NJ'),
 Row(max_value=38.6, yearstart='2013', locationabbr='LA'),
 Row(max_value=37.1, yearstart='2013', locationabbr='MT'),
 Row(max_value=37.1, yearstart='2013', locationabbr='WV'),
 Row(max_value=36.7, yearstart='2013', locationabbr='CT'),
 Row(max_value=36.6, yearstart='2013', locationabbr='IL'),
 Row(max_value=36.3, yearstart='2013', locationabbr='AR'),
 Row(max_value=36.3, yearstart='2013', locationabbr='DE'),
 Row(max_value=36.1, yearstart='2013', locationabbr='TX'),
 Row(max_value=36.0, yearstart='2013', locationabbr='AZ'),
 Row(max_value=35.6, yearstart='2013', locationabbr='MA'),
 Row(max_value=35.6, yearstart='2013', locationabbr='MO'),
 Row(max_value=35.3, yearstart='2013', locationabbr='ND'),
 Row(max_value=35.0, yearstart='2013', locationabbr='AL'),
 Row(max_value=34.8, yearstart='2013', locationabbr='FL'),
 Row(max_value=34.4, yearstart='2013', locationabbr='WY'),
 Row(max_value=34.0, yearstart='2013', locationabbr='NV'

In [45]:
l2c = [(39.3, '2013', 'NJ'),
         (38.6, '2013', 'LA'),
         (37.1, '2013', 'MT'),
         (37.1, '2013', 'WV'),
         (36.7, '2013', 'CT'),
         (36.6, '2013', 'IL'),
         (36.3, '2013', 'AR'),
         (36.3, '2013', 'DE'),
         (36.1, '2013', 'TX'),
         (36.0, '2013', 'AZ'),
         (35.6, '2013', 'MA'),
         (35.6, '2013', 'MO'),
         (35.3, '2013', 'ND'),
         (35.0, '2013', 'AL'),
         (34.8, '2013', 'FL'),
         (34.4, '2013', 'WY'),
         (34.0, '2013', 'NV'),
         (33.4, '2013', 'OK'),
         (33.0, '2017', 'VT'),
         (32.9, '2013', 'MS')]


for i, x in enumerate(first_20_records):
    if( l2c[i] == (x.max_value, x.yearstart, x.locationabbr)):
        print(x, '\n')
        print('True')

Row(max_value=39.3, yearstart='2013', locationabbr='NJ') 

True
Row(max_value=38.6, yearstart='2013', locationabbr='LA') 

True
Row(max_value=37.1, yearstart='2013', locationabbr='MT') 

True
Row(max_value=37.1, yearstart='2013', locationabbr='WV') 

True
Row(max_value=36.7, yearstart='2013', locationabbr='CT') 

True
Row(max_value=36.6, yearstart='2013', locationabbr='IL') 

True
Row(max_value=36.3, yearstart='2013', locationabbr='AR') 

True
Row(max_value=36.3, yearstart='2013', locationabbr='DE') 

True
Row(max_value=36.1, yearstart='2013', locationabbr='TX') 

True
Row(max_value=36.0, yearstart='2013', locationabbr='AZ') 

True
Row(max_value=35.6, yearstart='2013', locationabbr='MA') 

True
Row(max_value=35.6, yearstart='2013', locationabbr='MO') 

True
Row(max_value=35.3, yearstart='2013', locationabbr='ND') 

True
Row(max_value=35.0, yearstart='2013', locationabbr='AL') 

True
Row(max_value=34.8, yearstart='2013', locationabbr='FL') 

True
Row(max_value=34.4, yearstart='2013', lo

In [46]:
grader.check("q2c")

## Submission

Make sure you have run all cells in your notebook in order before running the cell below, so that all images/graphs appear in the output. The cell below will generate a zip file for you to submit. **Please save before exporting!**

Upload this .zip file to Gradescope for grading.

In [47]:
# Save your notebook first, then run this cell to export your submission.
grader.export(pdf=False)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 38194)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/conda/lib/python3.11/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/conda/lib/python3.11/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
       ^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    num_u