In [1]:
import sys
from pyspark import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row
import operator
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

sc = SparkContext("local[2]", "keamns streaming") # run locally with 2 cores
ssc = StreamingContext(sc, 1)  # 1 second per batch

In [2]:
# each row of features should be put into Vector
trainingData = sc.textFile("kmeans_train.csv")\
    .map(lambda line: Vectors.dense([float(x) for x in line.split(',')]))

In [3]:
# first element is the label, others are the features in a vector
# Each label & vector should be put in LabelPoint 
def sepLabelVector(line):
    elems = [float(x) for x in line.split(',')]
    label = elems[0]
    vector = Vectors.dense(elems[1:])
    
    return LabeledPoint(label, vector)

testingData = sc.textFile("kmeans_test.csv")\
                .map(lambda l: sepLabelVector(l))

In [4]:
trainingQueue = [trainingData]
testingQueue = [testingData]

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

In [5]:
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 410)

# Now register the streams for training
model.trainOn(trainingStream)

In [6]:
# print out predicted clusters for each arrived new data
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

In [7]:
ssc.start()  # start streaming
ssc.stop(stopSparkContext=True, stopGraceFully=True)  # end streaming after it's finished

-------------------------------------------
Time: 2018-01-27 19:07:39
-------------------------------------------
(1.0, 0)
(2.0, 0)
(1.0, 0)
(2.0, 0)

-------------------------------------------
Time: 2018-01-27 19:07:40
-------------------------------------------



In [8]:
# create another Spark Context to predict on data without label

sc = SparkContext("local[2]", "keamns streaming no label") # run locally with 2 cores
ssc = StreamingContext(sc, 1)  # 1 second per batch

In [9]:
# each row of features should be put into Vector
trainingData = sc.textFile("kmeans_train.csv")\
    .map(lambda line: Vectors.dense([float(x) for x in line.split(',')]))

In [10]:
# first element is the label, others are the features in a vector
# Only get features
def getVector(line):
    elems = [float(x) for x in line.split(',')]
    vector = Vectors.dense(elems[1:])
    
    return vector

testingData = sc.textFile("kmeans_test.csv")\
                .map(lambda l: getVector(l))

In [11]:
trainingQueue = [trainingData]
testingQueue = [testingData]

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 410)

# Now register the streams for training
model.trainOn(trainingStream)

In [12]:
# print out predicted clusters for each arrived new data
result = model.predictOn(testingStream)
result.pprint()

In [13]:
ssc.start()  # start streaming
ssc.stop(stopSparkContext=True, stopGraceFully=True)  # end streaming after it's finished

-------------------------------------------
Time: 2018-01-27 19:14:47
-------------------------------------------
0
0
0
0

-------------------------------------------
Time: 2018-01-27 19:14:48
-------------------------------------------

