In [1]:
!apt-get install openjdk-11-jdk -y
!wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz

!pip install -q findspark pyspark==3.5.1

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk-headless openjdk-11-jre openjdk-11-jre-headless
  x11-utils
Suggested packages:
  default-jre pcscd libxt-doc openjdk-11-demo openjdk-11-source visualvm
  libnss-mdns fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  | fonts-wqy-zenhei fonts-indic mesa-utils
The following NEW packages will be installed:
  ca-certificates-java fonts-dejavu-core fonts-dejavu-extra java-common
  libatk-wrapper-java libatk-wrapper-java-jni libpcsclite1 libxt-dev libxtst6
  libxxf86dga1 openjdk-11-jdk openjdk-11-jdk-headless openjdk-11-jre
  openjdk-11-jre-headless x11-utils
0 upgraded, 15 newly installed, 0 to remove and 41 not upgraded.
Need to get 122 MB of archives.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Create Spark session
spark = SparkSession.builder.appName("Real Time Stock Data Pipeline").getOrCreate()

# Define stock data schema
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("stock_symbol", StringType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", IntegerType(), True)
])

In [3]:
import threading, time, random, csv, os
from datetime import datetime

# This global flag will control the loop
keep_running = True

def generate_stream():
    global keep_running
    stocks = ["AAPL", "MSFT", "GOOG", "AMZN"]
    os.makedirs("/content/streaming_data", exist_ok=True)
    i = 0
    while keep_running:  # the loop runs while this is True
        filename = f"/content/streaming_data/data_{i}.csv"
        with open(filename, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["timestamp", "stock_symbol", "open", "high", "low", "close", "volume"])
            now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            for s in stocks:
                base = random.uniform(100, 300)
                writer.writerow([
                    now, s,
                    round(base, 2),
                    round(base + random.uniform(0, 5), 2),
                    round(base - random.uniform(0, 5), 2),
                    round(base + random.uniform(-2, 2), 2),
                    random.randint(1000, 5000)
                ])
        i += 1
        print(f"Wrote file {filename}")
        time.sleep(3)

print("Generator function ready.")

Generator function ready.


# **Start the Thread To Generate Streaming Stock Data**

This begins generating data files in the background every 3 seconds.

In [5]:
keep_running = True
thread = threading.Thread(target=generate_stream, daemon=True)
thread.start()
print("Streaming started...")

Streaming started...
Wrote file /content/streaming_data/data_0.csv


# **Stop/Restart the Thread**

if we want to stop it the thread:

In [30]:
keep_running = False
print("Stopping stream...")

Stopping stream...


if we want to start again, just re-run:

In [16]:
keep_running = True
thread = threading.Thread(target=generate_stream, daemon=True)
thread.start()
print("Streaming restarted...")

Streaming restarted...
Wrote file /content/streaming_data/data_0.csv


# **Reading Streaming Data**

In [6]:
df_stream = spark.readStream.format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load("/content/streaming_data")

# Define Watermark to handle late data (delay tolerance)
df_wk = df_stream.withWatermark("timestamp", "10 minutes")

Wrote file /content/streaming_data/data_4.csv


In [7]:
print("isStreaming:", df_stream.isStreaming)

isStreaming: True


In [8]:
df_stream.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- stock_symbol: string (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



# **Processing data in micro-batches**

In [None]:
from pyspark.sql.functions import col, to_timestamp, window, avg, abs, when
from pyspark.sql import SparkSession

# Drop rows with null essential values
df_clean = df_wk.na.drop(subset=["open", "high", "low", "close", "volume"])

# Ensure timestamp is proper
df_clean = df_clean.withColumn("timestamp", to_timestamp(col("timestamp")))

# Compute price change
df_clean = df_clean.withColumn("price_change", col("close") - col("open"))

# Detect anomalies (flag them)
df_clean = df_clean.withColumn("anomaly_flag", when(abs(col("price_change")) > 5, 1).otherwise(0))

# Aggregate 5-min averages
df_avg = df_clean.groupBy(
    window(col("timestamp"), "5 minutes"),
    col("stock_symbol")
).agg(
    avg("close").alias("avg_close"),
    avg("volume").alias("avg_volume"),
    avg("anomaly_flag").alias("avg_anomaly_flag")
)

# Static reference data (company names)
company_data = [
    ("AAPL", "Apple"),
    ("GOOG", "Google"),
    ("TSLA", "Tesla"),
    ("AMZN", "Amazon")
]
company_ref = spark.createDataFrame(company_data, ["stock_symbol", "company_name"])

# Join streaming aggregate with static reference
df_joined = df_avg.join(company_ref, on="stock_symbol", how="left")

# Write joined results to CSV files in micro-batches
query = df_joined.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("header", True) \
    .option("checkpointLocation", "/content/checkpoints/joined_stream/") \
    .option("path", "/content/processed_output/") \
    .start()

# Wait for streaming to run
query.awaitTermination()