# Requierments


In [1]:
!pip install findspark
!pip install confluent-kafka
# Downloaded from https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly_2.11
# !wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.0/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar



In [2]:
import os
import findspark
findspark.init('/usr/local/spark/spark-2.4.0-bin-hadoop2.7')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'

---

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pickle
from pyspark.mllib.feature import HashingTF, IDF, StandardScaler
from pyspark.mllib.classification import LogisticRegressionModel
from pyspark.mllib.linalg import Vectors
from urllib.parse import unquote

APP_NAME = "BigData"
sc = SparkContext(appName=APP_NAME)
ssc = StreamingContext(sc, 2)
sc

In [4]:
with open('./data/hashedTF.pickle', 'rb') as f:
    hashingTF = pickle.load(f)
    
htfVectors = sc.textFile("./data/htfVectors").map(Vectors.parse)
idf = IDF().fit(htfVectors)

tfidf =  idf.transform(htfVectors)

scaler = StandardScaler().fit(tfidf)

model = LogisticRegressionModel.load(sc, "./models/Logistic_Regression")


topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})


print("ready to run")

ready to run


In [5]:
def to_ngram(payload_obj):
    n=2
    payload = str(payload_obj)
    ngrams = []
    for i in range(0,len(payload)-n+1):
        ngrams.append(payload[i:i+n])
    return ngrams


def get_prediction(queries):
    ngrams = queries.map(lambda x: to_ngram(x))
    tf = hashingTF.transform(ngrams)
    tfidf = idf.transform(tf)
    scaled_tfidf = scaler.transform(tfidf)
    for q in zip(queries.collect(),model.predict(scaled_tfidf).collect()):
        print(q)
    

In [6]:
queries = kvs.map(lambda x: unquote(x[1]))
# ngrams = queries.map(lambda x: to_ngram(x))
queries.foreachRDD(get_prediction)

ssc.start()
ssc.awaitTermination()  

('/103886/', 0)
('/rcanimal/', 0)
('/458010b88d9ce/', 0)
('/cclogovs/', 0)
('/using-localization/', 1)
('/121006_dakotacwpressconf/', 1)
('/50393994/', 1)
('/169393/', 1)
('/166636/', 0)
('/labview_v2/', 1)
('/javascript/nets.png', 1)
('/p25-03/', 0)
('/javascript/minute.rb', 1)
('/javascript/weblogs.rss', 1)
('/javascript/util.rtf', 1)
('/180742/', 0)
('/javascript/typo3conf.do', 1)
('/000054736/', 1)
('/175034/', 0)
('/epsilon-0/', 0)
('/155470/', 0)
('/000023204/', 1)
('/radio_explorer/', 1)
('/jadedem/', 1)
('/javascript/legacy.swf', 1)
('/network-forensics/', 1)
('/free-cookies/', 0)
('/yukonvotes2006/', 0)
('/weblinks_panel/', 1)
('/131252/', 1)
('/20051025/', 0)
('/javascript/fn.mdb', 1)
('/2165498/', 0)
('/newsbordertop/', 1)
('/javascript/traceroute.vb', 1)
('/computingbusiness/', 0)
('/parabolic/', 1)
('/side2-bottomright/', 1)
('/includes/functions_kb.php?phpbb_root_path=http://cirt.net/rfiinc.txt?', 1)
('/javascript/usr.ep', 1)
('/page-13/', 1)
('/comxast/', 0)
('/dj mortis

KeyboardInterrupt: 