#### Read the JSON file as a Dataframe

In [5]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 7, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 7a07337a-25e4-4256-a943-0bfb5ea872b4)

#### Select just the news data column from the dataframe

In [6]:
df = df.select("organic_results")
display(df)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 8, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 9d9c3326-6945-42b1-a995-79232ddb85ca)

#### Explode the JSON column

In [7]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode(df["organic_results"]).alias("json_object"))
display(df_exploded)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 9, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 907f23f4-47ac-4da3-94af-6f65a4cd7668)

#### Converting the Exploded JSON DataFrame to a single JSON sting list

In [8]:
json_list = df_exploded.toJSON().collect() #collect as a list
print(json_list)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 10, Finished, Available, Finished, False)

['{"json_object":{"link":"https://www.msn.com/en-us/sports/nfl/top-6-trade-suitors-for-cowboys-wr-george-pickens-revealed/ar-AA1WmcBS?ocid=BingNewsBrowse","snippet":"The Dallas Cowboys have a big decision on their hands with George Pickens.While they would love to keep him around, they have to figure out the best path. Using the franchise tag would allow","source":"Dallas Cowboys On SI","thumbnail":"https://www.bing.com/th?id=ORMS.5c07e2b9fb153925a417474afe5294f7&pid=Wdp&w=300&h=200&c=14&rs=2&qlt=90","title":"Top 6 trade suitors for Cowboys WR George Pickens revealed"}}', '{"json_object":{"link":"https://www.msn.com/en-us/movies/news/mike-flanagans-scariest-film-the-thrilling-hush-unveiled/ar-AA1WaJE3?ocid=BingNewsBrowse","snippet":"Back in 2018, The Haunting of Hill House hit Netflix and quickly became one of the streaming service\'s biggest hits. It helped put director and creator Mike Flanagan on the map, letting the world","source":"ScreenRant","thumbnail":"https://www.bing.com/th?

In [9]:
print(json_list[0])

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 11, Finished, Available, Finished, False)

{"json_object":{"link":"https://www.msn.com/en-us/sports/nfl/top-6-trade-suitors-for-cowboys-wr-george-pickens-revealed/ar-AA1WmcBS?ocid=BingNewsBrowse","snippet":"The Dallas Cowboys have a big decision on their hands with George Pickens.While they would love to keep him around, they have to figure out the best path. Using the franchise tag would allow","source":"Dallas Cowboys On SI","thumbnail":"https://www.bing.com/th?id=ORMS.5c07e2b9fb153925a417474afe5294f7&pid=Wdp&w=300&h=200&c=14&rs=2&qlt=90","title":"Top 6 trade suitors for Cowboys WR George Pickens revealed"}}


In [10]:
import json
# convert into a dictoinary
news_json = json.loads(json_list[0])
print(news_json)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 12, Finished, Available, Finished, False)

{'json_object': {'link': 'https://www.msn.com/en-us/sports/nfl/top-6-trade-suitors-for-cowboys-wr-george-pickens-revealed/ar-AA1WmcBS?ocid=BingNewsBrowse', 'snippet': 'The Dallas Cowboys have a big decision on their hands with George Pickens.While they would love to keep him around, they have to figure out the best path. Using the franchise tag would allow', 'source': 'Dallas Cowboys On SI', 'thumbnail': 'https://www.bing.com/th?id=ORMS.5c07e2b9fb153925a417474afe5294f7&pid=Wdp&w=300&h=200&c=14&rs=2&qlt=90', 'title': 'Top 6 trade suitors for Cowboys WR George Pickens revealed'}}


In [11]:
print(news_json['json_object']['title'])
print(news_json['json_object']['link'])
print(news_json['json_object']['snippet'])
print(news_json['json_object']['source'])
print(news_json['json_object']['thumbnail'])

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 13, Finished, Available, Finished, False)

Top 6 trade suitors for Cowboys WR George Pickens revealed
https://www.msn.com/en-us/sports/nfl/top-6-trade-suitors-for-cowboys-wr-george-pickens-revealed/ar-AA1WmcBS?ocid=BingNewsBrowse
The Dallas Cowboys have a big decision on their hands with George Pickens.While they would love to keep him around, they have to figure out the best path. Using the franchise tag would allow
Dallas Cowboys On SI
https://www.bing.com/th?id=ORMS.5c07e2b9fb153925a417474afe5294f7&pid=Wdp&w=300&h=200&c=14&rs=2&qlt=90


#### Processing the JSON property to List

In [12]:
title = []
link = []
snippet = []
source = []
thumbnail = []

# Process each JSON object in the list
for json_str in json_list:
    try:

        # Parse the JSON string into a dictionary
        article = json.loads(json_str)

        if article["json_object"].get("snippet"):
            
            # Extract information from the dictiolnary
            title.append(article['json_object']['title'])
            link.append(article['json_object']['link'])
            snippet.append(article['json_object']['snippet'])
            source.append(article['json_object']['source'])
            thumbnail.append(article['json_object']['thumbnail'])
    
    except Exception as e:
        print(f"Error processing JSON object: {e}")




StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 14, Finished, Available, Finished, False)

#### Converting the List to a DataFrame

In [13]:
from pyspark.sql.types import StructType, StructField, StringType

# Combine the lists
data = list(zip(title, snippet, link, source, thumbnail))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("snippet", StringType(), True),
    StructField("link", StringType(), True),
    StructField("source", StringType(), True),
    StructField("thumbnail", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 15, Finished, Available, Finished, False)

In [14]:
display(df_cleaned.limit(5))

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 16, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 227f96b8-0781-43ee-ac19-e3fdc81c042e)

In [20]:
from pyspark.sql.functions import current_date

df_cleaned_final = (
    df_cleaned.withColumn("last_loaded_date", current_date())
)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 22, Finished, Available, Finished, False)

#### Writing the Final DataFrame to the Lakehouse DB in a Delta format

In [23]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.raw.tbl_latest_news'
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned source_view

                    ON source_view.link = target_table.link

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.snippet <> target_table.snippet OR
                    source_view.source <> target_table.source OR
                    source_view.thumbnail <> target_table.thumbnail 

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 25, Finished, Available, Finished, False)

Table Already Exists


In [17]:
#df_cleaned.write.format("delta").mode('overwrite').saveAsTable("raw.tbl_latest_news")

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 19, Finished, Available, Finished, False)

In [18]:
%%sql

select count(*) from bing_lake_db.raw.tbl_latest_news

StatementMeta(, 22720cf7-5d77-46ec-8927-7074ff3edd9e, 20, Finished, Available, Finished, False)

<Spark SQL result set with 1 rows and 1 fields>