In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [3]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("json-3") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
path='C:/Users/acer/Downloads/2024-7-20-15-26-1.json'
df=spark.read.format("json").load(path)
df.show()

+--------------------+--------------------+--------------------+--------------------+
|                Body|     EnqueuedTimeUtc|          Properties|    SystemProperties|
+--------------------+--------------------+--------------------+--------------------+
|{mn=TVgwMDAx:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAx:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAx:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAx:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAy:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAy:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAy:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAy:sn=M...|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{{"scope":"device...|
|{mn=TVgwMDAz:sn=M...|2024-08-20T09:56:...|{rp.mib/dev

In [5]:
from pyspark.sql.types import *

schema = StructType([
    StructField("EnqueuedTimeUtc", StringType(), True),
    StructField("Properties", StructType([
        StructField("appTopic", StringType(), True)
    ]), True),
    StructField("SystemProperties", StructType([
        StructField("connectionDeviceId", StringType(), True),
        StructField("connectionAuthMethod", StringType(), True),
        StructField("connectionDeviceGenerationId", StringType(), True),
        StructField("contentType", StringType(), True),
        StructField("contentEncoding", StringType(), True),
        StructField("enqueuedTime", StringType(), True)
    ]), True),
    StructField("Body", StructType([
        StructField("type", StringType(), True),
        StructField("mnsn", StringType(), True),
        StructField("timestamp", LongType(), True),
        StructField("scheduleName", StringType(), True),
    ]), True)
])

In [6]:
schema_df2 = spark.read.schema(schema).json(path)

In [7]:
schema_df2.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{a0ba43aa-f3c1-42...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{5b98aa47-2d23-4b...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{a0ba43aa-f3c1-42...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{5b98aa47-2d23-4b...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e

In [8]:
schema_df2.printSchema()

root
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- Properties: struct (nullable = true)
 |    |-- appTopic: string (nullable = true)
 |-- SystemProperties: struct (nullable = true)
 |    |-- connectionDeviceId: string (nullable = true)
 |    |-- connectionAuthMethod: string (nullable = true)
 |    |-- connectionDeviceGenerationId: string (nullable = true)
 |    |-- contentType: string (nullable = true)
 |    |-- contentEncoding: string (nullable = true)
 |    |-- enqueuedTime: string (nullable = true)
 |-- Body: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- mnsn: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- scheduleName: string (nullable = true)



In [9]:
schema_df2.select("Body.scheduleName").show()

+--------------+
|  scheduleName|
+--------------+
|   basicUpdate|
| counterUpdate|
|  deviceStatus|
|suppliesUpdate|
|   basicUpdate|
| counterUpdate|
|  deviceStatus|
|suppliesUpdate|
|   basicUpdate|
| counterUpdate|
|  deviceStatus|
|suppliesUpdate|
|   basicUpdate|
| counterUpdate|
|  deviceStatus|
|suppliesUpdate|
|   basicUpdate|
| counterUpdate|
|  deviceStatus|
|suppliesUpdate|
+--------------+
only showing top 20 rows



In [10]:
schedule_names = [row["scheduleName"] for row in schema_df2.select("Body.scheduleName").distinct().collect()]
schedule_names

['deviceStatus', 'basicUpdate', 'counterUpdate', 'suppliesUpdate']

In [11]:
from delta.tables import DeltaTable
from pyspark.sql.functions import *

In [12]:
schedule_df = schema_df2.filter(col("Body.scheduleName") == 'basicUpdate')
schedule_df.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e

In [13]:
latest_data=schedule_df.sort(schedule_df.Body.timestamp.desc()).limit(1)
latest_data.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
+--------------------+--------------------+--------------------+--------------------+



In [14]:
remaining_data=schedule_df.filter(~(schedule_df['Body.timestamp']==1724147813655))
remaining_data.show()
remaining_data.count()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e

499

In [15]:
table_path=f"./message/basicUpdate4"

In [16]:
latest_data.write.format('delta').mode('overwrite').save(table_path)

In [17]:
from delta.tables import *

In [18]:
dt=DeltaTable.forPath(spark, table_path)
check=dt.toDF()
check.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
+--------------------+--------------------+--------------------+--------------------+



In [20]:
cond = (F.col("t.Body.mnsn") == F.col("s.Body.mnsn"))
dt.alias("t").merge(remaining_data.alias("s"),cond).whenMatchedUpdate(
condition=F.col("s.Body.timestamp") > F.col("t.Body.timestamp"),
set={
    "Body":"s.Body",
    "EnqueuedTimeUtc":"s.EnqueuedTimeUtc",
    "Properties":"s.Properties",
    "SystemProperties":"s.SystemProperties"  
}).whenNotMatchedInsert(
values={
        "EnqueuedTimeUtc": "s.EnqueuedTimeUtc",
        "Properties": "s.Properties",
        "SystemProperties":"s.SystemProperties",
        "Body":"s.Body"
    }
).execute()

In [21]:
df1=spark.read.format("delta").load("./message/basicUpdate4")
df1.show()

+--------------------+--------------------+--------------------+--------------------+
|     EnqueuedTimeUtc|          Properties|    SystemProperties|                Body|
+--------------------+--------------------+--------------------+--------------------+
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e0b7-4e...|{smsProps, mn=TVg...|
|2024-08-20T09:56:...|{rp.mib/dev/96d1f...|{96d1f469-e

In [23]:
df1.select('Body.mnsn').distinct().count()

500

In [24]:
df1.filter((df1['Body.mnsn']=="mn=TVgwMDAx:sn=MDUxMTMxMzAwMDE=") | (df1['Body.mnsn']=="mn=TVgwMDA0:sn=MDUxMTMxMzAwMDQ=")).select('Body').show(truncate=False)

+-----------------------------------------------------------------------+
|Body                                                                   |
+-----------------------------------------------------------------------+
|{smsProps, mn=TVgwMDAx:sn=MDUxMTMxMzAwMDE=, 1724147812588, basicUpdate}|
|{smsProps, mn=TVgwMDA0:sn=MDUxMTMxMzAwMDQ=, 1724147812612, basicUpdate}|
+-----------------------------------------------------------------------+




# New Data

In [31]:
#ONE new value and 2 existing values with different timestamp 
path3='C:/Users/acer/projects/json-2.json'
new_data=spark.read.schema(schema).json(path3)
new_data.show(truncate=False)

+------------------------+--------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|EnqueuedTimeUtc         |Properties                                        |SystemProperties                                                                                                                                                    |Body                                                                   |
+------------------------+--------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|2024-08-20T09:56:52.534Z|{rp.mib/dev/96d1f469-e0b7-4ee

In [32]:
cond = (F.col("t.Body.mnsn") == F.col("s.Body.mnsn"))
dt.alias("t").merge(new_data.alias("s"),cond).whenMatchedUpdate(
condition=F.col("s.Body.timestamp") > F.col("t.Body.timestamp"),
set={
    "Body":"s.Body",
    "EnqueuedTimeUtc":"s.EnqueuedTimeUtc",
    "Properties":"s.Properties",
    "SystemProperties":"s.SystemProperties"  
}).whenNotMatchedInsert(
values={
        "EnqueuedTimeUtc": "s.EnqueuedTimeUtc",
        "Properties": "s.Properties",
        "SystemProperties":"s.SystemProperties",
        "Body":"s.Body"
    }
).execute()

In [33]:
df1.filter((df1['Body.mnsn']=="mn=TVgwMDAx:sn=MDUxMTMxMzAwMDE=") | (df1['Body.mnsn']=="mn=TVgwMDA0:sn=MDUxMTMxMzAwMDQ=")).select('Body').show(truncate=False)

+-----------------------------------------------------------------------+
|Body                                                                   |
+-----------------------------------------------------------------------+
|{smsProps, mn=TVgwMDA0:sn=MDUxMTMxMzAwMDQ=, 1732337812592, basicUpdate}|
|{smsProps, mn=TVgwMDAx:sn=MDUxMTMxMzAwMDE=, 1732337812590, basicUpdate}|
+-----------------------------------------------------------------------+



In [34]:
#successfull insertion
df1.select('Body.mnsn').distinct().count()

501