Abandon ship, mostly developing on EMR now.

### Starter code for group assignment 2
Rent data was loaded first because it is the only dataset in wide format which cannot be joined with other datasets unless converted to long format.

In [1]:
import ast

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import os
import getpass
import csv

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.4" pyspark-shell'

In [4]:
conf = SparkConf().set("spark.driver.memory", "8g")

In [5]:
sc = SparkContext(conf=conf).getOrCreate()

In [6]:
ss = SparkSession.builder.getOrCreate()

In [7]:
access_key = 'AKIAWOHFNKOOBLZJIZEU'
secret_key = getpass.getpass()

 ········································


In [8]:
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_key)

In [9]:
def guess_schema(spark_df):
    # PySpark's inferSchema is very slow because does an extra pass over the whole data
    # This function will peak at the second row (just in case forgot to set header=True)
    # and guess the type then build the StructType

    struct_field_list = []
    for col in spark_df.columns:
        current_val = spark_df.rdd.take(2)[1][col]
        try:
            current_val = ast.literal_eval(current_val)
        except (SyntaxError, ValueError):
            pass

        if type(current_val) is int:
            struct_field_list.append(StructField(col,IntegerType(),True))
        elif type(current_val) is float:
            struct_field_list.append(StructField(col,DoubleType(),True))
        else:
            struct_field_list.append(StructField(col,StringType(),True))

    return StructType(struct_field_list)

In [10]:
# All data frames except rent data
census_2015_df = ss.read.csv('s3a://msds-durian-candy/census/acs2015_census_tract_data.csv.gz', header=True)
county_2015_df = ss.read.csv('s3a://msds-durian-candy/census/acs2015_county_data.csv.gz', header=True)
census_2017_df = ss.read.csv('s3a://msds-durian-candy/census/acs2017_census_tract_data.csv.gz', header=True)
county_2017_df = ss.read.csv('s3a://msds-durian-candy/census/acs2017_county_data.csv.gz', header=True)
benefits_cost_sharing_df = ss.read.csv('s3a://msds-durian-candy/insurance/BenefitsCostSharing.csv.gz', header=True)
business_rules_df = ss.read.csv('s3a://msds-durian-candy/insurance/BusinessRules.csv.gz', header=True)
network_df = ss.read.csv('s3a://msds-durian-candy/insurance/Network.csv.gz', header=True)
plan_attributes_df = ss.read.csv('s3a://msds-durian-candy/insurance/PlanAttributes.csv.gz', header=True)
rate_df = ss.read.csv('s3a://msds-durian-candy/insurance/Rate.csv.gz', header=True)
service_area_df = ss.read.csv('s3a://msds-durian-candy/insurance/ServiceArea.csv.gz', header=True)

## Unpivot the data from wide to long format

The rent data is annoyingly in wide format, the code belows converts it to long format.

In [11]:
rdd_rent = sc.textFile('s3a://msds-durian-candy/rent/Metro_Zri_AllHomesPlusMultifamily.csv.gz')

In [12]:
header_raw = rdd_rent.first()
header_temp = [item for item in csv.reader([header_raw])][0]

new_header = []
new_header.append(header_temp[0]) # RegionID
new_header.extend(['RegionName','StateCode']) # RegionName to 'StateName' and 'StateCode'
new_header.append(header_temp[2])
new_header.extend(['Year','Month']) # From index 3 onwards is date related, we want long format of those columns
new_header.append('ZillowRentIndex')

In [None]:
# The first row is also useless for us since it's for the entire US not individual state
US_row = rdd_rent.filter(lambda line: line != header_raw).first()

In [None]:
def unpivot_widerow_to_longrows(row,header_original):
    new_row_base = []
    new_row_base.append(row[0])
    new_row_base.extend([state_data.strip() for state_data in row[1].split(',')])
    new_row_base.append(row[2])
    
    year_month_list = [year_month.split('-') for year_month in header_original[3:]]
    prices = row[3:]
    
    unpivoted_rows = []
    for i in range(len(year_month_list)):
        year_month_list[i].append(prices[i])
        new_row = new_row_base + year_month_list[i]
        unpivoted_rows.append(new_row)
    
    return unpivoted_rows

In [None]:
unpivoted_rent = (rdd_rent.filter(lambda line: line != header_raw)
         .filter(lambda line: line != US_row)
         .map(lambda row_raw_csv: [item for item in csv.reader([row_raw_csv])][0])
         .flatMap(lambda row: unpivot_widerow_to_longrows(row,header_temp) )
)

In [None]:
rent_data = unpivoted_rent.collect()
rent_data.insert(0,new_header)

In [None]:
long_rent_rdd = sc.parallelize(rent_data)
col_names = long_rent_rdd.first()
long_rent_rdd = long_rent_rdd.filter(lambda line: line != col_names)  # drop the first row, which is header
rent_df = ss.createDataFrame(long_rent_rdd)

for i in range(7):
    rent_df = rent_df.withColumnRenamed(rent_df.columns[i], col_names[i])

rent_df.show(5)


### Start with Rate table from insurance, expand from there.

* s3a://msds-durian-candy/census/acs2015_census_tract_data.csv.gz
* s3a://msds-durian-candy/census/acs2015_county_data.csv.gz
* s3a://msds-durian-candy/census/acs2017_census_tract_data.csv.gz
* s3a://msds-durian-candy/census/acs2017_county_data.csv.gz
* s3a://msds-durian-candy/insurance/BenefitsCostSharing.csv.gz
* s3a://msds-durian-candy/insurance/BusinessRules.csv.gz
* s3a://msds-durian-candy/insurance/Network.csv.gz
* s3a://msds-durian-candy/insurance/PlanAttributes.csv.gz
* s3a://msds-durian-candy/insurance/Rate.csv.gz
* s3a://msds-durian-candy/insurance/ServiceArea.csv.gz
* s3a://msds-durian-candy/rent/Metro_Zri_AllHomesPlusMultifamily.csv.gz

In [None]:
def guess_schema(spark_df):
    # PySpark's inferSchema is very slow because does an extra pass over the whole data
    # This function will peak at the second row (just in case forgot to set header=True)
    # and guess the type then build the StructType

    struct_field_list = []
    for col in spark_df.columns:
        current_val = spark_df.rdd.take(2)[1][col]
        try:
            current_val = ast.literal_eval(current_val)
        except (SyntaxError, ValueError):
            pass

        if type(current_val) is int:
            struct_field_list.append(StructField(col,IntegerType(),True))
        elif type(current_val) is float:
            struct_field_list.append(StructField(col,DoubleType(),True))
        else:
            struct_field_list.append(StructField(col,StringType(),True))

    return StructType(struct_field_list)

## Use Rate Table as base and try to add as many columns as possible
By observation, it looks like a good composite foreign key would be: 'BusinessYear', 'IssuerId', 'IssuerId2', 'PlanId'

In [None]:
rate_key = ['BusinessYear','IssuerId','IssuerId2','PlanId']
business_rules_key = ['BusinessYear','IssuerId','IssuerId2','StandardComponentId'] # PlanId equals StandardComponentId

## Check foreign key exists in other tables

In [None]:
check1 = set(rate_key).issubset(set(rate_df.columns)) # redundant
check2 = set(rate_key).issubset(set(business_rules_df.columns)) # False
check3 = set(rate_key).issubset(set(network_df.columns)) # False
check4 = set(rate_key).issubset(set(plan_attributes_df.columns)) # True
check5 = set(rate_key).issubset(set(benefits_cost_sharing_df.columns)) # True
check6 = set(rate_key).issubset(set(service_area_df.columns)) # False

print([check1,check2,check3,check4,check5,check6])

We want to try to perserve the rate df as much as possible so try to get 12,694,445 rows.

In [None]:
rate_df.count()

### Check for nulls in the composite foreign key

Having nulls in your foreign key is a big no no

In [None]:
# from pyspark.sql.functions import isnan, when, count, col
# rate_df.select([count(when(isnan(c), c)).alias(c) for c in rate_key]).show()

In [None]:
rate_plan_attr = rate_df.join(plan_attributes_df,on=rate_key,how='left_outer')
rate_plan_bcs = rate_plan_attr.join(benefits_cost_sharing_df,on=rate_key,how='left_outer')



In [None]:
rate_plan_attr.cache()
rate_plan_bcs.cache()

In [None]:
%%time
rate_plan_attr.count()

In [None]:
%%time
rate_plan_bcs.count()

In [None]:
len(rate_plan_bcs.columns)

In [None]:
with open('test_out.txt','w') as fh:
    sys.stdout = fh
    print(rate_plan_bcs.show(5))

In [None]:
save_stdOut = sys.stdout

In [None]:
sys.stdout = save_stdOut

In [None]:
from pyspark.sql.functions import isnan, when, count, col

In [None]:
with open('null_check.txt','w') as fh:
    sys.stdout = fh
    rate_plan_bcs.select([count(when(isnan(c), c)).alias(c) for c in rate_plan_bcs.columns]).show()

In [None]:
rate_plan_bcs.rdd.saveAsTextFile('rate_plan_bcs_rdd')

In [None]:
import sys


In [None]:
rate_df.columns

In [None]:
business_rules_df.columns

In [None]:
plan_attributes_df.columns

In [13]:
s3_data_path = 's3://msds-durian-candy/frames/version1/'
main_df = ss.read.parquet(s3_data_path)

In [14]:
main_df.show(5)

+---------+--------------+------------+----------+----------+-------------------+--------+----------+-----------------+------------------+-------------+--------------------+----+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+-------------+--------+----------------------------------------------+--------------------------------------------------------+--------------------+--------------------+------------------------+----------------+--------+--------+--------+------------------+-----------------+------------------+------+-----+--------------------+--------+-------+---------+------------+---------------+------------------+------------+------------------+------------------+------+------------------+-----------------+-----+------------------+------------------+-----+------------------+------------------+-------