# ****Read API data as dataframe 'df'****

In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.functions import *
df = spark.read.option("multiLine", True).json("abfss://Bing_Senti_Ana_Dev@onelake.dfs.fabric.microsoft.com/bing_LH.Lakehouse/Files/bing-latest-news.json")
df = df.select(explode("value").alias("value") )
display(df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, f84e6bc5-0d96-4cf1-8472-335657a873a2)

# ****Flatten the Json Structure****

In [None]:
from pyspark.sql import functions as F

# Flatten the required struct fields
df_flat = df \
    .withColumn("provider", F.explode_outer("value.provider")) \
    .select(
        F.col("value.name").alias("title"),                       # title (name)
        F.col("value.description").alias("description"),          # description
        F.col("value.category").alias("category"),                # category
        F.col("value.url").alias("url"),                          # url
        F.col("value.image.thumbnail.contentUrl").alias("image"), # image (contentUrl)
        F.col("provider._type").alias("provider_type"),           # provider type
        F.col("provider.image.thumbnail.contentUrl").alias("provider_image"),  # provider image
        F.col("value.datePublished").alias("datePublished")       # datePublished
    )
display(df_flat)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, d80e5f8a-a32f-4160-849e-e97a61a482ad)

# ****TypeCast the Date Types****

In [None]:
from pyspark.sql import functions as F

# Assuming your date strings are like "19-Oct-2024"
finalised_df = df_flat.withColumn(
    "datePublished",
    F.to_date(F.trim(F.col("datePublished"))) 
)

display(finalised_df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 3c460e1f-ed55-41fb-b466-fdd3dc4ea219)

## ****Incremental loading logic using MERGE SQL statement****

In [None]:
from pyspark.sql.utils import AnalysisException  # Correct import statement

try:
    table_name = 'Bing_Senti_Ana_Dev.bing_LH.dbo.bing_latest_news'
    finalised_df.write.format("delta").saveAsTable(table_name)  # Fixed formatting

except AnalysisException:
    print("Table Already Exists")

    # Create or replace the temporary view
    finalised_df.createOrReplaceTempView("VW_finalised_df")

    # Use proper indentation and formatting in the SQL MERGE statement
    spark.sql(f"""
        MERGE INTO {table_name} AS target_table
        USING VW_finalised_df AS source_view
        ON source_view.url = target_table.url
        WHEN MATCHED AND 
        (source_view.title <> target_table.title OR
         source_view.description <> target_table.description OR
         source_view.category <> target_table.category OR
         source_view.image <> target_table.image OR
         source_view.provider_type <> target_table.provider_type OR
         source_view.provider_image <> target_table.provider_image OR
         source_view.datePublished <> target_table.datePublished)
        THEN UPDATE SET 
            target_table.title = source_view.title,
            target_table.description = source_view.description,
            target_table.category = source_view.category,
            target_table.image = source_view.image,
            target_table.provider_type = source_view.provider_type,
            target_table.provider_image = source_view.provider_image,
            target_table.datePublished = source_view.datePublished
    """)

StatementMeta(, , , Waiting, , Waiting)

Table Already Exists


# ****Validation Step****

In [None]:
%%sql
SELECT COUNT(*) AS COUNT_AS FROM Bing_Senti_Ana_Dev.bing_LH.dbo.bing_latest_news

StatementMeta(, , , Waiting, , Waiting)

<Spark SQL result set with 1 rows and 1 fields>