In [3]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.cross_validation import train_test_split
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.regression import LabeledPoint

In [4]:
# a) Loading breast cancer wisconsin dataset and sort axis so that categorical value will be last
data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data')
data = data.reindex_axis(sorted(data.columns), axis=1)
data.head()

Unnamed: 0,0.006193,0.006399,0.01587,0.03003,0.04904,0.05373,0.07871,0.1184,0.1189,0.1471,...,122.8,153.4,17.33,17.99,184.6,2019,25.38,8.589,842302,M
0,0.003532,0.005225,0.0134,0.01389,0.01308,0.0186,0.05667,0.08474,0.08902,0.07017,...,132.9,74.08,23.41,20.57,158.8,1956.0,24.99,3.398,842517,M
1,0.004571,0.00615,0.02058,0.0225,0.04006,0.03832,0.05999,0.1096,0.08758,0.1279,...,130.0,94.03,25.53,19.69,152.5,1709.0,23.57,4.585,84300903,M
2,0.009208,0.00911,0.01867,0.05963,0.07458,0.05661,0.09744,0.1425,0.173,0.1052,...,77.58,27.23,26.5,11.42,98.87,567.7,14.91,3.445,84348301,M
3,0.005115,0.01149,0.01885,0.01756,0.02461,0.05688,0.05883,0.1003,0.07678,0.1043,...,135.1,94.44,16.67,20.29,152.2,1575.0,22.54,5.438,84358402,M
4,0.005082,0.00751,0.01137,0.02165,0.03345,0.03672,0.07613,0.1278,0.1244,0.08089,...,82.57,27.19,23.75,12.45,103.4,741.6,15.47,2.217,843786,M


In [34]:
# b) Using LabelEncoder to incode the binary M column into numerical values on original dataframe
le = LabelEncoder()
data['M'] = le.fit_transform(data['M']) * 1.0

In [35]:
data.columns = range(data.shape[1])
data.rename(columns={31:'M'}, inplace=True)
data.tail()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,22,23,24,25,26,27,28,29,30,M
563,0.004239,0.0103,0.02454,0.01114,0.02891,0.05198,0.05623,0.111,0.07115,0.1389,...,142.0,158.7,26.4,21.56,166.1,2027.0,25.45,7.673,926424,1.0
564,0.002498,0.005769,0.01678,0.01898,0.02423,0.0395,0.05533,0.0978,0.06637,0.09791,...,131.2,99.04,38.25,20.13,155.0,1731.0,23.69,5.203,926682,1.0
565,0.003892,0.005903,0.01557,0.01318,0.03731,0.0473,0.05648,0.08455,0.0782,0.05302,...,108.3,48.55,34.12,16.6,126.7,1124.0,18.98,3.425,926954,1.0
566,0.006185,0.006522,0.01664,0.02324,0.06158,0.07117,0.07016,0.1178,0.124,0.152,...,140.1,86.22,39.42,20.6,184.6,1821.0,25.74,5.772,927241,1.0
567,0.002783,0.007189,0.0,0.02676,0.00466,0.0,0.05884,0.05263,0.07039,0.0,...,47.92,19.15,30.37,7.76,59.16,268.6,9.456,2.548,92751,0.0


In [36]:
# transform pandas data frame to spark data fram
spark_data = sqlContext.createDataFrame(data)

# assemble feature vector
ignore = ['M']
assembler = VectorAssembler(
    inputCols=[x for x in spark_data.columns if x not in ignore],
    outputCol='features'
)
spark= assembler.transform(spark_data)


In [37]:
spark.printSchema()

root
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- 21: double (nullable = true)
 |-- 22: double (nullable = true)
 |-- 23: double (nullable = true)
 |-- 24: double (nullable = true)
 |-- 25: double (nullable = true)
 |-- 26: double (nullable = true)
 |-- 27: double (nullable = true)
 |-- 28: double (nullable = true)
 |-- 29: double (nu

In [145]:
# Create labled result set
#def label_data(data):
#    return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))

In [38]:
# c) splitting the data into 80% training data and 20% test data 
train, test = spark.randomSplit([0.8,0.2], seed=7)

In [39]:
train.collect()

[Row(0=0.003532, 1=0.005225, 2=0.0134, 3=0.013890000000000001, 4=0.013080000000000001, 5=0.0186, 6=0.056670000000000005, 7=0.08474, 8=0.08902, 9=0.07017000000000001, 10=0.1238, 11=0.1812, 12=0.18600000000000003, 13=0.07864, 14=0.0869, 15=0.275, 16=0.1866, 17=0.2416, 18=0.7339, 19=0.5435, 20=17.77, 21=1326.0, 22=132.9, 23=74.08, 24=23.41, 25=20.57, 26=158.8, 27=1956.0, 28=24.99, 29=3.398, 30=842517, M=1.0, features=DenseVector([0.0035, 0.0052, 0.0134, 0.0139, 0.0131, 0.0186, 0.0567, 0.0847, 0.089, 0.0702, 0.1238, 0.1812, 0.186, 0.0786, 0.0869, 0.275, 0.1866, 0.2416, 0.7339, 0.5435, 17.77, 1326.0, 132.9, 74.08, 23.41, 20.57, 158.8, 1956.0, 24.99, 3.398, 842517.0])),
 Row(0=0.004571, 1=0.00615, 2=0.02058, 3=0.0225, 4=0.040060000000000005, 5=0.03832, 6=0.059989999999999995, 7=0.1096, 8=0.08757999999999999, 9=0.1279, 10=0.1444, 11=0.2069, 12=0.243, 13=0.1599, 14=0.1974, 15=0.3613, 16=0.4245, 17=0.4504, 18=0.7869, 19=0.7456, 20=21.25, 21=1203.0, 22=130.0, 23=94.03, 24=25.53, 25=19.69, 26=152

In [47]:
sc = StandardScaler(inputCol="features")
lr = LogisticRegression(labelCol='M', featuresCol="features")
#lr.fit(train)
pipeline = Pipeline(stages=[sc,lr])
%timeit model = pipeline.fit(train)

1 loop, best of 3: 2.54 s per loop


In [46]:
model.stages

[StandardScaler_471cb194220cecfaffa9, LogisticRegression_4665a47f688ccc9f03f3]