In [1]:
df = spark.read.option("multiline", "true").json("Files/bing_latest_news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing_latest_news.json".
display(df)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a19c7848-bfea-41d4-ba95-18e4af73f7f3)

In [2]:
from pyspark.sql.functions import explode
df_exploded = df.select(explode("organic_results").alias("result"))

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 4, Finished, Available, Finished)

In [3]:
display(df_exploded)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e0cde27a-3bf1-447e-a232-e22272cf26b4)

In [4]:
json_list=df_exploded.toJSON().collect()

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 6, Finished, Available, Finished)

In [5]:
print(json_list)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 7, Finished, Available, Finished)



In [6]:
import json
news_json=json.loads(json_list[0])
print(news_json)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 8, Finished, Available, Finished)

{'result': {'date': '2m', 'link': 'https://www.vulture.com/article/south-park-jimmy-kimmel-monoculture-censorship.html', 'position': 1, 'snippet': 'For a fleeting stretch, it felt as if we were all locked into the same monocultural conversation around two programs that, on ...', 'source': 'Vulture', 'thumbnail': 'https://www.bing.com/th?id=OVFT.ltes9OtmehHSeqLMY98-Ci&pid=News&w=234&h=132&c=14&rs=2&qlt=90', 'title': 'Crisis Is the Only Monoculture Now'}}


In [7]:
print(news_json['result']['source'])
print(news_json['result']['title'])
print(news_json['result']['link'])
print(news_json['result']['snippet'])
print(news_json['result']['date'])

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 9, Finished, Available, Finished)

Vulture
Crisis Is the Only Monoculture Now
https://www.vulture.com/article/south-park-jimmy-kimmel-monoculture-censorship.html
For a fleeting stretch, it felt as if we were all locked into the same monocultural conversation around two programs that, on ...
2m


In [8]:
source=[]
title=[]
link=[]
snippet=[]
date=[]

for result_str in json_list:
    try:
        article=json.loads(result_str)

        source.append(article['result']['source'])
        title.append(article['result']['title'])
        snippet.append(article['result']['snippet'])
        link.append(article['result']['link'])
        date.append(article['result']['date'])
    except Exception as e:
        print("Errorinf processing JSON object:{e}")

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 10, Finished, Available, Finished)

In [9]:
from pyspark.sql.types import StructType, StructField, StringType

# Sample data (make sure source, title, link, date are defined lists of equal length)
data = list(zip(source, title,snippet, link, date))

# Define schema with correct field types
schema = StructType([
    StructField("source", StringType(), True),
    StructField("title", StringType(), True),
    StructField("snippet", StringType(), True),
    StructField("link", StringType(), True),
    StructField("date", StringType(), True)
])

# Create DataFrame
df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 11, Finished, Available, Finished)

In [10]:
display(df_cleaned)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ea0c0c84-9b16-4eb5-a3d2-e5a356605cb1)

In [11]:
from pyspark.sql.functions import udf, to_timestamp, year, month, dayofmonth, hour, date_format
from pyspark.sql.types import StringType
import datetime

# UDF to convert relative time to formatted datetime string 'dd-MM-yyyy HH:mm:ss'
def simple_convert_date(rel_time):
    now = datetime.datetime.now()
    try:
        if rel_time.endswith('m'):
            dt = now - datetime.timedelta(minutes=int(rel_time[:-1]))
        elif rel_time.endswith('h'):
            dt = now - datetime.timedelta(hours=int(rel_time[:-1]))
        elif rel_time.endswith('d'):
            dt = now - datetime.timedelta(days=int(rel_time[:-1]))
        else:
            return None
        return dt.strftime('%d-%m-%Y %H:%M:%S')
    except:
        return None

# Register the UDF
convert_date_udf = udf(simple_convert_date, StringType())

# Apply UDF to convert relative time to formatted string
df_with_date = df_cleaned.withColumn("formatted_date", convert_date_udf("date"))

# Drop original 'date' column and rename 'formatted_date' to 'date'
df_with_date = df_with_date.drop("date").withColumnRenamed("formatted_date", "date")

# Convert the formatted string column to TimestampType
df_with_date = df_with_date.withColumn("date_ts", to_timestamp("date", "dd-MM-yyyy HH:mm:ss"))
display(df_with_date)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f77d6b07-bddd-4684-8838-9bce09de39f0)

In [12]:
from pyspark.sql.functions import minute, second

df_final = df_with_date \
    .withColumn("day", dayofmonth("date_ts")) \
    .withColumn("month", month("date_ts")) \
    .withColumn("year", year("date_ts")) \
     .withColumn("hour", hour("date_ts")) \
     .withColumn("weekday", date_format("date_ts", "EEEE"))

df_final = df_final.drop("date_ts")

display(df_final)


StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e2052cc5-8bb1-447e-8a22-c3c441194206)

In [13]:
from pyspark.sql.utils import AnalysisException

try:
    table_name='bing_lake_db.latest_new_tbl'
    df_final.write.format("delta").saveAsTable(table_name)
except:
    print("Table Already Exists!")
    df_final.createOrReplaceTempView("vw_df_final")

    df_sorted = spark.sql("""
    SELECT * FROM vw_df_final
    ORDER BY date DESC
    """)

    spark.sql(f"""
    MERGE INTO bing_lake_db.latest_new_tbl AS target_table
    USING vw_df_final AS source_view
    ON source_view.link = target_table.link
    WHEN MATCHED AND (
    source_view.source <> target_table.source OR
    source_view.title <> target_table.title OR
    source_view.snippet <> target_table.snippet OR
    source_view.link <> target_table.link OR
    source_view.date = target_table.date OR
    source_view.day <> target_table.day OR
    source_view.month <> target_table.month OR
    source_view.year <> target_table.year OR
    source_view.hour <> target_table.hour OR
    source_view.weekday <> target_table.weekday
    )

    THEN UPDATE SET * 
    
    WHEN NOT MATCHED THEN INSERT * 
    """)

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 15, Finished, Available, Finished)

Table Already Exists!


In [14]:
%%sql

SELECT count(*) from bing_lake_db.latest_new_tbl

StatementMeta(, 440d4b53-b6e9-4d1d-8bf0-28060cbb17e0, 16, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>