###PART A — File-based streaming

Goal: simulate streaming using your Person.json by splitting it into smaller files and releasing them one-by-one into an input folder. We'll then show console output and write to Delta.

####A0 — Create helper file chunks 

- This cell reads your uploaded /FileStore/tables/Person.json and writes each JSON object into its own file in a staging folder.

- You will release files from staging into the live input folder later (one at a time), which simulates new incoming files.

In [0]:
import json, os, glob

# Paths (driver/local view)
uploaded = "/dbfs/FileStore/tables/Person.json"    # your uploaded file
parts_dir = "/dbfs/FileStore/streaming_parts/"     # staging area (all parts go here)
input_dir = "/dbfs/FileStore/streaming_input/person/"  # where the stream will read

In [0]:
# Create directories (if they already exist this will overwrite files inside)
import shutil
shutil.rmtree(parts_dir, ignore_errors=True)
shutil.rmtree(input_dir, ignore_errors=True)
os.makedirs(parts_dir, exist_ok=True)
os.makedirs(input_dir, exist_ok=True)

# Read uploaded file robustly: handle newline-delimited JSON or an array of objects
raw = open(uploaded, "r", encoding="utf-8").read().strip()
items = []
try:
    # try a json array
    parsed = json.loads(raw)
    if isinstance(parsed, list):
        items = parsed
    else:
        items = [parsed]
except Exception:
    # fallback: try parsing line-by-line (ndjson)
    for line in raw.splitlines():
        line=line.strip()
        if not line:
            continue
        try:
            items.append(json.loads(line))
        except Exception:
            # ignore badly formed lines
            pass

# Write each record as a separate file in the parts_dir
for i, obj in enumerate(items, start=1):
    fname = os.path.join(parts_dir, f"person_part_{i:03d}.json")
    with open(fname, "w", encoding="utf-8") as f:
        json.dump(obj, f)
print(f"Wrote {len(items)} part files to {parts_dir}")
print("To simulate streaming, we will move these files one-by-one into the input folder during the exercise.")

Wrote 5 part files to /dbfs/FileStore/streaming_parts/
To simulate streaming, we will move these files one-by-one into the input folder during the exercise.


####A1 — Function to release the next file (manual simulation)
- Each time you run this cell it will move one part file from streaming_parts into the streaming_input/person folder. Run it while the stream is running to simulate incoming data.

In [0]:
# Python cell: run this each time you want to push the next file into the stream
import os, glob, shutil
parts_dir = "/dbfs/FileStore/streaming_parts/"
input_dir = "/dbfs/FileStore/streaming_input/person/"

parts = sorted(glob.glob(parts_dir + "person_part_*.json"))
if not parts:
    print("No more part files to release.")
else:
    src = parts[0]
    dest = os.path.join(input_dir, os.path.basename(src))
    shutil.move(src, dest)
    print("Released:", os.path.basename(src), "→", dest)

No more part files to release.


####A2 — Create the streaming DataFrame (file stream)
This creates a streaming DataFrame that watches /FileStore/streaming_input/person/ and expects the Person schema.

In [0]:
from pyspark.sql.types import StructType, IntegerType, StringType

person_schema = (StructType()
    .add("id", IntegerType())
    .add("firstname", StringType())
    .add("middlename", StringType())
    .add("lastname", StringType())
    .add("dob_year", IntegerType())
    .add("dob_month", IntegerType())
    .add("gender", StringType())
    .add("salary", IntegerType())
)

input_path = "/FileStore/streaming_input/person/"   # Spark will read this path
person_stream = (spark.readStream
                 .schema(person_schema)   # required for file streams
                 .json(input_path)
                )

person_stream.printSchema()

root
 |-- id: integer (nullable = true)
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob_year: integer (nullable = true)
 |-- dob_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



####A3 — Start a console writer to watch incoming records
Start this cell and keep it running while you release parts (run the "release next file" cell to see records appear):

In [0]:
query_console = (person_stream.writeStream
                 .format("console")
                 .outputMode("append")
                 .option("truncate", False)
                 .start()
                )
print("Console stream started. Release files (run release cell) to see output.")

Console stream started. Release files (run release cell) to see output.


####A4 — Persist stream to Delta (safe pattern with checkpoint)
When you’re happy with testing, write the stream to Delta so you can query results.

In [0]:
# Python cell (paste & run)
target_path = "/dbfs/tmp/ss_tutorial/target_delta/"    # where Delta files are stored
checkpoint = "/dbfs/tmp/ss_tutorial/checkpoint_delta/"  # unique checkpoint

# Remove old test folders if needed (be careful!)
import shutil
shutil.rmtree(target_path, ignore_errors=True)
shutil.rmtree(checkpoint, ignore_errors=True)

delta_query = (person_stream.writeStream
               .format("delta")
               .option("path", target_path)
               .option("checkpointLocation", checkpoint)
               .outputMode("append")
               .start()
              )
print("Delta write started. New files will land at:", target_path)

Delta write started. New files will land at: /dbfs/tmp/ss_tutorial/target_delta/


####A5 — Stop streams (VERY IMPORTANT)
When you are done with the interactive console or writing streams, stop them:

In [0]:
for q in spark.streams.active:
    print("Stopping:", q.id, q.name)
    q.stop()
print("All streams stopped.")

Stopping: f98892ff-e57a-43df-8201-0e05ad8b47bd None
All streams stopped.


______