In [1]:
from pyspark.sql.types import StructType, StringType, StructField, DateType, ArrayType, MapType
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window
from pyspark.sql.functions import *
from bson.json_util import *

import matplotlib.pyplot as plt

In [2]:
kafka_topic_name = "trump"
kafka_bootstrap_servers = "localhost:9092"
nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
jsonOptions = { "timestampFormat": nestTimestampFormat }

In [3]:
sparkSession = SparkSession \
        .builder \
        .appName("TwitterStreamingAssignment") \
        .master("local") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
        .getOrCreate()
        
sparkSession.sparkContext.setLogLevel("ERROR")

22/01/13 22:18:57 WARN Utils: Your hostname, EMPID21092 resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlp3s0)
22/01/13 22:18:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rita/.ivy2/cache
The jars for the packages stored in: /home/rita/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5a55c2d7-6160-4fba-8663-b4e06d624bba;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.0 in cent

In [4]:
schema = StructType() \
    .add("Created_date", DateType()) \
        .add("Username", StringType()) \
            .add("User_location", StringType())\
                .add("Hashtags", StringType()) \
                    .add("Text", StringType())    

In [5]:
posts_df = sparkSession.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "trump") \
    .option("startingOffsets", "earliest") \
    .load()\
    .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"), col("timestamp").cast("string").alias("time_stamp"))

In [6]:
posts_df.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- Created_date: date (nullable = true)
 |    |-- Username: string (nullable = true)
 |    |-- User_location: string (nullable = true)
 |    |-- Hashtags: string (nullable = true)
 |    |-- Text: string (nullable = true)
 |-- time_stamp: string (nullable = true)



In [7]:
col_stream = posts_df.writeStream.trigger(processingTime='5 seconds')\
.outputMode('update')\
    .option("truncate", "false")\
        .option("checkpointLocation", ".checkpoint/col_stream_checkpoint")\
            .format("console")\
                .start()

col_stream.awaitTermination(1)

False

In [8]:
hashtags_location = posts_df.select('parsed_value.Hashtags', 'parsed_value.User_location','time_stamp') \
    .filter(col("Hashtags") != "[]").filter(col("Hashtags").contains('Omicron'))

In [11]:
hashtags_count_per_location = hashtags_location
.withWaterMark("time_stamp","10 minutes") \
.groupBy(col("User_location"),"time_stamp").count().select('*')
    
hashtags_count_location = hashtags_count_per_location.writeStream.trigger(processingTime='60 seconds')\
    .outputMode('append')\
    .option("truncate", "false")\
    .option("checkpointLocation", ".checkpoint/hashtags_checkpoint")\
    .format("csv")\
    .option("path", "./")\
    .start()
                    
hashtags_count_location.awaitTermination()

AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
Project [User_location#28, time_stamp#22, count#145L]
+- Aggregate [User_location#28, time_stamp#22], [User_location#28, time_stamp#22, count(1) AS count#145L]
   +- Filter Contains(Hashtags#27, Omicron)
      +- Filter NOT (Hashtags#27 = [])
         +- Project [parsed_value#21.Hashtags AS Hashtags#27, parsed_value#21.User_location AS User_location#28, time_stamp#22]
            +- Project [from_json(StructField(Created_date,DateType,true), StructField(Username,StringType,true), StructField(User_location,StringType,true), StructField(Hashtags,StringType,true), StructField(Text,StringType,true), (timestampFormat,yyyy-MM-dd'T'HH:mm:ss.sss'Z'), cast(value#8 as string), Some(Asia/Kolkata)) AS parsed_value#21, cast(timestamp#12 as string) AS time_stamp#22]
               +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1f53fdd0, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@32df3095, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=trump], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@51a46beb,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> trump, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]


-------------------------------------------
Batch: 122
-------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                                  |time_stamp             |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, AnaVerdeja, mexico, ['Covid19', 'Ómicron'], RT @DipPRIedomex: Ante el aumento de contagios de #Covid19 por la variante #Ómicron es importante no}                                |2022-01-13 22:19:49.365|
|{2022-01-13, Beta_Mode, Santiago

-------------------------------------------
Batch: 124
-------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                                  |time_stamp             |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, AnaVerdeja, mexico, ['Covid19', 'Ómicron'], RT @DipPRIedomex: Ante el aumento de contagios de #Covid19 por la variante #Ómicron es importante no}                                |2022-01-13 22:20:02.611|
|{2022-01-13, Beta_Mode, Santiago

-------------------------------------------
Batch: 126
-------------------------------------------
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                                                                       |time_stamp             |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, c2_co2, ✨✨✨🌍✨✨✨☀️✨✨✨🌒✨✨✨🎇, ['Omicron'], RT @FCOUILBAULT1: Le vaccin affaiblit le système immunitaire si on en croit le nombre

-------------------------------------------
Batch: 128
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                              |time_stamp             |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, call_the_kernel, , ['Omicron'], Comunque, grazie a un puro colpo di fortuna che ci ha portato #Omicron che sarebbe molto meno letale}                                        |2022-01-13 22:20:31.194|
|{2022-01-13, njoyflyfishing, Los Angeles, CA, []

-------------------------------------------
Batch: 130
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                              |time_stamp             |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, call_the_kernel, , ['Omicron'], Comunque, grazie a un puro colpo di fortuna che ci ha portato #Omicron che sarebbe molto meno letale}                                        |2022-01-13 22:20:44.573|
|{2022-01-13, njoyflyfishing, Los Angeles, CA, []

-------------------------------------------
Batch: 132
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|parsed_value                                                                                                                                                                              |time_stamp             |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|{2022-01-13, Monesty_, Maldives, ['COVID19', 'COVID19France', 'Omicron'], RT @Mediavenir: 🇫🇷 ALERTE INFO - 361.719 cas de #COVID19 ont été détectés en 24h en France. (SPF) #C}         |2022-01-13 22:20:57.966|
|{2022-01-13, KishoreeeeS, , ['JUSTIN', 'SunNews', 

                                                                                