In [21]:
df = spark.read.option("multiline", "true").json("Files/bing-worldwide-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-worldwide-news.json".
display(df)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7dd747f1-6c7b-4833-ad16-ec0b9cdb6ed2)

In [22]:
df=df.select('value')

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 24, Finished, Available, Finished)

In [23]:
from pyspark.sql.functions import explode

df_exploded=df.select(explode(df['value']).alias('json_objects'))
display(df_exploded)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5ba702cf-8958-4a13-8ee1-6b6df272dadb)

In [24]:
json_list=df_exploded.toJSON().collect()
print(json_list[2])

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 26, Finished, Available, Finished)

{"json_objects":{"about":[{"name":"Executive order","readLink":"https://api.bing.microsoft.com/api/v7/entities/065279a9-966a-c603-415c-1a5b52cb7cd7?setLang=en"},{"name":"Asthma","readLink":"https://api.bing.microsoft.com/api/v7/entities/e7fb1b6a-86ae-b5c8-02fe-89cb093afd53?setLang=en"}],"datePublished":"2025-01-07T13:11:00.0000000Z","description":"President-Elect Donald Trump is weighing an executive order that seeks to protect gas-powered appliances including stoves and heaters from federal and local regulators who want to phase them out of homes and businesses,","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.hO2Vs2pLge3ZHogm1-DwVi&pid=News","height":366,"width":700}},"name":"Trump weighing executive order protecting gas stoves","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.jFXbg3L7Ce_1pS4_IOR8CA&pid=news"}},"name":"Reuters"}],"url":"https://www.reuters.com/world/us/trump-weighing-executive-order-protecting-

In [25]:
import json

title=[]
description=[]
category=[]
url=[]
image=[]
provider=[]
datePublished=[]

for n in json_list:
    try:
        dic=json.loads(n)
        if dic["json_objects"].get("category") and dic["json_objects"].get("image",{}).get("thumbnail",{}).get("contentUrl"):
            title.append(dic["json_objects"]["name"])
            description.append (dic ["json_objects"]["description"])
            category.append (dic["json_objects"]["category"])
            url.append(dic["json_objects"]["url"])
            image.append(dic["json_objects"]["image"]["thumbnail"]["contentUrl"] )
            provider.append (dic["json_objects"]["provider"][0]["name"])
            datePublished.append(dic["json_objects"]["datePublished"])
    except Exception as e:
        print(e)



StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 27, Finished, Available, Finished)

In [26]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, description, category, url, image, provider, datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)
display(df_cleaned)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 28, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0dc05543-a397-4921-9eb4-3061cf6e7de5)

In [27]:
from pyspark.sql.functions import to_date, date_format

df_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))



StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 29, Finished, Available, Finished)

In [28]:
display(df_final)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f0b8b2b6-a59e-489d-838d-539fbee108a4)

### Incremental load

In [29]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = "bing_lakehouse.world_latest_news"
    df_final.write.format("delta").saveAsTable(table_name) 

except AnalysisException:
    print("Table Already Exists")

    df_final.createOrReplaceTempView("vw_df_final")

    spark.sql(f"""
        MERGE INTO {table_name} target_table
        USING vw_df_final source_view
        ON source_view.url = target_table.url

        WHEN MATCHED AND (
            source_view.title <> target_table.title OR
            source_view.description <> target_table.description OR
            source_view.category <> target_table.category OR
            source_view.image <> target_table.image OR
            source_view.provider <> target_table.provider OR
            source_view.datePublished <> target_table.datePublished
        ) THEN UPDATE SET *

        WHEN NOT MATCHED THEN INSERT *
    """)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 31, Finished, Available, Finished)

In [30]:
df_final.printSchema()

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 32, Finished, Available, Finished)

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- provider: string (nullable = true)
 |-- datePublished: string (nullable = true)



In [31]:
table_name = "bing_lakehouse.world_latest_news"
df_display = spark.sql(f"SELECT * FROM {table_name}")
display(df_display)

StatementMeta(, 3c297d04-2f3f-4d80-a569-3f0ef3f5c33b, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 38b33d16-28be-40e3-84b8-13c36080bcf2)