In [27]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
# Reading the cleaned data into a dataframe
df = spark.sql("SELECT * FROM bing_lake_db.tbl_latest_news")

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 29, Finished, Available, Finished)

In [28]:
# Importing the AnalyzeText model from SynapseML
import synapse.ml.core
from synapse.ml.services import AnalyzeText

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 30, Finished, Available, Finished)

In [29]:
# Importing the model and configuring the input and output of the model

model = (AnalyzeText()
        .setTextCol("description")
        .setKind("SentimentAnalysis")
        .setOutputCol("response")
        .setErrorCol("error"))

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 31, Finished, Available, Finished)

In [30]:
# applying the model on the data
result = model.transform(df)

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 32, Finished, Available, Finished)

In [31]:
from pyspark.sql.functions import col


# creating a new column to store only the sentiment of the news article
sentiment_df = result.withColumn("sentiment", col("response.documents.sentiment"))

# removing unnecessary columns
final_sentiment_df = sentiment_df.drop("response","error")

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 33, Finished, Available, Finished)

In [32]:
print(final_sentiment_df)

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 34, Finished, Available, Finished)

DataFrame[title: string, description: string, category: string, url: string, image: string, provider: string, datePublished: string, sentiment: string]


In [33]:
from pyspark.sql.functions import col, to_date

final_sentiment_df=final_sentiment_df.withColumn("datePublished",to_date(col("datePublished"),"dd-MMM-yyyy"))

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 35, Finished, Available, Finished)

In [34]:
# TYPE 1 Incremental Loading to load the news data along with its corresponding sentiment into a delta table


from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.sentiment_analysis'
    final_sentiment_df.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table already exists")  

    final_sentiment_df.createOrReplaceTempView("vw_sentiment_df_final")

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_sentiment_df_final source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND
                   source_view.title <> target_table.title OR
                   source_view.description <> target_table.description OR
                   source_view.category <> target_table.category OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished 

                   THEN UPDATE SET * 

                   WHEN NOT MATCHED THEN INSERT *
                
                """)  

StatementMeta(, 444c6e4f-ab23-4fb3-a796-555ee8eee6ca, 36, Finished, Available, Finished)