# Combining Features and Building Predictive Models

*Lost 144 candidates to not matching*

In [1]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.types as typ
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from functools import reduce
from pyspark.sql.functions import col, asc
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, DoubleType, DateType
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import pyspark.mllib.regression as reg
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [2]:
spark = SparkSession \
    .builder \
    .getOrCreate()

sc = spark.sparkContext

## Import Candidates

In [3]:
df_house = spark.read.csv('df_house.csv', inferSchema=True, header=True)

In [4]:
#clean up some titles
df_house = df_house.withColumnRenamed('candidatevotes', 'CAND_VOTES')
df_house = df_house.withColumnRenamed('totalvotes', 'TOTAL_VOTES')
df_house = df_house.withColumnRenamed('VOTE_percent', 'PERCENT_VOTES')

#drop index that is brought in
df_house = df_house.drop(col('_c0'))

In [5]:
df_house.printSchema()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- CAND_ELECTION_YR: integer (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: double (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- CAND_STATUS: string (nullable = true)
 |-- CAND_PCC: string (nullable = true)
 |-- CAND_CITY: string (nullable = true)
 |-- CAND_ST: string (nullable = true)
 |-- CAND_ZIP: double (nullable = true)
 |-- CAND_VOTES: integer (nullable = true)
 |-- TOTAL_VOTES: integer (nullable = true)
 |-- PERCENT_VOTES: double (nullable = true)
 |-- WINNER: integer (nullable = true)



Split into 2016, 2018 to add in features as they are year dependent

In [6]:
df_house16 = df_house.filter(df_house.CAND_ELECTION_YR == 2016)
df_house18 = df_house.filter(df_house.CAND_ELECTION_YR == 2018)

## Import Features

**2016:**

In [7]:
#reading in features 2016
avgsum_donation_16 = spark.read.csv('./features/avgsum_donation-16.csv', inferSchema=True, header=True)
num_big_donations_16 = spark.read.csv('./features/num_big_donations-16.csv', inferSchema=True, header=True)
num_out_of_state_donations_16 = spark.read.csv('./features/num_out_of_state_donations-16.csv', inferSchema=True, header=True)
numdonations16 = spark.read.csv('./features/numdonations16.csv', inferSchema=True, header=True)

**2018:**

In [8]:
#reading in features 2018
avgsum_donation_18 = spark.read.csv('./features/avgsum_donation-18.csv', inferSchema=True, header=True)
num_big_donations_18 = spark.read.csv('./features/num_big_donations-18.csv', inferSchema=True, header=True)
num_out_of_state_donations_18 = spark.read.csv('./features/num_out_of_state_donations-18.csv', inferSchema=True, header=True)
numdonations18 = spark.read.csv('./features/numdonations18.csv', inferSchema=True, header=True)

## Join Candidates to Features

**2016:**

In [9]:
df_house16 = df_house16.join(avgsum_donation_16, on='CAND_ID', how='left')
df_house16 = df_house16.withColumnRenamed('avgdonation','AVERAGE_DONATION')
df_house16 = df_house16.withColumnRenamed('sumdonation','TOTAL_DONATIONS')
df_house16 = df_house16.drop(col('_c0'))

df_house16 = df_house16.join(num_big_donations_16, on='CAND_ID', how='left')
df_house16 = df_house16.withColumnRenamed('numdonat','NUMBER_BIG_DONATIONS')
df_house16 = df_house16.drop(col('_c0'))

df_house16 = df_house16.join(num_out_of_state_donations_16, on='CAND_ID', how='left')
df_house16 = df_house16.withColumnRenamed('numdonat','NUMBER_OUT_OF_STATE_DONATIONS')
df_house16 = df_house16.drop(col('_c0'))

df_house16 = df_house16.join(numdonations16, on='CAND_ID', how='left')
df_house16 = df_house16.withColumnRenamed('numdonat','NUMBER_OF_DONATIONS')
df_house16 = df_house16.drop(col('_c0'))

#identification based on existence, so filling na values with 0 where none found
df_house16 = df_house16.fillna({'NUMBER_BIG_DONATIONS':0, 'NUMBER_OUT_OF_STATE_DONATIONS':0})

#not not all candidates were able to join - filter out those without contribution info
df_house16 = df_house16.filter(col('TOTAL_DONATIONS').isNotNull())

#simpler filters
df_house16 = df_house16.withColumn('CONCAT', F.concat(col('CAND_ELECTION_YR'),F.lit('_'),col('CAND_OFFICE_ST'),F.lit('_'),col('CAND_OFFICE_DISTRICT')))

# NEED TO FILTER OUT THE RACES WITH ONLY 1 CANDIDATE
# THEN ADJUST THE CALCULATIONS TO BE RELATIVE - going to be very messy

In [10]:
df_house16.select('CONCAT').distinct().count()

384

In [11]:
df_house16.select('CONCAT').count()

646

# CANNOT GET this filter to work correctly....no idea why

In [24]:
df_house168 = df_house16.filter(col('CONCAT').isin(c) == True)

In [25]:
df_house168.count()

143

In [220]:
df_house16.count()

646

In [None]:
df_house16.select('CONCAT')

In [232]:
df_house16.groupBy('CAND_ELECTION_YR','CAND_OFFICE_ST','CAND_OFFICE_DISTRICT').count().show()

+----------------+--------------+--------------------+-----+
|CAND_ELECTION_YR|CAND_OFFICE_ST|CAND_OFFICE_DISTRICT|count|
+----------------+--------------+--------------------+-----+
|            2016|            WA|                 6.0|    2|
|            2016|            CA|                16.0|    2|
|            2016|            IL|                18.0|    2|
|            2016|            NC|                 7.0|    2|
|            2016|            CA|                10.0|    2|
|            2016|            OR|                 4.0|    2|
|            2016|            CO|                 4.0|    2|
|            2016|            NY|                13.0|    2|
|            2016|            MO|                 2.0|    2|
|            2016|            MN|                 3.0|    2|
|            2016|            NC|                 6.0|    2|
|            2016|            CA|                29.0|    2|
|            2016|            VA|                10.0|    2|
|            2016|      

646 originally - 144 that only have 1 candidate = 502

In [13]:
a=df_house16.groupBy('CAND_ELECTION_YR','CAND_OFFICE_ST','CAND_OFFICE_DISTRICT')

In [14]:
a.count().filter("count == 1").count()

144

In [15]:
b = a.count().filter('count == 1').select('CAND_ELECTION_YR','CAND_OFFICE_ST','CAND_OFFICE_DISTRICT')

In [16]:
b= b.withColumn('Concat', F.concat(col('CAND_ELECTION_YR'),F.lit('_'),col('CAND_OFFICE_ST'),F.lit('_'),col('CAND_OFFICE_DISTRICT'))).select('Concat')

In [17]:
c= b.select('Concat').rdd.flatMap(lambda x: x).collect()

In [18]:
c

['2016_TX_13.0',
 '2016_TX_14.0',
 '2016_OH_2.0',
 '2016_CA_19.0',
 '2016_NY_12.0',
 '2016_AZ_6.0',
 '2016_GA_13.0',
 '2016_MI_13.0',
 '2016_PA_2.0',
 '2016_VA_11.0',
 '2016_KY_2.0',
 '2016_OH_3.0',
 '2016_MS_1.0',
 '2016_NJ_8.0',
 '2016_TX_19.0',
 '2016_TN_8.0',
 '2016_IL_1.0',
 '2016_OR_3.0',
 '2016_TX_4.0',
 '2016_OK_3.0',
 '2016_VA_1.0',
 '2016_TX_11.0',
 '2016_PA_13.0',
 '2016_HI_1.0',
 '2016_CA_5.0',
 '2016_MO_3.0',
 '2016_LA_5.0',
 '2016_MO_6.0',
 '2016_KY_5.0',
 '2016_TX_3.0',
 '2016_GA_8.0',
 '2016_TX_5.0',
 '2016_IN_3.0',
 '2016_TX_26.0',
 '2016_IL_17.0',
 '2016_SC_6.0',
 '2016_OK_1.0',
 '2016_SC_3.0',
 '2016_MN_4.0',
 '2016_CA_35.0',
 '2016_OH_5.0',
 '2016_MD_4.0',
 '2016_GA_6.0',
 '2016_LA_6.0',
 '2016_MO_8.0',
 '2016_NJ_10.0',
 '2016_GA_14.0',
 '2016_CA_28.0',
 '2016_MD_1.0',
 '2016_MI_2.0',
 '2016_TN_1.0',
 '2016_NY_15.0',
 '2016_GA_7.0',
 '2016_PA_6.0',
 '2016_CA_15.0',
 '2016_TX_12.0',
 '2016_OK_4.0',
 '2016_IN_1.0',
 '2016_KS_2.0',
 '2016_MD_7.0',
 '2016_AR_3.0',
 '201

**2018:**

In [76]:
df_house18 = df_house18.join(avgsum_donation_18, on='CAND_ID', how='left')
df_house18 = df_house18.withColumnRenamed('avgdonation','AVERAGE_DONATION')
df_house18 = df_house18.withColumnRenamed('sumdonation','TOTAL_DONATIONS')
df_house18 = df_house18.drop(col('_c0'))

df_house18 = df_house18.join(num_big_donations_18, on='CAND_ID', how='left')
df_house18 = df_house18.withColumnRenamed('numdonat','NUMBER_BIG_DONATIONS')
df_house18 = df_house18.drop(col('_c0'))

df_house18 = df_house18.join(num_out_of_state_donations_18, on='CAND_ID', how='left')
df_house18 = df_house18.withColumnRenamed('numdonat','NUMBER_OUT_OF_STATE_DONATIONS')
df_house18 = df_house18.drop(col('_c0'))

df_house18 = df_house18.join(numdonations18, on='CAND_ID', how='left')
df_house18 = df_house18.withColumnRenamed('numdonat','NUMBER_OF_DONATIONS')
df_house18 = df_house18.drop(col('_c0'))

#identification based on existence, so filling na values with 0 where none found
df_house18 = df_house18.fillna({'NUMBER_BIG_DONATIONS':0, 'NUMBER_OUT_OF_STATE_DONATIONS':0})

#not not all candidates were able to join - filter out those without contribution info
df_house18 = df_house18.filter(col('TOTAL_DONATIONS').isNotNull())

**Combine 2016 and 2018:**

In [77]:
df_house = reduce(DataFrame.unionAll, [df_house16,df_house18])

In [78]:
df_house.printSchema()

root
 |-- CAND_ID: string (nullable = true)
 |-- CAND_NAME: string (nullable = true)
 |-- CAND_PTY_AFFILIATION: string (nullable = true)
 |-- CAND_ELECTION_YR: integer (nullable = true)
 |-- CAND_OFFICE_ST: string (nullable = true)
 |-- CAND_OFFICE: string (nullable = true)
 |-- CAND_OFFICE_DISTRICT: double (nullable = true)
 |-- CAND_ICI: string (nullable = true)
 |-- CAND_STATUS: string (nullable = true)
 |-- CAND_PCC: string (nullable = true)
 |-- CAND_CITY: string (nullable = true)
 |-- CAND_ST: string (nullable = true)
 |-- CAND_ZIP: double (nullable = true)
 |-- CAND_VOTES: integer (nullable = true)
 |-- TOTAL_VOTES: integer (nullable = true)
 |-- PERCENT_VOTES: double (nullable = true)
 |-- WINNER: integer (nullable = true)
 |-- AVERAGE_DONATION: double (nullable = true)
 |-- TOTAL_DONATIONS: double (nullable = true)
 |-- NUMBER_BIG_DONATIONS: integer (nullable = false)
 |-- NUMBER_OUT_OF_STATE_DONATIONS: integer (nullable = false)
 |-- NUMBER_OF_DONATIONS: integer (nullable = t

In [79]:
df_house.count()

1406

**NEED TO WORK ON THE RELATIVE CALCULATION**

## Model Preparation

In [87]:
SEED = 1
training_fraction = [0.8, 0.2]
ITERS = 10
target = 'WINNER'
vars_to_keep = ['TOTAL_DONATIONS','AVERAGE_DONATION','NUMBER_OF_DONATIONS']

In [84]:
df_model = df_house.select([target]+vars_to_keep)

In [85]:
df_model.groupBy(target).count().show()

+------+-----+
|WINNER|count|
+------+-----+
|     1|  785|
|     0|  621|
+------+-----+



In [101]:
df_model.show(10)

+------+---------------+------------------+-------------------+
|WINNER|TOTAL_DONATIONS|  AVERAGE_DONATION|NUMBER_OF_DONATIONS|
+------+---------------+------------------+-------------------+
|     0|          600.0|             300.0|                  2|
|     0|         6300.0|             630.0|                 10|
|     1|       606972.0| 688.1768707482993|                882|
|     1|       204101.0|1607.0944881889764|                127|
|     1|       142575.0|1071.9924812030076|                133|
|     0|        26719.0|460.67241379310343|                 58|
|     0|        20450.0| 538.1578947368421|                 38|
|     1|       104530.0| 901.1206896551724|                116|
|     0|         4200.0|             600.0|                  7|
|     1|       154000.0|  649.789029535865|                237|
+------+---------------+------------------+-------------------+
only showing top 10 rows



By Individual Feature, building model and ranking AUC

In [97]:
## FROM ASSIGNMENT 7 ##

def compute_univariate_aucs(df, target, training_fraction, iters, seed):

    # split the data into train/test using seed
    data_train, data_test = df.randomSplit(training_fraction,seed=seed)
    
    # list of predictor variables
    vars = df.columns[1:]
    
    # results storage
    df_auc = pd.DataFrame(index=vars, columns=['weight','auroc'])    

    for v in vars:    
        print('=== analysis of variable: {}'.format(v))

        # create train and test dataframes with columns: target, v
        datai_tr = data_train.select(target,v)
        datai_te = data_test.select(target,v)

        # cast to LabeledPoint
        # train
        datai_tr_lp = datai_tr \
                     .rdd \
                     .map(lambda row: reg.LabeledPoint(row[0], row[1:]))
        
        # test
        datai_te_lp = datai_te \
                     .rdd \
                     .map(lambda row: reg.LabeledPoint(row[0], row[1:]))

        # train logistic regression, setting iterations, including intercept
        LR_Model = LogisticRegressionWithLBFGS.train(datai_tr_lp, iterations=iters, intercept=False)

        # from test set, zip labels with predicted labels and cast to float
        #p.label    # gives label
        #p.features # gives features
        
        act_pred_test_set = datai_te_lp.map(lambda p: (p.label, LR_Model.predict(p.features))) \
                                            .map(lambda row: (row[0], row[1] * 1.0))
        
        metrics = BinaryClassificationMetrics(act_pred_test_set)
        # metrics.areaUnderROC
        
        df_auc['weight'].loc[v] = LR_Model.weights  # store the weights
        df_auc['auroc'].loc[v] = metrics.areaUnderROC # extract AUROC
        print('=== completed analysis of variable: {}'.format(v))
        
    df_auc.sort_values(by='auroc', ascending=False, inplace=True)
    
    return df_auc

In [98]:
df_output = compute_univariate_aucs(df_model, target, training_fraction, ITERS, SEED)

=== analysis of variable: TOTAL_DONATIONS
=== completed analysis of variable: TOTAL_DONATIONS
=== analysis of variable: AVERAGE_DONATION
=== completed analysis of variable: AVERAGE_DONATION
=== analysis of variable: NUMBER_OF_DONATIONS
=== completed analysis of variable: NUMBER_OF_DONATIONS


In [99]:
df_output

Unnamed: 0,weight,auroc
TOTAL_DONATIONS,[5.095562920049984e-07],1
AVERAGE_DONATION,[5.146906493825712e-05],1
NUMBER_OF_DONATIONS,[8.923299131147139e-05],1
