## Load data

In [1]:
df = spark.read.option("multiline", "true").json("Files/news/us-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/news/us-latest-news.json".
df = df.select("articles")
display(df)

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e6de4310-b3f3-4ea6-b31b-3a52067cc737)

## Exlode dataframe 

In [2]:
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructField, StructType, StringType

# Correct explode operation
exploded_df = df.select(explode(col("articles")).alias('article'))

# Extract columns directly from the exploded DataFrame
clean_df = exploded_df.select(
    col("article.title").alias("title"),
    col("article.author").alias("author"),
    col("article.description").alias("description"),
    col("article.publishedAt").alias("datePublished"),
    col("article.url").alias("url"),
    col("article.source.name").alias("source")
)

# Optional: Date formatting
from pyspark.sql.functions import to_date, date_format
clean_df = clean_df.withColumn(
    'datePublished', 
    date_format(to_date(col('datePublished')), "dd-MMM-yyyy")
)

# Display the DataFrame
display(clean_df)

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3447c3a5-f921-4753-85f0-b3051472b778)

## Save data in lakehouse

In [3]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = "news_lakehouse_1.tbl_latest_us_news"
    clean_df.write.format('delta').saveAsTable(table_name)
except Exception as e:
    print("Table already exists")
    clean_df.createOrReplaceTempView("vw_clean_df")
    spark.sql(f"""
            MERGE INTO {table_name} target_table
            USING vw_clean_df source_view
            ON source_view.url = target_table.url
            WHEN MATCHED AND 
            source_view.title <> target_table.title OR 
            source_view.author <> target_table.author OR
            source_view.description <> target_table.description OR
            source_view.source <> target_table.source OR
            source_view.datePublished <> target_table.datePublished 
            THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
    """)


StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 5, Finished, Available, Finished)

Table already exists


## Sentiment analysis

In [4]:
df = spark.sql("SELECT * FROM news_lakehouse_1.tbl_latest_us_news LIMIT 1000")
display(df)

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6066acc3-2ebb-4b4f-aeaf-04e857f9d1bc)

In [5]:
import synapse.ml.core 
from synapse.ml.services import AnalyzeText
model = (AnalyzeText().setTextCol("description").setKind('SentimentAnalysis').setOutputCol("response")
    .setErrorCol("error")
)

result = model.transform(df)
from pyspark.sql.functions import col
result_df = result.withColumn("sentiment",col('response.documents.sentiment'))

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 7, Finished, Available, Finished)

In [6]:
result_df = result_df.drop('error')
result_df = result_df.drop('response')
display(result_df)

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0806e89e-6c36-4fef-a571-e1ad8d7f05d4)

### Save data in lakehouse

In [7]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = "news_lakehouse_1.tbl_latest_us_news_analysis"
    result_df.write.format('delta').saveAsTable(table_name)
except Exception as e:
    print("Table already exists")
    result_df.createOrReplaceTempView("result_df")
    spark.sql(f"""
            MERGE INTO {table_name} target_table
            USING result_df source_view
            ON source_view.url = target_table.url
            WHEN MATCHED AND 
            source_view.title <> target_table.title OR 
            source_view.author <> target_table.author OR
            source_view.description <> target_table.description OR
            source_view.source <> target_table.source OR
            source_view.datePublished <> target_table.datePublished 
            THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
    """)


StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 9, Finished, Available, Finished)

Table already exists


In [9]:
df = spark.sql("SELECT * FROM news_lakehouse_1.tbl_latest_us_news_analysis LIMIT 1000")
display(df)

StatementMeta(, c1d1cd48-8a58-4be3-9860-6e496013dbbe, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c09462c4-ba90-467f-8780-9eef672287a4)

StatementMeta(, 27df11f5-f06f-49c6-856b-901cdd8a65dc, 28, Finished, Available, Finished)