### 

### Extract

#### Importing functions and types 

In [1]:

from pyspark.sql.types import *
from pyspark.sql.functions import *

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 3, Finished, Available, Finished)

In [2]:
df= spark.read.json('Files/latest_news/bing-news-in-india.json')

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 4, Finished, Available, Finished)

### Transform

#### Exploding the Json column

In [3]:
explodeddf= df.select(explode(df["value"]).alias('jsonobject'))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 5, Finished, Available, Finished)

In [4]:
explodeddf.printSchema()

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 6, Finished, Available, Finished)

root
 |-- jsonobject: struct (nullable = true)
 |    |-- about: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- readLink: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- datePublished: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- image: struct (nullable = true)
 |    |    |-- thumbnail: struct (nullable = true)
 |    |    |    |-- contentUrl: string (nullable = true)
 |    |    |    |-- height: long (nullable = true)
 |    |    |    |-- width: long (nullable = true)
 |    |-- mentions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- provider: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _type: string (nullable = true)
 |    |    |    |-- i

#### Flattening Data Frame

In [5]:
flattened_df= explodeddf.select(col('jsonobject.name').alias('title'), \
                                col('jsonobject.description').alias('description'),  \
                                col('jsonobject.category').alias('category'),  \
                                col('jsonobject.image.thumbnail.contentUrl').alias('image'),   
                                col('jsonobject.url').alias('url'),  \
                                explode_outer(col('jsonobject.provider')).alias('provider'), \
                                col('jsonobject.datePublished').alias('datePublished'))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 7, Finished, Available, Finished)

In [6]:
display(flattened_df.limit(5))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2550836f-e135-4265-a563-0891356c7aa4)

In [7]:
flattened_df.describe()

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 9, Finished, Available, Finished)

DataFrame[summary: string, title: string, description: string, category: string, image: string, url: string, datePublished: string]

In [8]:
flattened_df.printSchema()

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 10, Finished, Available, Finished)

root
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- category: string (nullable = true)
 |-- image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- provider: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- image: struct (nullable = true)
 |    |    |-- thumbnail: struct (nullable = true)
 |    |    |    |-- contentUrl: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- datePublished: string (nullable = true)



In [9]:
df= flattened_df.select('*', \
               col('provider.name').alias('providername') ) \
            .drop(col('provider'))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 11, Finished, Available, Finished)

In [10]:
df = df.withColumn(
    'date',
    date_format(
        to_timestamp(col("datePublished"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"),
        "yyyy-MM-dd"
    )
).drop(col('datePublished'))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 12, Finished, Available, Finished)

In [11]:
df= df.withColumn('date',to_date(col('date'),'yyyy-MM-dd'))

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 13, Finished, Available, Finished)

In [14]:
df= df.na.drop()

StatementMeta(, d89dd7d6-fdb4-4208-bbec-9bdefde257dc, 16, Finished, Available, Finished)

### Load

In [None]:
# Incremental Load

from pyspark.sql.utils import AnalysisException

try:

    table_name = 'bing_data_lakehouse.Today_news_in_india'

    df.write.format("delta").saveAsTable(table_name)

except AnalysisException:

    print("Table Already Exists")

    df.createOrReplaceTempView("vw_df_final")
    spark.sql(f""" Merge into {table_name} t
                using vw_df_final s
                on t.url= s.url
                when matched and 
                    s.title <> t.title or
                    s.description <> t.description OR
                    s.category <> t.category
                then update set *
                when not matched then
                INSERT *""")

StatementMeta(, 148b9aba-616d-4140-966e-0207095f242a, -1, Cancelled, , Cancelled)