In [1]:
"""
docker run -it --rm -p 8888:8888 -v "$(pwd)":/home/jovyan/work \
  --user root \
  -e NB_GID=100 \
  -e GRANT_SUDO=yes jupyter/pyspark-notebook

docker exec -it {container_id} /bin/bash
---
apt update
apt-get install -y netcat
nc -lk 9999
"""


from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .config("spark.sql.shuffle.partitions", 3) \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()


In [2]:
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

In [3]:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

In [4]:
words_counts = words.groupBy("word").count()

In [None]:
query = words_counts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
query.awaitTermination()

In [8]:
# Output Mode
# 1. Append : Only the new records that have been added to the result stream
# 2. Update : Used when the output needs to include both new records and the updated values of existing records in the result stream
# 3. Complete : Provides the complete result for each batch, including all records