**DATA TRANSFORMATION OF THE US_2024_ELECTION NEWS JSON DATA**

In [1]:
#load json file data directly from the lakehouse 

df = spark.read.option("multiline", "true").json("Files/Bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/Bing-latest-news.json".
display(df)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 95d884f4-2229-4079-94e4-0901ec1f074d)

In [2]:
#display only the news article which is in the value column above 

df=df.select("value")

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 4, Finished, Available, Finished)

In [3]:
display(df)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9abd1cc3-3c2a-46c2-a483-0e95dba6404f)

In [4]:

#The explode import allow us to Returns a new row for each element in the given array or map.
#exploring the json to covert column to multiple rows.

from pyspark.sql.functions import explode

# Suppose 'json_field' is the field you want to explode
df_expload = df.select(explode(df["value"]).alias('json_field'))



StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 6, Finished, Available, Finished)

In [5]:
display(df_expload)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 92d5c163-cde3-4d49-932a-fcc70c62d955)

In [6]:
#convert json dataframe to a single json string list
json_list = df_expload.toJSON().collect()


StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 8, Finished, Available, Finished)

In [7]:
#print the first value from the json string list 
print(json_list[26])

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 9, Finished, Available, Finished)

{"json_field":{"datePublished":"2024-09-22T16:22:00.0000000Z","description":"ABC's George Stephanopoulos hosts Julie Pace, Jon Karl, and Marianna Sotomayor to discuss a sex scandal from Republican North Carolina gubernatorial candidate Mark Robinson, efforts to change Nebraska's electoral vote allocation,","name":"'This Week' Roundtable: 44 Days To 2024 Election, Will There Be Another Debate?","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.fAzojzbjfaG4aLodVwP_Mg&pid=news"}},"name":"RealClearPolitics"}],"url":"https://www.realclearpolitics.com/video/2024/09/22/this_week_roundtable_44_days_to_2024_election_will_there_be_another_debate.html"}}


In [8]:
#convert the json list to a json dictionary of the second news item for easy extraction. 
#it is easier to extract when the json list is in a json dictionary format.
import json
news_json = json.loads(json_list[26])


StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 10, Finished, Available, Finished)

In [9]:
print(news_json)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 11, Finished, Available, Finished)

{'json_field': {'datePublished': '2024-09-22T16:22:00.0000000Z', 'description': "ABC's George Stephanopoulos hosts Julie Pace, Jon Karl, and Marianna Sotomayor to discuss a sex scandal from Republican North Carolina gubernatorial candidate Mark Robinson, efforts to change Nebraska's electoral vote allocation,", 'name': "'This Week' Roundtable: 44 Days To 2024 Election, Will There Be Another Debate?", 'provider': [{'_type': 'Organization', 'image': {'thumbnail': {'contentUrl': 'https://www.bing.com/th?id=ODF.fAzojzbjfaG4aLodVwP_Mg&pid=news'}}, 'name': 'RealClearPolitics'}], 'url': 'https://www.realclearpolitics.com/video/2024/09/22/this_week_roundtable_44_days_to_2024_election_will_there_be_another_debate.html'}}


In [10]:
#get specific item from the json dictionary (description of the news)

# Accessing fields in the json_field dictionary
json_field = news_json['json_field']
print(json_field["name"])  # Name of the news
print(json_field["description"])  # Description of the news
print(json_field["datePublished"])  # Date published
print(json_field["provider"][0]["name"])  # Provider name


# Accessing the thumbnail URL correctly
thumbnail_url = json_field["provider"][0].get("image", {}).get("thumbnail", {}).get("contentUrl", "No thumbnail URL available")
print(thumbnail_url)  # Thumbnail URL

print(json_field["url"])  # URL of the news article


StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 12, Finished, Available, Finished)

'This Week' Roundtable: 44 Days To 2024 Election, Will There Be Another Debate?
ABC's George Stephanopoulos hosts Julie Pace, Jon Karl, and Marianna Sotomayor to discuss a sex scandal from Republican North Carolina gubernatorial candidate Mark Robinson, efforts to change Nebraska's electoral vote allocation,
2024-09-22T16:22:00.0000000Z
RealClearPolitics
https://www.bing.com/th?id=ODF.fAzojzbjfaG4aLodVwP_Mg&pid=news
https://www.realclearpolitics.com/video/2024/09/22/this_week_roundtable_44_days_to_2024_election_will_there_be_another_debate.html


In [11]:
#Process json property to List
title = []
description = []
image = []
link = []
datePublished = []
provider = []


#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        #Extract information from the dictionary
        title.append(item["json_field"]["name"])
        description.append(item["json_field"]["description"])
        image.append(item["json_field"]["image"]["thumbnail"]["contentUrl"])
        link.append(item["json_field"]["url"])
        datePublished.append(item["json_field"]["datePublished"])
        provider.append(item["json_field"]["provider"][0]["name"])
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 13, Finished, Available, Finished)

Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'
Error processing JSON field: 'image'


In [12]:
#THE IF STATEMENT = Don't process any item whose details is not in the json list 
#with this, the Error above is mitigated

title = []
category = []
country = []
creator = []
description = []
keywords = []
language = []
link = []
pubDate = []

#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        # This to remove all news that has no images attched to it.
        if item["json_field"].get("image"):

            #Extract information from the dictionary
            title.append(item["json_field"]["name"])
            description.append(item["json_field"]["description"])
            image.append(item["json_field"]["image"])
            link.append(item["json_field"]["url"])
            datePublished.append(item["json_field"]["datePublished"])
            provider.append(item["json_field"]["provider"])
    
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 14, Finished, Available, Finished)

In [13]:
#Convert List to a DataFrame
from pyspark.sql.types import StructType, StructField, StringType

#combine all list
us_2024_election = list(zip(title,description,image,link,datePublished, provider))

#Define Schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("image", StringType(), True),
    StructField("link", StringType(), True),
    StructField("datePublished", StringType(), True),
    StructField("provider", StringType(), True),
    
])
#create DataFrame
df_clean = spark.createDataFrame(us_2024_election , schema=schema)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 15, Finished, Available, Finished)

In [14]:
#lets look at the Cleaned data
display (df_clean)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b4c5f462-71c9-49b9-97a5-c1c97f6e4d6d)

In [15]:
# we want to covert the date column to a proper date format instead of the string format
# Split datePublished into Date and Time columns 

from pyspark.sql.functions import col, date_format

df_clean = df_clean.withColumn("published_date", date_format(col("datePublished"), "yyyy-MM-dd"))
df_clean = df_clean.withColumn("published_time", date_format(col("datePublished"), "HH:mm:ss"))



StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 17, Finished, Available, Finished)

In [16]:
display(df_clean)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 129fb775-fd97-4262-9721-553e84fdd452)

In [17]:
# Convert published_date column to Date data type
 
from pyspark.sql.functions import to_date

df_clean = df_clean.withColumn("published_date", to_date(col("published_date"), "yyyy-MM-dd"))

display(df_clean)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 99a024b8-d29f-4cb4-bdda-6b4e46e6e493)

In [21]:
# write table to the bing_lake_house (save the transfromed json file to the table session of the lakehouse)
 
# df_clean.write.format("delta").saveAsTable("bing_us_2024_election_news")

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 23, Finished, Available, Finished)

**Implementing The type 1 incremental Loading of the transformed data**

In [24]:
# Adopting TYPE 1 SCD incremental loading for our data.

'''In a Type 1 SCD the new data overwrites the existing data without duplicate. Thus the existing data
 is lost as it is not stored anywhere else. This is typically used when there is no need to keep 
 a history of the data.'''

from pyspark.sql.utils import AnalysisException

#Exception Handling
try:

    table_name = "Bing_lake_house.us_2024_election_latest_news"
    df_clean.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print ("Table Already Exist")

    df_clean.createOrReplaceTempView("vw_df_clean")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_clean source_view

                    ON source_view.link = target_table.link

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.image <> target_table.image OR
                    source_view.link <> target_table.link OR
                    source_view.datePublished <> target_table.datePublished OR
                    source_view.provider <> target_table.provider OR
                    source_view.published_date <> target_table.published_date OR
                    source_view.published_time<> target_table.published_time
                    
                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT * 

                """)

StatementMeta(, f2fbbc58-c90f-4086-8555-9a629ed0bcd3, 26, Finished, Available, Finished)

Table Already Exist


In [2]:
%%sql
--- This is to count the number of data that was loaded into the lakehouse 
select * from us_2024_election_latest_news

StatementMeta(, aff41153-7546-4cc3-b254-9335ebac62d8, 4, Finished, Available, Finished)

<Spark SQL result set with 21 rows and 8 fields>

In [1]:
#Show data in SQL, another method
sqldf = spark.sql("SELECT COUNT(*) FROM us_2024_election_latest_news LIMIT 1000")
display(sqldf)

StatementMeta(, aff41153-7546-4cc3-b254-9335ebac62d8, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e2385b45-41e6-42ef-b8cc-8268a72a36ce)