In [1]:
from pyspark import SparkContext, SparkConf
import os
import getpass
import pandas as pd

In [2]:
import csv

In [3]:
import us

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.4" pyspark-shell'

In [5]:
sc = SparkContext.getOrCreate()

In [6]:
access_key = 'AKIAWOHFNKOOBLZJIZEU'

In [7]:
secret_key = getpass.getpass()

 ········································


In [8]:
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_key)

In [9]:
rdd_rent = sc.textFile('s3a://msds-durian-candy/rent/Metro_Zri_AllHomesPlusMultifamily.csv.gz')

Two main steps:

1. Unpivot the data from wide to long format
2. Group by State and Year then calculate the mean ZRI

## Unpivot the data from wide to long format

In [10]:
header_raw = rdd_rent.first()
header_temp = [item for item in csv.reader([header_raw])][0]

new_header = []
new_header.append(header_temp[0]) # RegionID
new_header.extend(['RegionName','StateCode']) # RegionName to 'StateName' and 'StateCode'
new_header.append(header_temp[2])
new_header.extend(['Year','Month']) # From index 3 onwards is date related, we want long format of those columns
new_header.append('ZillowRentIndex')

In [11]:
# The first row is also useless for us since it's for the entire US not individual state
US_row = rdd_rent.filter(lambda line: line != header_raw).first()

In [12]:
def unpivot_widerow_to_longrows(row,header_original):
    new_row_base = []
    new_row_base.append(row[0])
    new_row_base.extend([state_data.strip() for state_data in row[1].split(',')])
    new_row_base.append(row[2])
    
    year_month_list = [year_month.split('-') for year_month in header_original[3:]]
    prices = row[3:]
    
    unpivoted_rows = []
    for i in range(len(year_month_list)):
        year_month_list[i].append(prices[i])
        new_row = new_row_base + year_month_list[i]
        unpivoted_rows.append(new_row)
    
    return unpivoted_rows

In [13]:
unpivoted_rent = (rdd_rent.filter(lambda line: line != header_raw)
         .filter(lambda line: line != US_row)
         .map(lambda row_raw_csv: [item for item in csv.reader([row_raw_csv])][0])
         .flatMap(lambda row: unpivot_widerow_to_longrows(row,header_temp) )
)

## Aggregate (average) Zillow Rent Index by State and Year

In [14]:
type(unpivoted_rent)

pyspark.rdd.PipelinedRDD

In [15]:
mean_zri = (unpivoted_rent.filter(lambda x: x[6] != '') # Drop null in ZRI
               .map(lambda x: [int(x[i].strip()) if i==4 else x[i] for i in range(len(x))]) # Year to Int
               .map(lambda x: [float(x[i].strip()) if i==6 else x[i] for i in range(len(x))]) # ZRI to Float
               # We don't convert to int before this because we need it as a string to do reduce by key later
               .filter(lambda x: x[4] >= 2014 and x[4] <= 2016 ) # We are only interested in year 2014,2015,2016
               .map(lambda x: ((x[2],x[4]),(x[6],1)) ) # (key=(State,Year) , value=(ZRI,1)), the 1 is for averaging use 
               .reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
               .mapValues(lambda x: x[0]/x[1])
               .map(lambda x: (x[0][0],x[0][1],x[1]))
)

In [16]:
mean_zri_list = mean_zri.collect()

In [21]:
filename = 'output/mean_zri_by_state_year.csv' 
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename,'w') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    
    header = ['StateCode','Year','Mean_ZRI']
    writer.writerow(header)
    for csv_row_string in mean_zri_list:
        writer.writerow(csv_row_string)

In [19]:
sc.stop()