### TODO Recording:

- Set up the input/ folder, make sure it is empty

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder.appName("FileStreamForeachPrintDemo").getOrCreate()
print('Spark session created successfully.')

25/02/12 20:23:13 WARN Utils: Your hostname, Jananis-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.68.52 instead (on interface en0)
25/02/12 20:23:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/12 20:23:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/12 20:23:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark session created successfully.


In [2]:
schema = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Manufacturer", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Segment", StringType(), True),
    StructField("Total_Cores", IntegerType(), True),
    StructField("Processor_Speed", IntegerType(), True),
    StructField("CoProcessor_Cores", IntegerType(), True),
    StructField("Rmax", DoubleType(), True),
    StructField("Rpeak", DoubleType(), True),
    StructField("Power", DoubleType(), True),
    StructField("Power_Efficiency", DoubleType(), True),
    StructField("Architecture", StringType(), True),
    StructField("Processor_Technology", StringType(), True),
    StructField("Operating_System", StringType(), True),
    StructField("OS_Family", StringType(), True)
])

print('Schema defined successfully.')

Schema defined successfully.


In [3]:
streaming_df = spark \
    .readStream \
    .format("csv") \
    .schema(schema) \
    .load("input/")

transformed_df = streaming_df \
    .select("Rank", "Name", "Country", "Processor_Speed", "Rmax") \
    .filter("Country = 'Japan'")

In [4]:
def process_row(row):
    name = row.Name if row.Name is not None else "Unknown"
    country = row.Country if row.Country is not None else "Unknown"
    speed = row.Processor_Speed if row.Processor_Speed is not None else "Unknown"
    rmax = row.Rmax if row.Rmax is not None else "Unknown"

    print(f"Processing row: Name={name}, Country={country}, Speed={speed}, Rmax={rmax}")

In [5]:
query = transformed_df.writeStream \
            .foreach(process_row) \
            .outputMode("append") \
            .start()

25/02/12 20:23:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/f4/wmvdntf96vjfrq3sb2p4ncy00000gn/T/temporary-cbca3c40-4829-462d-98f4-4ea4cc909155. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/02/12 20:23:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Processing row: Name=Supercomputer Fugaku, Country=Japan, Speed=2200, Rmax=442010.0
Processing row: Name=ABCI 2.0, Country=Japan, Speed=2400, Rmax=22208.72
Processing row: Name=Wisteria/BDEC-01 (Odyssey), Country=Japan, Speed=2200, Rmax=22121.0
Processing row: Name=TOKI-SORA, Country=Japan, Speed=2200, Rmax=16592.0
Processing row: Name=Oakforest-PACS, Country=Japan, Speed=1400, Rmax=13554.6
Processing row: Name=Eart

### TODO Recording:

- Add 1 file to the input/ folder
- Show the result
- Add another file to the input folder
- Show the result

In [6]:
query.stop()

### TODO Recording:

- Clean out the input/ folder

We create a custom class `PrintRowWriter` to process each row of the streaming data. This class implements three methods:

- **open()**: Called once per partition per epoch.
- **process()**: Called for each row. Here, we print the row details.
- **close()**: Called when the processing for the partition ends (to handle any errors).

This custom writer is used in the `foreach` sink.

What are partitionId and epochId?
When using a custom sink (like PrintRowWriter), Spark Structured Streaming calls the open() method for each partition in every micro-batch.

- partitionId
Represents the partition number within the current micro-batch.
If a DataFrame has multiple partitions, open() is called separately for each partition.
Partitioning depends on the number of executors and parallelism settings.
- epochId
Represents the unique micro-batch ID in Spark Structured Streaming.
Used to ensure exactly-once processing (important for fault tolerance).
If Spark retries a micro-batch, it reuses the same epochId.

In [9]:
streaming_df = spark \
    .readStream \
    .format("csv") \
    .schema(schema) \
    .load("input/")

transformed_df = streaming_df \
    .select("Rank", "Name", "Country", "Processor_Speed", "Rmax") \
    .filter("Country = 'Japan'")

In [10]:
class PrintRowWriter:
    def open(self, partitionId, epochId):
        print(f"Opened partition {partitionId} for epoch {epochId}")
        return True  # Return True to indicate that processing should continue

    def process(self, row):
        try:
            if row is None:
                print("Skipping null row")
                return

            name = row.Name if row.Name is not None else "Unknown"
            country = row.Country \
                if row.Country is not None else "Unknown"
            speed = row.Processor_Speed \
                if row.Processor_Speed is not None else "Unknown"
            rmax = row.Rmax if row.Rmax is not None else "Unknown"
            performance_score = (speed * rmax) / 10**6

            print(f"Processing row: Name={name}, Country={country}, "
                  f"Performance Score={performance_score}")

        except Exception as e:
            print(f"❌ Error processing row: {e}")

    def close(self, error):
        if error:
            print(f"❌ Encountered error in partition: {error}")
        else:
            print("✔ Partition processing completed successfully.")

In [11]:
query = transformed_df.writeStream \
            .foreach(PrintRowWriter()) \
            .outputMode("append") \
            .start()

25/02/12 20:24:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/f4/wmvdntf96vjfrq3sb2p4ncy00000gn/T/temporary-fa0694cc-a7db-4b41-a8a5-c9c2455644ca. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/02/12 20:24:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Opened partition 0 for epoch 0
Processing row: Name=Supercomputer Fugaku, Country=Japan, Performance Score=972.422
Processing row: Name=ABCI 2.0, Country=Japan, Performance Score=53.300928
Processing row: Name=Wisteria/BDEC-01 (Odyssey), Country=Japan, Performance Score=48.6662
Processing row: Name=TOKI-SORA, Country=Japan, Performance Score=36.5024
Processing row: Name=Oakforest-PACS, Country=Japan, Performance Sco

### TODO Recording:

- After running the previous cell place 1 file in the input/
- Show the output printed to screen (will be partition 0 and epoch 0)
- Add another file (will be partition 0 and epoch 1)
- Add 2 files in one go (will be partition 0/1 and epoch 2)

In [12]:
query.stop()