## Part 1: Using scikit-learn locally

In [34]:
import numpy as np
from sklearn import datasets

In [45]:
import findspark
findspark.init()
import pyspark

In [36]:
iris = datasets.load_iris()

# Generate test and train sets
size = len(iris.target)
indices = np.random.permutation(size)

cutoff = int(size * .30)

testX = iris.data[indices[0:cutoff],:]
trainX = iris.data[indices[cutoff:],:]
testY = iris.target[indices[0:cutoff]]
trainY = iris.target[indices[cutoff:]]

In [37]:
type(iris.data)

numpy.ndarray

In [38]:
iris.data[100]

array([ 6.3,  3.3,  6. ,  2.5])

In [39]:
iris.target[100]

2

In [40]:
from sklearn.neighbors import KNeighborsClassifier

# Create a KNeighborsClassifier using the default settings
knn = KNeighborsClassifier()
knn.fit(trainX, trainY)

predictions = knn.predict(testX)

# Print out the accuracy of the classifier on the test set
print(sum(predictions == testY) / float(len(testY)))

0.955555555556


## Part 2:  Using train_test_split

In [42]:
# One can import train_test_split from model_selection method of sklearn
from sklearn.model_selection import train_test_split

In [43]:
from sklearn.cross_validation import train_test_split

def runNearestNeighbors(k):
    # Load dataset from sklearn.datasets
    irisData = datasets.load_iris()
    
    # Split into train and test using sklearn.cross_validation.train_test_split
    yTrain, yTest, XTrain, XTest = train_test_split(irisData.target, 
                                                    irisData.data)
    
    # Build the model
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(XTrain, yTrain)
    
    # Calculate predictions and accuracy
    predictions = knn.predict(XTest)
    accuracy = (predictions == yTest).sum() / float(len(yTest))
    
    return (k, accuracy)   

In [49]:
# Using pyspark library
import findspark
findspark.init()
import pyspark

In [51]:
k = sc.parallelize(range(1, 11))
results = k.map(runNearestNeighbors)
print ('\n'.join(map(str, results.collect())))
print('------------------')

(1, 0.97368421052631582)
(2, 0.97368421052631582)
(3, 0.97368421052631582)
(4, 0.94736842105263153)
(5, 0.92105263157894735)
(6, 0.97368421052631582)
(7, 0.97368421052631582)
(8, 0.97368421052631582)
(9, 0.97368421052631582)
(10, 0.97368421052631582)


In [53]:
# Checking Spark version 
sc.version

'2.2.0'

Let's transfer the data using a Broadcast instead of loading it at each excecutor. 

In [64]:
# Creating the Broadcast variable
irisBroadcast = sc.broadcast(iris)

def runNearestNeighborsBroadcast(k):
    # Using the data in the irisBroadcast variable split into train and test using 
    # sklearn.cross_validation.train_test_split
    yTrain, yTest, XTrain, XTest = train_test_split(irisBroadcast.value.target,
                                                   irisBroadcast.value.data, random_state=0)
    
    # Building the model
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(XTrain, yTrain)
    
    # Calculating predictions and accuracy
    predictions = knn.predict(XTest)
    accuracy = (predictions == yTest).sum() / float(len(yTest))
    
    return (k, accuracy)

# Rerun grid search
k = sc.parallelize(range(1, 11))
results = k.map(runNearestNeighborsBroadcast)
print('\n'.join(map(str, results.collect())))

(1, 0.97368421052631582)
(2, 0.97368421052631582)
(3, 0.97368421052631582)
(4, 0.97368421052631582)
(5, 0.97368421052631582)
(6, 0.97368421052631582)
(7, 0.97368421052631582)
(8, 0.97368421052631582)
(9, 0.97368421052631582)
(10, 0.97368421052631582)


## Part 3: Cross Validation

In [54]:
from sklearn.cross_validation import KFold

In [58]:
# Creating indices for 10-fold cross validation
kf = KFold(size, n_folds=10)
print(len(kf))
print(kf.__iter__())



10
<generator object _PartitionIterator.__iter__ at 0x7f8af0ed69e8>


In [55]:
KFold?