# Read the Json File as a Dataframe

In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")

StatementMeta(, , , SessionError, )

InvalidHttpRequestToLivy: [TooManyRequestsForCapacity] This spark job can't be run because you have hit a spark compute or API rate limit. To run this spark job, cancel an active Spark job through the Monitoring hub, choose a larger capacity SKU, or try again later. HTTP status code: 430 {Learn more} HTTP status code: 430.

# Select and explode only the value column from the json

In [None]:
from pyspark.sql.functions import explode
def_exploded = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, , , Cancelled, )

# convert to json

In [None]:
json_list = def_exploded.toJSON().collect()


StatementMeta(, , , Cancelled, )

name


# Convert the different items into json dictionary and load them  into lists 

In [None]:
import json

title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

for counter_str in json_list:
    try:
        news_json = json.loads(counter_str)
        # no processing if the json from api does not contain category or image
        if(not news_json['json_object'].get("category") is None and not news_json['json_object'].get('image') is None):
            title.append(news_json['json_object']['name'])
            description.append(news_json['json_object']['description'])
            category.append(news_json['json_object']['category'])
            url.append(news_json['json_object']['url'])
            image.append(news_json['json_object']['image']['thumbnail']['contentUrl'])
            provider.append(news_json['json_object']['provider'][0]['name'])
            datePublished.append(news_json['json_object']['datePublished'])
    except Exception as e:
        print(f"errors processing json: {e}")


StatementMeta(, , , Cancelled, )

# Combine the different lists into a Dataframe 

In [None]:
from pyspark.sql.types import StructType,StructField, StringType

data = list(zip(title,description,category,url,image,provider,datePublished))

schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
    ])

df_cleaned = spark.createDataFrame(data,schema =schema)


StatementMeta(, , , Cancelled, )

# convert date

In [None]:
from pyspark.sql.functions import to_date, date_format
df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MM-yyyy"))

StatementMeta(, , , Cancelled, )

# Writing the Final Dataframe to the Lakehouse DB in a Delta format using INCREMENTAL LOAD Type 1

In [None]:
from pyspark.sql.utils import AnalysisException

table_name = "bing_lake_db.tbl_latest_news"

try: 
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:
    print ("Table already exists!")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    # check if the url matched between the source and the target, if it matched then, check if any of the 
    # column values has changed and update the whole row accordingly, if not insert the whole row in the table
    spark.sql(f"""MERGE INTO {table_name} target_table
                     USING vw_df_cleaned_final source_view
                     ON source_view.url = target_table.url

                     WHEN MATCHED AND
                     (source_view.title <> target_table.title OR
                     source_view.description <> target_table.description OR
                     source_view.category <> target_table.category OR
                     source_view.image <> target_table.image OR
                     source_view.provider <> target_table.provider OR
                     source_view.datePublished <> target_table.datePublished)

                     THEN UPDATE SET *
                     WHEN NOT MATCHED THEN INSERT *

                """)


StatementMeta(, , , Cancelled, )