### **Read the JSON file as a Dataframe**

In [None]:
df = spark.read.option("multiline", "true").json("Files/News.json")
# df now is a Spark DataFrame containing JSON data from "Files/News.json".
display(df)

### **Selecting just the value column from the dataframe**

In [None]:
df=df.select("articles")

### **Explode the JSON column**

In [89]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["articles"]).alias("json_object"))

StatementMeta(, 95cfd7fc-76d1-43a0-b28a-0c85414011eb, 91, Finished, Available, Finished)

### **Output after JSON Exploded**

In [None]:
display(df_exploded)

### **Converting the Exploded JSON Dataframe to a single JSON string list**

In [None]:
json_list = df_exploded.toJSON().collect()

### **Testing the JSON string list**

In [None]:
print(json_list[1])

### **Converting the JSON String to JSON Dictionary with one news articles**

In [None]:
import json
news_json = json.loads(json_list[25])

### **Testing the JSON Dictionary**

In [None]:
print(news_json)
print(news_json['json_object']['author'])

### **Column_names To Be Extraction**

In [None]:
# author
# content
# description
# publishedAt
# source_name (since source is a nested object, use source.name as source_name)
# title
# url
# urlToImage

### **Processing the JSON property to list**

In [None]:
author=[]
content=[]
description=[]
publishedAt=[]
source=[]
title=[]
url=[]
urlToImage=[]



# Process each JSON object in the list
for json_str in json_list:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)
        if article["json_object"].get("author") and article["json_object"].get("urlToImage"):
            # Extract information from the dictionary
            author.append(article['json_object']["author"])
            content.append(article["json_object"]["content"])
            description.append(article["json_object"]["description"])
            publishedAt.append(article["json_object"]["publishedAt"])
            source.append(article["json_object"]["source"]["name"])
            title.append(article["json_object"]["title"])
            url.append(article["json_object"]["url"])            
            urlToImage.append(article["json_object"]["urlToImage"])
 
    except Exception as e:
        print(f"Error processing JSON object: {e}")


### **Converting the list to a Dataframe**

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(author, content, description, publishedAt, source, title,url,urlToImage))

# Define schema
schema = StructType([
    StructField("author", StringType(), True),
    StructField("content", StringType(), True),
    StructField("description", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("source", StringType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("urlToImage", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)


### **Testing the Dataframe**

In [None]:
display(df_cleaned)

### **Processing the Date Column from the Dataframe**

In [None]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("PublishedAt", date_format(to_date("PublishedAt"), "dd-MMM-yyyy"))


### **Testing the new Dataframe**

In [None]:
display(df_cleaned_final.limit(5))

### Converting the new dataframe to delta format & storing this new table in the lakehouse.
### Note: Here we are using incremental load technique type 1 (Overwritten & No history)

In [None]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'OpenWealth_Lakehouse.table_latest_news'
    
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:
    print("Table Already Exists")
    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    spark.sql(f"""
    MERGE INTO {table_name} target_table
    USING vw_df_cleaned_final source_view
    ON source_view.url = target_table.url
    WHEN MATCHED THEN
    UPDATE SET
        author = source_view.author,
        content = source_view.content,
        description = source_view.description,
        publishedAt = source_view.publishedAt,
        source = source_view.source,
        title = source_view.title,
        urlToImage = source_view.urlToImage
    WHEN NOT MATCHED THEN
    INSERT *
    """)


In [None]:
%%sql
select count(*) from OpenWealth_Lakehouse.table_latest_news