In [20]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("test") \
    .getOrCreate()

In [21]:
df= spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

25/06/27 09:58:40 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [22]:
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

# Define schema including time
schema = StructType() \
    .add("name", StringType()) \
    .add("age", IntegerType()) \
    .add("event_time", StringType())  # We’ll convert this to timestamp


In [23]:
parsed_df = df.select(from_json(col("value"), schema).alias("data")).select("data.*") \
    .withColumn("event_time", to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))

In [13]:
result = parsed_df \
    .withWatermark("event_time", "5 minutes") \
    .select("name", "event_time")


In [14]:
{"name":"Aaditya", "age":25, "event_time":"2025-06-26 10:00:00"}
{"name":"Bikash", "age":22, "event_time":"2025-06-26 10:03:00"}
{"name":"Ram", "age":25, "event_time":"2025-06-26 09:50:00"}


{'name': 'Ram', 'age': 25, 'event_time': '2025-06-26 09:50:00'}

In [15]:
query = result.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
query.awaitTermination()

25/06/26 17:48:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-989de78c-42f3-4144-b0bf-b778dac9299b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/26 17:48:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
|name|event_time|
+----+----------+
+----+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+
|   name|         event_time|
+-------+-------------------+
|Aaditya|2025-06-26 10:00:00|
+-------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-------------------+
|  name|         event_time|
+------+-------------------+
|Bikash|2025-06-26 10:03:00|
+------+-------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-------------------+
|name|         event_time|
+----+-------------------+
| Ram|2025-06-26 09:50:00|
+----+-------------------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+-------------------+
|   name|         event_time|
+-------+-------------------+
|Aaditya|2025-06-26 10:00:00|
| Bikash|2025-06-26 10:03:00|
+-------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----+-------------------+
|name|         event_time|
+----+-------------------+
| Ram|2025-06-26 09:50:00|
+----+-------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------+-------------------+
|   name|         event_time|
+-------+-------------------+
|Aaditya|2025-06-26 10:00:00|
| Bikash|2025-06-26 10:03:00|
|    Ram|2025-06-26 09:50:00|
+-------+-------------------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.9/dist-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
spark.stop()

# as the last row was still being diplayed ti proves the point that watermarking only works with the stateful operations

Batch: 6
-------------------------------------------
+-------+-------------------+
|   name|         event_time|
+-------+-------------------+
|Aaditya|2025-06-26 10:00:00|
| Bikash|2025-06-26 10:03:00|
|    Ram|2025-06-26 09:50:00|
+-------+-------------------+

# while with stateful operations visualize it as doing2 things

1.dropping the records that are 2 old than the water mark

2.dropping the records from state store that has max time less than the watermark watch these 3 videos:


    1. https://www.youtube.com/watch?v=a9jNGTe4cyg

    2. https://www.youtube.com/watch?v=xZHacxf5uZ8

    3. https://www.youtube.com/watch?v=4DT528dAjtU

In [25]:
# note water mark ko 1st argument vane ko event time column ho hai
# aafai le time chalu garni chai haina hai spark le
# max kati time vako event aako xa
# tyo herxa aani yeti bhaji sake xa vanthanxa
# so tyo max-watermark delay garxa
# tyo valae ko water mark value vayo
# water mark vanda old data drop 
# aani state pani water mark vanda old drop

In [26]:
# lets do an example

In [32]:
from pyspark.sql.functions import window,count

In [33]:
new_result=parsed_df.withWatermark("event_time", "5 minutes").groupBy(window("event_time", "10 minutes")).agg(count("*"))

# yesari bhujam
paxadi ko agg vanae ko stateful operation ho 

so paila hamlae group banaudai xam window wise

as specified of 10 minutes

x:x:x-x:x:x yesto window ko group haru banni vayo

aani yo window yeuta group jasto vayo

yesma parni data ko aggregrate gare ko ta ho ni normal sql jastai

window vanae ko yeuta grouping ko basis or key jasto vanthanam na


yedi window le group na garni vayae kunai aaru column le ni garna ta milyo

tara tyo expire garna or indefinite grow huna rokna kei mechanish ta launu paryo 

as water mark le ta time basis ma kaam garxa

so we use arbitary stateful operations in that case

as water mark le vanxa yo time vanda aagadi ko state lai nikala 

but hamlae group by some key garyoum vanae ta 

kasari milyo ra ni 

In [None]:
query = new_result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()