# Cleaning

In [None]:
# Importing necessary modules
import seaborn as sns
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
from operator import add
from functools import reduce
import numpy as np
import re
import os
from pyspark.sql.types import StructField, StructType, StringType, LongType, FloatType
from pyspark.sql.functions import *
import random
from pyspark.ml.feature import StandardScaler, VectorAssembler, Imputer, StringIndexer
from pyspark.ml.functions import vector_to_array
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
import time

# Setting up visualization
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline

In [None]:
cols_to_keep = [
    "Voters_Gender", # cat
#     "Voters_Age", # num
    "Voters_BirthDate", # ignore
    "Residence_Families_HHCount", # num
    "Residence_HHGender_Description", # cat
    "Mailing_Families_HHCount", # num
    "Mailing_HHGender_Description", # cat

#   !! voter party affiliation
    "Parties_Description", 
    
    # cat
    "CommercialData_PropertyType",
    "AddressDistricts_Change_Changed_CD",
    "AddressDistricts_Change_Changed_SD",
    "AddressDistricts_Change_Changed_HD",
    "AddressDistricts_Change_Changed_County",
    
    "Residence_Addresses_Density", # num
    
    # cat
    "CommercialData_EstimatedHHIncome",
    "CommercialData_ISPSA",
    # num
    "CommercialData_AreaMedianEducationYears",
    "CommercialData_AreaMedianHousingValue",
#    "CommercialData_MosaicZ4Global",
    # cat
     "CommercialData_AreaPcntHHMarriedCoupleNoChild",  
     "CommercialData_AreaPcntHHMarriedCoupleWithChild",
     "CommercialData_AreaPcntHHSpanishSpeaking",
     "CommercialData_AreaPcntHHWithChildren",
     "CommercialData_StateIncomeDecile",
#    "Ethnic_Description",
    "EthnicGroups_EthnicGroup1Desc",
    "CommercialData_DwellingType",
    "CommercialData_PresenceOfChildrenCode",
#    "CommercialData_PresenceOfPremCredCrdInHome", ## too many missing
    "CommercialData_DonatesToCharityInHome",
    "CommercialData_DwellingUnitSize",
    "CommercialData_ComputerOwnerInHome",
    "CommercialData_DonatesEnvironmentCauseInHome",
    "CommercialData_Education",
    
#   Don't include because of lookahead bias  
#     "Voters_VotingPerformanceEvenYearGeneral",
#     "Voters_VotingPerformanceEvenYearPrimary",
#     "Voters_VotingPerformanceEvenYearGeneralAndPrimary",
#     "Voters_VotingPerformanceMinorElection",
    
#   Other control variables that expect to be highly associated with outcome:
#     "ElectionReturns_P08CountyTurnoutAllRegisteredVoters",
#     "ElectionReturns_P08CountyTurnoutDemocrats",
#     "ElectionReturns_P08CountyTurnoutRepublicans",
    "General_2000",
    "General_2004",
    "PresidentialPrimary_2000",
    "PresidentialPrimary_2004",
        
#   Outcome variable (indiana law happens in 2005, approved by SCOTUS before presidential election in 2008)
    "General_2008"
]

In [None]:
# These are the states that do not have strict voter ID laws:
#  'VM2Uniform--CA--2021-05-02',	VM2Uniform--CA--2021-05-02	CA	x	California
#  'VM2Uniform--IL--2021-03-05',	VM2Uniform--IL--2021-03-05	IL	x	Illinois
#  'VM2Uniform--MA--2021-01-19',	VM2Uniform--MA--2021-01-19	MA	x	Massachusetts
#  'VM2Uniform--MD--2021-02-15',	VM2Uniform--MD--2021-02-15	MD	x	Maryland
#  'VM2Uniform--ME--2021-05-28',	VM2Uniform--ME--2021-05-28	ME	x	Maine
#  'VM2Uniform--MN--2021-02-14',	VM2Uniform--MN--2021-02-14	MN	x	Minnesota
#  'VM2Uniform--NC--2021-05-18',	VM2Uniform--NC--2021-05-18	NC	x	North Carolina
#  'VM2Uniform--NE--2021-01-20',	VM2Uniform--NE--2021-01-20	NE	x	Nebraska
#  'VM2Uniform--NJ--2021-03-11',	VM2Uniform--NJ--2021-03-11	NJ	x	New Jersey
#  'VM2Uniform--NM--2021-02-25',	VM2Uniform--NM--2021-02-25	NM	x	New Mexico
#  'VM2Uniform--NV--2021-06-13',	VM2Uniform--NV--2021-06-13	NV	x	Nevada
#  'VM2Uniform--NY--2021-03-15',	VM2Uniform--NY--2021-03-15	NY	x	New York
#  'VM2Uniform--OR--2021-02-05',	VM2Uniform--OR--2021-02-05	OR	x	Oregon
#  'VM2Uniform--PA--2021-05-20',	VM2Uniform--PA--2021-05-20	PA	x	Pennsylvania
#  'VM2Uniform--VT--2021-05-28',	VM2Uniform--VT--2021-05-28	VT	x	Vermont

# For each of these states, I want to pull enough samples to get a total sample of 1/2 M; can increase later

# grab files
states =  [
# For now, just exclude New York and Califonria, because the parquet files take too long to read
'VM2Uniform--VT--2021-05-28', 
'VM2Uniform--IL--2021-03-05',
'VM2Uniform--MA--2021-01-19',
'VM2Uniform--MD--2021-02-15',
'VM2Uniform--ME--2021-05-28',
'VM2Uniform--MN--2021-02-14',
'VM2Uniform--NC--2021-05-18',
'VM2Uniform--NE--2021-01-20',
'VM2Uniform--NJ--2021-03-11',
'VM2Uniform--NM--2021-02-25',
'VM2Uniform--NV--2021-06-13',
'VM2Uniform--OR--2021-02-05',
'VM2Uniform--PA--2021-05-20',
'VM2Uniform--CA--2021-05-02',
'VM2Uniform--NY--2021-03-15',
]

# bucket file path for all state parquet files
gcs_path = 'gs://pstat135-voter-file/VM2Uniform'

# create list of state abbreviations
pattern = re.compile(r"(?<=--)[A-Z]{2}")
state_abvs = re.findall(pattern, ''.join(states))

# do first iteration
print('VM2Uniform--VT--2021-05-28')

# num_per_state = 500

df_ref = spark.read.parquet("/".join([gcs_path, 'VM2Uniform--VT--2021-05-28']))
df_ref = df_ref.select(cols_to_keep)

numrows = {'VM2Uniform--VT--2021-05-28': df_ref.count()}

print("%d" % (numrows['VM2Uniform--VT--2021-05-28']))
    
# percentage_sample = num_per_state / numrows['VM2Uniform--VT--2021-05-28']
    
# df_ref = df_ref.sample(True, percentage_sample, seed = 19480384)
df_ref = df_ref.withColumn('STATE', F.lit(state_abvs[0]))
 
next_states = states[1:]

# do the rest of the iterations
for i, one_state in enumerate(next_states):

    print("%s: " % (one_state), end="")
    
    # read dataframe for one_state
    tmp_ref = spark.read.parquet("/".join([gcs_path, one_state]))
    tmp_ref = tmp_ref.select(cols_to_keep)
    numrows[one_state] = tmp_ref.count()
    print("%d" % (numrows[one_state]))
    
#     percentage_sample = num_per_state / numrows[one_state]
    
#     tmp_ref = tmp_ref.sample(True, percentage_sample, seed = 19480384)
    tmp_ref = tmp_ref.withColumn('STATE', F.lit(state_abvs[i+1]))
    
    df_ref = df_ref.union(tmp_ref)      

df_ref.printSchema()
df_ref.count()

### FUNCTIONS TO CLEAN DATASET

In [None]:
def clean_numeric_categorical(input_df: DataFrame) -> DataFrame:
    
    # remove special symbols ($, %) from relevant columns
    input_df = input_df.withColumn(
        "CommercialData_AreaMedianHousingValue",
        F.expr("substring(CommercialData_AreaMedianHousingValue, 2, length(CommercialData_AreaMedianHousingValue))"))

    pct = ["CommercialData_AreaPcntHHMarriedCoupleNoChild",  
           "CommercialData_AreaPcntHHMarriedCoupleWithChild",
           "CommercialData_AreaPcntHHSpanishSpeaking",
           "CommercialData_AreaPcntHHWithChildren"]

    for c in pct:
        input_df = input_df.withColumn(
            c,
            F.expr(f"substring({c}, 1, length({c})-1)")
        )
    input_df.select(["CommercialData_AreaMedianHousingValue"]+pct).show()

    numeric_cols = [
        'Residence_Families_HHCount',
        'Mailing_Families_HHCount',
        'Residence_Addresses_Density',
        "CommercialData_AreaMedianEducationYears",
        "CommercialData_AreaMedianHousingValue"
    ] + pct

    trinary_cols = [
        'CommercialData_DonatesToCharityInHome',
        'CommercialData_ComputerOwnerInHome',
        'CommercialData_DonatesEnvironmentCauseInHome'
    ]

    binary_cols = []

    dont_touch_cols = [
        "General_2008", 
        "Voters_BirthDate", 
        "General_2000",
        "General_2004",
        "PresidentialPrimary_2000",
        "PresidentialPrimary_2004"
    ]
    
    other_cols = [c for c in input_df.columns if c not in dont_touch_cols]
    other_cols = [c for c in other_cols if c not in (numeric_cols + trinary_cols + binary_cols)]

    categorical_cols = other_cols + binary_cols + trinary_cols

    for c in numeric_cols:
        input_df = input_df.withColumn(c, F.col(c).cast("float").alias(c))
    input_df = input_df.fillna("U", subset= trinary_cols)
    input_df = input_df.fillna("Missing", subset = other_cols)
#   input_df = input_df.fillna("N", subset = binary_cols)
    
    return input_df

In [None]:
df_ref = clean_numeric_categorical(df_ref)
df_ref.printSchema()

In [None]:
def impute_values_function(input_df: DataFrame) -> DataFrame:

    # Create copy of working df
    input_df = input_df.alias('input_df')

    # Impute the missing values in the numerical columns with the mean -- minimize change to z-scores of given data
    imputer = Imputer(
        inputCols=numeric_cols, 
        outputCols=["{}_imp".format(c) for c in numeric_cols]
    )

    input_df = imputer.fit(input_df).transform(input_df)

    # Impute categorical columns -- maybe it's better to drop these records
    # input_df = input_df.fillna("missing", subset = categorical_cols)

    indexed_cols = [f"{c}_ind" for c in categorical_cols]

    # Ecode categorical variables
    indexer = StringIndexer(inputCols = categorical_cols, outputCols = indexed_cols)
    input_df = indexer.fit(input_df).transform(input_df)
    
    return input_df

In [None]:
df_ref = impute_values_function(df_ref)
df_ref.printSchema()

In [None]:
df_ref.select("Voters_BirthDate").show(10)

In [None]:
def clean_voter_participation(input_df: DataFrame) -> DataFrame:

    yrs_add = 18
    months_add = 18*12

    # date of national 
    target_month_day_presidential = "11-03"

    # date of presidential primary (ideally we should do this state by state, but this is the date for Indiana's)
    target_month_day_primary = "05-03" 

    input_df = input_df.withColumn("DATE_18", add_months(to_date(col("Voters_BirthDate"),"MM/dd/yyyy"), months_add))
    input_df.select(["Voters_BirthDate", "DATE_18"]).show(10)
    input_df = input_df.dropna(subset = "Voters_BirthDate")
    input_df = input_df.withColumn("YEAR_18", year("DATE_18"))
    input_df = input_df.withColumn("comparator_date_presidential", to_date(concat(col("YEAR_18"), lit("-"), lit(target_month_day_presidential))))
    input_df = input_df.withColumn("comparator_date_primary", to_date(concat(col("YEAR_18"), lit("-"), lit(target_month_day_primary))))

    for election in ["PRESIDENTIAL", "PRIMARY"]:
        input_df = input_df.withColumn(f"YEAR_ELIGIBLE_TO_VOTE_{election}", \
                                    when(col("DATE_18")<=col(f"comparator_date_{election.lower()}"), col("YEAR_18")) \
                                   .otherwise(col("YEAR_18") + 1) \
                                  )

    # check no missing vals:
    input_df.where(col("YEAR_18").isNull()).select("YEAR_18").show(10)

    # get rid of rows where the voter was not old enough to vote in the 2008 general election
    input_df = input_df.filter(col("YEAR_ELIGIBLE_TO_VOTE_PRESIDENTIAL")<=2008).fillna("N", subset = ["General_2008"])

    # for the 2000 and 2004 general elections, replace with "N" IF the person was old enough to vote at the time

    for election in ["2000", "2004"]:
        input_df = input_df.withColumn(f"General_{election}", \
                                   when((col("YEAR_ELIGIBLE_TO_VOTE_PRESIDENTIAL")<= int(election)) & \
                                        (col(f"General_{election}").isNull()), "N") \
                                   .otherwise(col(f"General_{election}")) \
                                  )

        input_df = input_df.withColumn(f"PresidentialPrimary_{election}", \
                                   when((col("YEAR_ELIGIBLE_TO_VOTE_PRIMARY")<= int(election)) & \
                                        (col(f"PresidentialPrimary_{election}").isNull()), "N") \
                                   .otherwise(col(f"PresidentialPrimary_{election}")) \
                                  )

    # make the general voting for 2008 a numeric variable; since we've deleted
    # everyone who was not eligible to vote, this can be directly calculated with a 1-0.
    input_df = input_df.withColumn("Voted_General_2008", when(input_df.General_2008 == "Y",1).otherwise(0))
    input_df = input_df.drop("General_2008")
    
    return input_df

In [None]:
df_ref = clean_voter_participation(df_ref)
df_ref.printSchema()

In [None]:
indi.select("Voters_BirthDate").show(10)

In [None]:
# do the same process for Indiana:
indi = spark.read.parquet("gs://voter-project-235-25/VM2Uniform--IN--2021-01-15_parq")
indi = indi.sample(True, 0.1, seed = 19480384)
indi = indi.select(cols_to_keep)
indi = indi.withColumn("STATE", lit("IN"))

indi = clean_numeric_categorical(indi)
indi = impute_values_function(indi)
indi = clean_voter_participation(indi)
indi.printSchema()

In [None]:
indi.columns

# Estimator with logistic regression index model

In [None]:
new_df.groupBy('General_2004').count().show()

indi_full = spark.read.parquet("gs://voter-project-235-25/VM2Uniform--IN--2021-01-15_parq")
indi = indi_full.select('General_2004')

indi.groupBy("General_2004").count().show()

new_df = new_df.fillna("N", subset = "General_2008")

indexer = StringIndexer(inputCol = "General_2008", outputCol = "label")
new_df = indexer.fit(new_df).transform(new_df)

new_df.select(
    ["{}_imp".format(c) for c in numeric_cols] + indexed_cols + ["label"]    
).printSchema()

new_df = new_df.select(
    ["{}_imp".format(c) for c in numeric_cols] + indexed_cols + ["label"]    
)

from pyspark.ml.feature import RFormula
supervised = RFormula(formula="label ~ .")

fittedRF = supervised.fit(new_df)

preparedDF = fittedRF.transform(new_df)

preparedDF.select("features").show(n=10, truncate=False)

train, test = new_df.randomSplit([0.7, 0.3], seed = 42069)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()

lrModel = lr.fit(train)

featureCols = pd.DataFrame(preparedDF.schema["features"].metadata["ml_attr"]["attrs"]["nominal"]+
  preparedDF.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")

featureCols = featureCols.set_index('idx')
featureCols.head()

plt.rcParams["figure.figsize"] = (8,6)

beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')

coefsArray = np.array(lrModel.coefficients)  # convert to np.array
coefsDF = pd.DataFrame(coefsArray, columns=['coefs'])  # to pandas

coefsDF = coefsDF.merge(featureCols, left_index=True, right_index=True)  # join it with featureCols we created above
coefsDF.sort_values('coefs', inplace=True)  # Sort them
coefsDF.head()

plt.rcParams["figure.figsize"] = (20,3)

plt.xticks(rotation=90)
plt.bar(coefsDF.name, coefsDF.coefs)
plt.title('Ranked coefficients from the logistic regression model')
plt.show()

df_ref.write.format("parquet").save("total_reference_sample")
df_ref = spark.read.parquet("total_reference_sample")

# Propensity score estimator

In [None]:
num_voters_indiana = indi.count()
num_voters_not_indiana = df_ref.count()
pct_sample = num_voters_indiana / num_voters_not_indiana

# empty list to store the estimated average treatment effects:
ATEs = []

# empty dictionary to store output
stored_DF = {}

# start_time = time.time()

for i in range(1):
    i = i + 1 # from 1 - 100 rather than 0 to 100

    # take random sample of the total parquet file (equivalent to the size of indiana)
    df_ref = df_ref.sample(True, pct_sample, seed = i)

    # create dummy DATA:
#     indi = indi.withColumn('Voters_Age', rand())
#     df_ref = df_ref.withColumn('Voters_Age', rand())
    
#     indi = indi.select(["Voters_Age"])
#     df_ref = df_ref.select(["Voters_Age"])

#     indi = indi.withColumn('Voters_Age', col('Voters_Age').cast('double'))
#     df_ref = df_ref.withColumn('Voters_Age', col('Voters_Age').cast('double'))

#     indi = indi.withColumn('General_2008_RANDOM', when(rand() > 0.5, 1).otherwise(0))
#     df_ref = df_ref.withColumn('General_2008_RANDOM', when(rand() > 0.5, 1).otherwise(0))

    # create a column with "LAW == 0" for non-Indiana states
    df_ref = df_ref.withColumn("LAW", lit(0))

    # create a column with "LAW == 1" for Indiana
    indi = indi.withColumn("LAW", lit(1))

    # union the two together
    df = df_ref.union(indi)

    cols_excluded_from_regression = [
        'Voters_BirthDate', # removed this, but KEPT the YEAR that the voter turned 18.
        'STATE',
        'STATE_ind',
        'DATE_18',
        'comparator_date_presidential',
        'comparator_date_primary',
        'YEAR_ELIGIBLE_TO_VOTE_PRESIDENTIAL',
        'YEAR_ELIGIBLE_TO_VOTE_PRIMARY',
        'Voted_General_2008'
    ]
    
    df_input_logistic = df.drop(*cols_excluded_from_regression)

    # fit logistic model on the intervention (variable Law)
    nrow_df_input_logistic_start_check = df_input_logistic.count()
#     df_input_logistic = df_input_logistic.fillna(0)
    df_input_logistic.columns
    supervised = RFormula(formula="LAW ~ .")
    fittedRF = supervised.fit(df_input_logistic) # inspect column types
    prepareddf_input_logistic = fittedRF.transform(df_input_logistic) # create feature and label columns
    prepareddf_input_logistic.show(5, truncate = False)
    lr = LogisticRegression(labelCol="label",featuresCol="features")
    print(lr.explainParams())
    lrModel = lr.fit(prepareddf_input_logistic) # train model
    lrModel.transform(prepareddf_input_logistic).select("label", "prediction") # fitted values

    # get a propensity score from the probability as a new column:
    fitted = lrModel.transform(prepareddf_input_logistic)
    fitted = fitted.withColumn('probability', vector_to_array('probability'))
    array_mean = udf(lambda x: float(np.mean(x)), FloatType())
    fitted = fitted.withColumn("propensity_score", array_mean("probability"))

    fitted.count() == nrow_df_input_logistic_start_check

    # new column that called weight that is T - PS / (PS * 1 - PS)
    fitted = fitted.withColumn("weight", (col("label") - col("propensity_score")) / (col("propensity_score") * (1-col("propensity_score"))))

    # merge back in the 2008 general election OUTCOME data
    fitted = fitted.withColumn("row_id", monotonically_increasing_id())
    df = df.withColumn("row_id", monotonically_increasing_id())
    num_row_df_prior = df.count()
    df = df.join(fitted, ["row_id", "Voters_Age", "LAW"]).drop("row_id")
    num_row_df_prior == df.count()

    # calculate the weighted average
    df = df.withColumn("weighted_outcome", col("General_2008_RANDOM") * col("weight"))

    # store weighted average into list
    ATE_this_round = df.agg(avg(col("weighted_outcome"))).collect()[0][0]
    ATEs.append(ATE_this_round)

    # store the DataFrame into a dictionary
    stored_DF[f"{i}"] = df
    
# end_time = time.time()
# print("Execution time: {:.2f} seconds".format(end_time - start_time))

In [None]:
df.show(10)

In [None]:
ATEs

In [None]:
range(1)

In [None]:
start_time = time.time()
end_time = time.time()
print("Execution time: {:.2f} seconds".format(end_time - start_time))