In [None]:
%pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, LongType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("sentiment_analysis_read") \
    .enableHiveSupport() \
    .getOrCreate()

# Kafka details
kafka_server = "kafka-broker:29092"
topic = "sentiment_analysis"

# Define the schema to match your data
schema = StructType([
    StructField("text", StringType()),
    StructField("date", StringType()),
    StructField("likes", DoubleType()),
    StructField("is_retweet", BooleanType()),
    StructField("retweets", LongType()),
    StructField("country", StringType()),
])

# Read data from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

# Write data to a Parquet format
query = df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/user/hive/warehouse/datatest2") \
    .option("checkpointLocation", "/tmp2/check2") \
    .start() 

# Await termination to keep the stream running
query.awaitTermination()

# Stop the Spark session
spark.stop()


In [1]:

%pyspark
