# Process Bing News


### Read the JSON file as a Dataframe

In [None]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 07b0d24d-f68d-462a-950c-d0227998ccf6)

### Selecting just the value column from the dataframe

In [None]:
df = df.select("value")
display(df)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, c2a7e702-7620-4bad-8239-d272fb0777fb)

### Explode the JSON column

In [None]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, , , Waiting, , Waiting)

In [None]:
display(df_exploded)

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 2c3858ab-a743-4a99-928f-332f4f092cfa)

### Converting the Exploded JSON Dataframe to a single JSON string list

In [None]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, , , Waiting, , Waiting)

### Testing the JSON string list

In [None]:
print(json_list[1])

StatementMeta(, , , Waiting, , Waiting)

{"json_object":{"about":[{"name":"Beyoncé","readLink":"https://api.bing.microsoft.com/api/v7/entities/6e476fa2-b3a4-d70a-4b7c-8b26868eb992"},{"name":"Kamala Harris","readLink":"https://api.bing.microsoft.com/api/v7/entities/ef5cf66f-32b7-7271-286a-8e8313eda5c5"},{"name":"Lemonade","readLink":"https://api.bing.microsoft.com/api/v7/entities/f4ca7e3d-6323-5b0c-a883-0c7a5a240e28"},{"name":"Democratic National Convention","readLink":"https://api.bing.microsoft.com/api/v7/entities/eb5f527b-1e2d-86c2-e55c-1fc837f43499"}],"datePublished":"2024-08-22T19:48:00.0000000Z","description":"The song is a driving force behind Vice President Kamala Harris’s latest campaign ad. But the song’s origins offer deeper significance for a candidate hoping to make history.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.lxFPEECrgFguiTOlJJIm2C&pid=News","height":366,"width":700}},"name":"The Meaning Behind Beyoncé’s ‘Freedom,’ the Harris Campaign Anthem","provider":[{"_type":"Organization","i

In [None]:
import json

news_json = json.loads(json_list[1]) #Converting the JSON string to a JSON dictionary
#print(news_json)
#print(news_json["json_object"]["name"])
#print(news_json["json_object"]["category"])

StatementMeta(, , , Waiting, , Waiting)

### Processing the JSON property to List

In [None]:
title = []
description = []
category = []
url =[]
image = []
provider = []
datePublished =[]


# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):
        
            #Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])

    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, , , Waiting, , Waiting)

### Converting the List to a Dataframe

In [None]:
from pyspark.sql.types import StructType, StructField, StringType


# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, , , Waiting, , Waiting)

In [None]:
display(df_cleaned.limit(5))

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 1fa70a57-9240-4f97-ad00-ed121b2ce459)

### Processing the Date column

In [None]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, , , Waiting, , Waiting)

In [None]:
display(df_cleaned_final.limit(5))

StatementMeta(, , , Waiting, , Waiting)

SynapseWidget(Synapse.DataFrame, 65d1f8e9-d001-4490-b41c-e38c33c2b4b0)

### Writing the Final Dataframe to the Lakehouse DB in a Delta format

In [None]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view

                    ON source_view.url = target_table.url
                    
                    WHEN MATCHED AND 
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.category <> target_table.category OR
                    source_view.image <> target_table.image OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished   

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, , , Waiting, , Waiting)

In [None]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, , , Waiting, , Waiting)

<Spark SQL result set with 1 rows and 1 fields>