# Requirements 

## Pip Install - findspark + confluent-kafka

In [1]:
!pip install findspark
!pip install confluent-kafka

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2
Collecting confluent-kafka
[?25l  Downloading https://files.pythonhosted.org/packages/48/19/3bbbed00188ccf89b181398b9fcb6612722f9b1e4eeb32528dcc7c5ba0f4/confluent_kafka-1.6.1-cp37-cp37m-manylinux2010_x86_64.whl (2.7MB)
[K    100% |████████████████████████████████| 2.7MB 3.4MB/s ta 0:00:01
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.6.1


## Download - Spark Project External Kafka Assembly

In [2]:
!wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.0/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar

--2021-04-03 20:52:42--  https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.0/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.112.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.112.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13385346 (13M) [application/java-archive]
Saving to: ‘spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar’


2021-04-03 20:52:44 (5.72 MB/s) - ‘spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar’ saved [13385346/13385346]



## imports

In [3]:
import os
import findspark
findspark.init('/usr/local/spark/spark-2.4.0-bin-hadoop2.7')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'

In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
from pyspark.sql.functions import udf
from pyspark.ml.feature import CountVectorizerModel, IDFModel, StandardScalerModel, Tokenizer
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.streaming.kafka import KafkaUtils
from urllib.parse import unquote

#Set spark configuration
APP_NAME = "BigData"
conf = pyspark.SparkConf().setAll([ ('spark.app.name', APP_NAME),
                                    ('spark.executor.memory', '8g'),
                                    ('spark.cores.max', '2'),
                                    ('spark.driver.memory','8g'),
                                    ('spark.master', 'local[2]')])
sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)
sc


## payload to ngram function

In [5]:
def to_ngram(payload_obj):
    n=2
    payload = str(payload_obj)
    ngrams = ''
    for i in range(0,len(payload)-n + 1):
        ngrams += payload[i:i+n]+ ' '
    return ngrams[:-1]

ngrams = udf(to_ngram, StringType())
tokenizer = Tokenizer.load('../models/Tokenizer')
vectorizer = CountVectorizerModel.load('../models/Vectorizer')
idf_model = IDFModel.load('../models/idf')
scalerModel = StandardScalerModel.load('../models/scalerModel')
model = LogisticRegressionModel.load('../models/Logistic_Regression_Model')


topic = "test"
brokers = "localhost:9092"
ssc = StreamingContext(sc, batchDuration= 3)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})


print("all setup - ready to run")

all setup - ready to run


## queries get predictions  

In [6]:
def get_prediction(queries):

    try:
        queries = queries.map(lambda w: Row(payload=w))
        queries = sqlc.createDataFrame(queries)
        queries = queries.withColumn('ngrams', ngrams(queries['payload']))
        queries = tokenizer.transform(queries)
        queries = vectorizer.transform(queries)
        queries = idf_model.transform(queries)
        queries = scalerModel.transform(queries)
        preds = model.transform(queries)
        preds.select('payload','prediction').show(truncate=False)
    except : 
        print('No data')

## start streaming

In [7]:
queries = kvs.map(lambda x: unquote(x[1]))
# ngrams = queries.map(lambda x: to_ngram(x))
queries.foreachRDD(get_prediction)


ssc.start()
ssc.awaitTermination()  

No data
No data
No data
No data
No data
+--------------------------------------+----------+
|payload                               |prediction|
+--------------------------------------+----------+
|/tagarela 1/                          |0.0       |
|/realplayer10/                        |0.0       |
|/6404573/                             |0.0       |
|/000023953/                           |0.0       |
|/_mocks/                              |0.0       |
|/javascript/oaerrordetailpage.svn-base|0.0       |
|/041440/                              |0.0       |
|/mgnews/                              |0.0       |
|/123004/                              |0.0       |
|/nettoys/                             |0.0       |
+--------------------------------------+----------+

+--------------------------------+----------+
|payload                         |prediction|
+--------------------------------+----------+
|/flag_japan/                    |0.0       |
|/666865/                        |0.0       |
|

+-------------------------------------------------------------------------------+----------+
|payload                                                                        |prediction|
+-------------------------------------------------------------------------------+----------+
|/?<script>document.cookie="testqppc=8767;"</script>                            |1.0       |
|/gpb/include/gpb.inc.php?root_path=http://192.168.202.118:8080/tzhfyzkbomspvm??|0.0       |
|/en-us/jnv890lt.cfm?<script>cross_site_scripting.nasl</script>                 |1.0       |
|/w32rbotaqs/                                                                   |0.0       |
|/javascript/7z.xml                                                             |0.0       |
|/107845/                                                                       |0.0       |
|/design6/                                                                      |0.0       |
|/051318/                                                             

KeyboardInterrupt: 