# Consume Kafka Topic, Process with Spark Stream and write to Cassandra

### Imports

In [1]:
try: 
    import os, sys,re
    from textblob import TextBlob
    from pyspark.ml.feature import Tokenizer, RegexTokenizer
    from pyspark.sql import functions as F
    from pyspark.sql.types import StringType, StructType, StructField, FloatType
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, udf
    from pyspark.sql.functions import col,avg,sum,min,max,row_number,explode, split
    from pyspark.sql.window import Window

except: 
    print("Import Error")

### Depending on installation

If different python versions are installed, to set env variables might be needed!

In [2]:

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

### User-defined functions for streamprocessing

##### preprocessing():
clean tweets for for unnecessary characters

##### getPolartiy():
compute sentiment score with library textblob

#### writeToCassandra(): 
function required for writting to cassandra (table, keyspace)

In [9]:
def preprocessing(tweet: str) -> str:
    tweet = re.sub(r'http\S+', '', str(tweet))
    tweet = re.sub(r'bit.ly/\S+', '', str(tweet))
    tweet = tweet.strip('[link]')

    # remove users
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    # remove puntuation
    my_punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â'
    tweet = re.sub('[' + my_punctuation + ']+', ' ', str(tweet))

    # remove number
    tweet = re.sub('([0-9]+)', '', str(tweet))

    # remove hashtag
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    return tweet


def getPolarity(text: str):
    return TextBlob(text).sentiment.polarity

def writeToCassandra(writeDF, _):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sencity", keyspace="sacity")\
    .save()

#Convert to udf functions for spark 
preproccesing_udf = udf( lambda z: preprocessing(z) , StringType())
sentiment_udf = udf( lambda z: getPolarity(z), FloatType()) 

### Init Spark Streaming

Init sparksession and apply transformations. 

In [None]:
spark = SparkSession \
    .builder \
    .appName("TwitterSentimentAnalysis") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0") \
    .getOrCreate()


df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "saCity") \
    .option("startingOffsets","earliest") \
    .load()


df= df.withColumn('key_str', df['key'].cast('string').alias('key_str'))\
    .drop('key').withColumn('value_str', df['value']\
    .cast('string')\
    .alias('key_str'))\
    .drop('value')


#Transform stream, Preprocess and 
clean_df = df.withColumn("preprocessed_str", preproccesing_udf(col("value_str")))
sentiment_df = clean_df.withColumn("sentiment", sentiment_udf(col("preprocessed_str")))
final_df = sentiment_df.groupBy(F.window(col("TimeStamp"), "5 seconds"), col("key_str"))\
    .agg(avg("sentiment"))\
    .select(col("window.start")\
    .alias("windowstart"), col("window.end")\
    .alias("windowend"), "key_str",col("avg(sentiment)")\
    .alias("sentiment"))





###QUERY 1: RUN THIS FOR TESTING: COMMENT QUERY NUMBER 2

"""
query = final_df.writeStream \
    .trigger(processingTime="10 seconds") \
    .outputMode("complete")\
    .format("console")\
    .start()
"""



###QUERY 2: RUN THIS PRODUCTION: COMMENT QUERY NUMBER 1 (DEFUALT)
query = final_df.writeStream \
    .trigger(processingTime="10 seconds") \
    .option("spark.cassandra.connection.host","localhost:9042")\
    .foreachBatch(writeToCassandra) \
    .outputMode("update") \
    .start()\


query.awaitTermination()


22/10/30 13:18:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-53c6eaab-bca3-4e17-afcb-a14c1579bae7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/10/30 13:18:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/30 13:18:39 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 13:18:40 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
22/10/30 13:18:40 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It

                                                                                

22/10/30 13:18:54 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 15370 milliseconds
22/10/30 13:43:03 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-e8f37422-6dd8-4eea-a9db-876b57bfb254-982882963-driver-0-7, groupId=spark-kafka-source-e8f37422-6dd8-4eea-a9db-876b57bfb254-982882963-driver-0] Connection to node 0 (osboxes/127.0.1.1:9092) could not be established. Broker may not be available.
22/10/30 13:43:03 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b71bac08-a273-4e20-9ae2-d179d2fb07eb-1481272023-driver-0-1, groupId=spark-kafka-source-b71bac08-a273-4e20-9ae2-d179d2fb07eb-1481272023-driver-0] Connection to node 0 (osboxes/127.0.1.1:9092) could not be established. Broker may not be available.
22/10/30 13:43:03 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-9191e223-9e5c-4327-9436-2167c8e7253a--883587546-driver-0-3, groupId=spark-kafka-source-9191e223-9e5c-4

22/10/30 13:43:09 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-e8f37422-6dd8-4eea-a9db-876b57bfb254-982882963-driver-0-7, groupId=spark-kafka-source-e8f37422-6dd8-4eea-a9db-876b57bfb254-982882963-driver-0] Connection to node 0 (osboxes/127.0.1.1:9092) could not be established. Broker may not be available.
22/10/30 13:43:10 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-b71bac08-a273-4e20-9ae2-d179d2fb07eb-1481272023-driver-0-1, groupId=spark-kafka-source-b71bac08-a273-4e20-9ae2-d179d2fb07eb-1481272023-driver-0] Connection to node 0 (osboxes/127.0.1.1:9092) could not be established. Broker may not be available.
22/10/30 13:43:10 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-9191e223-9e5c-4327-9436-2167c8e7253a--883587546-driver-0-3, groupId=spark-kafka-source-9191e223-9e5c-4327-9436-2167c8e7253a--883587546-driver-0] Connection to node 0 (osboxes/127.0.1.1:9092) could not be established. Broker may not be available.
22/10/30 