In [1]:
# Konfigurasi Spark
import os
import sys
from pprint import pprint
from matplotlib import pyplot

# 1. Lokasi di mana Spark diinstal
spark_path = "/Users/sani/Kuliah/Sem8/BigData/spark-2.1.0-bin-hadoop2.7"

# 2. Menentukan environment variable
os.environ['SPARK_HOME'] = spark_path


# 3. Download winutils dari https://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true
#   dan letakkan di dalam folder D:\spark\bin\
#   Lokasi winutils.exe
os.environ['HADOOP_HOME'] = spark_path

# 4. Lokasi Python yang dijalankan --> punya Anaconda
#    Apabila Python yang diinstall hanya Anaconda, maka tidak perlu menjalankan baris ini.
os.environ['PYSPARK_PYTHON'] = sys.executable

# 5. Konfigurasi path library PySpark
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

#############################################################

# from __future__ import print_function

# $example on$
from numpy import array
from math import sqrt
# $example off$

from pyspark import SparkContext
from pyspark import SparkConf
# $example on$
from pyspark.mllib.clustering import KMeans, KMeansModel
# $example off$

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")  # SparkContext

# The usual preamble
%matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Make the graphs a bit prettier, and bigger
pd.set_option('display.mpl_style', 'default')
plt.rcParams['figure.figsize'] = (15, 5)
plt.rcParams['font.family'] = 'sans-serif'

# This is necessary to show lots of columns in pandas 0.12. 
# Not necessary in pandas 0.13.
pd.set_option('display.width', 5000) 
pd.set_option('display.max_columns', 60)

mpl_style had been deprecated and will be removed in a future version.
Use `matplotlib.pyplot.style.use` instead.

  exec(code_obj, self.user_global_ns, self.user_ns)


In [2]:

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [4]:
df = pd.read_csv("file:///Users/sani/Kuliah/Sem8/BigData/Salaries.csv", index_col = 'Id')

  interactivity=interactivity, compiler=compiler, result=result)


In [5]:
'''Data Cleaning'''

#check how many NaN values there are in each column
df.isnull().sum()

EmployeeName             78
JobTitle                  0
BasePay                 605
OvertimePay               0
OtherPay                  0
Benefits              36159
TotalPay                  0
TotalPayBenefits          0
Year                      0
Notes               1000029
Agency               851375
Status               961910
dtype: int64

In [6]:
#delete unnecessary features

#delete employee name, for sake of privacy 
del df['EmployeeName']

#delete notes, 'Notes' column in empty 
del df['Notes']

#delete 'Agency' column; all jobs are in SF
del df['Agency']

#delete any observation where JobTitle is 'Not provided'
df = df[df.JobTitle != 'Not provided']

#if 'Benefits' are NaN, fill with 0
df['Benefits'].fillna(0, inplace=True)

#Replace NaN status with "Unknown", since we don't know if its FT or PT work
df['Status'].fillna("Unknown", inplace=True)

#drop all rows with NaN values
#all missing values are in "BasePay" column
#We can afford to lose 605 observations of 148,654 obs. 
df.dropna(inplace=True)

In [7]:

#no missing values
print (df.isnull().sum())

#preview data set
print (df.head(), "\n\n")
print (df.tail())

JobTitle            0
BasePay             0
OvertimePay         0
OtherPay            0
Benefits            0
TotalPay            0
TotalPayBenefits    0
Year                0
Status              0
dtype: int64
(                                          JobTitle BasePay OvertimePay OtherPay Benefits   TotalPay  TotalPayBenefits  Year   Status
Id                                                                                                                                  
1   GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY  167411           0   400184        0  567595.43         567595.43  2011  Unknown
2                  CAPTAIN III (POLICE DEPARTMENT)  155966      245132   137811        0  538909.28         538909.28  2011  Unknown
3                  CAPTAIN III (POLICE DEPARTMENT)  212739      106088  16452.6        0  335279.91         335279.91  2011  Unknown
4             WIRE ROPE CABLE MAINTENANCE MECHANIC   77916     56120.7   198307        0  332343.61         332343.61  2011

In [8]:
#K-means

In [9]:

Base = pd.to_numeric(df['BasePay'], errors='coerce').dropna(how='any')
Over = pd.to_numeric(df['OvertimePay'], errors='coerce').dropna(how='any')
Other = pd.to_numeric(df['OtherPay'], errors='coerce').dropna(how='any')

Pay=zip(Base,Over,Other)
Pay[:20]

[(167411.17999999999, 0.0, 400184.25),
 (155966.01999999999, 245131.88, 137811.38),
 (212739.13, 106088.17999999999, 16452.599999999999),
 (77916.0, 56120.709999999999, 198306.89999999999),
 (134401.60000000001, 9737.0, 182234.59),
 (118602.0, 8601.0, 189082.73999999999),
 (92492.009999999995, 89062.899999999994, 134426.14000000001),
 (256576.95999999999, 0.0, 51322.5),
 (176932.64000000001, 86362.679999999993, 40132.230000000003),
 (285262.0, 0.0, 17115.73),
 (194999.39000000001, 71344.880000000005, 33149.900000000001),
 (99722.0, 87082.619999999995, 110804.3),
 (294580.02000000002, 0.0, 0.0),
 (271329.03000000003, 0.0, 21342.59),
 (174872.64000000001, 74050.300000000003, 37424.110000000001),
 (198778.01000000001, 73478.199999999997, 13957.65),
 (268604.57000000001, 0.0, 16115.860000000001),
 (140546.87, 119397.25999999999, 18625.080000000002),
 (168692.63, 69626.119999999995, 38115.470000000001),
 (257510.59, 880.15999999999997, 16159.5)]

In [11]:
pay=sc.parallelize(Pay)

In [13]:
clusters = KMeans.train(pay,4, maxIterations=100, initializationMode="random")
    
    # Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = pay.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 14627401346.8


In [14]:
clusterdata=pay.map(lambda point: clusters.predict(point))
clusterdata.take(15)

[3, 3, 3, 2, 3, 2, 2, 3, 3, 3, 3, 2, 3, 3, 3]

In [15]:
X = pay.collect()

In [16]:
label = clusterdata.collect()

In [None]:
for point in range(len(label)):
    if (label[point]==0):
        color = "r";
    elif (label[point]==1):
        color = "y"
    elif (label[point]==2):
        color = "g"
    else:
        color = "b";
    lines = plt.plot(X[point][0], X[point][1], 'ro')
    plt.setp(lines, color=color, linewidth=2.0)
    center = clusters.clusterCenters
print center
for centroid in range(len(center)):
    lines = plt.plot(center[centroid][0],center[centroid][1],'bx')
    plt.setp(lines, color='w', markersize=20)
plt.show()