# Setup

Let's import and initialize Spark, along with all the other libraries we're using (this might take a while).

In [1]:
# Uncomment these if do this homework locally
# Do not run this if you are on hive
# from local_install import setup_environment
# setup_environment()

import folium
import pyspark
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
import os
%pylab inline
%pylab notebook

Populating the interactive namespace from numpy and matplotlib
Populating the interactive namespace from numpy and matplotlib


In [2]:
sc = pyspark.SparkContext()
sql = pyspark.sql.SQLContext(sc)

# 1. DataFrames

## 1a. File Format Wrangling

Implement the loading of data files into DataFrame objects.

### Begin Student Code Here

In [3]:
root_path = "/home/ff/cs186/sp16/fec_2016_3_25"

# file_struct objects
com_file = {'header': "cm_header_file.csv", 'data': "cm.txt" }
can_file = {'header': "cn_header_file.csv", 'data': "cn.txt" }
com_can_link_file = {'header': "ccl_header_file.csv", 'data': "ccl.txt" }
indv_file = {'header': "indiv_header_file.csv", 'data': "itcont.txt" }
pas_file = {'header': "pas2_header_file.csv", 'data': "itpas2.txt" }
com_links_file = {'header': "oth_header_file.csv", 'data': "itoth.txt" }

def load_header(filename):
    """
    Given a header .csv file, return a list containing the names of all columns in the table.
    
    Input:
    filename: a string specifying the header .csv file to load
    
    Output:
    A list containing column names of the table
    """
    with open(filename, "r") as f:
        return [r.replace('\r\n','') for r in f.readline().split(",")]
    
def load_dataframe(file_struct):
    """
    Given a dictionary representing the locations of FEC raw files corresponding to a table,
    load the tables into a Spark DataFrame.
    
    Input:
    file_struct: a dictionary containing the keys 'header' and 'data', where
     'header' contains the name of the `.csv` file specifying the table header file, and
     'data' contains the name of the `.txt` file specifying the table data file
     
    Output:
    A DataFrame which contains the loaded data.
    """    
    header_name = file_struct['header']
    data_name = file_struct['data']
        
    schemaString = load_header(root_path + '/' + header_name)

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
    
    schema = StructType(fields)
    

    lines = sc.textFile(root_path + '/' + data_name)
    parts = lines.map(lambda l: l.split("|"))
    input_data = parts.map(lambda p: p)
    
    df = sql.createDataFrame(input_data, schema)

    
    return df.cache()

### End Student Code Here

We'll load these files into Spark now, and register them as temporary SQL tables.

In [4]:
com = load_dataframe(com_file)
can = load_dataframe(can_file)
links = load_dataframe(com_can_link_file)
indv = load_dataframe(indv_file)
pas = load_dataframe(pas_file)
com_links = load_dataframe(com_links_file)

In [5]:
com.registerTempTable("com")
can.registerTempTable("can")
links.registerTempTable("links")
indv.registerTempTable("indv")
pas.registerTempTable("pas")
com_links.registerTempTable("com_links")

In [6]:
com.show()

+---------+--------------------+--------------------+--------------------+---------------+-------------+-------+---------+---------+-------+--------------------+----------------+------+--------------------+-------+
|  CMTE_ID|             CMTE_NM|             TRES_NM|            CMTE_ST1|       CMTE_ST2|    CMTE_CITY|CMTE_ST| CMTE_ZIP|CMTE_DSGN|CMTE_TP|CMTE_PTY_AFFILIATION|CMTE_FILING_FREQ|ORG_TP|    CONNECTED_ORG_NM|CAND_ID|
+---------+--------------------+--------------------+--------------------+---------------+-------------+-------+---------+---------+-------+--------------------+----------------+------+--------------------+-------+
|C00000059|  HALLMARK CARDS PAC|    BROWER, ERIN MS.|          2501 MCGEE|         MD#288|  KANSAS CITY|     MO|    64108|        U|      Q|                 UNK|               M|     C|HALLMARK CARDS, INC.|       |
|C00000422|AMERICAN MEDICAL ...|       WALKER, KEVIN|25 MASSACHUSETTS ...|      SUITE 600|   WASHINGTON|     DC|    20001|        B|      Q|

## 1b. Basic Analytics Queries

As a demonstration, let's query for information corresponding to the strings "Clinton", "Sanders", "Trump", and "Cruz". Try these queries. How does this output look?

In [7]:
front_runners = can.where(can.CAND_NAME.like("%CLINTON%") | can.CAND_NAME.like("%SANDERS%") 
          | can.CAND_NAME.like("%TRUMP%") | can.CAND_NAME.like("%CRUZ%") )\
    .select("CAND_ID", "CAND_NAME", "CAND_STATUS", "CAND_OFFICE")
front_runners.show(truncate=False)

+---------+-----------------------------------+-----------+-----------+
|CAND_ID  |CAND_NAME                          |CAND_STATUS|CAND_OFFICE|
+---------+-----------------------------------+-----------+-----------+
|H2CA24153|THAYNE, DAVID CRUZ                 |P          |H          |
|H2MI08105|HETRICK, BRIAN CLINTON             |N          |H          |
|H2TX33123|SANDERS, KENNETH                   |P          |H          |
|H6NY05051|SANDERS, JAMES JR                  |C          |H          |
|H6NY09137|SANDERS, JAIME                     |N          |H          |
|H8OH02041|SANDERS, CHARLES W                 |P          |H          |
|P00003392|CLINTON, HILLARY RODHAM            |C          |P          |
|P60006111|CRUZ, RAFAEL EDWARD "TED"          |C          |P          |
|P60007168|SANDERS, BERNARD                   |C          |P          |
|P60012184|BALLSACK, DONALD TRUMP'S HAIRY KING|N          |P          |
|P60012333|CLINTON, BILL                      |N          |P    

In [8]:
front_runners = sql.sql("""
SELECT CAND_ID, CAND_NAME, CAND_STATUS, CAND_OFFICE
FROM can
WHERE CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%SANDERS%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%CRUZ%"
""")
front_runners.registerTempTable("fr")
front_runners.show(truncate=False)

+---------+-----------------------------------+-----------+-----------+
|CAND_ID  |CAND_NAME                          |CAND_STATUS|CAND_OFFICE|
+---------+-----------------------------------+-----------+-----------+
|H2CA24153|THAYNE, DAVID CRUZ                 |P          |H          |
|H2MI08105|HETRICK, BRIAN CLINTON             |N          |H          |
|H2TX33123|SANDERS, KENNETH                   |P          |H          |
|H6NY05051|SANDERS, JAMES JR                  |C          |H          |
|H6NY09137|SANDERS, JAIME                     |N          |H          |
|H8OH02041|SANDERS, CHARLES W                 |P          |H          |
|P00003392|CLINTON, HILLARY RODHAM            |C          |P          |
|P60006111|CRUZ, RAFAEL EDWARD "TED"          |C          |P          |
|P60007168|SANDERS, BERNARD                   |C          |P          |
|P60012184|BALLSACK, DONALD TRUMP'S HAIRY KING|N          |P          |
|P60012333|CLINTON, BILL                      |N          |P    

That didn't work out so well. But notice how both queries got the same results! For the rest of the questions, you are allowed to write the queries in either form. Use the DataFrames that you created in part 1a. They are listed again here for reference:

* **com** contains committee information [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryCommitteeMaster.shtml)
* **can** contains candidate information [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryCandidateMaster.shtml)
* **links** contains linkage between committees and candidates [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryCandCmteLinkage.shtml)
* **indv** contains individual contributions to committees [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryContributionsbyIndividuals.shtml)
* **pas** contains contributions between committees [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryContributionstoCandidates.shtml)
* **com_links** links between committees [Details](http://www.fec.gov/finance/disclosure/metadata/DataDictionaryCommitteetoCommittee.shtml)

### Begin Student Code Here

Q1.

In [9]:
front_runners = sql.sql("""
SELECT CAND_ID, CAND_NAME, CAND_PCC
FROM can
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
""")

# AND (CAND_OFFICE_ST LIKE "%US%")
# AND (CAND_ELECTION_YR LIKE "%2016%")

q1 = front_runners.select("CAND_ID", "CAND_NAME", "CAND_PCC")
q1.show(truncate=False)

+---------+-------------------------+---------+
|CAND_ID  |CAND_NAME                |CAND_PCC |
+---------+-------------------------+---------+
|P00003392|CLINTON, HILLARY RODHAM  |C00575795|
|P60006111|CRUZ, RAFAEL EDWARD "TED"|C00574624|
|P60007168|SANDERS, BERNARD         |C00577130|
|P80001571|TRUMP, DONALD J          |C00580100|
+---------+-------------------------+---------+



**Q1 Expected Output:**
```
+---------+-------------------------+---------+
|CAND_ID  |CAND_NAME                |CAND_PCC |
+---------+-------------------------+---------+
|P00003392|CLINTON, HILLARY RODHAM  |C00575795|
|P60006111|CRUZ, RAFAEL EDWARD "TED"|C00574624|
|P60007168|SANDERS, BERNARD         |C00577130|
|P80001571|TRUMP, DONALD J          |C00580100|
+---------+-------------------------+---------+
```

Q2.

In [10]:
num_indv_contributions = sql.sql("""
SELECT CAND_ID, CAND_NAME, count(*) as count
FROM indv JOIN can 
ON indv.CMTE_ID = can.CAND_PCC
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
GROUP BY CAND_ID, CAND_NAME
""")

# WHERE CAND_ID LIKE "P%"
# AND (CAND_NAME LIKE "%CLINTON, HILLARY RODHAM%"
# OR CAND_NAME LIKE '%CRUZ, RAFAEL EDWARD "TED"%'
# OR CAND_NAME LIKE "%TRUMP, DONALD J%"
# OR CAND_NAME LIKE "%SANDERS, BERNARD%")

q2 = num_indv_contributions.select("CAND_ID", "CAND_NAME", 'count')
q2.show(truncate=False)

+---------+-------------------------+-----+
|CAND_ID  |CAND_NAME                |count|
+---------+-------------------------+-----+
|P80001571|TRUMP, DONALD J          |3006 |
|P60006111|CRUZ, RAFAEL EDWARD "TED"|34699|
|P60007168|SANDERS, BERNARD         |34524|
|P00003392|CLINTON, HILLARY RODHAM  |71249|
+---------+-------------------------+-----+



**Q2 Expected Output:**
```
+---------+-------------------------+-----+
|CAND_ID  |CAND_NAME                |count|
+---------+-------------------------+-----+
|P80001571|TRUMP, DONALD J          |3006 |
|P60006111|CRUZ, RAFAEL EDWARD "TED"|34699|
|P60007168|SANDERS, BERNARD         |34524|
|P00003392|CLINTON, HILLARY RODHAM  |71249|
+---------+-------------------------+-----+
```

Q3.

In [11]:
indv_contributions_amt = sql.sql("""
SELECT CAND_ID, CAND_NAME, sum(TRANSACTION_AMT) as SUM
FROM indv JOIN can 
ON indv.CMTE_ID = can.CAND_PCC
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
GROUP BY CAND_ID, CAND_NAME
""")

# WHERE CAND_ID LIKE "P%"
# AND (CAND_NAME LIKE "%CLINTON, HILLARY RODHAM%"
# OR CAND_NAME LIKE '%CRUZ, RAFAEL EDWARD "TED"%'
# OR CAND_NAME LIKE "%TRUMP, DONALD J%"
# OR CAND_NAME LIKE "%SANDERS, BERNARD%")

# 
# We will also accept a solution in either format.
# Be sure to comment out the one you don't use: 
q3 = indv_contributions_amt.select("CAND_ID", "CAND_NAME", "SUM")
# q3 = indv_contributions_amt.select("CAND_ID", "CAND_NAME", "sum(TRANSACTION_AMT)")
q3.show(truncate=False)

+---------+-------------------------+-----------+
|CAND_ID  |CAND_NAME                |SUM        |
+---------+-------------------------+-----------+
|P80001571|TRUMP, DONALD J          |1994976.0  |
|P60006111|CRUZ, RAFAEL EDWARD "TED"|2.4785175E7|
|P60007168|SANDERS, BERNARD         |4.9001141E7|
|P00003392|CLINTON, HILLARY RODHAM  |9.5996062E7|
+---------+-------------------------+-----------+



**Q3 Expected Output:**
```
+---------+-------------------------+--------------------+
|CAND_ID  |CAND_NAME                |sum(TRANSACTION_AMT)|
+---------+-------------------------+--------------------+
|P80001571|TRUMP, DONALD J          |1994976.0           |
|P60006111|CRUZ, RAFAEL EDWARD "TED"|2.4785175E7         |
|P60007168|SANDERS, BERNARD         |4.9001141E7         |
|P00003392|CLINTON, HILLARY RODHAM  |9.5996062E7         |
+---------+-------------------------+--------------------+
```

Q4.

In [12]:
linked_committees = sql.sql("""
SELECT can.CAND_ID, can.CAND_NAME, com.CMTE_ID, com.CMTE_NM
FROM com JOIN can
ON com.CAND_ID = can.CAND_ID
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
""")

# WHERE can.CAND_ID LIKE "P%"
# AND (CAND_NAME LIKE "%CLINTON, HILLARY RODHAM%"
# OR CAND_NAME LIKE '%CRUZ, RAFAEL EDWARD "TED"%'
# OR CAND_NAME LIKE "%TRUMP, DONALD J%"
# OR CAND_NAME LIKE "%SANDERS, BERNARD%")

q4 = linked_committees.select("CAND_NAME", "CAND_ID", "CMTE_ID", "CMTE_NM")
q4.show(truncate=False)

+-------------------------+---------+---------+-------------------------------------------------------------------------------+
|CAND_NAME                |CAND_ID  |CMTE_ID  |CMTE_NM                                                                        |
+-------------------------+---------+---------+-------------------------------------------------------------------------------+
|CLINTON, HILLARY RODHAM  |P00003392|C00570978|FUTURE OF AMERICAN LIVES MATTER                                                |
|CRUZ, RAFAEL EDWARD "TED"|P60006111|C00574624|CRUZ FOR PRESIDENT                                                             |
|CLINTON, HILLARY RODHAM  |P00003392|C00575795|HILLARY FOR AMERICA                                                            |
|SANDERS, BERNARD         |P60007168|C00577130|BERNIE 2016                                                                    |
|CLINTON, HILLARY RODHAM  |P00003392|C00577395|PEOPLE IN COMMAND/PIC                                    

**Q4 Expected Output:**
```
+-------------------------+---------+---------+-------------------------------------------------------------------------------+
|CAND_NAME                |CAND_ID  |CMTE_ID  |CMTE_NM                                                                        |
+-------------------------+---------+---------+-------------------------------------------------------------------------------+
|CLINTON, HILLARY RODHAM  |P00003392|C00577395|PEOPLE IN COMMAND/PIC                                                          |
|SANDERS, BERNARD         |P60007168|C00590646|NEW YORK CAPITAL REGION FOR BRINGING ECONOMIC REVOLUTION NOW INSPIRING EVERYONE|
|SANDERS, BERNARD         |P60007168|C00612549|LORAIN COUNTY FORWARD                                                          |
|CLINTON, HILLARY RODHAM  |P00003392|C00575795|HILLARY FOR AMERICA                                                            |
|SANDERS, BERNARD         |P60007168|C00587766|SILVER CITY NM FOR BERNIE SANDERS                                              |
|SANDERS, BERNARD         |P60007168|C00588707|ITHACA AND TOMPKINS COUNTY FOR BERNIE SANDERS                                  |
|SANDERS, BERNARD         |P60007168|C00589937|PROGRESSGJ                                                                     |
|SANDERS, BERNARD         |P60007168|C00588434|ROCHESTER FOR PROGRESS                                                         |
|SANDERS, BERNARD         |P60007168|C00577130|BERNIE 2016                                                                    |
|CRUZ, RAFAEL EDWARD "TED"|P60006111|C00574624|CRUZ FOR PRESIDENT                                                             |
|SANDERS, BERNARD         |P60007168|C00590828|UPPER VALLEY FOR BERNIE SANDERS                                                |
|CRUZ, RAFAEL EDWARD "TED"|P60006111|C00612119|CRUZ INFO PRESIDENT                                                            |
|SANDERS, BERNARD         |P60007168|C00583708|PROGRESS WV                                                                    |
|SANDERS, BERNARD         |P60007168|C00582395|UPSTATE NEW YORK FOR BERNIE SANDERS                                            |
|SANDERS, BERNARD         |P60007168|C00590620|LAS CRUCES FOR BERNIE                                                          |
|CLINTON, HILLARY RODHAM  |P00003392|C00570978|FUTURE OF AMERICAN LIVES MATTER                                                |
|TRUMP, DONALD J          |P80001571|C00580100|DONALD J. TRUMP FOR PRESIDENT, INC.                                            |
|SANDERS, BERNARD         |P60007168|C00590240|NORTHEAST MUSICIANS FOR SOCIAL DEMOCRACY                                       |
+-------------------------+---------+---------+-------------------------------------------------------------------------------+
```

Q5.

In [13]:
num_com_contributions = sql.sql("""
SELECT can.CAND_NAME, count(*) as count
FROM pas JOIN can
ON pas.CAND_ID = can.CAND_ID
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
GROUP BY CAND_NAME
""")

# WHERE can.CAND_ID LIKE "P%"
# AND (CAND_NAME LIKE "%CLINTON, HILLARY RODHAM%"
# OR CAND_NAME LIKE '%CRUZ, RAFAEL EDWARD "TED"%'
# OR CAND_NAME LIKE "%TRUMP, DONALD J%"
# OR CAND_NAME LIKE "%SANDERS, BERNARD%")

q5 = num_com_contributions.select("CAND_NAME", "count")

q5.show(truncate=False)

+-------------------------+-----+
|CAND_NAME                |count|
+-------------------------+-----+
|CLINTON, HILLARY RODHAM  |2561 |
|SANDERS, BERNARD         |1577 |
|TRUMP, DONALD J          |333  |
|CRUZ, RAFAEL EDWARD "TED"|643  |
+-------------------------+-----+



**Q5 Expected Output:**

```
+-------------------------+-----+
|CAND_NAME                |count|
+-------------------------+-----+
|CLINTON, HILLARY RODHAM  |2561 |
|SANDERS, BERNARD         |1577 |
|TRUMP, DONALD J          |333  |
|CRUZ, RAFAEL EDWARD "TED"|643  |
+-------------------------+-----+
```

Q6.

In [14]:
com_contributions_amt = sql.sql("""
SELECT can.CAND_NAME, sum(pas.TRANSACTION_AMT) as SUM
FROM pas JOIN can
ON pas.CAND_ID = can.CAND_ID
WHERE (CAND_NAME LIKE "%CLINTON%"
OR CAND_NAME LIKE "%CRUZ%"
OR CAND_NAME LIKE "%TRUMP%"
OR CAND_NAME LIKE "%SANDERS%")
AND (CAND_OFFICE = "P")
AND (CAND_STATUS = "C")
GROUP BY CAND_NAME
""")

# WHERE can.CAND_ID LIKE "P%"
# AND (CAND_NAME LIKE "%CLINTON, HILLARY RODHAM%"
# OR CAND_NAME LIKE '%CRUZ, RAFAEL EDWARD "TED"%'
# OR CAND_NAME LIKE "%TRUMP, DONALD J%"
# OR CAND_NAME LIKE "%SANDERS, BERNARD%")

# We will also accept a solution in either format.
# Be sure to comment out the one you don't use: 
q6 = com_contributions_amt.select("CAND_NAME", "SUM")
# q6 = com_contributions_amt.select("CAND_NAME", "sum(TRANSACTION_AMT)")
q6.show(truncate=False)

+-------------------------+-----------+
|CAND_NAME                |SUM        |
+-------------------------+-----------+
|CLINTON, HILLARY RODHAM  |6064499.0  |
|SANDERS, BERNARD         |1202034.0  |
|TRUMP, DONALD J          |6099777.0  |
|CRUZ, RAFAEL EDWARD "TED"|1.2145601E7|
+-------------------------+-----------+



**Q6 Expected Output:**

```
+-------------------------+--------------------+
|CAND_NAME                |sum(TRANSACTION_AMT)|
+-------------------------+--------------------+
|CLINTON, HILLARY RODHAM  |6064499.0           |
|SANDERS, BERNARD         |1202034.0           |
|TRUMP, DONALD J          |6099777.0           |
|CRUZ, RAFAEL EDWARD "TED"|1.2145601E7         |
+-------------------------+--------------------+
```

### End Student Code Here

# 2. K-means Clustering

## Use Dataframe API to load some Toy Data

Load a parquet file as a DataFrame.

In [15]:
df = sql.read.parquet("toy_data")

Let's look at a few records:

In [16]:
df.show()

+-----------+----------+
|          x|         y|
+-----------+----------+
| -4.5707927| -5.282721|
|  -5.762503| -4.832158|
|   7.907721|  6.793022|
|  7.4408655| -6.601918|
| -4.2428184| -4.162871|
| -5.7853255| -6.807544|
| -1.0236621| -8.026325|
|  7.2840943| 3.8668985|
|   8.975831| 3.7539392|
|  -7.442753|-5.6001735|
|-0.49966902| 5.3444986|
| -6.0518875|-5.4773846|
|  5.3814564| -4.115461|
| -5.9022512| 5.2586646|
| -3.1684203|-3.8134186|
| -6.7108235| 4.2055697|
|   2.811219| 3.8392153|
| -4.1118946| -6.137164|
|  5.8211355|-3.9361012|
|   8.003084|-3.3388462|
+-----------+----------+
only showing top 20 rows



## Plotting the Toy Data

We'll first plot a sample of some toy data. Let's collect a sample of records,

In [17]:
sample = df.sample(False, 0.2)

project out the `x` and the `y` columns,

In [18]:
x_sample = sample.select('x').collect()
y_sample = sample.select('y').collect()

In [19]:
sample.select('x').take(1)[0]['x']

-4.5707926750183105

and plot the data.

In [20]:
plt.figure()
plt.axis([-10, 10, -10, 10])
plot(x_sample, y_sample, 'bo')

<IPython.core.display.Javascript object>

[<matplotlib.lines.Line2D at 0x7f7295258510>]

## Implementation

Implement a distributed version of K-Means clustering using PySpark.

## 2a. K-Means++ Initialization

Initialize the centers using the K-Means++ Algorithm, using Distributed Reservoir Sampling. We've provided you the signatures of a few functions which may be helpful.

### Begin Student Code Here

In [21]:
def initialize_centers_plus(points, k):
    """
    Find k initial cluster centers using distributed weighted reservoir sampling.
    
    Inputs:
    points: a collection of d-dimensional points (x_1, x_2, ..., x_d).
    k: the number of cluster centers wanted
    
    Output:
    A list of k points which become the initial centers in K-means clustering
    """
    centers = []
    
    type_check_first = type(points) is pyspark.sql.dataframe.DataFrame
    if type_check_first == False:
        nothing_nothing = True
    else:
        points = points.rdd
        
    one_sample = points.takeSample(withReplacement=False, num=1, seed=None)[0]
    
    centers.append(one_sample)
    
    
    center_num = 1
    
    while center_num < k:
        
        pick_c = points.mapPartitions(lambda cur_point: helper(cur_point, centers))
        
        eliminate_c = pick_c.mapPartitions(choose_partition_center)
        
        output_c = eliminate_c.reduce(pick_between_centers)[0]
        
        centers.append(output_c)
        
        center_num += 1
    
    return centers


In [22]:
def choose_partition_center(points_partition_iterator):
    """
    Choose a single center from a SINGLE PARTITION, using weighted random sampling.
    
    Inputs:
    points_partition_iterator: an iterator through a partition of D-dimensional points
    
    Output:
    (As an iterator) A 2-tuple, containing the randomly-chosen center and its weighted random-sampling key
    """
    
    center = None
    big_key = float('-inf')
    
    for cur_center, cur_distance in points_partition_iterator:
        
        random_num = np.random.random()
        dis_sqaure = np.power(cur_distance, 2)
        exp_factor = np.divide(1.0, dis_sqaure)
        cur_key = np.power(random_num, exp_factor)

        if cur_key <= big_key:
            nothing = True
        else:
            center = cur_center
            big_key = cur_key    
    
    yield (center, big_key)


def helper(points, centers):
    
    for cur_p in points:
        res_tuple = nearest_center(centers, cur_p)
        
        point_distance = res_tuple[1]
        
        yield cur_p, point_distance

In [23]:
def pick_between_centers(candidate_center_1, candidate_center_2):
    """
    Between two centers from different partitions, pick the one with a larger key.
    
    Inputs:
    candidate_center_1: a 2-tuple, containing the randomly-chosen center and its weighted random sampling key 
                        from the first partition
    candidate_center_2: a 2-tuple, containing the randomly-chosen center and its weighted random sampling key 
                        from the first partition
    
    Output:
    A 2-tuple, containing the "better" center and its weighted random-sampling key
    """
    key_one = candidate_center_1[1]
    key_two = candidate_center_2[1]
    
    if key_one >= key_two:
        better_center = candidate_center_1
    else:
        better_center = candidate_center_2
        
    
    return better_center

In [24]:
def nearest_center(centers, point):
    """
    Given a point and a list of centers (also points), determine the center closest to that point,
    and compute the distance to that point.
    
    Inputs:
    centers: a list of points which represent the current centers
    point: the point to examine
    
    Outputs:
    A 2-tuple, containing the index of the closest center (point) and its distance from point
    """
    (index, shortest_distance) = (-1, float('inf'))
    
    
    cur_index = 0
    while cur_index < len(centers):
        
        cur_center = centers[cur_index]

        cur_distance = distance_calculator(cur_center, point)
        
        if cur_distance > shortest_distance:
            nothing = True
        else:
            shortest_distance = cur_distance
            index = cur_index
        
        cur_index += 1
            
    
    return (index, shortest_distance)

def distance_calculator(cur_center, point):
    
    arr1 = np.asarray(cur_center)
    arr2 = np.asarray(point)
    
    result = np.linalg.norm(arr1 - arr2, ord=2)
    
    return result
    

### End Student Code Here

## Plotting the centers

Let's see how our initialization algorithm performs. How do your centers look? Are they too close to each other?

In [26]:
num_centers = 4
centers = initialize_centers_plus(df, num_centers)
print centers 

plt.figure()
plt.axis([-10, 10, -10, 10])
plot(x_sample, y_sample, 'bo')
centers = np.asarray(centers)
plt.plot(centers[:,0], centers[:,1], 'mo', markersize = 10)

[Row(x=-5.450834274291992, y=-4.54454231262207), Row(x=7.503213405609131, y=6.1800055503845215), Row(x=-3.4497463703155518, y=5.604068279266357), Row(x=5.492626190185547, y=-6.148521900177002)]


<IPython.core.display.Javascript object>

[<matplotlib.lines.Line2D at 0x7f728c2aa490>]

## 2b. Main Loop

Now, implement the main loop in K-means clustering. Again, we've provided you the signatures of some functions which may be useful.

Note: The statistics of a new center "mean" is given by:
$$
\bar{x}_k = \frac{1}{n_k} \sum_{x \in \text{Cluster}[k]}^{n_i} x
$$
So the only statistics we require are $n_k$ the number of elements in cluster $k$ and $\sum_{x \in \text{Cluster}[k]}^{n_i} x$ the sum of the elements in cluster $k$.

### Begin Student Code Here

In [27]:
def k_means(points,
            k = num_centers,
            initial_centers = None,
            max_iterations = 100,
            initializer = initialize_centers_plus,
            epsilon = 0.001):
    """
    Executes the K-means algorithm on a collection of points.
    
    Inputs:
    points: a collection of d-dimensional points (x_1, x_2, ..., x_d).
    k: the number of cluster centers wanted
    initial_centers: if supplied, skips the initialization phase and uses points from this value
    max_iterations: the maximum number of main-loop iterations to run
    initializer: a function which selects initial centers (if none supplied)
    epsilon: the threshhold at which convergence is reached and the algorithm halted
    
    Output:
    A list of k candidate cluster centers
    """
    
    points.cache()
    
    old_centers = None
    new_centers = None
    
    if initial_centers:
        new_centers = initial_centers
    else: 
        new_centers = initializer(points, k)

        
    iteration = 0
    
    while not has_converged(old_centers, new_centers, epsilon) and iteration < max_iterations:

        old_centers = new_centers

        mid_step = points.mapPartitions(lambda cur_point: compute_new_center_statistics(cur_point, old_centers))
        
        counts, sums = mid_step.reduce(add_statistics)

        
        cur_index = 0
        while cur_index < k:
            new_centers[cur_index] = np.divide(sums[cur_index], counts[cur_index])
            cur_index += 1
        
        iteration = iteration + 1
        
    return new_centers

In [28]:
def compute_new_center_statistics(iterator, old_centers):
    """
    Given an iterator over points and a list of old centers, compute the statistics for the new center.
    
    Input:
    iterator: an iterator over points
    old_centers: a list of centers (points) from the previous iteration
    
    Output:
    A 2-tuple (counts, sums) consisting of:
     - counts: an array of length k containing the count of points in each new center
     - sums: a k by d array consisting of sum of the points assigned to each center
     
    Note that from the tuple you could compute the ith new center:
      sums[i] / counts[i]
    """
    
    shape_arr = np.asarray(old_centers).shape
    shape1 = shape_arr[0]
    shape2 = shape_arr[1]
    
        
    counts = np.zeros(shape1)
    sums = np.zeros((shape1, shape2))
    
    for cur_point in iterator:

        near_res_tuple = nearest_center(old_centers, cur_point)
        cur_index, dist = near_res_tuple

        cur_count_val = counts[cur_index]
        count_to_put_in = cur_count_val + 1
        counts[cur_index] = count_to_put_in

        point_arr = np.asarray(cur_point)
        cur_sums_val = sums[cur_index, :]
        sums_to_put_in = cur_sums_val + point_arr
        sums[cur_index, :] = sums_to_put_in
        
        
    yield (counts, sums)

In [29]:
def add_statistics(stats_1, stats_2):
    """
    Given statistics from two partitions, add those statistics.
    """

    res_one = np.add(stats_1[0], stats_2[0])
    res_two = np.add(stats_1[1], stats_2[1])

    
    return (res_one, res_two)

### End Student Code Here

In [30]:
def has_converged(old_centers, new_centers, epsilon):
    """
    Test if the distance between the centers is less than epsilon.
    """
    return (old_centers is not None) and (new_centers is not None) and \
        np.linalg.norm(np.asarray(old_centers) - np.asarray(new_centers), ord=2) < epsilon

## Plotting Clusters

How does your code perform? Are these good clusters?

In [31]:
colors = ['ro', 'bo', 'go', 'yo'] # for now just use 4 colors

def plot_clusters(centers, plt):
    for i in range(num_centers):
        cluster = df.rdd.filter(lambda x: nearest_center(centers, x)[0] == i)
        cluster_sample = np.asarray(cluster.sample(False, 0.2).collect())
        plt.plot(cluster_sample[:,0], cluster_sample[:,1], colors[i])
    

In [33]:
plt.figure()
plt.axis([-10, 10, -10, 10])

centers = k_means(df, num_centers)
plot_clusters(centers, plt)

centers = np.asarray(centers)
plt.plot(centers[:,0], centers[:,1], 'mo', markersize = 10)


<IPython.core.display.Javascript object>

[<matplotlib.lines.Line2D at 0x7f7283c63dd0>]

# 3. Geographical Contribution Clustering

Let's put everything together! First we'll load the required data files.

In [34]:
zip_codes = sql.read.parquet("zip_codes")

Now we'll grab the relevant data we need. We'll join the table of front runners we collected earlier with their contributors' zip codes to get latitudinal and longitudinal data...

In [35]:
contrib_zips = \
    front_runners.join(indv, front_runners.CAND_PCC == indv.CMTE_ID) \
    .select("CAND_NAME", "CAND_ID", "ZIP_CODE")
contrib_zips = contrib_zips.withColumn("SHORT_ZIP", contrib_zips.ZIP_CODE.substr(0,5))

In [36]:
contrib_locs = \
    contrib_zips.join(zip_codes, contrib_zips.SHORT_ZIP == zip_codes.zip) \
    .select("CAND_NAME", "latitude", "longitude").cache()

In [37]:
candidate_names = [x[0] for x in contrib_locs.select("CAND_NAME").distinct().collect()]
candidate_names

[u'CLINTON, HILLARY RODHAM',
 u'SANDERS, BERNARD',
 u'TRUMP, DONALD J',
 u'CRUZ, RAFAEL EDWARD "TED"']

In [38]:
X = {}
for candidate in candidate_names:
    X[candidate] = contrib_locs.where(contrib_locs.CAND_NAME == candidate) \
        .select("latitude", "longitude") \
        .map(lambda r: np.array([x for x in r])) \
        .coalesce(sc.defaultParallelism).cache()

...which will allow us to plot these locations.

In [39]:
maps = {}
for candidate in X:
    maps[candidate] = folium.Map(location=[40, -100],
           tiles='Stamen Toner',
           zoom_start=4)
    locs = X[candidate].sample(False, 0.01).collect()
    for l in locs:
        folium.Marker(l).add_to(maps[candidate])

In [40]:
# maps[u'CRUZ, RAFAEL EDWARD "TED"']

We can run k-means for each candidate. What's a good value for $k$? Run the K-means algorithm you developed earlier with different values of $k$ to determine the best one.

### Begin Student Code Here

In [43]:
def compute_errors(candidate_locations, k_range):
    errors = {}
    for c in candidate_locations:
        errors[c] = []
        
        candidate_dots = candidate_locations[c]
        
        for k in k_range:
            
            k_centers = k_means(candidate_dots, k)
            
            get_center_dis_tuple = candidate_dots.map(lambda p : nearest_center(k_centers, p)[1]).mean()
                        
            errors[c].append(get_center_dis_tuple)
                
                
            
    return errors

In [44]:
plt.figure()
plt.axis([0, 30, 0, 20])
k_range = range(2, 32, 2)
errors = compute_errors(X, k_range)
for c in errors:
    plot(k_range, errors[c])

<IPython.core.display.Javascript object>

Now that you've found a suitable value for k, run K-means to compute the desired clusters.

In [101]:
centers = {}
k = 10 # TODO enter the value you found here
for c in X:
    centers[c] = k_means(X[c], k)

In [102]:
centers

{u'CLINTON, HILLARY RODHAM': [array([  35.69212511, -119.65400241]),
  array([ 40.49164038, -74.6981326 ]),
  array([  38.81473536, -103.32411838]),
  array([ 34.70142785, -82.53288861]),
  array([  60.9326087 , -147.54086957]),
  array([  46.60891849, -122.13947256]),
  array([ 31.16665526, -95.75015304]),
  array([ 41.63390598, -87.67811964]),
  array([ 26.47377773, -79.76608053]),
  array([  21.18320755, -157.55867925])],
 u'CRUZ, RAFAEL EDWARD "TED"': [array([ 31.37976125, -96.58396914]),
  array([  47.28834443, -122.50910982]),
  array([ 41.08707345, -74.1726177 ]),
  array([  37.46944473, -108.37683446]),
  array([ 33.27068705, -80.65225892]),
  array([  21.21048077, -157.56221154]),
  array([ 39.91449102, -87.83910607]),
  array([  13.47,  144.74]),
  array([ 0.,  0.]),
  array([  35.21829606, -118.9630518 ])],
 u'SANDERS, BERNARD': [array([ 34.54725177, -84.08447734]),
  array([  39.86818108, -122.83332519]),
  array([ 41.23158887, -73.89864769]),
  array([  36.57946726, -107.6

### End Student Code Here

At last, we have the clusters we want. Here's a nifty visualization.

In [103]:
colors = dict(zip(centers.keys(), ['blue', 'green','orange', 'red']))
centers_map = folium.Map(location=[40, -100],
                         zoom_start=4)
for candidate in centers: 
    locs = centers[candidate]
    for l in locs:
        folium.Marker(l,
                      popup=str(l),
                      icon=folium.Icon(color=colors[candidate])).add_to(centers_map)
centers_map

Congratulations! You've successfully implemented a scalable machine learning algorithm on dataset of campaign finance contributions!