In [None]:
#Importing required modules
from pyspark.sql import SparkSession

from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.clustering import KMeans

In [None]:
#SparkSession is the entry point to Spark SQL. It is the very first object 
#to create while developing Spark SQL applications.
#Used the SparkSession.builder method to create an instance of SparkSession with appName('Political')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Political').getOrCreate()

# 1. Study the text file to understand what are contained in the given data set?

This data set includes votes for each of the U.S. House of Representatives Congressmen on the 16 key votes identified by the CQA.  The CQA lists nine different types of votes: voted for, paired for, and announced for , voted against, paired against, and announced against (these three simplified to nay), voted present, voted present to avoid conflict of interest, and did not vote or otherwise make a position known (these three simplified to an unknown disposition).

In [None]:
# Read the data from the "house-votes-84-actual.txt" text file into a df1 dataframe
df1 = spark.read.text("house-votes-84-actual.txt")

In [None]:
from pyspark.sql.functions import split,expr

In [None]:
#Splitting the single valued attribute into defined attributes by using expr and split functions
#using ',' as delimiter and casting to string type and renaming each column with respective or equivalent column name
#and storing in df2 dataframe.
df2 = df1.select(expr("(split(value,','))[0]").cast("string").alias("class_name"),expr("(split(value,','))[1]").cast("string").alias("att_1"),expr("(split(value,','))[2]").cast("string").alias("att_2"),expr("(split(value,','))[3]").cast("string").alias("att_3"),
           expr("(split(value,','))[4]").cast("string").alias("att_4"),expr("(split(value,','))[5]").cast("string").alias("att_5"),expr("(split(value,','))[6]").cast("string").alias("att_6"),expr("(split(value,','))[7]").cast("string").alias("att_7"),
           expr("(split(value,','))[8]").cast("string").alias("att_8"),expr("(split(value,','))[9]").cast("string").alias("att_9"),expr("(split(value,','))[10]").cast("string").alias("att_10"),expr("(split(value,','))[11]").cast("string").alias("att_11"),
           expr("(split(value,','))[12]").cast("string").alias("att_12"),expr("(split(value,','))[13]").cast("string").alias("att_13"),expr("(split(value,','))[14]").cast("string").alias("att_14"),expr("(split(value,','))[15]").cast("string").alias("att_15"),
           expr("(split(value,','))[16]").cast("string").alias("att_16"))

# 2. Remove observations that contain missing values (in this case, missing values denote the vote is neither Yes nor No), what percentage of the observations contain missing values?

In [None]:
from pyspark.sql.functions import *

In [None]:
#Filtering all the missing data observations by using where condition checking if its value is equal to '?' 
#and storing it in  df3 dataframe and calculating its count and storing in 'Missing_count' variable.
df3 = df2.where((col('att_1')=='?') | (col('att_2')=='?')|(col('att_3')=='?')|(col('att_4')=='?')|(col('att_5')=='?')|
         (col('att_6')=='?') | (col('att_7')=='?')|(col('att_8')=='?')|(col('att_9')=='?')|(col('att_10')=='?')|
         (col('att_11')=='?') | (col('att_12')=='?')|(col('att_13')=='?')|(col('att_14')=='?')|(col('att_15')=='?')|
         (col('att_16')=='?'))
Missing_count = df3.count()

In [None]:
#Storing all instances capacity or count into Tot_count variable from df2 dataframe
#(Which contains complete data)
Tot_count = df2.count()

In [None]:
#Calculating percentage of observations containing missing values
Res_Count = ((Tot_count - Missing_count)/(Tot_count))*100
Res_Count
print("Percentage of observations containing missing values are: " + str(Res_Count))

# 3. Using the data set without missing values (DO NOT use the Class Name attribute, i.e., the political party affiliation of the voter), perform clustering using k-means. 

In [None]:
#Storing only required data...i.e. filtering = or excluding out attibutes containing missing data('?')
#and storing it in df4 dataframe.
df4 = df2.filter(~(df2['att_1'] == '?')& ~(df2['att_2'] == '?') & ~(df2['att_3'] == '?') & ~(df2['att_4'] == '?') & ~(df2['att_5'] == '?') &
                        ~(df2['att_6'] == '?')& ~(df2['att_7'] == '?')&~(df2['att_8'] == '?')&~(df2['att_9'] == '?')&~(df2['att_10'] == '?')&
                        ~(df2['att_11'] == '?')&~(df2['att_12'] == '?')&~(df2['att_13'] == '?')&~(df2['att_14'] == '?')&~(df2['att_15'] == '?')&
                        ~(df2['att_16'] == '?'))

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
#As given dataset is in string format, we need to convert or transform into number format.
#To convert more than one column, I'm using StringIndexer along with PipeLine function.
#In first step, for each input column, giving output column with original name + 'label'
#In second step, pipeline all indexes and finally fitting the data and transforming the data into df_3 dataframe.

indexers = [StringIndexer(inputCol=column, outputCol=column+"_label").fit(df4) for column in list(set(df4.columns)) ]


pipeline = Pipeline(stages=indexers)
df_5 = pipeline.fit(df4).transform(df4)

In [None]:
#Selecting only required columns from df_5 frame into df_6 dataframe.
#i.e. selecting only columns with '_label' suffix.
df_6 = df_5.select(['class_name_label', 'att_1_label', 'att_2_label', 'att_3_label', 'att_4_label', 'att_5_label'
                                , 'att_6_label', 'att_7_label', 'att_8_label', 'att_9_label', 'att_10_label', 'att_11_label'
                                , 'att_12_label', 'att_13_label', 'att_14_label', 'att_15_label', 'att_16_label'])

In [None]:
#Creating a vector features with all required attributes or dimensions
#Here we are excluding class_name_label as we don't require it.
assembler = VectorAssembler(inputCols=[ 'att_1_label',
 'att_2_label',
 'att_3_label',
 'att_4_label',
 'att_5_label',
 'att_6_label',
 'att_7_label',
 'att_8_label',
 'att_9_label',
 'att_10_label',
 'att_11_label',
 'att_12_label',
 'att_13_label',
 'att_14_label',
 'att_15_label',
 'att_16_label'], outputCol='features')

In [None]:
#Applying the transform function on vector feature and 
#placing it in assembled_data dataframe.
assembled_data = assembler.transform(df_6)

In [None]:
#Applying the StandardScaler function to scale the data values of given attributes or columns.
#We are creating an output column as 'scaledFeatures' with input column as 'features' and storing it in scaler.
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [None]:
#Fitting the assembled_data to scaler object and placing it in scaler_model
#And transforming the data and placing it in scaled_data and printing the schema
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)

scaled_data.printSchema()

In [None]:
# Displaying the scaled features
scaled_data.select('scaledFeatures').show()

# 3)
a. Start with different K values
b. For each K and associated results, calculate the cost
d. Return clustering result—each observation should have an index indicating their cluster belonging. 

In [None]:
#Applying the Kmeans algorithm for range of K-clusters(varies from 2 to 16)
#fitting the data into model and calculating the cost error for each cluster and storing the result into err list for plotting purpose.
#And displaying the prediction result i.e count for each cluster group
err = []
for i in range(2,17):
    kmeans = KMeans().setK(i).setSeed(1)
    model = kmeans.fit(assembled_data)
    wssse = model.computeCost(assembled_data)
    result = model.transform(assembled_data).groupBy('prediction').count()
    err.append(wssse)
    print("With "+ str(i)+" Clusters : Within Set Sum of Squared Errors = " + str(wssse))
    print("Prediction result count for " + str(i)+" Clusters :\n")
    result.show()
    

# 4. Study your clustering result, is it in any way related to the voter’s political party affiliation? Can we use your model to predict is a voter is democratic or republican? 

After studying the cluster result, we can see the result is formed into different clusters or groups for different k-values,
so it is not related to voter's political party affiliation. As it is unsupervised learning we can't given prediction based on taken attributes or features, it a voter is democratic or republican.

In [None]:
#Creating a list which contains a range of clusters
cluster_list = list(range(2,17))

# 5. (Bonus question 10 points) Find a way to visualize your cluster results in a meaningful way (very likely use some plotting tool, could from any other Python packages). Make sure you create meaningful legends and labels for major components in your plot. 

In [None]:
import matplotlib.pyplot as py

In [None]:
#Plotting the graph with sum of squares vs number of clusters.
#By using pyplot module or function.
#Giving required labels.
py.figure(figsize=(12,6))
py.plot(cluster_list,err,marker = 'o')
py.ylabel('Sum Of Squared Errors')
py.xlabel('Range of Clusters (K)')
py.title('Elbow Method showing the optimal K')

In [None]:
py.show()

# 3.
C)Choose a K that you consider as “a good choice” (you need to justify why it is a good choice)

By using elbow curve method, we can say that k=5 is a good choice as after that we can see there is only slight change in difference in sum of square s against range of clusters. 
