## Reading the JSON data using Spark

In [1]:
df = spark.read.option("multiline", "true").json("Files/latest-world-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/latest-world-news.json".
display(df)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 79ef221d-7e8e-45af-9070-d22a76e98dfe)

In [2]:
df_canada = spark.read.option("multiline", "true").json("Files/latest-canada-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/latest-canada-news.json".
display(df_canada)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 31c98785-ba5c-483a-b61d-e5301b0002da)

In [3]:
df = df.select("articles")
df_canada = df_canada.select("articles")

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 5, Finished, Available, Finished)

In [4]:
display(df)
display(df_canada)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 89efdff5-4776-4e34-a57e-1e64dd4e0624)

SynapseWidget(Synapse.DataFrame, 15c23a0b-5f72-41ce-a224-6b4a5b7a4dcd)

## Merging Latest news and bitcoin news

In [5]:
df_merged = df.unionByName(df_canada)
display(df_merged)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c8e63ddd-901d-4fc8-acfb-f093ec590e63)

## Explode
Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.

Since we are having all the data inside the column, we will be exploding the Articles column so that we can have all the elements row wise.

What’s happening here is that both of your DataFrames (df and df_bitcoin) have a column named articles, and inside it is a JSON/array (you can see [{ "author": ... }]). When you do unionByName, Spark just stacks them without expanding the nested structure.

To fix this, you need to flatten the articles column so that each article is a separate row with proper columns (author, title, description, etc.).

In [6]:
from pyspark.sql.functions import explode, col

# Explode the articles array into separate rows
df_flat = df_merged.withColumn("article", explode(col("articles")))

# Select the fields inside each article
df_exploded = df_flat.select(
    col("article.author").alias("author"),
    col("article.title").alias("title"),
    col("article.description").alias("description"),
    col("article.content").alias("content"),
    col("article.url").alias("url"),
    col("article.urlToImage").alias("urlToImage"),
    col("article.source.name").alias("sourceName"),
    col("article.publishedAt").alias("publishedAt")
)

display(df_exploded.limit(5))

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0d65c7b3-0be9-47f0-9b48-022f77d245ef)

## Filter out missing values
Check if Author, Description or URL is not null and filter those rows.

In [7]:

df_filtered = df_exploded.filter(
    col("author").isNotNull() &
    col("description").isNotNull() &
    col("url").isNotNull()
)

display(df_filtered.limit(5))
df_filtered.count()

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 52d0c6a8-f522-4731-b654-a071e7b16585)

84

## Converting the Exploded JSON dataframe to single JSON string
- collect is required so that the data inside gets collected as list

In [8]:
json_list = df_filtered.toJSON().collect()

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 10, Finished, Available, Finished)

## Testing the JSON string list

In [9]:
#print(json_list)
print(json_list[0])

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 11, Finished, Available, Finished)

{"author":"Olivia Craighead","title":"MSNBC Is Changing Its Name to MS NOW","description":"Later this year, MSNBC will change its name to MS NOW, which is not a drag name but actually something that stands for “My Source for News, Opinion, and the World.”","content":"Say good-bye to MSNBC. As part of the channels split from NBC Universal, it will take on a new moniker by the end of 2025. Given that this is one of the big three news networks (No. 1 among moms who … [+2689 chars]","url":"https://www.thecut.com/article/msnbc-changing-name-ms-now.html","urlToImage":"https://pyxis.nymag.com/v1/imgs/568/709/c4f52a3fd228f161b9b1ed372531fba97e-msnbc-rebrand.1x.rsocial.w1200.jpg","sourceName":"New York Magazine","publishedAt":"2025-08-18T16:29:30Z"}


#### Convert list to JSON dictionary
What are JSON loads () in Python?
- The json.loads() method can be used to parse a valid JSON string and convert it into a Python
Dictionary. It is mainly used for deserializing native string, byte, or byte array which consists of JSON
data into Python Dictionary.

In [10]:
import json

news_json = json.loads(json_list[0])

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 12, Finished, Available, Finished)

In [11]:
print(news_json)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 13, Finished, Available, Finished)

{'author': 'Olivia Craighead', 'title': 'MSNBC Is Changing Its Name to MS NOW', 'description': 'Later this year, MSNBC will change its name to MS NOW, which is not a drag name but actually something that stands for “My Source for News, Opinion, and the World.”', 'content': 'Say good-bye to MSNBC. As part of the channels split from NBC Universal, it will take on a new moniker by the end of 2025. Given that this is one of the big three news networks (No. 1 among moms who … [+2689 chars]', 'url': 'https://www.thecut.com/article/msnbc-changing-name-ms-now.html', 'urlToImage': 'https://pyxis.nymag.com/v1/imgs/568/709/c4f52a3fd228f161b9b1ed372531fba97e-msnbc-rebrand.1x.rsocial.w1200.jpg', 'sourceName': 'New York Magazine', 'publishedAt': '2025-08-18T16:29:30Z'}


Lets see now what information we need from an article
- title
- description
- content
- author
- urlToImage
- url
- source
- publishedAt

## Processing the JSON property to List

In [12]:
title = []
description = []
content = []
author = []
urlToImage = []
url = []
source = []
publishedAt = []

for json_str in json_list:
    try:
        obj = json.loads(json_str)

        title.append(obj.get("title"))
        description.append(obj.get("description"))
        content.append(obj.get("content"))  # May be None
        author.append(obj.get("author"))
        urlToImage.append(obj.get("urlToImage"))
        url.append(obj.get("url"))
        source.append(obj.get("sourceName"))
        publishedAt.append(obj.get("publishedAt"))

    except Exception as e:
        print(f"Error processing JSON object: {e}")


StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 14, Finished, Available, Finished)

## Converting list to DataFrame
Since we have now all the data, we can create dataframe.
- using zip() to combine your lists into tuples, and then passing that list of tuples along with a clearly defined StructType schema into spark.createDataFrame(). That’s a textbook approach for creating a Spark DataFrame from Python-native data.


In [13]:
from pyspark.sql.types import StructType, StructField,StringType

# Combine the data using Zip
data = list(zip(title,description, content, author, urlToImage, url, source, publishedAt))

# Define schema
schema = StructType([
    StructField("title",StringType(),True),
    StructField("description", StringType(),True),
    StructField("content", StringType(),True),
    StructField("author",StringType(), True),
    StructField("urlToImage", StringType(), True),
    StructField("url", StringType(), True),
    StructField("sourceName", StringType(), True),
    StructField("publishedAt", StringType(), True)

])


# Create the Dataframe
df_cleaned = spark.createDataFrame(data= data,schema = schema)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 15, Finished, Available, Finished)

In [14]:
display(df_cleaned.limit(5))
df_cleaned.count()

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ce9b42f4-caf8-411f-a025-6ce749c5eb3a)

84

## Transforming publishedAt column to Date format
Since the publishedAt column is in DataTime Format, we will be first transforming it into DateFormat

In [15]:
from pyspark.sql.functions import date_format, to_date

df_cleaned_final = df_cleaned.withColumn("publishedAt", date_format(to_date("publishedAt"), "dd-MM-yyyy"))

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 17, Finished, Available, Finished)

In [16]:
display(df_cleaned_final.limit(5))

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 408bf1ad-054a-4e9e-916e-0544f948c673)

## Writing the Final Dataframe to the Lakehouse DB in a Delta format
Now we have transformed the raw json file to a proper table structure

In [17]:
#df_cleaned_final.write.format("delta").saveAsTable("bing_lake_db.tbl_latest_news")

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 19, Finished, Available, Finished)

## Incremental Load (Type 1 Merge)
We will check the source and target URL because all the news article have unique URL
```
- <>  is referred to as Not equal to In Spark SQL
```

In [19]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = "bing_lake_db.tbl_latest_news"
    
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view

                    ON source_view.url = target_table.url

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.content <> target_table.content OR
                    source_view.author <> target_table.author OR
                    source_view.urlToImage <> target_table.urlToImage OR
                    source_view.sourceName <> target_table.sourceName OR
                    source_view.publishedAt <> target_table.publishedAt

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *
    """)

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 21, Finished, Available, Finished)

Table Already exists


In [20]:
%%sql

SELECT count(*) from tbl_latest_news

StatementMeta(, 4b551c10-3b1f-4441-8b4b-f8cf33c409d9, 22, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>