In [3]:
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel, StreamingLinearRegressionWithSGD


<b>First we will start by split our dataset to use as several chunks in the stream

In [None]:
# Read the file as Pandas
Green_Taxi_DF = pd.read_csv("Green_Taxi_B7.csv") 

In [None]:
# Split it to files
chunk1,chunk2,chunk3,chunk4 = np.array_split(Green_Taxi_DF, 4)


In [None]:
# Save the chunks to the directory that you will use for stream Data
# In my case:
# The Stream Train Folder is "Stream_Folder/DataFiles/Train/""
# The Stream Test Folder is "Stream_Folder/DataFiles/Test/"

chunk1.to_csv(r'Stream_Folder/DataFiles/Train/chunk1.csv')
chunk2.to_csv(r'Stream_Folder/DataFiles/Test/chunk2.csv')
chunk3.to_csv(r'Stream_Folder/DataFiles/Test/chunk3.csv')
chunk4.to_csv(r'Stream_Folder/DataFiles/Test/chunk4.csv')


<b>Now its time to use the Streaming

Read starting "Streaming linear regression" <br>
https://spark.apache.org/docs/1.6.2/mllib-linear-methods.html


Read starting "Streaming linear regression" <br>
https://spark.apache.org/docs/2.2.0/mllib-linear-methods.html#streaming-linear-regression


Another good Example <br>
https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/streaming_linear_regression_example.py


Read starting "MLlib Operations" <br>
https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#mllib-operations


Help <br>
https://spark.apache.org/docs/2.1.3/api/python/pyspark.streaming.html

<b>Spark Streaming provides two categories of built-in streaming sources.</b>

<b>Basic sources:</b> <br>
Sources directly available in the StreamingContext API. Examples: file systems, and socket connections. 

<b>Advanced sources:</b> <br>
Sources like Kafka, Flume, Kinesis, etc. are available through extra utility classes.
            

In [1]:
# Starting Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config('spark.master', 'local[4]') \
    .appName('Test') \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
ssc = StreamingContext(sc,batchDuration=30)

#location = 'Stream_Folder/DataFiles/'
#lines = ssc.textFileStream("Stream_Folder/DataFiles/chunk1.csv").cache()

##checkpointDir = "/LOCAL_DIR/DATA/"
#ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)



#lines = ssc.socketTextStream("localhost", 9999)

In [None]:
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

In [None]:
trainingData = ssc.textFileStream("Stream_Folder/DataFiles/Train").cache().map(parse).cache()
testData = ssc.textFileStream("Stream_Folder/DataFiles/Test")#.map(parse).cache()


# The Stream Train Folder is "Stream_Folder/DataFiles/Train/""
# The Stream Test Folder is "Stream_Folder/DataFiles/Test/"

#trainingData = ssc.textFileStream("Stream_Folder/DataFiles/CheckPoints").cache().map(parse).cache()
#testData = ssc.textFileStream("Stream_Folder/DataFiles/DataFiles")#.map(parse).cache()



In [None]:
testData

In [None]:
# This will create a line with zero records instead of writting 12 zeros features 
# ([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

numFeatures = 12
zeroVector = Vectors.zeros(numFeatures)

zeroVector

In [None]:
model = StreamingLinearRegressionWithSGD()

#.setInitialWeights([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

model.setInitialWeights(zeroVector)

In [None]:
#model1 = StreamingLinearRegressionWithSGD(stepSize=0.01, numIterations=1)
#model1.setInitialWeights(Vectors.dense([0] * numFeatures))

#model2 = StreamingLinearRegressionWithSGD(stepSize=1, numIterations=1)
#model2.setInitialWeights(Vectors.dense([0] * numFeatures))


#def parsePoint(line):
#    values = [float(x) for x in line.split(",")]
#    return LabeledPoint(label=values[0], features=Vectors.dense(values[1:]))


#labeledStream = stream.map(lambda line: parsePoint(line))

#model1.trainOn(labeledStream)
#model2.trainOn(labeledStream)


In [None]:
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

#model.predictOn(testData.map(lambda lp: lp.features))

#print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
#model.predictOnValues(testData)

#predict = model.predictOn(testData.map(lambda lp: lp.features))
#predict.foreachRDD(processToFloat)


In [None]:
ssc.start()
ssc.awaitTermination()