**In this data ingestion pipeline, I used the Bing API as the source to fetch real-time data. The API was leveraged to gather structured information such as news articles, search results, and metadata. The data retrieved from the Bing API was in a JSON format.**

**To orchestrate and automate the ingestion process, I utilized Azure Data Factory (ADF). Azure Data Factory allowed me to set up a scalable, automated data pipeline that periodically calls the Bing API, extracts the data, and ingests it into a designated storage location within Azure.**

#### Load the raw data from storage account

In [23]:
%%pyspark
df = spark.read.option('multiline','true').load('abfss://bing-data@bingdatastorage.dfs.core.windows.net/raw-data/bing-raw.json', format='json')
# display(df.limit(10))

StatementMeta(bingtest, 1, 24, Finished, Available, Finished)

#### get the value column where all the data resides

In [24]:
df=df.select('value')
# display(df)

StatementMeta(bingtest, 1, 25, Finished, Available, Finished)

#### explode the value column

In [25]:
from pyspark.sql.functions import explode

df_exploded=df.select(explode(df['value']).alias('row1'))
# display(df_exploded)

StatementMeta(bingtest, 1, 26, Finished, Available, Finished)

#### load all the data in a List

In [26]:
import json
json_list=df_exploded.toJSON().collect()

StatementMeta(bingtest, 1, 27, Finished, Available, Finished)

#### Create list for each required columns

In [27]:
title=[]
description=[]
datePublished=[]
category=[]
url=[]
image=[]
provider=[]

for js in json_list:

    try:
        new_json=json.loads(js)
        row = new_json.get('row1', {})

        title.append(row.get('name', None))
        description.append(row.get('description', None))
        category.append(row.get('category', None))
        url.append(row.get('url', None))

        # For nested structures, use get() safely at each level
        image.append(row.get('image', {}).get('thumbnail', {}).get('contentUrl', None))
        provider.append(row.get('provider', [{}])[0].get('name', None))  # Handle missing provider array
        datePublished.append(row.get('datePublished', None))

    except Exception as e:
        print(f"Error  proceesing  json object: {e}")

StatementMeta(bingtest, 1, 28, Finished, Available, Finished)

#### Create a Dataframe using data ans schema

In [28]:
from pyspark.sql.types import StringType,StructType,StructField
data=list(zip(title,description,category,url,image,provider,datePublished))

schema=StructType([
    StructField("title",StringType(),True),
    StructField("description",StringType(),True),
    StructField("category",StringType(),True),
    StructField("url",StringType(),True),
    StructField("image",StringType(),True),
    StructField("provider",StringType(),True),
    StructField("datePublished",StringType(),True)
    
    ])

df_cleaned=spark.createDataFrame(data,schema)
display(df_cleaned)

StatementMeta(bingtest, 1, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 63f0bbf0-81fc-4304-b917-f052697e2c40)

In [29]:
from pyspark.sql.functions import date_format,to_date

df_cleaned_final=df_cleaned.withColumn("datePublished",date_format(to_date('datePublished'),"dd-MM-yyyy"))
display(df_cleaned_final)

StatementMeta(bingtest, 1, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0b628fe3-8170-4bd0-b7c5-6716302c926a)

#### Incremental Load of the data in a table

In [35]:
from pyspark.sql.utils import AnalysisException

try:
    table_name='default.bing_cleaned'
    df_cleaned_final.write.format('delta').saveAsTable(table_name)
except AnalysisException:
    print("table is already created")

    df_cleaned_final.createOrReplaceTempView('temp')

    spark.sql(f"""merge into {table_name} target_table
                    using temp source_view
                    on  source_view.url=target_table.url

                    when matched and
                    source_view.title<>target_table.title OR
                    source_view.description<>target_table.description OR
                    source_view.provider<>target_table.provider OR
                    source_view.image<>target_table.image OR
                    source_view.category<>target_table.category OR
                    source_view.datePublished<>target_table.datePublished
                    Then update set *

                    when not matched then insert *
    """)

StatementMeta(bingtest, 1, 36, Finished, Available, Finished)

table is already created


In [36]:
%%sql
select count(*) from default.bing_cleaned

StatementMeta(bingtest, 1, 37, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>