# Consume Kafka Topic, Process with Spark Stream and write to Cassandra

### Imports

In [1]:
try: 
    import os, sys,re
    from textblob import TextBlob
    from pyspark.ml.feature import Tokenizer, RegexTokenizer
    from pyspark.sql import functions as F
    from pyspark.sql.types import StringType, StructType, StructField, FloatType
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, udf
    from pyspark.sql.functions import col,avg,sum,min,max,row_number,explode, split
    from pyspark.sql.window import Window

except: 
    print("Import Error")

### Depending on installation

If different python versions are installed, to set env variables might be needed!

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

### User-defined functions for streamprocessing

##### preprocessing():
clean tweets for for unnecessary characters

##### getPolartiy():
compute sentiment score with library textblob

#### writeToCassandra(): 
function required for writting to cassandra (table, keyspace)

In [3]:
def preprocessing(tweet: str) -> str:
    tweet = re.sub(r'http\S+', '', str(tweet))
    tweet = re.sub(r'bit.ly/\S+', '', str(tweet))
    tweet = tweet.strip('[link]')

    # remove users
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    # remove puntuation
    my_punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â'
    tweet = re.sub('[' + my_punctuation + ']+', ' ', str(tweet))

    # remove number
    tweet = re.sub('([0-9]+)', '', str(tweet))

    # remove hashtag
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    return tweet


def getPolarity(text: str):
    return TextBlob(text).sentiment.polarity

def writeToCassandra(writeDF, _):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sencity", keyspace="sacity")\
    .save()

#Convert to udf functions for spark 
preproccesing_udf = udf( lambda z: preprocessing(z) , StringType())
sentiment_udf = udf( lambda z: getPolarity(z), FloatType()) 

### Init Spark Streaming

Init sparksession and apply transformations. 

In [4]:
spark = SparkSession \
    .builder \
    .appName("TwitterSentimentAnalysis") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0") \
    .getOrCreate()


df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "saCity") \
    .option("startingOffsets","earliest") \
    .load()


df= df.withColumn('key_str', df['key'].cast('string').alias('key_str'))\
    .drop('key').withColumn('value_str', df['value']\
    .cast('string')\
    .alias('key_str'))\
    .drop('value')


#Transform stream, Preprocess and 
clean_df = df.withColumn("preprocessed_str", preproccesing_udf(col("value_str")))
sentiment_df = clean_df.withColumn("sentiment", sentiment_udf(col("preprocessed_str")))
final_df = sentiment_df.groupBy(F.window(col("TimeStamp"), "5 seconds"), col("key_str"))\
    .agg(avg("sentiment"))\
    .select(col("window.start")\
    .alias("windowstart"), col("window.end")\
    .alias("windowend"), "key_str",col("avg(sentiment)")\
    .alias("sentiment"))





###QUERY 1: RUN THIS FOR TESTING: COMMENT QUERY NUMBER 2

"""
query = final_df.writeStream \
    .trigger(processingTime="10 seconds") \
    .outputMode("complete")\
    .format("console")\
    .start()
"""



###QUERY 2: RUN THIS PRODUCTION: COMMENT QUERY NUMBER 1 (DEFUALT)
query = final_df.writeStream \
    .trigger(processingTime="30 seconds") \
    .option("spark.cassandra.connection.host","localhost:9042")\
    .foreachBatch(writeToCassandra) \
    .outputMode("update") \
    .start()\


query.awaitTermination()


22/10/30 15:31:21 WARN Utils: Your hostname, osboxes resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/10/30 15:31:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/osboxes/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osboxes/.ivy2/cache
The jars for the packages stored in: /home/osboxes/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0f80c7bb-b90d-4b3d-80e3-fedb21898ef5;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-loggin

22/10/30 15:31:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/30 15:31:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-aae835c6-c233-4279-a6ee-544f65667976. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/10/30 15:31:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


[Stage 0:>                                                          (0 + 1) / 1]

22/10/30 15:31:35 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:36 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:31:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


[Stage 6:>                                                          (0 + 1) / 1]

22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:32:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:33:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


[Stage 14:>                                                         (0 + 1) / 1]

22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:34:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:35:30 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

                                                                                

22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 15:36:00 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/home/osboxes/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=60>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/osboxes/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/osboxes/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/osboxes/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
  

Py4JError: An error occurred while calling o106.awaitTermination