### **Read JSON File as Dataframe**

In [2]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 512d4d27-91b5-4839-aa7a-8f33a99a1c05)

In [3]:
df.select("value").printSchema()

StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 5, Finished, Available, Finished)

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- datePublished: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- image: struct (nullable = true)
 |    |    |    |-- isLicensed: boolean (nullable = true)
 |    |    |    |-- thumbnail: struct (nullable = true)
 |    |    |    |    |-- contentUrl: string (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- provider: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _type: string (nullable = true)
 |    |    |    |    |-- image: struct (nullable = true)
 |    |    |    |    |    |-- thumbnail: struct (nullable = true)
 |    |    |    |    |    |    |-- contentUrl: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    | 

### **Flattened and Formatted News Data**

In [4]:
from pyspark.sql.functions import explode, col, expr

# Explode the `value` array to get individual news items
df_flattened = df.select(explode(col("value")).alias("news"))

# Select and flatten the relevant fields
df_formatted = df_flattened.select(
    col("news.name").alias("title"),
    col("news.description").alias("description"),
    col("news.url").alias("url"),
    col("news.image.thumbnail.contentUrl").alias("image"),
    expr("news.provider.name[0]").alias("provider"),
    col("news.datePublished").alias("datePublished")
)

# Display the formatted DataFrame
display(df_formatted)


StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 10a64064-43f9-4354-8326-efc0a4224805)

### **Formatting Published Date**

In [14]:
from pyspark.sql.functions import col, date_format, to_date

# Overwrite the 'datePublished' column with the formatted date
df_formatted = df_formatted.withColumn(
    "datePublished",
    date_format(to_date(col("datePublished"), "yyyy-MM-dd"), "dd-MMM-yyyy")
)

# Display the DataFrame with the updated 'datePublished' column
display(df_formatted)

StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1cc5bdeb-0a34-497a-a3eb-56a558bda67f)

In [15]:
df_formatted = df_formatted.drop("formattedDate")
df_formatted.printSchema()

StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 17, Finished, Available, Finished)

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- provider: string (nullable = true)
 |-- datePublished: string (nullable = true)



### **Saving News Data To Lakehouse DB In Delta Format**

In data warehousing, Type 1 and Type 2 Slowly Changing Dimensions (SCD) are two methods used to handle changes in dimension data. Here’s a brief overview of each:

### Type 1: Overwrite
- **Description**: In Type 1 SCD, the old data is overwritten with new data. Historical data is not preserved.
- **Use Case**: This method is used when you do not need to track historical changes and only care about the current value of the dimension attributes.
- **Example**: If a customer’s address changes, the old address is replaced with the new one, and only the latest address is stored.

### Type 2: Add New Row
- **Description**: In Type 2 SCD, a new row is added to the dimension table each time a change occurs, and the historical data is preserved. This often involves adding a start date, end date, and a current flag to manage the versioning of records.
- **Use Case**: This method is used when you need to track historical changes and maintain a complete history of dimension attributes.
- **Example**: If a customer’s address changes, a new row is created with the new address, and the old row is marked as historical with an end date.

### Summary:
- **Type 1 SCD**: Overwrites old data; does not preserve history.
- **Type 2 SCD**: Adds new rows; preserves historical data by tracking changes with additional attributes.

In [16]:
from pyspark.sql.utils import AnalysisException

try:
    table_name = 'bing_lake_db.tbl_latest_news'
    df_formatted.write.format('delta').saveAsTable(table_name)

except AnalysisException:
    print('Table Already Exists')

    df_formatted.createOrReplaceTempView('view_df_formatted')

    spark.sql(f""" MERGE INTO {table_name} target_table
                   USING view_df_formatted source_view

                   ON source_view.url = target_table.url

                   WHEN MATCHED AND

                   source_view.title <> target_table.title OR
                   source_view.description <> target_table.description OR
                   source_view.image <> target_table.image OR
                   source_view.provider <> target_table.provider OR
                   source_view.datePublished <> target_table.datePublished 
                   
                   THEN UPDATE SET *

                   WHEN NOT MATCHED THEN INSERT *
    
    """)


StatementMeta(, cc574661-4362-4d7c-800a-51a2f9668e0a, 18, Finished, Available, Finished)