##### **DATA TRANSFORMATION OF THE OLYMPIC NEWS JSON DATA**

In [39]:
#load data directly from the Lakehouse db 
df = spark.read.option("multiline", "true").json("Files/olympic-news.json")

# df now is a Spark DataFrame containing JSON data from "Files/olympic-news.json".
display(df)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6e08c078-8c4f-473f-8934-1a4e959d92c5)

In [40]:
#display only the news article which is in the value Column above.
df_news = df.select("value")

display(df_news)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 42, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c2fe0c09-d279-486f-8d0a-e65c97af5a22)

In [41]:
#The explode import allow us to Returns a new row for each element in the given array or map.
#exploring the json to covert column to multiple rows.

from pyspark.sql.functions import explode

# Suppose 'json_field' is the field you want to explode
df_expload = df_news.select(explode(df["value"]).alias('json_field'))

display(df_expload)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dc3829b9-8036-4390-9b4a-684bd3c6ff62)

In [42]:
#convert json dataframe to a single json string list
json_list = df_expload.toJSON().collect()

#print the first value from the json string list 
print(json_list[0])

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 44, Finished, Available, Finished)

{"json_field":{"about":[{"name":"Jordan Chiles","readLink":"https://api.bing.microsoft.com/api/v7/entities/ccfb1b3e-5197-1843-449d-789be26df4bf"},{"name":"Dominique Dawes","readLink":"https://api.bing.microsoft.com/api/v7/entities/fc3a3f50-2c48-c688-161b-a9f96fa0dbb7"},{"name":"USA Gymnastics","readLink":"https://api.bing.microsoft.com/api/v7/entities/0b6ecc16-2f85-ff22-eac6-f8273f9fd25a"},{"name":"International Olympic Committee","readLink":"https://api.bing.microsoft.com/api/v7/entities/d9fd20e2-8f92-7e19-85f2-e211d74b45b6"}],"datePublished":"2024-08-14T14:33:05.0000000Z","description":"Olympic legend Dominique Dawes said on Tuesday that the error in the Jordan Chiles' medal controversy lies with the judges who \"missed a dance element\" in her floor ...","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.9S1Eay2zwQ11kNXh_Tb1Fi&pid=News","height":393,"width":700}},"mentions":[{"name":"Jordan"},{"name":"Olympic Games"},{"name":"Fox News Channel"}],"name":"Olympic judg

In [43]:
#convert the json list to a json dictionary of the second news item for easy extraction. 
#it is easier to extract when whenthe json list is in a json dictionary format.
import json
news_json = json.loads(json_list[4])

print(news_json)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 45, Finished, Available, Finished)

{'json_field': {'about': [{'name': 'ITV', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/a55980ba-0515-d5b3-19d8-ebe9a6098e65'}, {'name': 'Jordan Chiles', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/ccfb1b3e-5197-1843-449d-789be26df4bf'}, {'name': 'Court of Arbitration for Sport', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/e1d5ba69-02a0-d7e4-4388-a9c0b5dc4ecc'}], 'datePublished': '2024-08-14T13:29:00.0000000Z', 'description': 'The US Olympic and Paralympic Committee seems to be running out of options, but said they will work “diligently to resolve this matter swiftly and fairly.” Subscribe free to our weekly newsletter for exclusive and original coverage from ITV News.', 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=OVFT.bvuAQdKdCb2IrWQPcl8oUy&pid=News', 'height': 393, 'width': 700}}, 'name': 'Which Paris Olympic medals are still being disputed after the Games have ended?', 'provider': [{'_type': 'Organization', 'name': 

In [44]:
# Print the keys (headers) of the top-level dictionary
print(news_json.keys())

#  To see want to see the keys inside the 'json_field' (the nested dictionary)
print(news_json['json_field'].keys())

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 46, Finished, Available, Finished)

dict_keys(['json_field'])
dict_keys(['about', 'datePublished', 'description', 'image', 'name', 'provider', 'url'])


In [45]:
#get specific item from the json dictionary (description of the news)

print(news_json["json_field"]["name"])  
print(news_json["json_field"]["description"]) 
print(news_json["json_field"]["image"]["thumbnail"]["contentUrl"])
print(news_json["json_field"]["datePublished"])
print(news_json["json_field"]["provider"][0]["name"])
print(news_json["json_field"]["url"])

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 47, Finished, Available, Finished)

Which Paris Olympic medals are still being disputed after the Games have ended?
The US Olympic and Paralympic Committee seems to be running out of options, but said they will work “diligently to resolve this matter swiftly and fairly.” Subscribe free to our weekly newsletter for exclusive and original coverage from ITV News.
https://www.bing.com/th?id=OVFT.bvuAQdKdCb2IrWQPcl8oUy&pid=News
2024-08-14T13:29:00.0000000Z
ITV
https://www.itv.com/news/2024-08-14/which-paris-olympic-medals-are-still-being-disputed-after-the-games-have-ended


In [46]:
#Process json property to List
title = []
description = []
image = []
link = []
datePublished = []
provider = []


#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        #Extract information from the dictionary
        title.append(item["json_field"]["name"])
        description.append(item["json_field"]["description"])
        image.append(item["json_field"]["image"]["thumbnail"]["contentUrl"])
        link.append(item["json_field"]["url"])
        datePublished.append(item["json_field"]["datePublished"])
        provider.append(item["json_field"]["provider"][0]["name"])
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 48, Finished, Available, Finished)

Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'


In [47]:
#THE IF STATEMENT = Don't process any item whose details is not in the json list 
#with this, the Error above is mitigated

title = []
category = []
country = []
creator = []
description = []
keywords = []
language = []
link = []
pubDate = []

#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        # This to remove all news that has no images attched to it.
        if item["json_field"].get("image"):

            #Extract information from the dictionary
            title.append(item["json_field"]["name"])
            description.append(item["json_field"]["description"])
            image.append(item["json_field"]["image"])
            link.append(item["json_field"]["url"])
            datePublished.append(item["json_field"]["datePublished"])
            provider.append(item["json_field"]["provider"])
    
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 49, Finished, Available, Finished)

In [48]:
#Convert List to a DataFrame
from pyspark.sql.types import StructType, StructField, StringType

#combine all list
olympic_news_data = list(zip(title,description,image,link,datePublished, provider))

#Define Schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("image", StringType(), True),
    StructField("link", StringType(), True),
    StructField("datePublished", StringType(), True),
    StructField("provider", StringType(), True),
    
])
#create DataFrame
df_clean = spark.createDataFrame(olympic_news_data, schema=schema)


StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 50, Finished, Available, Finished)

In [49]:
#Lets look at the cleaned data
display(df_clean)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 51, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f3406b80-6267-47d2-9d97-9e042e0dd2ce)

In [50]:
#Split datePublished into Date and Time columns 
from pyspark.sql.functions import col, date_format

df_clean = df_clean.withColumn("published_date", date_format(col("datePublished"), "yyyy-MM-dd"))
df_clean = df_clean.withColumn("published_time", date_format(col("datePublished"), "HH:mm:ss"))

display(df_clean)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 52, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dff35509-19ca-4877-9b6d-0998f9b2b212)

In [51]:
# Convert published_date column to Date data type
 
from pyspark.sql.functions import to_date

df_clean = df_clean.withColumn("published_date", to_date(col("published_date"), "yyyy-MM-dd"))

display(df_clean)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d7204b43-9d0e-4152-98ee-dd71f0081f4f)

In [None]:
#write table to the bing_olympic_news_db

#df_clean.write.format("delta").saveAsTable("bing_olympic_news_db.olympic_news_transformed_data")

## **Implementing the Type 1 incremental Loading of the Transformed Data**

In [52]:
# Adopting TYPE 1 SCD incremental loading for our data.

'''In a Type 1 SCD the new data overwrites the existing data without duplicate. Thus the existing data
 is lost as it is not stored anywhere else. This is typically used when there is no need to keep 
 a history of the data.'''

from pyspark.sql.utils import AnalysisException

#Exception Handling
try:

    table_name = "bing_olympic_news_db.olympic_news_updated_data"
    df_clean.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print ("Table Already Exist")

    df_clean.createOrReplaceTempView("vw_df_clean")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_clean source_view

                    ON source_view.link = target_table.link

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.image <> target_table.image OR
                    source_view.link <> target_table.link OR
                    source_view.datePublished <> target_table.datePublished OR
                    source_view.provider <> target_table.provider OR
                    source_view.published_date <> target_table.published_date OR
                    source_view.published_time<> target_table.published_time
                    
                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT * 

                """)


StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 54, Finished, Available, Finished)

In [53]:
%%sql

-- This is to count the number of data was loaded into the Lakehouse

SELECT COUNT(*) FROM bing_olympic_news_db.olympic_news_updated_data
     

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 55, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [54]:
#Show data in SQL, another method
sqldf = spark.sql("SELECT COUNT(*) FROM bing_olympic_news_db.olympic_news_updated_data LIMIT 1000")
display(sqldf)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 56, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 43420012-2734-414c-a889-ed395d35aeca)

In [55]:
#Append to existing table in LakeHouse
#df_clean.write.format("delta").mode("append").insertInto("bing_olympic_news_db.olympic_news_updated_data")

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 57, Finished, Available, Finished)

In [56]:
df = spark.sql("SELECT * FROM bing_olympic_news_db.olympic_news_updated_data LIMIT 1000")
display(df)

StatementMeta(, 44571c71-7d2a-4832-bb9c-de08fbf41949, 58, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 46a736ab-b073-443e-9b25-f2c57520e115)