In [12]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 41298692-226c-4607-ab23-de1b66ab35d3)

In [13]:
df = df.select('value')
display(df)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4dfb812b-951b-4d5a-a4a3-3ced49d40c55)

In [14]:
from pyspark.sql.functions import explode

df_exploded = df.select(explode(df['value']).alias('json_object'))

display(df_exploded)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7d56bad8-c31f-49ae-907e-207b7d70e27a)

In [15]:
json_list = df_exploded.toJSON().collect()

print(json_list)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 17, Finished, Available, Finished)

['{"json_object":{"about":[{"name":"Rosjanie","readLink":"https://api.bing.microsoft.com/api/v7/entities/bf145061-36ee-21a4-f79d-fb29ef1b77ff"},{"name":"Sumy","readLink":"https://api.bing.microsoft.com/api/v7/entities/7a5eb2e6-e493-088b-8af4-4dd09dc10bec"}],"datePublished":"2024-09-17T00:13:00.0000000Z","description":"Rosjanie rozpoczęli zmasowany atak z “Shahedami” na Sumy (atak ilustracyjny w sprawie miasta Sumy. O tej sprawie poinformowała służba prasowa OVA Sumy. „Wyjaśniane są skutki ataku, wszystkie służby pracują nad usunięciem skutków ataku” – głosi komunikat.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.EpbfqwQFf2mHfay2Vpozdi&pid=News","height":375,"width":600}},"name":"Rosjanie rozpoczęli zmasowany atak „Shahedami” na Sumy","provider":[{"_type":"Organization","name":"geopolityka.org"}],"url":"https://geopolityka.org/20240917/rosjanie-rozpoczeli-zmasowany-atak-shahedami-na-sumy/"}}', '{"json_object":{"datePublished":"2024-09-16T03:42:00.0000000Z","descr

In [25]:
print(json_list[0])

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 27, Finished, Available, Finished)

{"json_object":{"about":[{"name":"Rosjanie","readLink":"https://api.bing.microsoft.com/api/v7/entities/bf145061-36ee-21a4-f79d-fb29ef1b77ff"},{"name":"Sumy","readLink":"https://api.bing.microsoft.com/api/v7/entities/7a5eb2e6-e493-088b-8af4-4dd09dc10bec"}],"datePublished":"2024-09-17T00:13:00.0000000Z","description":"Rosjanie rozpoczęli zmasowany atak z “Shahedami” na Sumy (atak ilustracyjny w sprawie miasta Sumy. O tej sprawie poinformowała służba prasowa OVA Sumy. „Wyjaśniane są skutki ataku, wszystkie służby pracują nad usunięciem skutków ataku” – głosi komunikat.","image":{"thumbnail":{"contentUrl":"https://www.bing.com/th?id=OVFT.EpbfqwQFf2mHfay2Vpozdi&pid=News","height":375,"width":600}},"name":"Rosjanie rozpoczęli zmasowany atak „Shahedami” na Sumy","provider":[{"_type":"Organization","name":"geopolityka.org"}],"url":"https://geopolityka.org/20240917/rosjanie-rozpoczeli-zmasowany-atak-shahedami-na-sumy/"}}


In [16]:
import json

title = []
description = []
image = []
url = []
provider = []
datePublished = []

for article in json_list:
    try:
        article = json.loads(article)
        if article['json_object'].get('image'):
            title.append(article['json_object']['name'])
            description.append(article['json_object']['description'])
            image.append(article['json_object']['image']['thumbnail']['contentUrl'])
            url.append(article['json_object']['url'])
            provider.append(article['json_object']['provider'])
            datePublished.append(article['json_object']['datePublished'])
    except Exception as e:
        print(f"Error occurred: {e}")



StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 18, Finished, Available, Finished)

In [17]:
from pyspark.sql.types import StringType, StructField, StructType

data = list(zip(title, description, image, url, provider, datePublished))

schema = StructType([
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("image", StringType(), True),
    StructField("url", StringType(), True),
    StructField("provider", StringType(), True),
    StructField("datePublished", StringType(), True),
])

df_cleaned = spark.createDataFrame(data, schema=schema)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 19, Finished, Available, Finished)

In [18]:
display(df_cleaned)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1857476d-e474-4f7e-953a-dbb687f3239f)

In [19]:
from pyspark.sql.functions import to_date, date_format

df_cleaned_final = df_cleaned.withColumn('datePublished', date_format(to_date(df_cleaned.datePublished), 'dd-MMM-yyyy'))

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 21, Finished, Available, Finished)

In [23]:
from pyspark.sql.utils import AnalysisException

try:
    table = 'bing_lake_database.latest_news' #TODO

    df_cleaned_final.write.format("delta").saveAsTable(table)

except AnalysisException:
    print('Table already exists.')

    df_cleaned_final.createOrReplaceTempView('temp_latest_news')

    spark.sql(f"""  MERGE INTO {table} target_table
                    USING temp_latest_news source_view

                    ON source_view.url = target_table.url

                    WHEN MATCHED AND
                    source_view.title <> target_table.title OR
                    source_view.description <> target_table.description OR
                    source_view.image <> target_table.image OR
                    source_view.url <> target_table.url OR
                    source_view.provider <> target_table.provider OR
                    source_view.datePublished <> target_table.datePublished

                    THEN UPDATE SET *

                    WHEN NOT MATCHED THEN INSERT *

                """)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 25, Finished, Available, Finished)

Table already exists.


In [24]:
df = spark.sql("SELECT * FROM bing_lake_database.latest_news LIMIT 1000")
display(df)

StatementMeta(, 6d33f561-1a65-4558-aeab-a8d586964bfd, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 816eb652-b810-4057-a712-cc25bd807ec4)

In [None]:
df = spark.read.option("multiline", "true").json("Files/bing-latest-news.json")
# df now is a Spark DataFrame containing JSON data from "Files/bing-latest-news.json".
display(df)