In [12]:
df = spark.sql("SELECT * FROM bing_lake_db.tbl_latest_news")
display(df)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c353fcb7-838f-4cce-8bf2-9989229e4274)

In [13]:
import synapse.ml.core
from synapse.ml.services import AnalyzeText

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 15, Finished, Available, Finished)

In [14]:
# Import the model and configure the input and output columns
model = (AnalyzeText()
          .setTextCol("description")
          .setKind("SentimentAnalysis")
          .setOutputCol("response")
          .setErrorCol("error"))

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 16, Finished, Available, Finished)

In [15]:
result = model.transform(df)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 17, Finished, Available, Finished)

In [16]:
display(result)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 889fa79d-acf0-4a2d-86c4-00e16d99b4ea)

In [17]:
#Create Sentiment Column
from pyspark.sql.functions import col
sentiment_df = result.withColumn("sentiment", col("response.documents.sentiment"))

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 19, Finished, Available, Finished)

In [18]:
display(sentiment_df)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5f24fe1c-56ec-4379-8533-98f49b2f25a6)

In [19]:
sentiment_final = sentiment_df.drop('error', 'response')
display(sentiment_final)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fac24d8b-363c-429d-bb5f-1f4da224efc7)

In [20]:
from pyspark.sql.functions import col, to_date

sentiment_final = sentiment_final.withColumn("datePublished", to_date(col("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 22, Finished, Available, Finished)

In [21]:
display(sentiment_final)

StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 67430205-bea9-491b-a2e3-42bd385266e3)

In [24]:
# Saving our result to our lakehouse db
from pyspark.sql.utils import AnalysisException

try:
    # Attempt to create the table if it doesn't already exist
    table_name = 'bing_lake_db.tbl_sentiment_analysis'
    sentiment_final.write.format("delta").saveAsTable(table_name)
except AnalysisException as e:
    if "TABLE_OR_VIEW_ALREADY_EXISTS" in str(e):
        print("Table Already Exists, performing MERGE operation")
        # Create a temporary view from the DataFrame
        sentiment_final.createOrReplaceTempView("vw_sentiment_final")
        
        # Perform a MERGE (Upsert) operation to update existing rows or insert new ones
        spark.sql(f"""
        MERGE INTO {table_name} target_table
        USING vw_sentiment_final source_view
        ON source_view.url = target_table.url
        WHEN MATCHED AND (
            source_view.title <> target_table.title OR
            source_view.description <> target_table.description OR
            source_view.category <> target_table.category OR
            source_view.image <> target_table.image OR
            source_view.provider <> target_table.provider OR
            source_view.datePublished <> target_table.datePublished
        ) THEN UPDATE SET 
            target_table.title = source_view.title,
            target_table.description = source_view.description,
            target_table.category = source_view.category,
            target_table.image = source_view.image,
            target_table.provider = source_view.provider,
            target_table.datePublished = source_view.datePublished
        WHEN NOT MATCHED THEN INSERT *
        """)
    else:
        # Re-raise unexpected exceptions
        raise e


StatementMeta(, e327f84b-5393-4fe6-b5b5-0b2fc75fcae4, 26, Finished, Available, Finished)

Table Already Exists, performing MERGE operation
