In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
import pyspark.sql.functions as F

# Definisanje sheme na osnovu strukture vašeg CSV fajla
schema = StructType([
    StructField("Title", StringType(), True),
    StructField("URL", StringType(), True),
    StructField("Time Published", TimestampType(), True),
    StructField("Authors", StringType(), True),
    StructField("Summary", StringType(), True),
    StructField("Banner Image", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Category Within Source", StringType(), True),
    StructField("Source Domain", StringType(), True),
    StructField("Topics", StringType(), True),
    StructField("Overall Sentiment Score", FloatType(), True),
    StructField("Overall Sentiment Label", StringType(), True),
    StructField("Ticker Sentiment", StringType(), True)
])

# Čitanje podataka u realnom vremenu
sdf = (
    spark
    .readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .option("header", "true")
    .csv("/FileStore/tables/")  # Zamijenite 'file_location' s putanjom do vašeg CSV fajla
)

# Obrada podataka u realnom vremenu
# Primer: Filtriranje podataka i grupisanje po izvoru i sentimentu
sdf_processed = (
    sdf
    .filter(col("Time Published").isNotNull())
    .groupBy("Source", "Overall Sentiment Label")
    .count()
)

# Upisivanje rezultata streaming obrade u memoriju za dalji upit
query = (
    sdf_processed
    .writeStream
    .format("memory")
    .queryName("source_sentiment_counts")
    .outputMode("complete")
    .start()
)

spark.sql("SELECT * FROM source_sentiment_counts")


In [0]:
Dobijanje Top 5 Izvora po Ukupnom Broju Zapisa:

In [0]:
top_sources = spark.sql("""
    SELECT Source, SUM(count) as TotalCount
    FROM source_sentiment_counts
    GROUP BY Source
    ORDER BY TotalCount DESC
    LIMIT 5
""")
top_sources.show()

In [0]:
# Broj Zapisa po Sentimentu za Određeni Izvor:
benzinga_sentiment_count = spark.sql("""
    SELECT `Overall Sentiment Label`, count
    FROM source_sentiment_counts
    WHERE Source = 'Benzinga'
    ORDER BY count DESC
""")
benzinga_sentiment_count.show()


In [0]:
# Prikaz Izvora sa Najvišim Pozitivnim Sentimentom:
highest_positive_sentiment_source = spark.sql("""
    SELECT Source, `Overall Sentiment Label`, count
    FROM source_sentiment_countsa
    WHERE `Overall Sentiment Label` = 'Positive'
    ORDER BY count DESC
    LIMIT 1
""")
highest_positive_sentiment_source.show()


In [0]:
# Trendovi Sentimenta Tokom Vremena (ako ste primenili prozore vremena):
sentiment_trends = spark.sql("""
    SELECT window, `Overall Sentiment Label`, SUM(count) as TotalCount
    FROM source_sentiment_counts
    GROUP BY window, `Overall Sentiment Label`
    ORDER BY window
""")
sentiment_trends.show()
