#### Read the JSON File as a DataFrame

In [2]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
display(df)

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 31445b3d-3cac-475d-b917-10c25cc67c46)

#### Selecting just the value column from the DataFrame

In [3]:
df=df.select("value")
display(df)

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2226a9f8-ab7a-40d5-8062-2be59a5c8ca4)

#### Explode the JSON column

In [4]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))
display(df_exploded)

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a9cbba3d-b0d7-4399-b766-d2bab50327bc)

#### Converting the Exploded JSON DataFrame to a single JSON string List

In [5]:
json_list = df_exploded.toJSON().collect()
print(json_list[0])

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 7, Finished, Available, Finished)

{"json_object":{"about":[{"name":"India","readLink":"https://api.bing.microsoft.com/api/v7/entities/85fa63d3-9596-adb9-b4eb-502273d84f56"}],"datePublished":"2024-07-15T10:26:00.0000000Z","description":"India TV News provides you with all the breaking news, latest news, breaking story videos, and Live TV on a single platform to ensure you don't miss the biggest happenings in India and the world.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.BpqX5CYgdSYp49xHk5IB9i&pid=News","height":393,"width":700}},"name":"Breaking News, July 15 | Live Updates","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.IaXDAOnJaO-iU5IbMMsMHw&pid=news"}},"name":"India TV"}],"url":"https://www.indiatvnews.com/news/india/breaking-news-july-15-live-updates-pm-modi-rahul-gandhi-union-budget-2024-nirmala-sitharaman-trump-rally-firing-monsoon-weather-supreme-court-2024-07-15-941824"}}


#### Converting JSON String to Dictionary - JSON loads()

In [6]:
import json
news_json = json.loads(json_list[0])
print(news_json)
print(news_json['json_object']['description'])

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 8, Finished, Available, Finished)

{'json_object': {'about': [{'name': 'India', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/85fa63d3-9596-adb9-b4eb-502273d84f56'}], 'datePublished': '2024-07-15T10:26:00.0000000Z', 'description': "India TV News provides you with all the breaking news, latest news, breaking story videos, and Live TV on a single platform to ensure you don't miss the biggest happenings in India and the world.", 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=OVFT.BpqX5CYgdSYp49xHk5IB9i&pid=News', 'height': 393, 'width': 700}}, 'name': 'Breaking News, July 15 | Live Updates', 'provider': [{'_type': 'Organization', 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=ODF.IaXDAOnJaO-iU5IbMMsMHw&pid=news'}}, 'name': 'India TV'}], 'url': 'https://www.indiatvnews.com/news/india/breaking-news-july-15-live-updates-pm-modi-rahul-gandhi-union-budget-2024-nirmala-sitharaman-trump-rally-firing-monsoon-weather-supreme-court-2024-07-15-941824'}}
India TV News provides you wi

In [7]:
print(news_json['json_object']['name'])
print(news_json['json_object']['description'])
#print(news_json['json_object']['category'])
print(news_json['json_object']['url'])
print(news_json['json_object']['image']['thumbnail']['contentUrl'])
print(news_json['json_object']['provider'][0]['name'])
print(news_json['json_object']['datePublished'])

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 9, Finished, Available, Finished)

Breaking News, July 15 | Live Updates
India TV News provides you with all the breaking news, latest news, breaking story videos, and Live TV on a single platform to ensure you don't miss the biggest happenings in India and the world.
https://www.indiatvnews.com/news/india/breaking-news-july-15-live-updates-pm-modi-rahul-gandhi-union-budget-2024-nirmala-sitharaman-trump-rally-firing-monsoon-weather-supreme-court-2024-07-15-941824
https://www.bing.com/th?id=OVFT.BpqX5CYgdSYp49xHk5IB9i&pid=News
India TV
2024-07-15T10:26:00.0000000Z


#### Processing the JSON Property to List

In [8]:
title = []
description = []
category = []
image = []
url = []
provider = []
datePublished = []

# Process each JSON object in the List
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article['json_object'].get('category') and article['json_object'].get('image',{}).get('thumbnail',{}).get('contentUrl'):

            # Extract information from the dictionary
            title.append(article['json_object']['name'])
            description.append(article['json_object']['description'])
            category.append(article['json_object']['category'])
            url.append(article['json_object']['url'])
            image.append(article['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(article['json_object']['provider'][0]['name'])
            datePublished.append(article['json_object']['datePublished'])
    
    except Exception as e:
        print(f'Error processing JSON object: {e}')


StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 10, Finished, Available, Finished)

#### Converting the List to a DataFrame

In [9]:
from pyspark.sql.types import StructType,StructField,StringType

# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define Schema
schema=StructType([
    StructField('title',StringType(),True),
    StructField('description',StringType(),True),
    StructField('category',StringType(),True),
    StructField('url',StringType(),True),
    StructField('image',StringType(),True),
    StructField('provider',StringType(),True),
    StructField('datePublished',StringType(),True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)
display(df_cleaned)

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cfdc7fca-2085-43d9-afb3-e2401c0d9bdb)

#### Processing the Date column

In [10]:
from pyspark.sql.functions import to_date, date_format
df_cleaned_final = df_cleaned.withColumn('datePublished', date_format(to_date('datePublished'),'dd-MMM-yyyy'))
display(df_cleaned_final)

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fe7a9a0a-9b32-41d0-a911-df534b15b0e9)

#### Writing the Final DataFrame to the Lakehouse DB in a Delta format

In [15]:
'''

df_cleaned_final.write.format('delta').saveAsTable('bing_lake_db.tbl_latest_news')

'''


StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 17, Finished, Available, Finished)

"\ndf_cleaned_final.write.format('delta').saveAsTable('bing_lake_db.tbl_latest_news')\n"

#### SQL Merge DataWarehouse : Type-1

In [14]:
from pyspark.sql.utils import AnalysisException

try:
    table_name='bing_lake_db.tbl_latest_news'
    df_cleaned_final.write.format('delta').saveAsTable(table_name)

except AnalysisException:
    print('Table Already Exists')
    df_cleaned_final.createOrReplaceTempView('vw_df_cleaned_final')
    spark.sql(f''' Merge into {table_name} target_table 
                   using vw_df_cleaned_final source_view
                   on source_view.url = target_table.url

                   when matched and 
                   source_view.title <> target_table.title or 
                   source_view.description <> target_table.description or
                   source_view.category <> target_table.category or
                   source_view.image <> target_table.image or
                   source_view.provider <> target_table.provider or
                   source_view.datePublished <> target_table.datePublished 

                   then update set *

                   when not matched then insert *

                 ''')

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 16, Finished, Available, Finished)

Table Already Exists


In [16]:
%%sql
select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, 651b0312-8e81-4a22-9d7e-41a55b05dc18, 18, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>