## Velocity-Oriented Analytics (Deviation from True Streaming)

This notebook demonstrates velocity-oriented data processing through a controlled
micro-batch simulation rather than true real-time streaming. While Apache Spark
Structured Streaming was initially considered, platform-specific limitations led to
the adoption of a bounded micro-batch approach.

By processing the dataset incrementally in fixed-size batches and recomputing analytics
over time, this approach captures the core principles of data velocity, including
incremental computation, temporal ordering, and evolving insights, while ensuring
reproducibility and stable execution.


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("ProstateCancerVelocitySimulation")
    .getOrCreate()
)

spark


In [2]:
import os

data_path = os.path.join("..", "data", "prostate_cancer_prediction.csv")

df = spark.read.csv(data_path, header=True, inferSchema=True)
df.printSchema()


root
 |-- Patient_ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Family_History: string (nullable = true)
 |-- Race_African_Ancestry: string (nullable = true)
 |-- PSA_Level: double (nullable = true)
 |-- DRE_Result: string (nullable = true)
 |-- Biopsy_Result: string (nullable = true)
 |-- Difficulty_Urinating: string (nullable = true)
 |-- Weak_Urine_Flow: string (nullable = true)
 |-- Blood_in_Urine: string (nullable = true)
 |-- Pelvic_Pain: string (nullable = true)
 |-- Back_Pain: string (nullable = true)
 |-- Erectile_Dysfunction: string (nullable = true)
 |-- Cancer_Stage: string (nullable = true)
 |-- Treatment_Recommended: string (nullable = true)
 |-- Survival_5_Years: string (nullable = true)
 |-- Exercise_Regularly: string (nullable = true)
 |-- Healthy_Diet: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoking_History: string (nullable = true)
 |-- Alcohol_Consumption: string (nullable = true)
 |-- Hypertension: string (nullable 

#### Adding synthetic time dimension


In [3]:
from pyspark.sql.functions import monotonically_increasing_id

df_with_time = df.withColumn(
    "arrival_id",
    monotonically_increasing_id()
)

df_with_time.show(5)


+----------+---+--------------+---------------------+---------+----------+-------------+--------------------+---------------+--------------+-----------+---------+--------------------+------------+---------------------+----------------+------------------+------------+----+---------------+-------------------+------------+--------+-----------------+-------------+------------------+---------------+--------------------+-----------------------+---------------+----------+
|Patient_ID|Age|Family_History|Race_African_Ancestry|PSA_Level|DRE_Result|Biopsy_Result|Difficulty_Urinating|Weak_Urine_Flow|Blood_in_Urine|Pelvic_Pain|Back_Pain|Erectile_Dysfunction|Cancer_Stage|Treatment_Recommended|Survival_5_Years|Exercise_Regularly|Healthy_Diet| BMI|Smoking_History|Alcohol_Consumption|Hypertension|Diabetes|Cholesterol_Level|Screening_Age|Follow_Up_Required|Prostate_Volume|Genetic_Risk_Factors|Previous_Cancer_History|Early_Detection|arrival_id|
+----------+---+--------------+---------------------+-------

#### Define Micro-Batch Parameters

In [4]:
BATCH_SIZE = 100
MAX_BATCHES = 5   # limits runtime intentionally

TOTAL_ROWS = df_with_time.count()

print(f"Total records: {TOTAL_ROWS}")
print(f"Micro-batch size: {BATCH_SIZE}")
print(f"Maximum batches to process: {MAX_BATCHES}")



Total records: 27945
Micro-batch size: 100
Maximum batches to process: 5


#### Micro-Batch Processing Loop

In [5]:
from pyspark.sql.functions import avg
import time

current_offset = 0
batch_number = 1

while current_offset < TOTAL_ROWS and batch_number <= MAX_BATCHES:
    print(f"\n=== Processing Micro-Batch {batch_number} ===")
    print(f"Time: {time.strftime('%H:%M:%S')}")
    
    micro_batch = (
        df_with_time
        .filter(
            (df_with_time.arrival_id >= current_offset) &
            (df_with_time.arrival_id < current_offset + BATCH_SIZE)
        )
    )
    
    result = micro_batch.groupBy("Cancer_Stage").agg(
        avg("PSA_Level").alias("avg_psa")
    )
    
    result.show()
    
    current_offset += BATCH_SIZE
    batch_number += 1
    
    # Simulate delay between arrivals
    time.sleep(2)

print("\nVelocity simulation completed successfully.")



=== Processing Micro-Batch 1 ===
Time: 17:46:59
+------------+-----------------+
|Cancer_Stage|          avg_psa|
+------------+-----------------+
|   Localized| 8.15164383561644|
|    Advanced|6.461052631578947|
|  Metastatic|         10.31625|
+------------+-----------------+


=== Processing Micro-Batch 2 ===
Time: 17:47:01
+------------+-----------------+
|Cancer_Stage|          avg_psa|
+------------+-----------------+
|   Localized|7.918311688311684|
|    Advanced|6.845000000000001|
|  Metastatic|7.788571428571428|
+------------+-----------------+


=== Processing Micro-Batch 3 ===
Time: 17:47:03
+------------+-----------------+
|Cancer_Stage|          avg_psa|
+------------+-----------------+
|   Localized| 8.91051724137931|
|    Advanced|6.239142857142856|
|  Metastatic|7.195714285714287|
+------------+-----------------+


=== Processing Micro-Batch 4 ===
Time: 17:47:05
+------------+------------------+
|Cancer_Stage|           avg_psa|
+------------+------------------+
|   Lo

## Summary

The velocity simulation demonstrates how analytical results can be updated as new data
arrives incrementally. By recomputing metrics over successive micro-batches, this
notebook illustrates the impact of data velocity on analytics without requiring
continuous streaming infrastructure.

This approach complements the batch analytics by showing how population-level insights
can be observed dynamically over time.
