# Context
## Independent Expenditures
### What is an Independent Expenditure?
According to the <a href='https://www.fec.gov/help-candidates-and-committees/making-disbursements-pac/independent-expenditures-nonconnected-pac/'> Federal Election Commission documentation</a>, an Independent Expenditure is an expenditure for a communication that 'expressly advocates the election or defeat of a clearly identified federal candidate; and is not coordinated with a candidate, candidate’s committee, party committee or their agents.'

Political action committees make independent expenditures to support or opposed candidates.  Independent expenditures are not contributions to a candidate and are therefore not subject to contribution limits.

In this analysis we are finding the similarity between candidates, based on those committees which spend money to support or oppose them.  

# Analysis
## Calculating the Similarity Coefficient between candidates based on their shared contributors
### Overview
In this notebook we calculate the <a href=https://en.wikipedia.org/wiki/Jaccard_index> 'Jaccard Index', or Similarity Coefficient </a>, of candidates based on the identities of the committees who have spent money (independent expenditures) to support or oppose them.

Scores are between 0 and 100, with 100 being identical set of committee spenders and 0 being no shared spenders.  For example: two candidates have no similarity in terms of expenditures on them, they will have a Jaccard Index of 0.  Two candidates who have exactly the same committees spending money on them will have a Jaccard Index of 100.

This is useful in understanding patterns of giving and opposition, and how political action committees might be exercising influence across multiple candidates and races.

### Method
1. Obtain an RDD of the form (CandidateID, [list of contributor IDs])
2. Cross join of the RDD generated in step 1 with itself
3. Calculate the Jaccard Index of each set produced by the cross join
    - get the sum of the length of the sets of contributors for each candidate being compared
    - get the length of the union of the sets of contributors to each candidate
    - divide the union of the sets by the total length of the sets
    - multiple by 100
4. Return an RDD of the form (Jaccard Index, (Candidate1ID, Candidate2ID))
5. Print the return to the console

#### Author
Dan Budris <d.c.budris@gmail.com>

In [22]:
# Import dependencies
# Basic modules
import re
import datetime
from operator import add

# Data analysis modules
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [23]:
# Instantiate variables for processing data moving forward
spark = SparkSession.builder.appName('ElectionAnalyzer').getOrCreate()
datapath = '/Users/Dan/data/'

# Set the election year; modify this value to look at a different year
election_year = "2016"

# Set the file paths for the data file, based on the data path and election year
independent_expenditure_file = '{0}independent_expenditure_{1}.csv'.format(datapath, election_year)

In [24]:
# Read the CSV as a spark dataframe
ind_exp = spark.read.csv(independent_expenditure_file, header=True)

# Convert the dataframe to an RDD, and 
# filter out the empty lines
total_expenditures_rdd = (
    ind_exp
    .rdd
    .filter(lambda x: len(x) != 0)
)

In [25]:
# Define functions for preping and analyzing the data 

def prep_data(input_rdd, val_key, val_value):
    ''' A function to map and reduce an RDD based on a given key and value.
        Provided an object attribute to use as a key, and one to use as a value,
        will map the rdd to a tuple of (key, value) and then reduce by key by 
        addition
    '''
    _prepped_data = (
        input_rdd
        .filter(lambda x: (getattr(x, val_key) != 'None'))
        .map(lambda x: (getattr(x, val_key), [getattr(x, val_value)]))
        .reduceByKey(lambda x, y: x + y)
        .map(lambda x: (x[0], set(x[1])))
    )
    return _prepped_data
    
def calculate_similarity_coefficient(set_one, set_two, min_similar=None):
    ''' Function to calculate the Jaccard Index, given two sets
        Returns the Jaccard Index as a numeric percentage between 0 and 100
        Given a 'min_similar' value, will only return positive values for those 
        sets whose intersection is greater than or equal to the minimum similarity.
    '''
    _set_one = set(set_one)
    _set_two = set(set_two)
    _set_union_len = len(_set_one.union(_set_two))
    _set_intersect_len = len(_set_one.intersection(_set_two))
    if min_similar:
        if _set_intersect_len < min_similar:
            return 0
    _sim_coeff = _set_intersect_len/_set_union_len * 100
    return _sim_coeff
    
def rdd_similarity_coefficient_map(input_rdd):
    ''' Given an RDD of the form [(Identifier, [List of Characteristics])]
        This function will produce an RDD of the form [(Jaccard Index), Identifier1, Identifier2]
        based on the cartesian product of the RDD
    '''
    _coeff_sim = (
        input_rdd
        # get the cartesian product of the input rdd with itself
        .cartesian(input_rdd)
        # filter out the self * self products
        .filter(lambda x: x[0] != x[1])
        # calculate the Jaccard Index for each result of the cross-product
        # returning a nested tuple of (Jaccard Index, (cand1 id, cand2 id))
        .map(lambda x: (calculate_similarity_coefficient(x[0][1], x[1][1], min_similar=5), (x[0][0], x[1][0])))
    )
    return _coeff_sim

In [26]:
# Calculate the Jaccard similarity for each candidate the committee who has spent on that candidate
candidate_pairs_with_jaccard_index = (
    rdd_similarity_coefficient_map(
        prep_data(
            total_expenditures_rdd, 'cand_id', 'spe_id'
        )
    )
    #.map(lambda x: (x[0], frozenset([x[1][0], x[1][1]])))
    #.distinct()
)

In [27]:
# Collect the (similarity, candidate pair) RDD, printing to the console
candidate_pairs_with_jaccard_index.sortByKey(False).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 270, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'cand_id' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 370, in func
    return f(iterator)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 1876, in combineLocally
    merger.mergeValues(iterator)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 237, in mergeValues
    for k, v in iterator:
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-25-ad98b0cd20ac>", line 11, in <lambda>
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: cand_id

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'cand_id' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 2457, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 370, in func
    return f(iterator)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/rdd.py", line 1876, in combineLocally
    merger.mergeValues(iterator)
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 237, in mergeValues
    for k, v in iterator:
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-25-ad98b0cd20ac>", line 11, in <lambda>
  File "/Users/Dan/Desktop/stagingArea/FEC_data_notebooks/FEC_data_notebook/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: cand_id

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
