In [183]:
import pyspark
import pandas as pd
from pyspark.conf import SparkConf
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

### Entry point to SparkSQL and Hive

In [20]:
spark = SparkSession.builder \
     .master("local") \
     .appName("Primary Results") \
     .getOrCreate()

In [28]:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

### Loading the data into pandas DF and then convert it to Spark DF

In [9]:
data_df = pd.read_csv('primary_results.csv')

In [10]:
data_df.head()

Unnamed: 0,state,state_abbreviation,county,fips,party,candidate,votes,fraction_votes
0,Alabama,AL,Autauga,1001.0,Democrat,Bernie Sanders,544,0.182
1,Alabama,AL,Autauga,1001.0,Democrat,Hillary Clinton,2387,0.8
2,Alabama,AL,Baldwin,1003.0,Democrat,Bernie Sanders,2694,0.329
3,Alabama,AL,Baldwin,1003.0,Democrat,Hillary Clinton,5290,0.647
4,Alabama,AL,Barbour,1005.0,Democrat,Bernie Sanders,222,0.078


In [29]:
data_spark = spark.createDataFrame(data_df)

### Some explorations

In [32]:
data_spark.take(1)

[Row(state='Alabama', state_abbreviation='AL', county='Autauga', fips=1001.0, party='Democrat', candidate='Bernie Sanders', votes=544, fraction_votes=0.182)]

In [33]:
data_spark.count()

24611

In [41]:
data_grouped = data_spark.groupby(['state', 'party'])

### Group by state

In [49]:
data_spark_rdd = data_spark.rdd

In [74]:
data_key_value_state = (data_spark_rdd.map(lambda x: (x.state, 
                               (x.candidate,
                                x.party,
                                x.votes,
                                x.fraction_votes,
                                x.fips)))
)

In [75]:
data_grouped = data_key_value_state.groupByKey()

In [158]:
party_mapping = {'Democrat':0,
                'Republican':1}

In [190]:
def model(x):
    column_names = ['candidate', 'party', 'votes', 'fraction_votes', 'fips']
    df = pd.DataFrame(x, columns=column_names)
    dummpy_vars = pd.get_dummies(df.candidate)
    df2 = pd.concat([dummpy_vars, df], axis=1)
    y = df2.party.apply(lambda x: party_mapping[x]).values 
    X = df2.drop(['party', 'candidate'], axis=1).values
    clf = LogisticRegression(random_state=1)
    clf.fit(X, y)
    y_pred = clf.predict(X)
    acc = accuracy_score(y, y_pred)
    return acc

In [197]:
data_model = (data_grouped.map(lambda x: (x[0], model(list(x[1]))
                            )
                 )
 )

In [199]:
data_model.take(1)[0]

('Arizona', 1.0)

In [195]:
data_grouped.take(1)

[('Arizona', <pyspark.resultiterable.ResultIterable at 0x111884668>)]