![title](http://ocausal.imbv.net/wp-content/uploads/2017/02/banner-autocop-3.jpg)
[AutoCop](http://ocausal.imbv.net/proyecto-autocop-es/), Proof of Concept of the  Observatorio de Contenidos Audiovisuales ([OCA](http://ocausal.imbv.net/proyecto-autocop-es/)), funded by the University of Salamanca Foundation [Plan TCUE 2015-2017 Fase 2]. 
Principal Investigator: Carlos Arcila Calderón. Researchers: Félix Ortega, Javier Amores, Sofía Trullenque, Miguel Vicente, Mateo Álvarez, Javier Ramírez

# AutoCop to run in Spark in English

# Spark application
This application starts a streaming context connected to a kafka topic with a kafka consumer and predicts the score of tweets with MLlib algorithms

### Import necessary libraries and start Spark Context

In [1]:
import findspark
findspark.init()
import pyspark
import os
import datetime
from pyspark.sql.functions import lit
from pyspark import SparkConf

SUBMIT_ARGS = "--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.1.0,\
org.mongodb.spark:mongo-spark-connector_2.10:2.0.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
#conf = SparkConf()
conf = (SparkConf()\
        .set("spark.mongodb.input.uri", "mongodb://localhost:27017/twitter.tests") \
        .set("spark.mongodb.output.uri", "mongodb://localhost:27017/twitter.tests"))
    
sc = pyspark.SparkContext(appName="streaming_app", conf=conf) \
.getOrCreate()
spark = pyspark.sql.SparkSession(sc)

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)

## Kafka configuration

Set the server configuration and the topic to read from

In [2]:
kafka_configuration_params = {
    "topic": ["BigData"],
    "connectionstring": "localhost:9092"
}

In [3]:
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(
    ssc, kafka_configuration_params["topic"],
    {"metadata.broker.list": kafka_configuration_params["connectionstring"]})

## Import algorithms

Import the stored algorithms, the algorithms have to have been trained before with the script provided.

In [4]:
from pyspark.mllib.classification import SVMModel, LogisticRegressionModel, NaiveBayesModel

In [5]:
classif_LR_model = LogisticRegressionModel.load(sc, "LR_model")
classif_SVM_model = SVMModel.load(sc, "SVM_model")
classif_NB_model = NaiveBayesModel.load(sc, "NB_model")

In [6]:
LR_model = classif_LR_model#.clearThreshold() only if saved without thresshold
SVM_model = classif_SVM_model#.clearThreshold() only if saved without thresshold
NB_model = classif_NB_model # NB already generates probabilities only if saved without thresshold

## Transform streaming rdds to match algorithms input

To be able to classify the tweets, these have to pass through the same process the training instances passed, to transform each tweet into a Spark's Labeled Point instance.

In [7]:
import nltk
import random
from nltk.tokenize import word_tokenize
allowed_word_types = ["JJ"]

## Getting the word list generated during training

In [8]:
rdd_all_words = sc.textFile("all_words/part-00000")
rdd_broadcast_all_words = sc.broadcast(rdd_all_words.collect())

In [9]:
def convert_tweet_to_instance(tweets):
    
    rdd_tweets = tweets.map( \
    lambda tweet: [word[0] for word in nltk.pos_tag(word_tokenize(tweet)) if word[1] in allowed_word_types])
    
    rdd_instances = rdd_tweets.map(lambda instance: find_features(instance))
    
    return rdd_instances

def find_features(instance):
    features = []
    for word in rdd_broadcast_all_words.value:
        if word in instance:
            features.append(1)
        else:
             features.append(0)   
    return features

# Command to start streaming

Get tweets from kafka stream and save the text to a RDD

In [10]:
rdd_input = directKafkaStream.map(lambda output: output[1])

Convert the tweets to instances with the same transformation of the training process and user them to predict the label with the three models

In [11]:
classification = convert_tweet_to_instance(rdd_input).map(lambda instance: \
1 if (LR_model.predict(instance) + int(NB_model.predict(instance)) + SVM_model.predict(instance))>=1 else -1)

classification_each = convert_tweet_to_instance(rdd_input).map(lambda instance: \
[LR_model.predict(instance), NB_model.predict(instance), SVM_model.predict(instance)])

Spark streaming actions: printing each prediction with standard output and writting predictions and tweets to files (Spark creates a new folder with the results for each batch)

In [12]:
#classification_each.pprint()
#classification.saveAsTextFiles("file:///Users/carlosarcila/Development/autocop_dis/notebooks/result", "txt")
#rdd_input.saveAsTextFiles("file:///Users/carlosarcila/Development/autocop_dis/notebooks/tweet", "txt")

## Use database to store results

In [13]:
def save_to_db(rdd, collection):
    df = rdd.zipWithUniqueId().toDF().withColumn('timestamp',lit(datetime.datetime.utcnow()))\
    .toDF('label','in_batch_id', 'timestamp')

    df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database",
"twitter").option("collection",collection).save()

In [14]:
classification.foreachRDD(lambda rdd: save_to_db(rdd,"labels"))
rdd_input.foreachRDD(lambda rdd: save_to_db(rdd, "tweets"))

## Start process

In [15]:
ssc.start()

In [16]:
ssc.stop(stopSparkContext=False)

In [17]:
len(NB_model.theta[0])

6016