In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, sum, col, split, isnan, column, count, mean, to_date, year, desc, max, min, rank, regexp_replace, dense_rank, substring, lpad, ltrim, concat, lit, when
from pyspark.sql.types import StringType
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, LinearRegression, GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import RFormula 


#5 nodes (m4.2xlarge), each with 16 cores and 32 GB
spark = SparkSession.builder \
    .appName("final_proj") \
    .config("spark.driver.cores", 1) \
    .config("spark.executor.memory", "10G") \
    .config("spark.executor.cores", "5") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1544155017119_0006,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [None]:
#read in Urban Institute's linked HMDA and ACS data
#obs are at the tract level
data = spark.read.csv("s3://randfinalproj/tract_data.csv.gz", header = True)

#format tract ID to be mergable with other data
data = data.withColumn("tract", regexp_replace(col("tract"), "[.]", ""))
data = data.withColumn("tract_id", concat("county", "tract")) 

In [None]:
#select relevant variables
data = data.select(col("state"), col("county"), col("year"), col("tract_id"), \
                     #col("VALUEH_P50"), \
                     col("FARM_1"), col("FARM_2"), \
                     col("BUILTYR2_1"), col("BUILTYR2_2"), col("BUILTYR2_3"), \
                     col("BUILTYR2_4"), col("BUILTYR2_5"), col("BUILTYR2_6"), \
                     col("BUILTYR2_7"), 
                     #col("BUILTYR2_8"), 
                     col("BUILTYR2_9"), \
                     col("BUILTYR2_10"), col("BUILTYR2_11"), col("BUILTYR2_12"), \
                     col("BUILTYR2_13"), col("BUILTYR2_14"), col("BUILTYR2_15"), \
                     col("MORTGAGE_0"), col("MORTGAGE_1"), col("MORTGAGE_3"), col("MORTGAGE_4"), \
                     col("FOODSTMP_0"), col("FOODSTMP_1"), col("FOODSTMP_2"), \
                     #col("PROPINSR_P50"), \
                     col("OWNERSHP_1"), col("OWNERSHP_2"), \
                     col("RACE1_HH_B"), col("RACE1_HH_H"), col("RACE1_HH_O"), col("RACE1_HH_W"), \
                     col("EMPSTAT_HH_1"), col("EMPSTAT_HH_2"), col("EMPSTAT_HH_3"), col("EMPSTAT_HH_0"), \
                     col("EDUC_HH_0"), col("EDUC_HH_1"), col("EDUC_HH_2"), \
                     col("EDUC_HH_3"), col("EDUC_HH_4"), col("EDUC_HH_5"), col("EDUC_HH_6"), \
                     col("EDUC_HH_7"), col("EDUC_HH_8"), col("EDUC_HH_10"), \
                     col("INCOME_P50"), \
                     col("AMOUNT_P50"), \
                     col("PURPOSE_1"), col("PURPOSE_2"), col("PURPOSE_3"), 
                     #col("PURPOSE_4"), \
                     )   

In [None]:
#read in origin-destination data 
drive = spark.read.parquet('s3://lsdm-emr-util/lsdm-data/travel-times/drive_times.parquet')

In [None]:
#read in novel dataset (created by me on local machine with R) of tract IDs affected by disasters and years affected
disaster = spark.read.csv("s3://randfinalproj/disaster.csv", header = True)
disaster = disaster.select("tract_id", "Year_After_Disaster", "Disaster")

In [None]:
#filter origin-destination data to tracts just directly affected by disasters
direct_expression = (drive["from_tract"] == disaster["tract_id"])
direct_type = "inner"
direct_affect = drive.join(disaster, direct_expression, direct_type).where(col("Disaster") == 1) \
                       .select("tract_id", "Year_After_Disaster") \
                       .withColumn("miles", lit(0))

#drop duplicates
direct_affect = direct_affect.groupBy("tract_id", "Year_After_Disaster").agg(min("miles").alias("miles"))

#issue: right_outer = 363 and inner = 326... why?

In [None]:
#another filter of o-d data, now to "remote" tracts: within 150 miles of directly affected tracts
remote_expression = (drive["to_tract"] == direct_affect["tract_id"]) & (drive["from_tract"] != direct_affect["tract_id"])
remote_type = "inner"
remote_affect = drive.join(direct_affect.select("tract_id", "Year_After_Disaster"), remote_expression, remote_type) \
                     .select("from_tract", "Year_After_Disaster", "miles") \
                     .withColumnRenamed("from_tract", "tract_id")

#remove remote tracts counted multiple times b/c within 150 miles of 2+ direct tracts
#group by tract and year, keeping min distance away
remote_affect = remote_affect.groupBy("tract_id", "Year_After_Disaster").agg(min("miles").alias("miles"))

In [None]:
#bind the remote and directly affected datasets together (by row)
affected_tracts = direct_affect.union(remote_affect).withColumnRenamed("tract_id", "temp_tract")

In [None]:
#join affected_tract data to main data, only includes affected tracks, but for all years data available
main_join = (data["tract_id"] == affected_tracts["temp_tract"])
main_Type = "inner"
data_disaster = data.join(affected_tracts, main_join, main_Type).drop("temp_tract")

In [None]:
#create new column indicating when treatment (miles) should be turned on
#note disaster year is actually year after disaster
data_disaster = data_disaster.withColumn("Disaster", when(col("year") == col("Year_After_Disaster"), 1) \
                         .otherwise(0))

In [None]:
#read in annual home price data from Federal Finance Housing Agency
#obs are at the tract level
home_price = spark.read.csv("s3://randfinalproj/HPI_AT_BDL_tract.csv", header = True)
home_price = home_price.withColumnRenamed("year", "home_year")

In [None]:
#join home data to main acs_hmda_disaster data
home_join = (data_disaster["tract_id"] == home_price["tract"]) & (data_disaster["year"] == home_price["home_year"])
home_type = "inner"
disaster_home = data_disaster.join(home_price, home_join, home_type).drop("home_year", "tract")

In [None]:
#checkpoint
disaster_home.write.parquet("s3://randfinalproj/disaster_home.parquet")

In [2]:
disaster_home = spark.read.parquet("s3://randfinalproj/disaster_home.parquet")

In [3]:
# make some features: unemployment rate, percentage renting, percentage college educated, percentage minority
disaster_home = disaster_home.withColumn("unemp_rate", col("EMPSTAT_HH_2") / (col("EMPSTAT_HH_1") + col("EMPSTAT_HH_2"))) \
                             .withColumn("rent_rate", col("OWNERSHP_2") / (col("OWNERSHP_1") + col("OWNERSHP_2"))) \
                             .withColumn("college_rate", col("EDUC_HH_10") / (col("EDUC_HH_0") + col("EDUC_HH_1") + \
                                                                              col("EDUC_HH_2") + col("EDUC_HH_3") + \
                                                                              col("EDUC_HH_4") + col("EDUC_HH_5") + \
                                                                              col("EDUC_HH_6") + col("EDUC_HH_7") + \
                                                                              col("EDUC_HH_8") + col("EDUC_HH_10"))) \
                            .withColumn("minority_rate", (col("RACE1_HH_B") + col("RACE1_HH_H") + col("RACE1_HH_O")) / \
                                        (col("RACE1_HH_B") + col("RACE1_HH_H") + col("RACE1_HH_O") + col("RACE1_HH_W")))

In [6]:
#filter dataset, cast appropriate variables which are strings to floats
#leaving years and states as strings to serve as fixed effects
disaster_home_filt = disaster_home.select(col("annual_change").cast("float"), col("miles").cast("float"), \
                                          col("state_abbr"), col("year"), col("Disaster"), \
                                          col("unemp_rate"), col("rent_rate"), 
                                          col("INCOME_P50").cast("float"), col("minority_rate"))

disaster_home_filt = disaster_home_filt.where(col("annual_change").isNotNull()) \
                                        .where(col("unemp_rate").isNotNull()) \
                                        .where(col("INCOME_P50").isNotNull()) \
                                        .where(col("minority_rate").isNotNull())
    
#college rate has so many NULLs... why!
#annual change has a lot too (7621/185488)
#unemp: 37/185488
#income: 494/185488
#minor: 2603/185488
disaster_home_filt.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in disaster_home_filt.columns]).show()

+-------------+-----+----------+----+--------+----------+---------+----------+-------------+
|annual_change|miles|state_abbr|year|Disaster|unemp_rate|rent_rate|INCOME_P50|minority_rate|
+-------------+-----+----------+----+--------+----------+---------+----------+-------------+
|            0|    0|         0|   0|       0|         0|        0|         0|            0|
+-------------+-----+----------+----+--------+----------+---------+----------+-------------+

In [7]:
#split into test vs train
sep = disaster_home_filt.randomSplit([0.7, 0.3], 99)
train = sep[0]
test = sep[1]

#create Diff-in-Diff specification
#annual change in median home price regressed on variables
#treatment effect is year after disaster in affected tract, also interacted with miles away from disaster
supervised = RFormula(formula="annual_change ~ Disaster + miles:Disaster + unemp_rate + rent_rate + INCOME_P50 + minority_rate + state_abbr + year")

train_trans = supervised.fit(train).transform(train)
test_trans = supervised.fit(test).transform(test)

In [9]:
#estimate is a generalized linear regression
glr = GeneralizedLinearRegression(featuresCol = 'features', labelCol = "label")

#I create a hyperparamter grid with different values for the regularization term
params = ParamGridBuilder() \
            .addGrid(glr.regParam, [0.0, 0.5, 1.0]).build()

#I use root mean squared error for my evaluation metric
evaluator = RegressionEvaluator(predictionCol = "prediction",
                                labelCol = "label",
                                metricName = "rmse")

cv = CrossValidator(estimator = glr,
                    estimatorParamMaps = params,
                    evaluator = evaluator,
                    numFolds = 3)

cvModel = cv.fit(train_trans)

for i in range(len(params)):
    print("Model with config:\n\t {}".format(params[i]))
    print("Achieved avg metric of:\n\t {}".format(cvModel.avgMetrics[i]))
    print("")

Model with config:
	 {Param(parent='GeneralizedLinearRegression_423ab242c127f857f3ae', name='regParam', doc='regularization parameter (>= 0).'): 0.0}
Achieved avg metric of:
	 6.12011577548574

Model with config:
	 {Param(parent='GeneralizedLinearRegression_423ab242c127f857f3ae', name='regParam', doc='regularization parameter (>= 0).'): 0.5}
Achieved avg metric of:
	 6.1297596195314705

Model with config:
	 {Param(parent='GeneralizedLinearRegression_423ab242c127f857f3ae', name='regParam', doc='regularization parameter (>= 0).'): 1.0}
Achieved avg metric of:
	 6.150528658315142

In [15]:
#can't figure out how to get coefficients or pvalues!

summary = cvModel.bestModel.summary
print("Coefficients: " + str(summary.coefficients))
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("P Values: " + str(summary.pValues))

'GeneralizedLinearRegressionTrainingSummary' object has no attribute 'coefficients'
Traceback (most recent call last):
AttributeError: 'GeneralizedLinearRegressionTrainingSummary' object has no attribute 'coefficients'

