In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StructType, StringType
from textblob import TextBlob

# 1. Initialisation de la SparkSession avec Kafka
spark = SparkSession.builder \
    .appName("RealTimeStockSentiment") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()


In [8]:
# 2. Lecture du stream Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "stock-tweets") \
    .option("startingOffsets", "latest") \
    .load()



In [None]:
# 3. Schéma des tweets
tweet_schema = StructType() \
    .add("text", StringType()) \
    .add("created_at", StringType()) \
    .add("user", StructType()
        .add("screen_name", StringType()))


In [None]:
# 4. Parsing des données JSON
df_parsed = df_kafka.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json("json", tweet_schema).alias("data")) \
    .select("data.*")

In [None]:
# 5. Nettoyage des tweets (comme dans votre notebook original)
df_clean = df_parsed.withColumn("clean_text", 
    regexp_replace(col("text"), r"http\S+|www\S+|https\S+", "")) \
    .withColumn("clean_text", regexp_replace(col("clean_text"), r"@\w+", "")) \
    .withColumn("clean_text", regexp_replace(col("clean_text"), r"[^a-zA-Z\s]", ""))

In [None]:
# 6. Analyse de sentiment (version avec neutre comme demandé)
def get_sentiment(text):
    if not text:
        return 1  # neutre
    analysis = TextBlob(text)
    polarity = analysis.sentiment.polarity
    if polarity > 0.1: return 2  # positif
    elif polarity < -0.1: return 0  # négatif
    else: return 1  # neutre

sentiment_udf = udf(get_sentiment, IntegerType())
df_with_sentiment = df_clean.withColumn("sentiment", sentiment_udf(col("clean_text")))