In [2]:
# Import the data set
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 358172c1-3884-4305-b96c-84938126e1d6)

In [3]:
# We are only interested in the field value
df = df.select("value")
display(df)

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 17cb83e5-e734-4eea-bf9f-c479b0caec4c)

In [4]:
# Convert DF to JSON string
df_json = df.toJSON().collect()
print(df_json[0])

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 6, Finished, Available)

{"value":[{"about":[{"name":"Feijoada","readLink":"https://api.bing.microsoft.com/api/v7/entities/fae08105-9a1a-55f7-0888-48772b45127b"},{"name":"Pompeia","readLink":"https://api.bing.microsoft.com/api/v7/entities/d630420e-09a1-ec1d-2350-76a4e404b71d"}],"category":"Brazil","datePublished":"2024-06-02T17:57:00.0000000Z","description":"Quer se manter informado, ter acesso a mais de 60 colunistas e reportagens exclusivas?Assine o Estadão aqui! O inverno chegou, trazendo consigo os pratos típicos que aquecem o corpo e trazem uma sensa","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.ehMVmglb0Le2-jYcZ5Tc2y&pid=News","height":420,"width":700}},"mentions":[{"name":"São Paulo"},{"name":"O Estado de S. Paulo"},{"name":"Pará"}],"name":"3 padarias em São Paulo com os melhores caldos e sopas para o inverno","provider":[{"_type":"Organization","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=ODF.jvs1O4LPpypSJDRgmKOUTw&pid=news"}},"name":"Estadão on MSN.com"}],"url"

In [5]:
# First test on how parse the JSON
import json

# The JSON string
json_string = df_json[0]

# Parse the JSON string into a Python dictionary
data = json.loads(json_string)

# Looping through each item in the list associated with the 'value' key
for item in data['value']:
    # Checking if 'category' exists in the dictionary and printing it
    if 'category' not in item:
        print('null')
    else:
        # If 'category' is present, print its value
        print(item['category'])

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 7, Finished, Available)

Brazil


In [6]:
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

import json

# The JSON string
json_string = df_json[0]

# Parse the JSON string into a Python dictionary
data = json.loads(json_string)

# Looping through each item in the list associated with the 'value' key
for item in data['value']:
    try:
        # extract information from the dictionary
        title.append(item['name'])
        description.append(item['description'])
        url.append(item['url'])
        image.append(item['provider'][0]['image']['thumbnail']['contentUrl'])
        provider.append(item['provider'][0]['name'])
        datePublished.append(item['datePublished'])

                # Checking if 'category' exists in the dictionary and printing it
        if 'category' not in item:
            category.append('null')
        else:
            # If 'category' is present, print its value
           category.append(item['category'])

    except Exception as e:
        print(f"Error processing JSON object: {e}")


    

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 8, Finished, Available)

In [7]:
from pyspark.sql.types import StructType, StructField, StringType


# Combine the lists
data = list(zip(title,description,category,url,image,provider,datePublished))

# Define schema
schema = StructType([
    StructField('title', StringType(), True),
    StructField('description', StringType(), True),
    StructField('category', StringType(), True),
    StructField('url', StringType(), True),
    StructField('image', StringType(), True),
    StructField('provider', StringType(), True),
    StructField('datePublished', StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 9, Finished, Available)

In [8]:
display(df_cleaned)

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, 768d234c-3cdb-4fbb-b144-cba90b87334c)

In [9]:
from pyspark.sql.functions import to_timestamp, to_date

# First, parse the full timestamp including microseconds and timezone
df_cleaned_final = df_cleaned.withColumn("datePublished", to_timestamp("datePublished", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"))

# Now, convert the timestamp to just a date
df_cleaned_final = df_cleaned_final.withColumn("datePublished", to_date("datePublished"))

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 11, Finished, Available)

In [10]:
display(df_cleaned_final)

StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, c9a34ef8-8458-4e3b-8811-fcbbf450694b)

In [11]:
# Incremental load to final table
from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_lake_db.tbl_latest_news'

    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Existis")

    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")

    spark.sql(f""" Merge INTO {table_name} target_table
                   USING vw_df_cleaned_final source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND
                   source_view.title <> target_table.title OR
                   source_view.description <> target_table.description OR
                   source_view.category <> target_table.category OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished
                   
                   THEN UPDATE SET *

                   WHEN NOT MATCHED THEN INSERT * 
                   
                    """)



StatementMeta(, b8e63d6e-6189-47e6-8b34-bb3976ba8493, 13, Finished, Available)

Table Already Existis
