In [1]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 3, Finished, Available, Finished)

In [2]:
display(df)

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 17645e84-745f-4b47-a726-b88905179abb)

In [3]:
metadata_df = df.select("search_metadata")
display(metadata_df)

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 5, Finished, Available, Finished)

DataFrame[search_metadata: struct<created_at:string,html_url:string,id:string,json_url:string,parsing_time_taken:double,request_time_taken:double,request_url:string,status:string,total_time_taken:double>]


In [4]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode("organic_results").alias("news_results"))

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 6, Finished, Available, Finished)

In [5]:
display(df_exploded)

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3f32fca2-baa7-46b9-8f8d-c735adf37696)

In [12]:
news_results = df_exploded.toJSON().collect()

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 14, Finished, Available, Finished)

In [13]:
print(news_results[0])

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 15, Finished, Available, Finished)

{"news_results":{"date":"19m","link":"https://www.msn.com/en-us/sports/nba/nba-media-day-live-updates-latest-injury-news-roster-moves-for-2025-26-season/ar-AA1Nwk6e?ocid=BingNewsVerp","position":1,"snippet":"It's media day for 25 NBA teams on Monday ahead of the 2025-26 season. USA TODAY is tracking the latest news and developments around the league.","source":"USA TODAY on MSN","title":"NBA media day live updates: Latest injury news, roster moves for 2025-26 season"}}


In [31]:
from datetime import datetime

title = []
link = []
source = []
datePublished =[]
description = []
thumbnail = []

# Process each JSON object in the list
for json_str in news_results:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)
        article = article['news_results']

        if "thumbnail" in article.keys():
        
            #Extract information from the dictionary
            title.append(article["title"])
            link.append(article["link"])
            source.append(article["source"])
            datePublished.append(datetime.today().strftime('%Y-%m-%d'))
            description.append(article["snippet"])
            thumbnail.append(article["thumbnail"])

    except Exception as e:
        print(f"Error processing JSON object: {e}")

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 33, Finished, Available, Finished)

In [32]:
from pyspark.sql.types import StructType, StructField, StringType


# Combine the lists
data = list(zip(title, link, source, datePublished, description, thumbnail))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("link", StringType(), True),
    StructField("source", StringType(), True),
    StructField("datePublished", StringType(), True),
    StructField("description", StringType(), True),
    StructField("thumbnail", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 34, Finished, Available, Finished)

In [33]:
display(df_cleaned.limit(5))

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 35, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b04f53d5-57ec-4951-abf3-96fc73ed635e)

In [34]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MMM-yyyy"))

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 36, Finished, Available, Finished)

In [35]:
display(df_cleaned_final.limit(5))

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 37, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4cf9d7b0-9524-4eb7-88eb-d62bfdbd5597)

In [39]:
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'tfdeproject_fabric_lakehouse.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_cleaned_final source_view

                    ON source_view.url = target_table.url
                    
                    WHEN MATCHED AND 
                    source_view.title <> target_table.title OR
                    source_view.link <> target_table.link OR
                    source_view.source <> target_table.source OR
                    source_view.datePublished <> target_table.datePublished OR
                    source_view.description <> target_table.description OR
                    source_view.thumbnail <> target_table.thumbnail   

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 41, Finished, Available, Finished)

In [41]:
%%sql

select count(*) from tfdeproject_fabric_lakehouse.tbl_latest_news

StatementMeta(, eb1aef2d-c60d-4c00-9fc2-c0166d276c9e, 43, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>