## Read the JSON File as DataFrame

In [None]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")

In [None]:
display(df)

## Select just the values column

In [None]:
df = df.select("value")

## Explode the JSON Column

In [None]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["value"]).alias("json_object"))

In [None]:
display(df_exploded)

## Convert the Exploded JSON Format into a single JSON String list

In [None]:
json_list = df_exploded.toJSON().collect()

## Testing the json string list

In [None]:
print(json_list)

In [None]:
import json

news_json = json.loads(json_list[1]) #Converting the JSON string to a JSON dictionary
#print(news_json)
#print(news_json["json_object"]["name"])
#print(news_json["json_object"]["category"])

## List Initialization and JSON Processing

In [None]:
title = []
description = []
category = []
url =[]
image = []
provider = []
datePublished =[]

# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)
        
        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):
            
            #Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]['name'])
            datePublished.append(article["json_object"]["datePublished"])
            
    except Exception as e:
        print(f"Error processing JSON object: {e}")

## Converting the List to a Dataframe

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

In [None]:
display(df_cleaned.limit(5))

## Processing the Date column

In [None]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

## Display Results

In [None]:
display(df_cleaned_final.limit(5))

## Writing the Final Dataframe to the Lakehouse DB in a Delta format

In [None]:
df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.test")