In [3]:
import findspark
findspark.init()
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark import SparkContext

# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(')')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

sc =  SparkContext.getOrCreate()  # SparkContext
ssc = StreamingContext(sc, 1)

trainingData = sc.textFile("test.txt").map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("streaming.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]

trainingDifferential = sc.textFile("testdiff.txt").map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
differentialQueue = [trainingDifferential]

trainingStream = ssc.queueStream(trainingQueue).cache()
testingStream = ssc.queueStream(testingQueue)
differentialStream = ssc.queueStream(differentialQueue)


# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0).setHalfLife(0.5, timeUnit = "batches")

# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

combinedStream = trainingStream.join(differentialStream)
newModel = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3,1.0,0).setHalfLife(0.5, timeUnit = "batches")
newModel.trainOn(combinedStream)

result = newModel.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)

print("Final centers (Original): " + str(model.latestModel().centers) + "\n")
print("Final centers (Differential): " + str(newModel.latestModel().centers))

-------------------------------------------
Time: 2020-10-15 16:41:46
-------------------------------------------
(1.0, 0)
(2.0, 1)

-------------------------------------------
Time: 2020-10-15 16:41:46
-------------------------------------------
(1.0, 0)
(2.0, 1)

-------------------------------------------
Time: 2020-10-15 16:41:47
-------------------------------------------

-------------------------------------------
Time: 2020-10-15 16:41:47
-------------------------------------------

Final centers (Original): [[ 6.25577984  6.25403349  6.25477431]
 [ 2.2408932   1.86755799 -0.97727788]]

Final centers (Differential): [[ 1.76405235  0.40015721  0.97873798]
 [ 2.2408932   1.86755799 -0.97727788]]
