In [36]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 403f81a0-5882-4862-9c5e-33f11ba98494)

In [37]:
# selecting only the value column
df = df.select("value")
display(df)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 39, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f5731677-1fec-4486-8215-57d5ea247311)

In [38]:
#exploding the value into multiple rows
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df["value"]).alias("json_object"))

display(df_exploded)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 40, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8b23577d-e5d5-49ab-a5c1-0b3d36d8af5a)

In [39]:
#converting the each news into jsons string format

json_list = df_exploded.toJSON().collect()
print(json_list)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 41, Finished, Available, Finished)

['{"json_object":{"about":[{"name":"Milind Soman","readLink":"https://api.bing.microsoft.com/api/v7/entities/055f369c-9dc7-a62e-81be-75a4d207414a"},{"name":"Lip","readLink":"https://api.bing.microsoft.com/api/v7/entities/b8d9fa0d-683d-405b-f2fd-6ad902c7a453"},{"name":"Renuka","readLink":"https://api.bing.microsoft.com/api/v7/entities/0e6961df-1425-9d3c-f822-9caff3124878"},{"name":"The Times of India","readLink":"https://api.bing.microsoft.com/api/v7/entities/7804fb2e-ea6c-f934-6dc5-c5bc671e2ba2"},{"name":"Tom Hiddleston","readLink":"https://api.bing.microsoft.com/api/v7/entities/cc81a252-2712-c3c6-201d-5b317353e793"}],"datePublished":"2024-08-19T06:30:00.0000000Z","description":"India needs a sporting culture, says the actor and fitness enthusiast, who ran 240 kms in a span of five days from Pune to Vasai Fort recently, to cel","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.ViqrfrSZwphfhxrbFtAEty&pid=News","height":379,"width":700}},"mentions":[{"name":"Milind Soma

In [40]:
print(json_list[0])

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 42, Finished, Available, Finished)

{"json_object":{"about":[{"name":"Milind Soman","readLink":"https://api.bing.microsoft.com/api/v7/entities/055f369c-9dc7-a62e-81be-75a4d207414a"},{"name":"Lip","readLink":"https://api.bing.microsoft.com/api/v7/entities/b8d9fa0d-683d-405b-f2fd-6ad902c7a453"},{"name":"Renuka","readLink":"https://api.bing.microsoft.com/api/v7/entities/0e6961df-1425-9d3c-f822-9caff3124878"},{"name":"The Times of India","readLink":"https://api.bing.microsoft.com/api/v7/entities/7804fb2e-ea6c-f934-6dc5-c5bc671e2ba2"},{"name":"Tom Hiddleston","readLink":"https://api.bing.microsoft.com/api/v7/entities/cc81a252-2712-c3c6-201d-5b317353e793"}],"datePublished":"2024-08-19T06:30:00.0000000Z","description":"India needs a sporting culture, says the actor and fitness enthusiast, who ran 240 kms in a span of five days from Pune to Vasai Fort recently, to cel","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.ViqrfrSZwphfhxrbFtAEty&pid=News","height":379,"width":700}},"mentions":[{"name":"Milind Soman"

In [41]:
# convert json string to json dictionary

import json
news_json = json.loads(json_list[0])

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 43, Finished, Available, Finished)

In [42]:
#getting the value using the json dictionary
# print(news_json['json_object']['name'])
# print(news_json['json_object']['description'])
# print(news_json['json_object']['category'])
# print(news_json['json_object']['url'])
# print(news_json['json_object']['image']['thumbnail']['contentUrl'])
# print(news_json['json_object']['provider'][0]['name'])
# print(news_json['json_object']['datePublished'])

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 44, Finished, Available, Finished)

In [43]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []
#process each json obj in list
for json_str in json_list:
    try:
        #parse json string to json dictionary
        article = json.loads(json_str)
        #append to list
        # to check whether the image and category exist
        if article['json_object'].get("category") and article['json_object'].get('image', {}).get('thumbnail', {}).get('contentUrl', {}):
            title.append(article['json_object']['name'])
            description.append(article['json_object']['description'])
            category.append(article['json_object']['category'])
            url.append(article['json_object']['url'])
            image.append(article['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(article['json_object']['provider'][0]['name'])
            datePublished.append(article['json_object']['datePublished'])
    except Exception as e:
        print(f"Error processing json object: {e}")

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 45, Finished, Available, Finished)

In [44]:
from  pyspark.sql.types import StructType, StructField, StringType

#combine the list
data = list(zip(title,
description,
category,
url,
image,
provider,
datePublished))

#define schema
schema = StructType([ 
    StructField('title', 
                StringType(), True), 
    StructField('description', 
                StringType(), True), 
    StructField('category', 
                StringType(), True), 
    StructField('url', 
                StringType(), True), 
    StructField('image', 
                StringType(), True),
    StructField('provider', 
                StringType(), True),
    StructField('datePublished', 
                StringType(), True) 
])

#creat dataframe
df_cleaned = spark.createDataFrame(data,schema=schema)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 46, Finished, Available, Finished)

In [45]:
display(df_cleaned)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 47, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c7c928c8-0bb3-4804-929a-ce336f09ac3f)

In [46]:
from pyspark.sql.functions import to_date,date_format
#changed the datatype of datePublished
df_cleaned_Final = df_cleaned.withColumn("datePublished", date_format(to_date(df_cleaned["datePublished"]), "dd-MMM-yyyy"))


StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 48, Finished, Available, Finished)

In [47]:
display(df_cleaned_Final)

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 49, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c3a19938-a28e-4d12-ab02-ba7e713f6270)

In [48]:
# df_cleaned_Final.write.format("delta").saveAsTable("bing_lakehouse.tbl_latest_news")

StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 50, Finished, Available, Finished)

In [49]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'Bing_Lakehouse.tbl_latest_news'

    df_cleaned_Final.write.format('delta').saveAsTable(table_name)

except AnalysisException:
    print("Table already exists")

    df_cleaned_Final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view
                    
                    ON source_view.url = target_table.url
                    
                    WHEN MATCHED AND 
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.category <> target_table.category OR
                    source_view.image <> target_table.image OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished 
                    
                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *
                """)


StatementMeta(, 240052b2-ab24-4069-81eb-ca7c7db819b2, 51, Finished, Available, Finished)

Table already exists
