In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField

In [35]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("RedditSentimentAnalysis") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

In [36]:
# Define Kafka source for streaming queries
kafkaDataFrame = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "reddit") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [37]:
# Define the schema for the JSON data
schema = StructType([
    StructField("title", StringType()),
    StructField("selftext", StringType()),
    StructField("url", StringType())
])

In [38]:
# Selecting the message value and converting it from binary to string
stringDF = kafkaDataFrame.selectExpr("CAST(value AS STRING)")


In [39]:
# Convert JSON string to DataFrame
jsonDataDF = stringDF.select(from_json(col("value"), schema).alias("data")).select("data.*")


In [40]:
# Function to perform sentiment analysis using VADER
def analyze_sentiment(text):
    analyzer = SentimentIntensityAnalyzer()
    sentiment_score = analyzer.polarity_scores(text)
    if sentiment_score['compound'] >= 0.05:
        return 'positive'
    elif sentiment_score['compound'] <= -0.05:
        return 'negative'
    else:
        return 'neutral'

In [41]:
# Register the sentiment analysis function as a User Defined Function (UDF)
sentiment_udf = udf(analyze_sentiment, StringType())

# Apply sentiment analysis to the selftext column
sentiment_analyzed_df = jsonDataDF.withColumn("sentiment", sentiment_udf(col("selftext")))


In [22]:
sentiment_analyzed_df.createOrReplaceTempView("temp_analyzed_reddit_posts")

In [10]:
query = sentiment_analyzed_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/hive/warehouse/new_analyzed_reddit_posts") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/priyasuresh/checkpoints") \
    .start()
query.awaitTermination()

24/02/29 20:06:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/02/29 20:06:04 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/02/29 20:06:04 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/02/29 20:06:04 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/02/29 20:06:04 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/02/29 20:06:04 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
24/02/29 20:06:07 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/02/29 20:06:08 WARN KafkaDataConsumer: KafkaDataConsumer is not running in

KeyboardInterrupt: 

In [32]:
query.stop()

In [None]:
spark.stop()

In [13]:
#for influx database

In [42]:
import requests

def write_df_to_influxdb(batch_df, epoch_id):
    influxdb_url = 'http://localhost:8086'
    org_name = 'sjsu'  # Replace with your actual organization name
    bucket_name = 'realtime'  # Your bucket name
    token = 'YOUR_INFLUXDB_TOKEN'  # Your InfluxDB token

    # The write endpoint for InfluxDB 2.x
    write_endpoint = f"{influxdb_url}/api/v2/write?org={org_name}&bucket={bucket_name}"

    headers = {
        'Authorization': f'Token IMxN_tN7fYgjv6yoiqrND0De1RqIOntosy9P8n-bE5NXqUibC9Cx3gWgSWnfyYIzmuwZkMJ9pEdMmVypGdtAeg==',
        'Content-Type': 'text/plain; charset=utf-8'
    }
    
    pandas_df = batch_df.toPandas()
    lines = []
    for index, row in pandas_df.iterrows():
        title_escaped = row['title'].replace(' ', '\\ ').replace(',', '\\,')
        sentiment = row['sentiment']
        line = f"reddit_sentiment,title={title_escaped} sentiment=\"{sentiment}\""
        lines.append(line)
    
    data_str = '\n'.join(lines).encode('utf-8')
    
    if data_str:
        response = requests.post(write_endpoint, headers=headers, data=data_str)
        if response.status_code != 204:
            print(f"Failed to write to InfluxDB: {response.text}")


In [43]:
query = sentiment_analyzed_df \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(write_df_to_influxdb) \
    .start()

query.awaitTermination()


24/02/29 20:47:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/9j/0m5g2l6d03z4pd5y6pt7xh1h0000gn/T/temporary-6d5450f4-aad6-4c48-96ae-ab3b1c82edd0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/02/29 20:47:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/02/29 20:47:21 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/02/29 20:47:21 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/02/29 20:47:21 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/02/29 20:47:21 WARN AdminClientConfig: The configuration '

KeyboardInterrupt: 

In [44]:
query.stop()

In [45]:
spark.stop()