# Train test split

In [17]:
data = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

In [18]:
training, test = data.randomSplit([0.6, 0.4])

In [19]:
training.collect()

[1, 2, 3, 7, 8, 9, 10]

In [20]:
test.collect()

[4, 5, 6]

# Decision Tree

In [23]:
from pyspark.mllib.regression import LabeledPoint

outlook = {"sunny": 0.0, "overcast": 1.0, "rainy": 2.0}

labeledpoints = [
    LabeledPoint(0.0, [outlook["sunny"],85,85,False]),
    LabeledPoint(0.0, [outlook["sunny"],80,90,True]),
    LabeledPoint(1.0, [outlook["overcast"],83,86,False]),
    LabeledPoint(1.0, [outlook["rainy"],70,96,False]),
    LabeledPoint(1.0, [outlook["rainy"],68,80,False]),
    LabeledPoint(0.0, [outlook["rainy"],65,70,True]),
    LabeledPoint(1.0, [outlook["overcast"],64,65,True]),
    LabeledPoint(0.0, [outlook["sunny"],72,95,False]),
    LabeledPoint(1.0, [outlook["sunny"],69,70,False]),
    LabeledPoint(1.0, [outlook["sunny"],75,80,False]),
    LabeledPoint(1.0, [outlook["sunny"],75,70,True]),
    LabeledPoint(1.0, [outlook["overcast"],72,90,True]),
    LabeledPoint(1.0, [outlook["overcast"],81,75,False]),
    LabeledPoint(0.0, [outlook["rainy"],71,91,True])]

data = sc.parallelize(labeledpoints)

In [24]:
data.collect()

[LabeledPoint(0.0, [0.0,85.0,85.0,0.0]),
 LabeledPoint(0.0, [0.0,80.0,90.0,1.0]),
 LabeledPoint(1.0, [1.0,83.0,86.0,0.0]),
 LabeledPoint(1.0, [2.0,70.0,96.0,0.0]),
 LabeledPoint(1.0, [2.0,68.0,80.0,0.0]),
 LabeledPoint(0.0, [2.0,65.0,70.0,1.0]),
 LabeledPoint(1.0, [1.0,64.0,65.0,1.0]),
 LabeledPoint(0.0, [0.0,72.0,95.0,0.0]),
 LabeledPoint(1.0, [0.0,69.0,70.0,0.0]),
 LabeledPoint(1.0, [0.0,75.0,80.0,0.0]),
 LabeledPoint(1.0, [0.0,75.0,70.0,1.0]),
 LabeledPoint(1.0, [1.0,72.0,90.0,1.0]),
 LabeledPoint(1.0, [1.0,81.0,75.0,0.0]),
 LabeledPoint(0.0, [2.0,71.0,91.0,1.0])]

In [25]:
from pyspark.mllib.tree import DecisionTree

The categoricalFeaturesInfo argument is a dict or map that specifies which features are categorical and how many categorical values each of those features can take.

In [27]:
model = DecisionTree.trainClassifier(data=data,
        numClasses=2,
        categoricalFeaturesInfo={0: 3})

In [28]:
print(model.toDebugString())

DecisionTreeModel classifier of depth 3 with 9 nodes
  If (feature 0 in {0.0,2.0})
   If (feature 2 <= 82.5)
    If (feature 1 <= 66.5)
     Predict: 0.0
    Else (feature 1 > 66.5)
     Predict: 1.0
   Else (feature 2 > 82.5)
    If (feature 1 <= 70.5)
     Predict: 1.0
    Else (feature 1 > 70.5)
     Predict: 0.0
  Else (feature 0 not in {0.0,2.0})
   Predict: 1.0



In [29]:
model.predict([1.0,85,85,True])

1.0

# Naive bayes

In [30]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel

model = NaiveBayes.train(data=data, lambda_ = 1.0)
model.predict([1.0, 85,85,True])

1.0

# Recommendation

In [31]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [39]:
data = sc.textFile('movielens.dat')
ratings = data.map(lambda x: x.split('\t'))\
    .map( lambda x: Rating(int(x[0]), int(x[1]), float(x[2]) ))

In [42]:
ratings.take(1)

[Rating(user=196, product=242, rating=3.0)]

In [41]:
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

In [73]:
testdata = ratings.map(lambda p: (p[0], p[1]))

In [74]:
testdata.take(2)

[(196, 242), (186, 302)]

In [75]:
predictions = model.predictAll(testdata).map(lambda r: ( (r[0], r[1]), r[2]) ) 

In [76]:
predictions.take(2)

[((316, 1084), 3.916170460277161), ((504, 1084), 3.768507462239329)]

In [77]:
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

In [78]:
ratesAndPreds.take(2) #label, prediction

[((186, 302), (3.0, 2.609181678937813)),
 ((115, 265), (2.0, 3.035589378218772))]

In [79]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()

In [80]:
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 0.4822655821612374


# Save and load model

In [None]:
#“model.save(sc, "file:///opt/spark/data/ratings_model")

In [None]:
# from pyspark.mllib.recommendation import MatrixFactorizationModel

# reloaded_model = MatrixFactorizationModel.load(sc, "file:///opt/spark/data/ratings_model")

# Kmeans

In [82]:
from pyspark.mllib.clustering import KMeans, KMeansModel #Numpy
from numpy import array
from math import sqrt

In [86]:
# Load and parse the data
data = sc.textFile("/usr/local/Cellar/apache-spark/3.0.1/libexec/data/mllib/kmeans_data.txt")

In [90]:
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode='random')

In [92]:
# Evaluate clustering by computing Within Set Sum of Squared Errors

def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

In [93]:
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x,y: x+y)
print("Within set sum of squared error = "+str(WSSSE))

Within set sum of squared error = 0.6928203230275529


# Save and load kmeans

In [94]:
# clusters.save(sc, "hdfs:///.../kmeans_model")
# reloaded_model = KMeansModel.load(sc, "hdfs:///.../kmeans_model")