# Kafka Consumer

In [1]:
#Setup environment variables
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,org.postgresql:postgresql:42.1.1 pyspark-shell'

### Import dependencies


In [2]:
#Stream processing
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
#Data formats and db connection
import json
import psycopg2

In [3]:
#Text processing
import preprocessor as p
import re
import string

import nltk
from nltk.corpus import stopwords
from nltk import word_tokenize

### Text cleaning

In [4]:
stop_words = set(stopwords.words("english")) #create a set of stopwords
stop_words = stop_words.union(set(("im", "thats","rt"))) #add these stopwords
stop_words = stop_words - set("not") #remove not from the stopwords

p.set_options(p.OPT.URL, p.OPT.MENTION, p.OPT.EMOJI, p.OPT.SMILEY) #params to remove from the text

def preprocess(text):
    
    #remove url, mentions, smiley and emojis
    text = p.clean(text)
    
    #to lowercase
    text = text.lower()
    
    #remove punctuation
    text = re.sub(r'([^\s\w]|_)+', '', text)
    
    #removing stopwords and stem each word
    tokens = word_tokenize(text)
    
    text = [i for i in tokens if not i in stop_words]
    
    return ' '.join(text)

def bigrams(string):
    string = re.sub(' +', ' ', string)
    return re.findall("[^\s]+\s[^\s]+", s)

### Create Spark context and streaming context

In [5]:
sc = SparkContext(appName="StreamProcessorPyspark").getOrCreate()
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 10)
ssc.checkpoint('ssc_checkpoint')

### Connect to Kafka and to the DB


In [7]:
#ip = 'localhost'
ip = '35.228.250.247'
kafkaParams = {"metadata.broker.list": ip+':9092', "auto.offset.reset": 'largest'}
myStream = KafkaUtils.createDirectStream(ssc, ['DIC'], kafkaParams)

host = "ec2-54-217-206-65.eu-west-1.compute.amazonaws.com"
database = "d9bf9qompakvh6"
user = "cuapumoqmbkafk"
password = "a510ef81c98a872ea8e47b58e7e044fb5f204f56b7019a581aa3b4f9223498f8"

### Process DStream

In [9]:
tweets = myStream.map(lambda item: json.loads(item[1]))
tweets = tweets.map(lambda x: ((x['tag'], x['date']), preprocess(x['tweet']).split()))

pairs = tweets.flatMapValues(lambda x: x)
words = pairs.map(lambda x: ((x[0][0], x[0][1], x[1]), 1))                     

In [10]:
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
        
    return sum(newValues, runningCount)

counts = words.updateStateByKey(updateFunction)

### Store results into DB

In [11]:
def sendPartition(iter):
    for record in iter:
        conn = psycopg2.connect(host= host,database= database, user= user, password= password, port = 5432)
        cur = conn.cursor()

        treshold = 10
        if (record[1] > treshold):
            sql = "INSERT INTO word (tag, date, word, frequency) VALUES (%s, %s, %s, %s) " \
            "ON CONFLICT ON CONSTRAINT word_pkey DO UPDATE " \
            "SET frequency = word.frequency + %s"
            cur.execute(sql, (record[0][0],record[0][1],record[0][2],record[1],record[1]))
            conn.commit()
        conn.close()

counts.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

### Start the stream processing

In [None]:
ssc.start()
ssc.awaitTermination(timeout=10000)