### Read the News Table from the Lakehouse DB

In [1]:
# Perform sentiment analysis using SynapseML

df = spark.sql("SELECT * FROM bing_lake_db.tbl_latest_news")
display(df)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, f348844a-cb1f-442d-9f89-d3d84d841730)

### Import Synapse ML

In [2]:
# Store analyzed sentiment results into Delta table

import synapse.ml.core
from synapse.ml.services import AnalyzeText

StatementMeta(, , , Waiting, )

### Configure Synapse ML

In [3]:
#Import the model and configure the input and output columns
model = (AnalyzeText()
        .setTextCol("description")
        .setKind("SentimentAnalysis")
        .setOutputCol("response")
        .setErrorCol("error"))

StatementMeta(, , , Waiting, )

### Apply the model

In [4]:
#Apply the model to our dataframe
result = model.transform(df)

StatementMeta(, , , Waiting, )

In [5]:
display(result)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 7218bca0-a1af-40ed-b854-075c2857eaff)

### Grab sentiment from the response

In [6]:
#Create Sentiment Column
from pyspark.sql.functions import col

sentiment_df = result.withColumn("sentiment", col("response.documents.sentiment"))

StatementMeta(, , , Waiting, )

In [7]:
display(sentiment_df)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 869c5a13-9a6a-489a-89d7-2cc814af1f21)

### Remove unwanted columns

In [13]:
sentiment_df_final = sentiment_df.drop("error","response")

StatementMeta(, , , Waiting, )

In [14]:
display(sentiment_df_final)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 661894f8-094e-47ae-beb5-3813798a49a0)

In [None]:
from pyspark.sql.functions import col, to_date

sentiment_df_final = sentiment_df_final.withColumn("datePublished", to_date(col("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, , , Waiting, )

## Type 1 Merge

In [10]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.tbl_sentiment_analysis'

    sentiment_df_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    sentiment_df_final.createOrReplaceTempView("vw_sentiment_df_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_sentiment_df_final source_view

                    ON source_view.url = target_table.url
                    
                    WHEN MATCHED AND 
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.category <> target_table.category OR
                    source_view.image <> target_table.image OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished   

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, , , Waiting, )

Table Already Exists


In [17]:
display(sentiment_df_final)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 2a4beba9-e8e6-4e6f-be21-6764a6e55690)

In [None]:
%%sql

SELECT count(*) from bing_lake_db.tbl_sentiment_analysis

StatementMeta(, , , Waiting, )

<Spark SQL result set with 1 rows and 1 fields>