In [0]:
from pyspark.sql.types import *

schema = StructType([
    StructField("EnqueuedTimeUtc", StringType(), True),
    StructField("Properties", StructType([
        StructField("appTopic", StringType(), True),
        StructField("relatedGroupId", StringType(), True)
    ])),
    StructField("SystemProperties", StructType([
        StructField("connectionDeviceId", StringType(), True),
        StructField("connectionAuthMethod", StringType(), True),
        StructField("connectionDeviceGenerationId", StringType(), True),
        StructField("contentType", StringType(), True),
        StructField("contentEncoding", StringType(), True),
        StructField("enqueuedTime", StringType(), True)
    ])),
    StructField("Body", StructType([
        StructField("type", StringType(), True),
        StructField("mnsn", StringType(), True),
        StructField("timestamp", LongType(), True),
        StructField("scheduleName", StringType(), True),
        StructField("smsProperties", StructType([
            StructField("service", StructType([
                StructField("print", StructType([
                    StructField("mediaPathList", ArrayType(StructType([
                        StructField("id", StringType(), True),
                        StructField("type", StringType(), True)
                    ])), True),
                    StructField("channelList", ArrayType(StructType([
                        StructField("id", StringType(), True),
                        StructField("descriptionLanguageDefault", StringType(), True),
                        StructField("mediaPathDefault", StringType(), True)
                    ])), True),
                    StructField("descriptionLanguageList", ArrayType(StructType([
                        StructField("id", StringType(), True),
                        StructField("family", StringType(), True),
                        StructField("version", StringType(), True),
                        StructField("level", StringType(), True),
                        StructField("description", StringType(), True),
                        StructField("orientationDefault", StringType(), True),
                        StructField("langVersion", StringType(), True)
                    ])), True)
                ]))
            ]))
        ])),
        StructField("interface", StructType([
            StructField("ethernetList", ArrayType(StructType([
                StructField("id", StringType(), True),
                StructField("type", StringType(), True),
                StructField("address", StringType(), True)
            ])), True),
            StructField("ipList", ArrayType(StructType([
                StructField("address", StringType(), True),
                StructField("defaultRoute", StringType(), True),
                StructField("subnetMask", StringType(), True),
                StructField("ethernetId", StringType(), True)
            ])), True)
        ])),
        StructField("device", StructType([
            StructField("memory", StructType([
                StructField("size", StringType(), True)
            ])),
            StructField("deviceEntryList", ArrayType(StructType([
                StructField("type", StringType(), True),
                StructField("description", StringType(), True),
                StructField("status", StringType(), True),
                StructField("errors", StringType(), True),
                StructField("deviceID", StringType(), True),
                StructField("deviceIndex", StringType(), True)
            ])), True),
            StructField("configuration", StructType([
                StructField("duplexModule", StringType(), True),
                StructField("installedOptions", StructType([
                    StructField("value", StringType(), True),
                    StructField("dsk", StringType(), True)
                ]))
            ]))
        ]))
    ]))
])


In [0]:
json_file_path = "dbfs:/FileStore/tables/New6.json"

In [0]:
df=spark.read.format('json').schema(schema).load(json_file_path)

In [0]:
df.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{type, mn=TVgwMDA...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{a0ba43aa-f3c1-42...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{5b98aa47-2d23-4b...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{a0ba43aa-f3c1-42...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{5b98aa47-2d23-4b...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e

In [0]:
df.select('Body.scheduleName').distinct().show()

+--------------+
|  scheduleName|
+--------------+
|  deviceStatus|
|   basicUpdate|
| counterUpdate|
|suppliesUpdate|
+--------------+



In [0]:
df1=df.filter(df['Body.scheduleName']=='basicUpdate')
df1.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{type, mn=TVgwMDA...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
+--------------------+--------------------+--------------------+--------------------+



In [0]:
from pyspark.sql.window import Window
window=Window.partitionBy('Body.mnsn').orderBy('Body.timestamp')


In [0]:
import pyspark.sql.functions as F
df1=df1.withColumn('row_number',F.row_number().over(window))
df1.show()

+--------------------+--------------------+--------------------+--------------------+----------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|row_number|
+--------------------+--------------------+--------------------+--------------------+----------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{type, mn=TVgwMDA...|         1|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|         1|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|         1|
+--------------------+--------------------+--------------------+--------------------+----------+



In [0]:
df_final=df1.filter(df1['row_number']==1)

In [0]:
df_final=df_final.drop('row_number')
df_final.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{type, mn=TVgwMDA...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
+--------------------+--------------------+--------------------+--------------------+



In [0]:
delta_table_path='dbfs:/FileStore/tables/basicupdate'

In [0]:
from delta.tables import DeltaTable
if not DeltaTable.isDeltaTable(spark,delta_table_path):
    df_final.write.format('delta').mode('overwrite').save(delta_table_path)
else:
    deltatable=DeltaTable.forPath(spark,delta_table_path)

    deltatable.alias('old').merge(
        df_final.alias('new'),
        'old.Body.mnsn=new.Body.mnsn'
    ).whenMatchedUpdate(
        condition='new.Body.timestamp>old.Body.timestamp',
        set={'old.Body': 'new.Body','old.EnqueuedTimeUtc': 'new.EnqueuedTimeUtc','old.Properties': 'new.Properties','old.SystemProperties': 'new.SystemProperties'}
    ).whenNotMatchedInsertAll().execute()

In [0]:
# deltatable.alias('old').merge(
#         df_final.alias('new'),
#         'old.Body.mnsn = new.Body.mnsn'
#     ).whenMatchedUpdate(
#         condition=(
#             "new.Body.timestamp > old.Body.timestamp AND new.Body IS NOT NULL AND new.EnqueuedTimeUtc IS NOT NULL AND new.Properties IS NOT NULL AND new.SystemProperties IS NOT NULL"
#         ),
#         set={
#             "old.Body": col("new.Body"),
#             "old.EnqueuedTimeUtc": col("new.EnqueuedTimeUtc"),
#             "old.Properties": col("new.Properties"),
#             "old.SystemProperties": col("new.SystemProperties")
#         }
#     ).whenNotMatchedInsertAll().execute()

In [0]:
#The coalesce() function in PySpark returns the first non-null value from the given columns

In [0]:
# deltatable.alias('old').merge(
#     df_final.alias('new'),
#     'old.Body.mnsn = new.Body.mnsn'
# ).whenMatchedUpdate(
#     condition="new.Body.timestamp > old.Body.timestamp",
#     set={
#         "old.Body": coalesce(col("new.Body"), col("old.Body")), 
#         "old.EnqueuedTimeUtc": coalesce(col("new.EnqueuedTimeUtc"), col("old.EnqueuedTimeUtc")),
#         "old.Properties": coalesce(col("new.Properties"), col("old.Properties")),
#         "old.SystemProperties": coalesce(col("new.SystemProperties"), col("old.SystemProperties"))
#     }
# ).execute()