In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import GBTRegressionModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.types import *
# ✅ 1. Initialize Spark session
spark = SparkSession.builder \
    .appName("OptimizedPrediction") \
    .master("local[8]") \
    .config("spark.sql.shuffle.partitions", "8") \  # Match number of cores
    .config("spark.default.parallelism", "8") \     # Controls RDD operations
    .getOrCreate()


# 2. Load model
model_path = "/content/drive/MyDrive/School/Scalable/Project/best_model_GBT"
best_model = GBTRegressionModel.load(model_path)

In [None]:
# Selected numerical and indexed categorical features
features = [
    "QUANTITYORDERED",        # numerical
    "PRICEEACH",              # numerical
    "ORDERLINENUMBER",        # numerical
    "SALES",                  # numerical
    "QTR_ID",                 # numerical
    "MONTH_ID",               # numerical
    "YEAR_ID",                # numerical
    "MSRP",                   # numerical
    "STATUS_index",           # categorical (StringIndexed)
    "PRODUCTLINE_index",      # categorical (StringIndexed)
    "DEALSIZE_index"          # categorical (StringIndexed)
]


In [None]:
with open("test_sales.csv", "w") as f:
    f.write("""QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,QTR_ID,MONTH_ID,YEAR_ID,MSRP,STATUS_index,PRODUCTLINE_index,DEALSIZE_index
30,95.70,2,2871.00,1,2,2003,95,0.0,1.0,2.0
49,100.00,3,4900.00,2,3,2004,100,1.0,0.0,1.0
21,97.50,1,2047.50,4,7,2005,97,2.0,3.0,0.0""")


In [None]:
# 3. Load test CSV into a Spark DataFrame
df = spark.read.csv("/content/test_sales.csv", header=True, inferSchema=True)

# 4. Assemble features
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_assembled = assembler.transform(df)

# 5. Predict using the model
predictions = best_model.transform(df_assembled)

# 6. Show predictions
# Select all columns except 'features'
predictions.select([col for col in predictions.columns if col != 'features']).show(truncate=False)


+---------------+---------+---------------+------+------+--------+-------+----+------------+-----------------+--------------+------------------+
|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|SALES |QTR_ID|MONTH_ID|YEAR_ID|MSRP|STATUS_index|PRODUCTLINE_index|DEALSIZE_index|prediction        |
+---------------+---------+---------------+------+------+--------+-------+----+------------+-----------------+--------------+------------------+
|30             |95.7     |2              |2871.0|1     |2       |2003   |95  |0.0         |1.0              |2.0           |244.19891227521222|
|49             |100.0    |3              |4900.0|2     |3       |2004   |100 |1.0         |0.0              |1.0           |244.07206673509808|
|21             |97.5     |1              |2047.5|4     |7       |2005   |97  |2.0         |3.0              |0.0           |247.13550520093258|
+---------------+---------+---------------+------+------+--------+-------+----+------------+-----------------+--------------+-----

In [None]:
import os

stream_dir = '/content/Streaming_input'
os.makedirs(stream_dir, exist_ok=True)

# 🧹 Delete all files in the directory
for file in os.listdir(stream_dir):
    file_path = os.path.join(stream_dir, file)
    if os.path.isfile(file_path):
        os.remove(file_path)

print("✅ All files in 'Streaming_input' have been deleted.")


✅ All files in 'Streaming_input' have been deleted.


In [None]:
import os
import time
import random
import threading

def generate_csv_files(output_dir="/content/Streaming_input",count = 0):
    os.makedirs(output_dir, exist_ok=True)
    filename = f"{output_dir}/data_{count}.csv"
    with open(filename, "w") as f:
            # Write header
            f.write(",".join(features) + "\n")

            # Write 3 rows of random values
            for _ in range(2):
                values = [str(random.randint(0, 10)) for _ in features]
                f.write(",".join(values) + "\n")




In [None]:
generate_csv_files()

In [None]:
def thread_generatefile(interval_Time = 15,num_generate = 20):
  for i in range(num_generate):
    generate_csv_files(count = i+1)
    time.sleep(interval_Time)


In [None]:
thread_generatefile(5,5)

In [None]:
# 3. Load test CSV into a Spark DataFrame
df = spark.read.csv("/content/Streaming_input/data_0.csv", header=True, inferSchema=True)

# 4. Assemble features
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_assembled = assembler.transform(df)

# 5. Predict using the model
predictions = best_model.transform(df_assembled)

# 6. Show predictions
predictions.select([col for col in predictions.columns if col != 'features']).show(truncate=False)


+---------------+---------+---------------+-----+------+--------+-------+----+------------+-----------------+--------------+------------------+
|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|SALES|QTR_ID|MONTH_ID|YEAR_ID|MSRP|STATUS_index|PRODUCTLINE_index|DEALSIZE_index|prediction        |
+---------------+---------+---------------+-----+------+--------+-------+----+------------+-----------------+--------------+------------------+
|6              |3        |5              |6    |3     |10      |10     |4   |0           |9                |5             |245.71859141330708|
|10             |5        |8              |9    |5     |8       |0      |1   |0           |6                |4             |4.8551450623246994|
+---------------+---------+---------------+-----+------+--------+-------+----+------------+-----------------+--------------+------------------+



In [None]:
from pyspark.sql.functions import monotonically_increasing_id, row_number, lit, concat_ws
from pyspark.sql.window import Window
import os

def model_working(path):
    # Extract filename without extension
    file_name = os.path.splitext(os.path.basename(path))[0]  # e.g., "data_1"

    # Load CSV
    df = spark.read.csv(path, header=True, inferSchema=True)

    # Assemble features
    assembler = VectorAssembler(inputCols=features, outputCol="features")
    df_assembled = assembler.transform(df)

    # Make predictions
    predictions = best_model.transform(df_assembled)

    # Add row number per file (to create key)
    window_spec = Window.orderBy(monotonically_increasing_id())
    predictions_with_index = predictions.withColumn(
        "row_num", row_number().over(window_spec)
    )

    # Create key column as "file_row1", "file_row2", ...
    predictions_with_key = predictions_with_index.withColumn(
        "key", concat_ws("_", lit(file_name), lit("row"), predictions_with_index.row_num)
    )

    return predictions_with_key.select("key","prediction")


In [None]:
model_working('/content/Streaming_input/data_1.csv').show()

+------------+------------------+
|         key|        prediction|
+------------+------------------+
|data_1_row_1|219.12353380797333|
|data_1_row_2|244.15234632559674|
+------------+------------------+



In [None]:
def dirproccessing(files,dirpath = '/content/Streaming_input'):
  predictions = []
  for file in files:
    predictions.append(model_working(f'{dirpath}/{file}'))
  return predictions

In [None]:
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.window import Window

def stack_and_index_predictions(predictions_list):
    # Step 1: Combine all DataFrames into one using union
    combined_df = predictions_list[0]
    for df in predictions_list[1:]:
        combined_df = combined_df.union(df)

    # Step 2: Add sample index starting from 1
    window_spec = Window.orderBy(monotonically_increasing_id())
    combined_df = combined_df.withColumn("sample", row_number().over(window_spec))

    # Step 3: Select and order the result
    return combined_df.select("key","prediction")

In [None]:
def model_fullywork(path,files):
  predictions = dirproccessing(files,path)
  combined_df = stack_and_index_predictions(predictions)
  combined_df.show()
  return combined_df

In [None]:
import os

def sort_files_by(files, base_path="", key_func=None, reverse=True):
    """
    Sort a list of files using a custom criterion.

    Args:
        files (list): List of filenames (not full paths).
        base_path (str): Base path to prefix filenames, if needed.
        key_func (callable): Function that takes a full file path and returns a sort key.
        reverse (bool): Sort descending (True) or ascending (False).

    Returns:
        List of filenames sorted by the key.
    """
    if key_func is None:
        # Default: sort by modification time
        key_func = lambda f: os.path.getmtime(os.path.join(base_path, f))

    return sorted(files, key=lambda f: key_func(os.path.join(base_path, f)), reverse=reverse)


In [None]:
def stream_dir_work(path,interval = 5):
    processed_files = []
    while True:
      try:
        fullfiles = os.listdir(path)
        work_files = [file for file in fullfiles if file not in processed_files]
        if work_files == []:
            print('No new files')
            time.sleep(interval)
            continue
        work_files = sort_files_by(work_files,path)
        print(f'Detect new {len(work_files)} files')
        model_fullywork(path,work_files)
        processed_files.extend(work_files)
        time.sleep(interval)
        continue
      except KeyboardInterrupt:
        print("\n🛑 Stream processing stopped by user.")
        break

In [None]:
stream_dir_work(stream_dir)

Detect new 6 files
+------------+------------------+
|         key|        prediction|
+------------+------------------+
|data_5_row_1|246.86676312211532|
|data_5_row_2|141.21289400660652|
|data_4_row_1|240.56296836089862|
|data_4_row_2|245.25800142415534|
|data_3_row_1|146.42802432916142|
|data_3_row_2| 247.1233991822614|
|data_2_row_1| 244.0915536023666|
|data_2_row_2| 237.4830727078165|
|data_1_row_1|219.12353380797333|
|data_1_row_2|244.15234632559674|
|data_0_row_1|245.71859141330708|
|data_0_row_2|4.8551450623246994|
+------------+------------------+

No new files
No new files

🛑 Stream processing stopped by user.


In [None]:
import os

def clear_directory(dir_path):
    if not os.path.exists(dir_path):
        print(f"❗Directory '{dir_path}' does not exist.")
        return

    file_list = os.listdir(dir_path)
    for file_name in file_list:
        file_path = os.path.join(dir_path, file_name)
        try:
            if os.path.isfile(file_path):
                os.remove(file_path)
        except Exception as e:
            print(f"⚠️ Error deleting file {file_path}: {e}")

    print(f"✅ All files removed from '{dir_path}'")


In [None]:
import threading
clear_directory('/content/Streaming_input')
# Start the file generation in a separate thread
generate_thread = threading.Thread(target=thread_generatefile, kwargs={
    "interval_Time": 2,
    "num_generate": 600
})
generate_thread.start()

# Start the stream processing in the main thread (or another thread if preferred)
stream_dir_work(stream_dir)


✅ All files removed from '/content/Streaming_input'
No new files
Detect new 3 files
+------------+------------------+
|         key|        prediction|
+------------+------------------+
|data_3_row_1|246.51573428500762|
|data_3_row_2| 6.056638686568553|
|data_2_row_1|  243.036542162863|
|data_2_row_2| 246.7376346525271|
|data_1_row_1| 244.8894286546158|
|data_1_row_2| 248.2822730571827|
+------------+------------------+

Detect new 4 files
+------------+------------------+
|         key|        prediction|
+------------+------------------+
|data_7_row_1|215.70543541300106|
|data_7_row_2|248.36772823645993|
|data_6_row_1|2.9875695089102074|
|data_6_row_2| 243.9294568371029|
|data_5_row_1| 5.539119959874314|
|data_5_row_2|247.47353834708267|
|data_4_row_1|247.20898590451935|
|data_4_row_2|244.99844367278433|
+------------+------------------+

Detect new 6 files
+-------------+------------------+
|          key|        prediction|
+-------------+------------------+
|data_13_row_1| 9.80349