In [1]:
import time
import re, ast
import numpy as np
import os
import pandas as pd
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[*] pyspark-shell'

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("KafkaReceive") \
    .getOrCreate()

In [3]:
from sklearn.cluster import KMeans
locs = pd.read_csv("../../data/mote_locs.txt", header=None, sep=" ")
locs.columns = ["SensorId", "X", "Y"]
ids = locs.values[:, 0]
pos = locs.values[:, 1:]

kmeans = KMeans(n_clusters=5, random_state=0).fit(pos)
labels = np.vstack((ids, kmeans.labels_)).T.astype(int)
clusters = pd.DataFrame({'SensorId': labels[:,0], 'ClusterId': labels[:,1]})

clusters.loc[clusters.SensorId==1].ClusterId

0    2
Name: ClusterId, dtype: int64

In [4]:
# sensor to predict

target_sensor_id = 1
target_cluster_id = int(clusters.loc[clusters.SensorId==target_sensor_id].ClusterId)
clusters.loc[clusters.ClusterId == target_cluster_id]

Unnamed: 0,ClusterId,SensorId
0,2,1
1,2,2
2,2,3
32,2,33
33,2,34
34,2,35
35,2,36
36,2,37
37,2,38
38,2,39


In [5]:
#This function creates a connection to a Kafka stream
#You may change the topic, or batch interval
#The Zookeeper server is assumed to be running at 127.0.0.1:2181
#The function returns the Spark context, Spark streaming context, and DStream object
def getKafkaDStream(spark,topic='persistence',batch_interval=10):

    #Get Spark context
    sc=spark.sparkContext

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    #Create a DStream that represents streaming data from Kafka, for the required topic 
    dstream = KafkaUtils.createStream(ssc, "zoo1:2181,zoo2:2181,zoo3:2181", "spark-streaming-consumer", {topic: 1})
    
    return [sc,ssc,dstream]

In [6]:
#Save state in global Python variable
def saveState(rdd):
    global state_global
    if rdd is not None:
        data=rdd.collect()
        state_global=data
        
def printInfoRDD(rdd):
    clear_output(wait=True)
    if rdd is not None:
        print("The RDD has "+str(rdd.getNumPartitions())+" partitions")
        print("The RDD has "+str(rdd.count())+" elements")
    else:
        print("No info to provide")

In [7]:
def updateFunction(new_values, state): 
    ## RLS update function
    ## Only update with first value of RDD. You should transform new_values to array, and update models for all values 
    if (len(new_values)>0 ):
        
        key=new_values[0][0]
        yx=new_values[0][1]
        i=yx[0]
        y=yx[1]
        x=yx[2:]
        n=len(x)
        
        beta=state[1]
        beta.shape=(n,1)
        V=state[2]
        mu=state[3]
        sse=state[4]  ## sum of squared errors
        N=state[5]   ## number of treated samples
        x.shape=(1,n)
        err=y-x.dot(beta)
        sse=sse+pow(err,2.0)
        V=1.0/mu*(V-V.dot(x.T).dot(x).dot(V)/(1.0+float(x.dot(V).dot(x.T))))
        gamma=V.dot(x.T)
        beta=beta+gamma*err
        
        return (key,beta,V,mu,sse/(N+1.0),N+1)  ## update formula mod1
        
    else:
        return state

In [8]:
def updateEMA(new_value, state):
    # new_value = (x, y_true) where x in avg temp in cluster
    # state = (alpha, EMA, SSE, N)
    if len(new_value) > 0:
        x = new_value[0][0]
        y_true = new_value[0][1]
        bin = new_value[0][2]
        alpha = state[0]
        EMA = state[1]
        EMA = alpha*x + (1-alpha)*EMA
        N = state[3]
        SSE = state[2] * N
        if y_true != None:
            err = y_true - EMA      
            SSE = SSE + pow(err,2)
            N += 1
        return (alpha, EMA, SSE/N, N, err**2, y_true, bin)
    else:
        return state

In [26]:
OPENTSDB_URL = "localhost:4242" #'http://' + os.environ.get('OPENTSDB_URL')
print(OPENTSDB_URL)

def to_json(state):
    y_true = dict()
    y_pred = dict()
    error = dict()
    state = state[1]
    y_true['metric'] = 'temperature.truth'
    y_true['timestamp'] = state[-1]
    y_true['value'] = state[5]
    y_true['tags']['space'] = 1
    
    y_pred['metric'] = 'temperature.prediction'
    y_pred['timestamp'] = state[-1]
    y_pred['value'] = state[1]
    y_pred['tags']['space'] = 1
    
    error['metric'] = 'temperature.error'
    error['timestamp'] = state[-1]
    error['value'] = state[4]
    error['tags']['space'] = 1
    
    data = [y_true, y_pred, error]
    return data


def sendPartition(iter):
    if iter:
        r = requests.post(OPENTSDB_URL + '/api/put', data=json.dumps(iter))
        print(r.status_code)
        return r.status_code
    else:
        r = 400
        print(r)
        return r
    

localhost:4242


In [33]:
import re, ast

time_resolution = 10
wait_time = 1
simulation = 1

alpha1 = 0.25
state = (alpha1,20,0,0,0,20,0)

# dstream value format: (bin, y_true, Value, seconds, SensorId, Type):
if simulation:
    [sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='ClusterRLSTrain',batch_interval=wait_time)
    dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
else:
    [sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='ClusterRLSTrain',batch_interval=time_resolution)
    dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))\
                     .window(time_resolution, time_resolution)

# dstream value format: (ClusterId, (bin, y_true, Value, seconds, SensorId, Type)):
dstream = dstream.map(lambda x: (int(clusters.loc[clusters.SensorId==int(x[4])].ClusterId), x))
# keep only data belonging to target_cluster_id:
dstream = dstream.filter(lambda x: x[0] == target_cluster_id)
dstream = dstream.mapValues(lambda x: (x[2], 1, x[1], x[0])) # (x, 1, y_true, bin)
# compute average of x:
dstream = dstream.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2], x[3]))
dstream = dstream.mapValues(lambda x: (x[0]/x[1], x[2], x[3]))

initialStateRDD = sc.parallelize([(target_cluster_id, state)])
dstream = dstream.updateStateByKey(updateEMA,initialRDD=initialStateRDD)
#dstream.pprint()
stateStream = dstream.flatMap(lambda x: [{'metric': 'temperature.prediction',
                                           'timestamp': time.time() + x[1][-1],
                                           'value': x[1][1],
                                           'tags': {'space': 1}},
                                           {'metric': 'temperature.truth',
                                           'timestamp': time.time() + x[1][-1],
                                           'value': x[1][5],
                                           'tags': {'space': 1}},
                                           {'metric': 'temperature.error',
                                           'timestamp': time.time() + x[1][-1],
                                           'value': x[1][4],
                                           'tags': {'space': 1}}
                                          ])
#stateStream.pprint()
stateStream.foreachRDD(printInfoRDD)

In [34]:
ssc.start()

-------------------------------------------
Time: 2019-05-29 15:51:01
-------------------------------------------
{'metric': 'temperature.prediction', 'timestamp': 1559145061.8363674, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.truth', 'timestamp': 1559145061.8363698, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.error', 'timestamp': 1559145061.8363702, 'value': 0, 'tags': {'space': 1}}

-------------------------------------------
Time: 2019-05-29 15:51:02
-------------------------------------------
{'metric': 'temperature.prediction', 'timestamp': 1559145062.7725465, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.truth', 'timestamp': 1559145062.772551, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.error', 'timestamp': 1559145062.7725523, 'value': 0, 'tags': {'space': 1}}

-------------------------------------------
Time: 2019-05-29 15:51:03
-------------------------------------------
{'metric': 'temperature.prediction', 'timestamp':

In [35]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

-------------------------------------------
Time: 2019-05-29 15:51:05
-------------------------------------------
{'metric': 'temperature.prediction', 'timestamp': 1559145065.732522, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.truth', 'timestamp': 1559145065.732526, 'value': 20, 'tags': {'space': 1}}
{'metric': 'temperature.error', 'timestamp': 1559145065.7325268, 'value': 0, 'tags': {'space': 1}}

