In [1]:
HOST = "localhost"
PORT = 9999

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("StructuredNetworkCount") \
    .config("spark.driver.host","127.0.0.1") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()
)

24/09/30 19:56:07 WARN Utils: Your hostname, daniel-Yoga-Creator-7-15IMH05 resolves to a loopback address: 127.0.1.1; using 192.168.63.125 instead (on interface wlp0s20f3)
24/09/30 19:56:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/30 19:56:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = (
    spark.readStream.format("socket")
    .option("host", HOST)
    .option("port", PORT)
    # .option("includeTimestamp", "true")
    .load()
)

24/09/30 19:56:10 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [4]:
data = data.selectExpr("CAST(value AS STRING) as json") \
    .selectExpr("from_json(json, 'timestamp STRING, value INT') as data") \
    .select("data.*")

In [5]:
from pyspark.sql.functions import lit

data = data.withColumn("dummy_key", lit(1))

In [6]:
import pandas as pd
from pyspark.sql.streaming.state import GroupStateTimeout
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType


outputSchema = StructType([StructField("mean", DoubleType(), True), StructField("std", DoubleType(), True)])
stateSchema = StructType([StructField("S", IntegerType(), True), StructField("n", IntegerType(), True), StructField("Q", DoubleType(), True)])

def update_count(key, df_iter, state):
    if not state.exists:
        state.update((0, 0, 0))

    S, n, Q = state.get

    for df in df_iter:
        for row in df.itertuples(index=False):
            value = int(row.value)
            n += 1
            S += value
            Q += Q + (value - S/n) ** 2
    
    state.update((S, n, Q))

    mean = S/n
    std = (1/n * Q) ** 1/2

    yield pd.DataFrame([(mean, std)], columns=outputSchema.fieldNames())
result = data.groupBy("dummy_key").applyInPandasWithState(
    update_count,
    outputStructType=outputSchema,
    stateStructType=stateSchema,
    outputMode="append",
    timeoutConf=GroupStateTimeout.NoTimeout
)


In [7]:
query = (
    result.writeStream
    .outputMode("append")
    .format("console")
    .trigger(processingTime="2 seconds")
    .start()
)

24/09/30 19:56:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8362e930-2449-437f-ba4b-360381fb9f60. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/30 19:56:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [8]:
query.awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------+
|mean|variance|
+----+--------+
+----+--------+



24/09/30 19:56:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 8873 milliseconds
24/09/30 19:56:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 5238 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|51.84705882352941|6.395589245296199E25|
+-----------------+--------------------+



24/09/30 19:56:31 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 5552 milliseconds


-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|49.62043795620438|1.787058200046796E41|
+-----------------+--------------------+



24/09/30 19:56:44 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13438 milliseconds


-------------------------------------------
Batch: 3
-------------------------------------------
+------------------+-------------------+
|              mean|           variance|
+------------------+-------------------+
|49.557291666666664|4.59417777584733E57|
+------------------+-------------------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------+--------------------+
|              mean|            variance|
+------------------+--------------------+
|50.246153846153845|2.955391960843945...|
+------------------+--------------------+



24/09/30 19:56:58 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 13713 milliseconds
24/09/30 19:57:07 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 8623 milliseconds


-------------------------------------------
Batch: 5
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.44347826086957|9.094734630258217...|
+-----------------+--------------------+



24/09/30 19:57:11 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 4108 milliseconds


-------------------------------------------
Batch: 6
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.25504587155963|2.969620772314853...|
+-----------------+--------------------+



24/09/30 19:57:15 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3883 milliseconds


-------------------------------------------
Batch: 7
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.43344709897611|6.073369454739601...|
+-----------------+--------------------+



24/09/30 19:57:18 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3794 milliseconds


-------------------------------------------
Batch: 8
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.39903846153846|1.567770767814801...|
+-----------------+--------------------+



24/09/30 19:57:22 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 4012 milliseconds


-------------------------------------------
Batch: 9
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.37613293051359|4.062084916440338...|
+-----------------+--------------------+



24/09/30 19:57:26 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3848 milliseconds


-------------------------------------------
Batch: 10
-------------------------------------------
+------------------+--------------------+
|              mean|            variance|
+------------------+--------------------+
|50.205128205128204|4.211819023218590...|
+------------------+--------------------+



24/09/30 19:57:30 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3836 milliseconds


-------------------------------------------
Batch: 11
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.00810810810811|1.098284689548016...|
+-----------------+--------------------+



24/09/30 19:57:34 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3872 milliseconds


-------------------------------------------
Batch: 12
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|49.75064267352185|2.871487217889251...|
+-----------------+--------------------+



24/09/30 19:57:38 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3746 milliseconds


-------------------------------------------
Batch: 13
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.04651162790697|1.503260544181006...|
+-----------------+--------------------+



24/09/30 19:57:42 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3846 milliseconds


-------------------------------------------
Batch: 14
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|49.81967213114754|1.976552180821840...|
+-----------------+--------------------+



24/09/30 19:57:45 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3849 milliseconds


-------------------------------------------
Batch: 15
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|50.05829596412556|5.201650107299774...|
+-----------------+--------------------+



24/09/30 19:57:49 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3903 milliseconds


-------------------------------------------
Batch: 16
-------------------------------------------
+-----------------+--------------------+
|             mean|            variance|
+-----------------+--------------------+
|49.93333333333333|1.371395994818751...|
+-----------------+--------------------+



24/09/30 19:57:53 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 2000 milliseconds, but spent 3852 milliseconds


-------------------------------------------
Batch: 17
-------------------------------------------
+----------------+--------------------+
|            mean|            variance|
+----------------+--------------------+
|50.0092879256966|7.235888718304675...|
+----------------+--------------------+



ERROR:root:KeyboardInterrupt while sending command.            (108 + 12) / 200]
Traceback (most recent call last):
  File "/home/daniel/miniconda3/envs/sparky/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/daniel/miniconda3/envs/sparky/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/daniel/miniconda3/envs/sparky/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

KeyboardInterrupt: 

-------------------------------------------
Batch: 18
-------------------------------------------


                                                                                

In [None]:
spark.stop()