## **Read JSON file as a DATAFRAME**

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 578e043d-bd81-4e43-be08-bb3a60c03006)

## **SELECT just the value column from the DataFrame**

In [2]:
df = df.select("value")

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 4, Finished, Available)

In [3]:
display(df)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3c1179c6-5b41-472e-9a60-367157303436)

## **Explode the JSON column**

In [4]:
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 6, Finished, Available)

In [5]:
display(df_exploded)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1b903461-88d6-4132-bb22-de196b913d94)

## **CONVERT the explode JSON DataFrame to a single JSON string list**

In [6]:
json_list = df_exploded.toJSON().collect()

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 8, Finished, Available)

## **Testing string list**

In [7]:
print(json_list[25])

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 9, Finished, Available)

{"json_object":{"about":[{"name":"State Bank of India","readLink":"https://api.bing.microsoft.com/api/v7/entities/74aa8c7f-1efb-2199-d975-546c5bf8f65c"},{"name":"Axis Bank","readLink":"https://api.bing.microsoft.com/api/v7/entities/035bb83b-062d-e8b8-4719-7650b2c0c939"},{"name":"HDFC Bank","readLink":"https://api.bing.microsoft.com/api/v7/entities/fef801ff-bedf-ac92-9300-2f1cf04fa8dd"},{"name":"Infosys","readLink":"https://api.bing.microsoft.com/api/v7/entities/9cef64da-6db6-c7ec-b82e-b4252ffb23c5"},{"name":"Wipro","readLink":"https://api.bing.microsoft.com/api/v7/entities/685856ee-7c8b-09b7-f62c-9f03e8d49a20"},{"name":"NTPC Limited","readLink":"https://api.bing.microsoft.com/api/v7/entities/46a38888-c5a3-b9e1-0f4f-5ecb033dae50"}],"category":"Business","datePublished":"2024-04-12T11:35:00.0000000Z","description":"YES Bank(numbers of share traded: 17.11 crore), SAIL(numbers of share traded: 7.36 crore), Reliance Power(numbers of share traded: 5.72 crore), BEL(numbers of share traded: 5.

In [8]:
import json

news_json = json.loads(json_list[25])

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 10, Finished, Available)

In [9]:
print(news_json)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 11, Finished, Available)

{'json_object': {'about': [{'name': 'State Bank of India', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/74aa8c7f-1efb-2199-d975-546c5bf8f65c'}, {'name': 'Axis Bank', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/035bb83b-062d-e8b8-4719-7650b2c0c939'}, {'name': 'HDFC Bank', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/fef801ff-bedf-ac92-9300-2f1cf04fa8dd'}, {'name': 'Infosys', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/9cef64da-6db6-c7ec-b82e-b4252ffb23c5'}, {'name': 'Wipro', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/685856ee-7c8b-09b7-f62c-9f03e8d49a20'}, {'name': 'NTPC Limited', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/46a38888-c5a3-b9e1-0f4f-5ecb033dae50'}], 'category': 'Business', 'datePublished': '2024-04-12T11:35:00.0000000Z', 'description': 'YES Bank(numbers of share traded: 17.11 crore), SAIL(numbers of share traded: 7.36 crore), Reliance Power(numbers of share traded: 5.72 crore), 

In [10]:
print(news_json["json_object"]["name"])
print(news_json["json_object"]["description"])
print(news_json["json_object"]["category"]) 
print(news_json["json_object"]["url"])
print(news_json["json_object"]["image"]["thumbnail"]["contentUrl"])
print(news_json["json_object"]["provider"][0]['name'])
print(news_json["json_object"]["datePublished"])

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 12, Finished, Available)

Share market update: Most active stocks in today's market in terms of volume
YES Bank(numbers of share traded: 17.11 crore), SAIL(numbers of share traded: 7.36 crore), Reliance Power(numbers of share traded: 5.72 crore), BEL(numbers of share traded: 5.72 crore), Zomato(numbers of share traded: 5.
Business
https://economictimes.indiatimes.com/markets/stocks/stock-watch/share-market-update-most-active-stocks-in-todays-market-in-terms-of-volume/articleshow/109248435.cms
https://www.bing.com/th?id=OVFT.KzP56VjOfHNYjQ_T89YbES&pid=News
Indiatimes
2024-04-12T11:35:00.0000000Z


## **Processing the JSON property to list**

In [11]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

#Process each JSON object in the list
for json_str in json_list:
    try:
        #Parse the JSON string into dictionary
        article = json.loads(json_str)

        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):
        #Extract information from the dictionary

            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])
    except Exception as e:
        print(f"Error processing JSON object: {e}")


StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 13, Finished, Available)

## **Converting the LIST to a DataFrame**

In [12]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists into a list of tuples
valid_rows = [row for row in zip(title, description, category, url, image, provider, datePublished) if all(row)]

# Define the schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create the DataFrame
df_cleaned = spark.createDataFrame(valid_rows, schema=schema)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 14, Finished, Available)

In [13]:
display(df_cleaned)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1eeff9dc-b701-4c7c-aae8-db9120058e23)

## **Processing the date column**

In [14]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 16, Finished, Available)

In [15]:
display(df_cleaned_final)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3231d8a3-f357-4be7-94da-4d3cb8f5cfcc)

## **Writting the Final DataFrame to the LakeHouse DB in a Delta format**

In [16]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_df_cleaned_final source_view
                   
                   ON source_view.url = target_table.url
                   
                   WHEN MATCHED AND
                   source_view.title <> target_table.title OR
                   source_view.description <> target_table.description OR
                   source_view.category <> target_table.category OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished 
                   
                   THEN UPDATE SET *
                   
                   WHEN NOT MATCHED THEN INSERT *
                """)

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 18, Finished, Available)

Table Already Exists


In [17]:
%%sql

select count(*) from bing_lake_db.tbl_latest_news

StatementMeta(, ef6fc1c2-26d9-4d8f-a208-e4b81e93d793, 19, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>