In [1]:
#load data directly from the Lakehouse db 
df = spark.read.option("multiline", "true").json("Files/EnglishTechNews.json")
# df now is a Spark DataFrame containing JSON data from "Files/TechnologyNews.json".
display(df)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f7bb47e0-a238-4110-a712-85f3b9f9fb56)

In [2]:
#display only the news article
df_news = df.select("results")

display(df_news)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7a2fd376-a4cb-4841-b511-16a523fedb73)

In [3]:
#explore json to multiple row
from pyspark.sql.functions import explode

# Suppose 'json_field' is the field you want to explode
df_expload = df_news.select(explode(df["results"]).alias('json_field'))

display(df_expload)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 04d6b95e-8a90-491f-bb86-bc33335bb63a)

In [4]:
#convert json dataframe to a single json string list
json_list = df_expload.toJSON().collect()

#print the first value from the json string list 
print(json_list[1])

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 6, Finished, Available, Finished)

{"json_field":{"ai_org":"ONLY AVAILABLE IN CORPORATE PLANS","ai_region":"ONLY AVAILABLE IN CORPORATE PLANS","ai_tag":"ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS","article_id":"82f4e43d99956463a6a2ba7614341f5e","category":["business"],"content":"ONLY AVAILABLE IN PAID PLANS","country":["australia"],"creator":["James Pearson"],"description":"Radiopharm Theranostics has kicked off the first phase of clinical trials testing its “RAD 204” treatment aimed at beating the most common type of lung cancer after dosing its first patient in New South Wales.","image_url":"https://images.thewest.com.au/publication/C-15308144/6415295ecaab2098e3d6b0193a6c2e0ebe5b3b4c-16x9-x0y92w1440h810.jpg","language":"english","link":"https://thewest.com.au/business/bulls-n-bears/radiopharm-unveils-landmark-human-lung-cancer-trial-c-15308144","pubDate":"2024-07-10 07:01:25","sentiment":"ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS","sentiment_stats":"ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS",

In [5]:
#convert the json list to a json dictionary of the second news item for easy extraction
import json
news_json = json.loads(json_list[0])

print(news_json)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 7, Finished, Available, Finished)

{'json_field': {'ai_org': 'ONLY AVAILABLE IN CORPORATE PLANS', 'ai_region': 'ONLY AVAILABLE IN CORPORATE PLANS', 'ai_tag': 'ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS', 'article_id': '722f4feef92598fc4becb7a68369b995', 'category': ['business'], 'content': 'ONLY AVAILABLE IN PAID PLANS', 'country': ['united kingdom'], 'description': "TURKU, Finland, July 10, 2024 /PRNewswire/ -- As a part of Teleste's strategy execution, we have entered to an EMS-Partnership with Kyrel Oy from Finland. Through this new collaboration, Teleste aims to ensure continuous high quality, agility, and delivery reliability for our Public...", 'language': 'english', 'link': 'https://www.prnewswire.co.uk/news-releases/teleste-enters-to-an-ems-partnership-with-kyrel-oy-302193131.html', 'pubDate': '2024-07-10 07:02:00', 'sentiment': 'ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS', 'sentiment_stats': 'ONLY AVAILABLE IN PROFESSIONAL AND CORPORATE PLANS', 'source_icon': 'https://i.bytvi.com/domain_icons/pr

In [6]:
#get specific item from the json dictionary (description of the news)

print(news_json["json_field"]["country"])
print(news_json["json_field"]["description"])

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 8, Finished, Available, Finished)

['united kingdom']
TURKU, Finland, July 10, 2024 /PRNewswire/ -- As a part of Teleste's strategy execution, we have entered to an EMS-Partnership with Kyrel Oy from Finland. Through this new collaboration, Teleste aims to ensure continuous high quality, agility, and delivery reliability for our Public...


In [7]:
#Process json property to List
title = []
category = []
country = []
creator = []
description = []
keywords = []
language = []
link = []
pubDate = []

#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        #Extract information from the dictionary
        title.append(item["json_field"]["title"])
        category.append(item["json_field"]["category"])
        country.append(item["json_field"]["country"])
        creator.append(item["json_field"]["creator"])
        description.append(item["json_field"]["description"])
        keywords.append(item["json_field"]["keywords"])
        language.append(item["json_field"]["language"])
        link.append(item["json_field"]["link"])
        pubDate.append(item["json_field"]["pubDate"])
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 9, Finished, Available, Finished)

Error processing JSON field: 'creator'
Error processing JSON field: 'keywords'
Error processing JSON field: 'description'


In [8]:
#THE IF STATEMENT = Don't process any item whose details is not in the json list 
#with this, the Error above is mitigated

title = []
category = []
country = []
creator = []
description = []
keywords = []
language = []
link = []
pubDate = []

#Process each JSON object iun the list
for json_string in json_list:
    try:
        #parse the JSON string into a dictionary
        item = json.loads(json_string)

        if item["json_field"].get("category") and item["json_field"].get("title") and item["json_field"].get("country") and item["json_field"].get("creator") and item["json_field"].get("description") and item["json_field"].get("keywords") and item["json_field"].get("language") and item["json_field"].get("link") and item["json_field"].get("pubDate"):

            #Extract information from the dictionary
            title.append(item["json_field"]["title"])
            category.append(item["json_field"]["category"])
            country.append(item["json_field"]["country"])
            creator.append(item["json_field"]["creator"])
            description.append(item["json_field"]["description"])
            keywords.append(item["json_field"]["keywords"])
            language.append(item["json_field"]["language"])
            link.append(item["json_field"]["link"])
            pubDate.append(item["json_field"]["pubDate"])
    
    except Exception as e:
        print(f"Error processing JSON field: {e}")

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 10, Finished, Available, Finished)

In [9]:
#Convert List to a DataFrame
from pyspark.sql.types import StructType, StructField, StringType

#combine all list
News_Data = list(zip(title,category,country,description,keywords,language,link,pubDate))

#Define Schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("category", StringType(), True),
    StructField("country", StringType(), True),
    StructField("description", StringType(), True),
    StructField("keywords", StringType(), True),
    StructField("language", StringType(), True),
    StructField("link", StringType(), True),
    StructField("pubDate", StringType(), True),
])

#create DataFrame
df_clean = spark.createDataFrame(News_Data, schema=schema)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 11, Finished, Available, Finished)

In [10]:
display(df_clean)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 17bf5ed3-dd7f-49e8-b7e4-d1268eaccdd3)

In [11]:
from pyspark.sql.functions import initcap, regexp_replace


# clean category, country, keywords and language columns by removing the square bracket and begin with a capital letter
df_clean = df_clean.withColumn("category", initcap(regexp_replace("category", r"\[|\]", "")))
df_clean = df_clean.withColumn("country", initcap(regexp_replace("country", r"\[|\]", "")))
df_clean = df_clean.withColumn("keywords", initcap(regexp_replace("country", r"\[|\]", "")))
df_clean = df_clean.withColumn("language", initcap("language"))

display(df_clean)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2e1c174f-5486-454c-ad87-a1282239f3cc)

In [12]:
#Split pubDate into Date and Time columns 
from pyspark.sql.functions import col, date_format

df_clean = df_clean.withColumn("Date", date_format(col("pubDate"), "yyyy-MM-dd"))
df_clean = df_clean.withColumn("Time", date_format(col("pubDate"), "HH:mm:ss"))

display(df_clean)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b74b0da0-6106-4774-a1c6-06e35181d48f)

In [13]:
# Convert Date column to date data type 
from pyspark.sql.functions import to_date

df_clean = df_clean.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

display(df_clean)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5f3a11da-0fb1-40d9-90a2-900b030afc23)

In [14]:
#write table to Lake house db
#df_clean.write.format("delta").saveAsTable("News_lake_db.news_updated_data")

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 16, Finished, Available, Finished)

In [15]:
#Adopting TYPE 1 Match in Data Warehousing

'''a Type 1 match refers to a scenario where the new data iss appended to an existing data without duplicate. 
This is typically used when there is no need to keep a history of the data.
'''
from pyspark.sql.utils import AnalysisException

#Exception Handling
try:

    table_name = "News_lake_db.news_updated_data"
    df_clean.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print ("Table Already Exist")

    df_clean.createOrReplaceTempView("vw_df_clean")

    spark.sql(f"""  MERGE INTO {table_name} target_table
                    USING vw_df_clean source_view

                    ON source_view.link = target_table.link

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.category <> target_table.category OR
                    source_view.country <> target_table.country OR
                    source_view.description <> target_table.description OR
                    source_view.keywords <> target_table.keywords OR
                    source_view.language <> target_table.language OR
                    source_view.link <> target_table.link OR
                    source_view.pubDate <> target_table.pubDate OR
                    source_view.Date <> target_table.Date OR
                    source_view.Time <> target_table.Time
                    
                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT * 

                """)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 17, Finished, Available, Finished)

Table Already Exist


In [16]:
%%sql

SELECT COUNT(*) FROM News_lake_db.news_updated_data

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 18, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [17]:
#Show data in SQL, another method
sqldf = spark.sql("SELECT COUNT(*) FROM News_lake_db.news_updated_data LIMIT 1000")
display(sqldf)

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9b54d191-00ae-4e6e-b931-f8344d6707c9)

In [18]:
#Append to existing table in LakeHouse
#df_clean.write.format("delta").mode("append").insertInto("News_lake_db.news_latest_data")

StatementMeta(, ea725a34-411e-4b16-b2b6-235488526209, 20, Finished, Available, Finished)