In [1]:
import csv
import random
from datetime import datetime, timedelta
import os
from google.colab import drive
import time

# Mount Google Drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=2025bc0b9be08afdbad605933fbaf56c9c7816aaf6b50b2945aa4d3cfee7d7fa
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql import functions as F
import time

# Define the schema for the CSV data
schema = StructType([
    StructField("Timestamp", StringType(), True),
    StructField("Stock_Open", FloatType(), True),
    StructField("Stock_Close", FloatType(), True)
])

# Initialize Spark session
spark = SparkSession.builder \
    .appName("StockBatchProcessing") \
    .getOrCreate()

# Path to the folder containing CSV files in Google Drive
csv_path = "/content/gdrive/My Drive/stock_data"

# Define the output path for the CSV files with average values
output_path = "/content/gdrive/My Drive/stock_average"

while True:
    # Read CSV files for the last 5 minutes
    start_time = time.time()
    current_time = int(start_time)
    five_minutes_ago = current_time - (current_time % 300)  # Align to 5-minute intervals
    streaming_df = spark.read.option("header", "true").schema(schema).csv(csv_path)

    # Filter the DataFrame for the last 5 minutes
    streaming_df = streaming_df.filter((F.unix_timestamp("Timestamp") > five_minutes_ago))

    # Calculate average stock value
    average_stock_df = streaming_df.groupBy().agg(
        F.avg("Stock_Open").alias("Average_Stock_Open"),
        F.avg("Stock_Close").alias("Average_Stock_Close")
    )

    # Write the results to a CSV file
    average_stock_df.write.mode("overwrite").csv(output_path)

    # Wait for the next 5-minute interval
    elapsed_time = time.time() - start_time
    wait_time = max(0, 300 - elapsed_time)
    time.sleep(wait_time)
