In [10]:
df_raw = spark.read.json("Files/AI_news.json")

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 12, Finished, Available, Finished)

In [11]:
df_raw.printSchema()

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 13, Finished, Available, Finished)

root
 |-- ai_overview: struct (nullable = true)
 |    |-- references: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- index: long (nullable = true)
 |    |    |    |-- link: string (nullable = true)
 |    |    |    |-- snippet: string (nullable = true)
 |    |    |    |-- source: string (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |    |-- text_blocks: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- list: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- reference_indexes: array (nullable = true)
 |    |    |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |    |    |-- snippet: string (nullable = true)
 |    |    |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- reference_indexes: array (nullable = true)
 |    |    |    |    |-- element: long (contain

In [12]:
from pyspark.sql.functions import explode, col, current_date

# Extract from organic_results array
df_flat = df_raw.select(explode("organic_results").alias("result"))

# Select key fields
df_clean = df_flat.select(
    col("result.position"),
    col("result.title"),
    col("result.link"),
    col("result.snippet"),
    col("result.displayed_link"),
    col("result.source")
).dropna(subset=["link"])  # use link as unique key

# Add load timestamp
df_clean = df_clean.withColumn("load_date", current_date())


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 14, Finished, Available, Finished)

In [13]:
import nltk
nltk.download('vader_lexicon')

from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import pandas_udf
import pandas as pd

sia = SentimentIntensityAnalyzer()

@pandas_udf("double")
def compute_sentiment(texts: pd.Series) -> pd.Series:
    return texts.fillna("").apply(lambda x: sia.polarity_scores(x)['compound'])

# Apply sentiment UDF
df_scored = df_clean.withColumn("sentiment", compute_sentiment(col("snippet")))

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 15, Finished, Available, Finished)

[nltk_data] Downloading package vader_lexicon to /home/trusted-
[nltk_data]     service-user/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [14]:
# Check if table already exists
try:
    df_existing = spark.table("serpapi_ai_articles").select("link")
    df_new = df_scored.join(df_existing, on="link", how="left_anti")
except:
    # Table doesn't exist yet
    df_new = df_scored


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 16, Finished, Available, Finished)

In [15]:
# Append new records to the final table
df_new.write.mode("append").saveAsTable("serpapi_ai_articles")

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 17, Finished, Available, Finished)

In [16]:
from pyspark.sql.functions import when

df_labeled = df_new.withColumn("sentiment_label", when(col("sentiment") >= 0.05, "Positive")
                                               .when(col("sentiment") <= -0.05, "Negative")
                                               .otherwise("Neutral"))
df_labeled.write.mode("append").saveAsTable("serpapi_ai_articles_labeled")

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 18, Finished, Available, Finished)

In [17]:
df = spark.sql("SELECT * FROM bing_news_db.serpapi_ai_articles LIMIT 1000")
display(df)

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3d8c8de1-22a5-460a-8610-9b09c6cd75b1)

In [23]:
from pyspark.sql.utils import AnalysisException
try:
    df = spark.table("serpapi_sentiment_scored").select("link")
    df_to_score = spark.table("serpapi_ai_articles").join(df, on="link", how="left_anti")
except AnalysisException:
    df_to_score = spark.table("serpapi_ai_articles")


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 25, Finished, Available, Finished)

In [24]:
# Rename columns for clarity
df_input = df_to_score.select(
    col("link"),
    col("title").alias("article_title"),
    col("snippet").alias("description"),  # matches .setTextCol("description")
    "displayed_link",
    "source",
    "load_date"
).na.drop(subset=["description"])


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 26, Finished, Available, Finished)

In [25]:
sia = SentimentIntensityAnalyzer()

@pandas_udf("double")
def get_compound_score(texts: pd.Series) -> pd.Series:
    return texts.fillna("").apply(lambda x: sia.polarity_scores(x)['compound'])

@pandas_udf("string")
def get_sentiment_4class(texts: pd.Series) -> pd.Series:
    def classify(text):
        scores = sia.polarity_scores(text)
        pos = scores["pos"]
        neg = scores["neg"]
        compound = scores["compound"]

        if pos > 0.4 and neg > 0.4:
            return "Mixed"
        elif compound >= 0.05:
            return "Positive"
        elif compound <= -0.05:
            return "Negative"
        else:
            return "Neutral"
    
    return texts.fillna("").apply(classify)


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 27, Finished, Available, Finished)

In [26]:
df_scored = df_input \
    .withColumn("compound_score", get_compound_score(col("description"))) \
    .withColumn("sentiment_label", get_sentiment_4class(col("description")))

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 28, Finished, Available, Finished)

In [27]:
df_final = df_scored.select(
    "link",
    "article_title",
    "description",
    "displayed_link",
    "source",
    "load_date",
    "compound_score",
    "sentiment_label"
)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

df_final.write \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("serpapi_sentiment_scored")

display(df_final.limit(10))

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 20708363-810d-40cf-bfeb-d7bb47dc8f9c)

In [29]:
df = spark.sql("SELECT * FROM bing_news_db.serpapi_sentiment_scored LIMIT 1000")
display(df)

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 861a28ff-bce1-41dc-8596-c06bcce6c2d7)

In [30]:
sentiment_df_final = df_scored.select(
    "link",
    "article_title",
    "description",
    "displayed_link",
    "source",
    "load_date",
    "compound_score",
    "sentiment_label"
)

StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 32, Finished, Available, Finished)

In [31]:
from pyspark.sql.utils import AnalysisException

table_name = "serpapi_sentiment_scored"

try:
    # Try initial table creation
    sentiment_df_final.write.format("delta").saveAsTable(table_name)
    print(f"✅ Table `{table_name}` created.")
except AnalysisException:
    print("ℹ️ Table exists. Performing Type 1 SCD merge...")

    # Create temp view from new data
    sentiment_df_final.createOrReplaceTempView("vw_sentiment")

    # Perform Type 1 SCD Merge (update when values changed, insert otherwise)
    spark.sql(f"""
        MERGE INTO {table_name} AS target
        USING vw_sentiment AS source
        ON source.link = target.link

        WHEN MATCHED AND (
            source.article_title <> target.article_title OR
            source.description <> target.description OR
            source.displayed_link <> target.displayed_link OR
            source.source <> target.source OR
            source.load_date <> target.load_date OR
            source.compound_score <> target.compound_score OR
            source.sentiment_label <> target.sentiment_label
        )
        THEN UPDATE SET *

        WHEN NOT MATCHED THEN INSERT *
    """)


StatementMeta(, 7fe95c27-917e-4862-903e-f20a039aca3c, 33, Finished, Available, Finished)

ℹ️ Table exists. Performing Type 1 SCD merge...
