# KERAS MODEL IN STREAM CONTEXT

## Initial setup

In [21]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import Row, SparkSession

In [22]:
import numpy as np
from keras.models import model_from_json

from keras.models import Sequential
from keras.layers import Conv2D, Dropout, Merge, Dense
from keras.layers.embeddings import Embedding
from keras.layers.pooling import GlobalMaxPooling2D
from keras.callbacks import TensorBoard
from keras.utils.np_utils import to_categorical
from keras import optimizers
from keras.layers import advanced_activations
from keras import initializers

import time

In [23]:
spark = (SparkSession.builder
    .master("local[8]")
    .config("spark.driver.cores", 8)
    .appName("KerasStream")
    .getOrCreate() )

sc = spark.sparkContext
ssc = StreamingContext(sc, 3)

In [24]:
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

In [25]:
json_file = open('./models/CNN_model.json', 'r')
loaded_model_json = json_file.read()
data_model = sc.broadcast(loaded_model_json)
json_file.close()

## Keras Stream

In [26]:
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

In [28]:
def get_pred(data):
    tam_fijo = 15
    embedding_vecor_length = 300
    # n_filters = 3

    loaded_model = model_from_json(data_model.value)
    # load weights into new model
    loaded_model.load_weights("./models/CNN_model.h5")
    
    numpVect = data.split()
    cont_num = 0
    cont_words = 0
    X_test = np.zeros((1, tam_fijo, embedding_vecor_length))
        
    for num in numpVect:
        if cont_num == 300:
            cont_words += 1
            cont_num = 0
            # print(str(cont_tweet) + ' ' + str(cont_words) + ' ' + str(cont_num))
        X_test[0][cont_words][cont_num]= num
        cont_num += 1
    cont_words = 0
    cont_num = 0
    
    prediction = loaded_model.predict(X_test)
    sent = np.argmax(prediction)
    
    if sent == 1:
        return 'POSITIVE'
    else:
        return 'NEGATIVE'

In [29]:
instances = directKafkaStream.map(lambda x: x[1])


def process(time, rdd):
    print("========= %s =========" % str(time))
    # Get the singleton instance of SparkSession
    spark = getSparkSessionInstance(rdd.context.getConf())

    rowRdd4 = rdd.map(lambda w: Row(sen=get_pred(w)))
    df4 = spark.createDataFrame(rowRdd4)
        
    df4.show()



instances.foreachRDD(process)

Execute next line in terminal:
** python kafka_user_producer.py test **

## Start Streaming

In [30]:
# First, execute only one of the example cells above; then, start the StreamingContext as follows
ssc.start()

+--------+
|     sen|
+--------+
|POSITIVE|
+--------+

+--------+
|     sen|
+--------+
|NEGATIVE|
+--------+



In [31]:
# Once you are done, stop the StreamingContext; for the next run you must recreate it
# again since the beginning (cell #2)
ssc.stop(False)