In [8]:
# Reading the JSON data
df = spark.read.option("multiline","true").json("Files/bing-latest-news")

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 10, Finished, Available, Finished)

In [9]:
# Selecting the 'value' column since it has the required JSON objects as one single row
df = df.select('value')

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 11, Finished, Available, Finished)

In [10]:
from pyspark.sql.functions import explode
import json

# Creating a new data frame with 'json_object' column, of which, each row contains a new json object
df_exploded = df.select(explode(df['value']).alias('json_object'))


# Collecting the JSON objects into a list
json_list = df_exploded.toJSON().collect()


StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 12, Finished, Available, Finished)

In [11]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

for json_str in json_list:
    try:
        article = json.loads(json_str)

        # Checking if 'category' and 'contentUrl' are present in the JSON object
        if article["json_object"].get("category") and article["json_object"].get("image",{}).get("thumbnail",{}).get("contentUrl"): 
            title.append(article['json_object']['name'])
            description.append(article['json_object']['description'])
            category.append(article['json_object']['category'])
            url.append(article['json_object']['url'])
            image.append(article['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(article['json_object']['provider'] [0]['name'])
            datePublished.append(article['json_object']['datePublished'])

    except Exception as e:
        print(f"Error processing JSON Object: {e}")    

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 13, Finished, Available, Finished)

In [12]:
from pyspark.sql.types import StructType, StructField, StringType


# Create a list of tuples where each tuple contains the respective elements from title, description, category, url, image, provider, and datePublished lists
data = list(zip(title,description,category,url,image,provider,datePublished))


# Defining the schema for the DataFrame
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(),True),
    StructField("image",StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Creating a DataFrame using the data and schema defined above
df_cleaned = spark.createDataFrame(data, schema = schema)

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 14, Finished, Available, Finished)

In [13]:
from pyspark.sql.functions import to_date, date_format

# Converting the 'datePublished' column to 'dd-MMM-yyyy' format
df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 15, Finished, Available, Finished)

In [14]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table already exists")  

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING vw_df_cleaned_final source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND
                   source_view.title <> target_table.title OR
                   source_view.description <> target_table.description OR
                   source_view.category <> target_table.category OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished 

                   THEN UPDATE SET * 
                
                """)  

StatementMeta(, 2db2290a-4e08-4b02-9351-d84722a1287e, 16, Finished, Available, Finished)

Table already exists
