<a href="https://colab.research.google.com/github/dodonas/CC_Ext_2/blob/main/final/CLOUD_TECH_EXT_2_CONSUMER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Afeka - 65215 - Ext 2 - Kafka and Spark Streaming

# Submitter: Andrey Dodon - 317858298

# Setup

Install the required kafka package and get AFINN-111.txt

In [None]:
%%capture
!pip install findspark
!pip install confluent_kafka

!apt-get install openjdk-11-jdk-headless -qq > /dev/null

!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.4.8/spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar"
!wget https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!wget https://github.com/dodonas/CC_Ext_2/blob/main/AFINN-111.txt
!tar -xvf spark-2.4.8-bin-hadoop2.7.tgz

## Set the parameters

In [None]:
kafka_topic_name = "RomeoAndJuliet"
kafka_bootstrap_servers = 'localhost:9092'
wordsWithSentimentLevel = 'AFINN-111.txt' 

In [None]:
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar pyspark-shell'

## Import packages

Provides findspark.init() to make pyspark importable as a regular library

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import string
from operator import add
from confluent_kafka import KafkaError, KafkaException, Consumer

In [None]:
sc = SparkContext()
ssc = StreamingContext(sc,5)

# Consumer loop

In [None]:
def consume_loop(consumer, topics):
  print('Started consuming...')
  records = []
  try:
      consumer.subscribe(topics)
      msg = consumer.poll(timeout=2.0)
      with open(wordsWithSentimentLevel, encoding='utf-8-sig', mode='r') as afinn111:
        while msg is not None:
          if not msg.error():
            dictionary = {}
            for line in afinn111:
              _line = line.replace('\n','').split('\t')
              dictionary[_line[0]] = int(_line[1])
              
            sum = sc.parallelize(msg).flatMap(lambda x: x.split(' ')).map(
                lambda word: word.translate(str.maketrans('', '', string.punctuation))).map(
                    lambda word: dictionary.get(word, 0)).reduce(add)
            records.append(sum)
          elif msg.error().code() == KafkaError._PARTITION_EOF:
            sys.stderr.write('%% %s [%d] reached end at offset %d\n' % 
                            (msg.topic(), msg.partition(), msg.offset()))
          elif msg.error():
            raise KafkaException(msg.error())
          msg = consumer.poll(timeout=2.0)
  finally:
    print('No messages were received in the last 2 seconds, closing the consumer')
    consumer.close()

In [None]:
conf = {'bootstrap.servers': kafka_bootstrap_servers,
        'group.id': "afeka-group",
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

consume_loop(consumer, [kafka_topic_name])

Started consuming...
No messages were received in the last 2 seconds, closing the consumer
