# JSON file as Dataframe

In [1]:
df = spark.read.option("multiline", "true").json("abfss://Portfolio@onelake.dfs.fabric.microsoft.com/Scottish_Housing_Project.Lakehouse/Files/latest_news.json")
# df now is a Spark DataFrame containing JSON data from "abfss://Portfolio@onelake.dfs.fabric.microsoft.com/Scottish_Housing_Project.Lakehouse/Files/latest_news.json".
display(df)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, f5cd6f32-8b0e-4577-a04b-674c7232771f)

# Selecting just the value column from

In [2]:
df = df.select("value")

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 4, Finished, Available)

In [3]:
display(df)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2425b37b-bf49-4b22-b7e5-09e594b5875b)

# Explode the JSON column

In [4]:
from pyspark.sql.functions import explode
df_explode = df.select(explode(df["value"]).alias("json_object"))

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 6, Finished, Available)

In [5]:
display(df_explode)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, f3e1e536-5710-438f-a6ee-d9513a7672c9)

# Converting the Exploded JSON DataFrame to a single JSON string list

In [6]:
df_json = df_explode.toJSON().collect()

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 8, Finished, Available)

## Testing the JSON string list

In [7]:
print(df_json[0])

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 9, Finished, Available)

{"json_object":{"about":[{"name":"Scottish Government","readLink":"https://api.bing.microsoft.com/api/v7/entities/955e8a7c-d981-037b-3759-28686357dc34"},{"name":"Shirley-Anne Somerville","readLink":"https://api.bing.microsoft.com/api/v7/entities/060de6ca-211e-8d6b-a972-2e769714efa1"},{"name":"Scotland","readLink":"https://api.bing.microsoft.com/api/v7/entities/a0377d96-1a18-f843-65ad-adcbc4acdc69"},{"name":"Brexit","readLink":"https://api.bing.microsoft.com/api/v7/entities/8c1fb9a7-8c19-2716-3798-948a537641f0"},{"name":"Holyrood, Kansas","readLink":"https://api.bing.microsoft.com/api/v7/entities/69bdb99a-25b5-4203-91f0-ad59434ea80b"},{"name":"Scottish National Party","readLink":"https://api.bing.microsoft.com/api/v7/entities/73330ccf-2058-8665-ac51-06760ab18194"},{"name":"Edinburgh","readLink":"https://api.bing.microsoft.com/api/v7/entities/286af946-edea-5f33-df53-4164821c69da"},{"name":"Glasgow","readLink":"https://api.bing.microsoft.com/api/v7/entities/da2548ee-1b26-f939-06b4-2fae57e

## Converting the JSON list to JSON Dictionary

In [8]:
import json

news_json = json.loads(df_json[0])

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 10, Finished, Available)

In [9]:
print(news_json)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 11, Finished, Available)

{'json_object': {'about': [{'name': 'Scottish Government', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/955e8a7c-d981-037b-3759-28686357dc34'}, {'name': 'Shirley-Anne Somerville', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/060de6ca-211e-8d6b-a972-2e769714efa1'}, {'name': 'Scotland', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/a0377d96-1a18-f843-65ad-adcbc4acdc69'}, {'name': 'Brexit', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/8c1fb9a7-8c19-2716-3798-948a537641f0'}, {'name': 'Holyrood, Kansas', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/69bdb99a-25b5-4203-91f0-ad59434ea80b'}, {'name': 'Scottish National Party', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/73330ccf-2058-8665-ac51-06760ab18194'}, {'name': 'Edinburgh', 'readLink': 'https://api.bing.microsoft.com/api/v7/entities/286af946-edea-5f33-df53-4164821c69da'}, {'name': 'Glasgow', 'readLink': 'https://api.bing.microsoft.com/api/v7/entitie

In [10]:
#print(news_json['json_object']['name'])
#print(news_json['json_object']['description'])
#print(news_json['json_object']['category'])
#print(news_json['json_object']['url'])
#print(news_json['json_object']['image']['thumbnail']['contentUrl'])
#print(news_json['json_object']['provider'][0]['name'])
#print(news_json['json_object']['datePublished'])

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 12, Finished, Available)

In [11]:
#name
#description
#category
#image
#url
#provider
#datePublished

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 13, Finished, Available)

# Processing the JSON property to list

In [12]:

# Initialize empty lists to hold extracted information
title = []
description = []
category = []
url = []
image = []
provider = []
datePublished = []

# Process each JSON object in the list
for json_str in df_json:
    try:
        # Parse the JSON string into a dictionary
        article = json.loads(json_str)
        
        # Check if 'category' and 'image.thumbnail.contentUrl' exist
        if article["json_object"].get("category") and article["json_object"].get("image", {}).get("thumbnail", {}).get("contentUrl"):
            # Extract information from the dictionary
            title.append(article["json_object"]["name"])
            description.append(article["json_object"]["description"])
            category.append(article["json_object"]["category"])
            url.append(article["json_object"]["url"])
            image.append(article["json_object"]["image"]["thumbnail"]["contentUrl"])
            provider.append(article["json_object"]["provider"][0]["name"])
            datePublished.append(article["json_object"]["datePublished"])
        
    except Exception as e:
        print(f"Error processing JSON object: {e}")

# Print the extracted information (optional)
#print("Titles:", title)
#print("Descriptions:", description)
#print("Categories:", category)
#print("URLs:", url)
#print("Images:", image)
#print("Providers:", provider)
#print("Date Published:", datePublished)


StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 14, Finished, Available)

In [13]:
#title

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 15, Finished, Available)

#

# Converting the List to a DataFrame

In [14]:
from pyspark.sql.types import StructType, StructField, StringType


# Combine the lists
data = list(zip(title, description, category, url, image, provider, datePublished))

# Define schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("category", StringType(), True),
    StructField("url", StringType(), True),
    StructField("image", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

# Show DataFrame
df_cleaned.show()


StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 16, Finished, Available)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               title|         description|            category|                 url|               image|            provider|       datePublished|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Scottish governme...|Scottish Labour s...|            Politics|https://www.bdonl...|https://www.bing....|      bdonline.co.uk|2024-05-16T03:45:...|
|Scottish governme...|Edinburgh and Gla...|            Politics|https://www.bdonl...|https://www.bing....|      bdonline.co.uk|2024-05-16T07:07:...|
|Scotland’s first ...|Funded by over 5,...|            Business|https://www.scott...|https://www.bing....| scottishhousingnews|2024-05-16T09:06:...|
|Scottish govt dec...|The Scottish gove...|            Politics|https://www.mortg...|https://www.bing....|

In [15]:
display(df_cleaned)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, e3c26dd1-3633-4f5c-8e9e-6786d430f275)

# Processing the Date Column

In [16]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn("datePublished", date_format(to_date("datePublished"), "dd-MM-yyyy"))

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 18, Finished, Available)

In [17]:
display(df_cleaned_final)

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8652445c-6dcb-4c20-bcc5-de3ee2308936)

# Writting the Final DataFrame to the Lakehouse DB in a Delta format

In [21]:
#df_cleaned_final.write.format("delta").saveAsTable("Scottish_Housing_Project.tbl_latest_housing_news")

StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 23, Finished, Available)

AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `Scottish_Housing_Project`.`tbl_latest_housing_news` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.

In [22]:
from pyspark.sql.utils import AnalysisException


try:
    # Define table name
    table_name = 'Scottish_Housing_Project.tbl_latest_housing_news'
    # Try to write the DataFrame to a Delta table
    df_cleaned_final.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table already exists")

    # Create a temporary view for the DataFrame
    df_cleaned_final.createOrReplaceTempView("vw_df_cleaned_final")
    
    # Perform merge operation using SQL
    spark.sql(f"""
        MERGE INTO {table_name} AS target_table
        USING vw_df_cleaned_final AS source_view
        ON source_view.url = target_table.url
        WHEN MATCHED AND (
            source_view.title <> target_table.title OR
            source_view.description <> target_table.description OR
            source_view.category <> target_table.category OR
            source_view.image <> target_table.image OR
            source_view.provider <> target_table.provider OR
            source_view.datePublished <> target_table.datePublished
        )
        THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)


StatementMeta(, 51622653-c215-435a-b10d-d9f88d98ec66, 24, Finished, Available)

Table already exists
