In [1]:
from bsf_env import init_spark, init_mariadb_engine,set_spark_verbosity
from pyspark.sql.functions import lit, current_timestamp
import pandas as pd
import numpy as np
from pyspark.sql.types import *
from tqdm import tqdm
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from IPython.display import display, HTML
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import joblib
import tempfile
import os
from delta.tables import DeltaTable
import warnings
from statsmodels.tools.sm_exceptions import ConvergenceWarning

warnings.filterwarnings("ignore", category=ConvergenceWarning)

spark = init_spark("bsf_candidates_analysis", log_level="WARN", show_progress=False, enable_ui=True, process_option="wide")
engine = init_mariadb_engine()

ingest_ts = spark.sql("SELECT current_timestamp()").collect()[0][0]

pd.set_option("display.max_columns", None)  # Show all columns
pd.set_option("display.width", 200)         # Adjust width for readability
pd.set_option("display.max_rows", 20)       # Show only top 20 rows by default

from pyspark.sql import functions as F
from pyspark.sql import Window


from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import broadcast

# -----------------------------
# Load tables
# -----------------------------
df_last = spark.table("bsf.history_signals_allcol_last")
df_all = spark.table("bsf.history_signals")

# -----------------------------
# Drop some BS columns
# -----------------------------
cols_to_drop = ["BatchId", "IngestedAt"]
df_last = df_last.drop(*cols_to_drop)
df_all = df_all.drop(*cols_to_drop)

:: loading settings :: url = jar:file:/home/jupyter/.venv/python3.9_bsf/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jupyter/.ivy2/cache
The jars for the packages stored in: /home/jupyter/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b3fe5cf4-6037-4036-86b5-8bbfa6409106;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0rc1 in spark-list
	found io.delta#delta-storage;3.0.0rc1 in spark-list
	found org.antlr#antlr4-runtime;4.9.3 in spark-list
:: resolution report :: resolve 654ms :: artifacts dl 14ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.0.0rc1 from spark-list in [default]
	io.delta#delta-storage;3.0.0rc1 from spark-list in [default]
	org.antlr#antlr4-runtime;4.9.3 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	

[Spark] Started 'bsf_candidates_analysis' log_level=WARN (effective=WARN), progress=False


25/09/20 18:05:14 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/09/20 18:05:14 WARN HiveConf: HiveConf of name hive.metastore.client.connect.timeout does not exist
25/09/20 18:05:14 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/09/20 18:05:15 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/09/20 18:05:15 WARN HiveConf: HiveConf of name hive.metastore.client.connect.timeout does not exist
25/09/20 18:05:15 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/09/20 18:05:18 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/09/20 18:05:18 WARN HiveConf: HiveConf of name hive.metastore.client.connect.timeout does not exist
25/09/20 18:05:18 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/09/20 18:05:21 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [2]:
from pyspark.sql import functions as F

# Check for NaN in TomorrowClose in last-row DF
df_last.filter(F.isnan(F.col("TomorrowClose"))).show(truncate=False)

# Count NaNs per company
df_last.groupBy("CompanyId").agg(
    F.sum(F.isnan(F.col("TomorrowClose")).cast("int")).alias("nan_count")
).show()

# Check in full historical DF
df_all.filter(F.isnan(F.col("TomorrowClose"))).show(truncate=False)


+---------+----------+--------+--------+--------+--------+-----+------+----------+--------------+------------+---------------+---------------+----------------+----------------+----------------+-------------+-------------+-----------+------------+--------------+-----------+-----------+------------------+---------------+----------+-------------+---------+----------+--------+-------+-------------+--------------+--------------+------------------+-------------------+-----+-------+-----------+---------------+------------+------------------+---------------------+---------------------+----------+------------+---------------------+--------------+----------------+--------------------+-------------+--------------+---------------------+----------+------------+----------------+------------------+-----------+---------------------+-----------------+----------------+-----------------------+--------------------+------------------+-----------------+---------------------+--------------+----------------+-

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def check_future_rows(timeframe_dict, dict_name="timeframe"):
    for tf, df in timeframe_dict.items():
        print(f"\n--- Checking {dict_name} '{tf}' ---")
        
        # Add row number per company sorted by date descending
        window_spec = Window.partitionBy("CompanyId").orderBy(F.col("StockDate").desc())
        df_with_rn = df.withColumn("rn", F.row_number().over(window_spec))
        
        # Filter rows where TomorrowClose is NaN
        df_nulls = df_with_rn.filter(F.isnan(F.col("TomorrowClose")))
        
        # Show the rows
        df_nulls.select("CompanyId", "StockDate", "rn").show(truncate=False)
        
        # Sanity check: count per company
        df_nulls.groupBy("CompanyId").count().show()


In [4]:
# -----------------------------
# Filter only Buy actions from last-row DF
# -----------------------------
df_last_buys = df_last.filter(F.col("Action") == "Buy").cache()  # cache because we reuse it

# -----------------------------
# Define ranking window per timeframe
# -----------------------------
w_tf = Window.partitionBy("TimeFrame").orderBy(
    F.desc("ActionConfidence"),
    F.desc("Return")
)

top_n = 60

# -----------------------------
# Rank and select top N Buy companies per timeframe
# -----------------------------
df_ranked_last_top = (
    df_last_buys
    .withColumn("BuyRank", F.row_number().over(w_tf))  # use rank() if you want ties
    .filter(F.col("BuyRank") <= top_n)
    .orderBy(F.col("ActionConfidence").desc(), F.col("BuyRank").asc())
)

# -----------------------------
# Extract top companies per timeframe
# -----------------------------
top_companies = df_ranked_last_top.select("CompanyId", "TimeFrame").distinct().cache()
# -----------------------------
# Filter original last-row DF and full historical DF to include only top Buy companies
# Cache large DataFrames once
# -----------------------------
df_ranked_last_topN = df_last.join(
    broadcast(df_ranked_last_top.select("CompanyId","TimeFrame","BuyRank")),
    on=["CompanyId","TimeFrame"],
    how="inner"
).cache()

df_ranked_all_topN = df_all.join(
    broadcast(df_ranked_last_top.select("CompanyId","TimeFrame","BuyRank")),
    on=["CompanyId","TimeFrame"],
    how="inner"
).cache()




from pyspark.sql import functions as F
from pyspark.sql import DataFrame

# -----------------------------
# List of timeframes
# -----------------------------
timeframes = ["Short", "Swing", "Long", "Daily"]

# -----------------------------
# Dictionaries to store per-timeframe DataFrames
# -----------------------------
timeframe_dfs = {}
timeframe_dfs_all = {}

# -----------------------------
# Efficient per-timeframe splitting
# -----------------------------
for tf in timeframes:
    # Last-row top N for this timeframe
    timeframe_dfs[tf] = df_ranked_last_topN.filter(F.col("TimeFrame") == tf)
    
    # Full historical top N for this timeframe
    timeframe_dfs_all[tf] = df_ranked_all_topN.filter(F.col("TimeFrame") == tf)


print(f"✅ Stage 1 completed: Top {top_n} candidates selected per timeframe")


✅ Stage 1 completed: Top 60 candidates selected per timeframe


In [5]:
check_future_rows(timeframe_dfs, dict_name="last-row topN")
check_future_rows(timeframe_dfs_all, dict_name="full historical topN")



--- Checking last-row topN 'Short' ---
+---------+----------+---+
|CompanyId|StockDate |rn |
+---------+----------+---+
|705      |2025-09-18|1  |
|1633     |2025-09-18|1  |
|1712     |2025-09-18|1  |
|2287     |2025-09-18|1  |
|30304    |2025-09-18|1  |
|30312    |2025-09-18|1  |
|30710    |2025-09-18|1  |
|30866    |2025-09-18|1  |
|30870    |2025-09-18|1  |
|30979    |2025-09-18|1  |
|34109    |2025-09-18|1  |
|34223    |2025-09-18|1  |
|34776    |2025-09-18|1  |
|35533    |2025-09-18|1  |
|35658    |2025-09-18|1  |
|35687    |2025-08-29|1  |
|35694    |2025-09-18|1  |
|35715    |2025-09-18|1  |
|39498    |2025-09-18|1  |
|41835    |2025-09-18|1  |
+---------+----------+---+
only showing top 20 rows

+---------+-----+
|CompanyId|count|
+---------+-----+
|    35715|    1|
|    64303|    1|
|    68568|    1|
|    78988|    1|
|    34776|    1|
|    58874|    1|
|    78306|    1|
|   237696|    1|
|    58410|    1|
|    67927|    1|
|    30710|    1|
|    59490|    1|
|   100645|    1

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window



# Example usage:
check_future_rows(timeframe_dfs, dict_name="last-row topN")
check_future_rows(timeframe_dfs_all, dict_name="full historical topN")



--- Checking last-row topN 'Short' ---
+---------+----------+---+
|CompanyId|StockDate |rn |
+---------+----------+---+
|705      |2025-09-18|1  |
|1633     |2025-09-18|1  |
|1712     |2025-09-18|1  |
|2287     |2025-09-18|1  |
|30304    |2025-09-18|1  |
|30312    |2025-09-18|1  |
|30710    |2025-09-18|1  |
|30866    |2025-09-18|1  |
|30870    |2025-09-18|1  |
|30979    |2025-09-18|1  |
|34109    |2025-09-18|1  |
|34223    |2025-09-18|1  |
|34776    |2025-09-18|1  |
|35533    |2025-09-18|1  |
|35658    |2025-09-18|1  |
|35687    |2025-08-29|1  |
|35694    |2025-09-18|1  |
|35715    |2025-09-18|1  |
|39498    |2025-09-18|1  |
|41835    |2025-09-18|1  |
+---------+----------+---+
only showing top 20 rows

+---------+-----+
|CompanyId|count|
+---------+-----+
|      705|    1|
|    65401|    1|
|    85379|    1|
|     2287|    1|
|    42241|    1|
|    78668|    1|
|    88184|    1|
|    94510|    1|
|    89371|    1|
|    89960|    1|
|    97766|    1|
|   247699|    1|
|    35658|    1

In [7]:
# -----------------------------
# Helper: select best model
# -----------------------------
def select_best_model(metrics_dict, strategy="hybrid"):
    if strategy == "rmse":
        return min(metrics_dict, key=lambda m: metrics_dict[m]["RMSE"])
    elif strategy == "mae":
        return min(metrics_dict, key=lambda m: metrics_dict[m]["MAE"])
    elif strategy == "direction":
        return max(metrics_dict, key=lambda m: metrics_dict[m]["DirectionAcc"])
    elif strategy == "hybrid":
        # DirectionAcc first, RMSE tie-breaker
        return max(metrics_dict, key=lambda m: (metrics_dict[m]["DirectionAcc"], -metrics_dict[m]["RMSE"]))
    else:
        raise ValueError(f"Unknown strategy: {strategy}")


In [8]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def debug_spark_df(df: DataFrame, col_check=None, n=5, name="DataFrame"):
    """
    Helper to inspect a PySpark DataFrame.
    
    Args:
        df: PySpark DataFrame
        col_check: optional column name to count nulls
        n: number of rows to show
        name: optional label for printing
    """
    n_rows = df.count()
    n_cols = len(df.columns)
    print(f"--- Debug {name} ---")
    print(f"Shape: ({n_rows}, {n_cols})")
    
    if col_check and col_check in df.columns:
        null_count = df.filter(F.col(col_check).isNull()).count()
        print(f"Column '{col_check}' nulls: {null_count}")
    
    print(f"Columns: {df.columns}")
    print(f"Sample rows:")
    df.show(n)
    print("--- End Debug ---\n")


In [None]:
# -----------------------------
# Hybrid scoring function
# -----------------------------
def hybrid_score(metrics, w_dir=1.0, w_rmse=0.5, w_mape=0.5):
    """
    Combine multiple metrics: higher DirectionAcc, lower RMSE & MAPE
    """
    return w_dir*metrics["DirectionAcc"] - w_rmse*metrics["RMSE"] - w_mape*metrics["MAPE"]

# -----------------------------
# Pick best model using hybrid score
# -----------------------------
def select_best_model(metrics_dict):
    scores = {name: hybrid_score(metrics_dict[name]) for name in metrics_dict}
    best_name = max(scores, key=scores.get)
    return best_name, scores

In [14]:
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import pandas as pd

# -----------------------------
# Parameters
# -----------------------------
target_stage2 = "TomorrowClose"
epsilon = 1e-8
all_stage2_predictions = []

# -----------------------------
# Loop over timeframes (Pandas)
# -----------------------------
for tf, sdf_tf in timeframe_dfs_all.items():  
    # Convert Spark DF to Pandas once
    df_tf = sdf_tf.toPandas()
    
    companies = df_tf["CompanyId"].unique()
    print(f"=== Phase 2 - Timeframe: {tf} ===")
    
    for cid in companies:
        # Filter by company
        df_c = df_tf[df_tf["CompanyId"] == cid].copy()
        if df_c.empty:
            continue
        
        # 1️⃣ Identify training rows and future rows
        train_df = df_c[df_c[target_stage2].notna()].copy()
        future_df = df_c[df_c[target_stage2].isna()].copy()
        if train_df.empty or future_df.empty:
            continue
        
        # 2️⃣ Log-transform OHLC columns to stabilize variance
        for col in ["Open", "High", "Low", "Close"]:
            df_c[f"log_{col}"] = np.log(df_c[col].replace(0, epsilon))
        
        # 3️⃣ Select numeric & boolean features (excluding target)
        numeric_cols = train_df.select_dtypes(include=[np.number]).columns.difference([target_stage2]).tolist()
        bool_cols = train_df.select_dtypes(include=["bool"]).columns.tolist()
        all_features = numeric_cols + bool_cols
        
        # 4️⃣ Feature correlation with target
        corr = train_df[all_features + [target_stage2]].corr()[target_stage2].abs()
        threshold = 0.03
        good_features = corr[corr >= threshold].drop(target_stage2).index.tolist()
        if not good_features:
            continue
        
        # 5️⃣ Prepare train dataset
        X_train = train_df[good_features].fillna(0)
        y_train = train_df[target_stage2]
        X_future = future_df[good_features].fillna(0)
        
        # -----------------------------
        # 6️⃣ Train models
        # -----------------------------
        models = {
            "Linear": LinearRegression(),
            "Lasso": Lasso(alpha=0.01),
            "Ridge": Ridge(alpha=1.0, solver="svd"),
            "XGBoost": XGBRegressor(n_estimators=50, max_depth=3, learning_rate=0.1, verbosity=0)
        }
        metrics_dict = {}
        for name, model in models.items():
            model.fit(X_train, y_train)
            pred_train = model.predict(X_train)
            rmse = mean_squared_error(y_train, pred_train, squared=False)
            mae = mean_absolute_error(y_train, pred_train)
            mape = np.mean(np.abs((y_train - pred_train) / (y_train + epsilon)))
            direction = np.mean(np.sign(pred_train[1:] - pred_train[:-1]) == np.sign(y_train.values[1:] - y_train.values[:-1]))
            r2 = r2_score(y_train, pred_train)
            k = X_train.shape[1]
            n = len(y_train)
            adj_r2 = 1 - ((1 - r2) * (n - 1) / (n - k - 1)) if n - k - 1 != 0 else 0
            metrics_dict[name] = {"RMSE": rmse, "MAE": mae, "MAPE": mape, "DirectionAcc": direction, "R2": r2, "AdjR2": adj_r2}
        
        # 7️⃣ Pick best model
        #best_name = max(metrics_dict, key=lambda m: hybrid_score(metrics_dict[m]))
        #best_model = models[best_name]
        
        # Pick best model with hybrid score
        best_name, scores = select_best_model(metrics_dict)
        best_model = models[best_name]

        
        # 8️⃣ Predict future rows
        for name, model in models.items():
            future_df[f"Pred_{name}"] = model.predict(X_future)
        
        # Weighted ensemble (inverse RMSE)
        total_inv = sum(1 / metrics_dict[m]["RMSE"] for m in metrics_dict)
        weights = {m: (1 / metrics_dict[m]["RMSE"]) / total_inv for m in metrics_dict}
        future_df["Pred_Sklearn"] = sum(future_df[f"Pred_{m}"] * w for m, w in weights.items())
        
        # Predicted return
        if "Close" in future_df.columns:
            future_df["PredictedReturn_Sklearn"] = (future_df["Pred_Sklearn"] - future_df["Close"]) / future_df["Close"]
        
        # Add identifiers & best model info
        future_df["TimeFrame"] = tf
        future_df["CompanyId"] = cid
        future_df["BestModel"] = best_name
        future_df["BestModel_RMSE"] = metrics_dict[best_name]["RMSE"]
        future_df["BestModel_MAPE"] = metrics_dict[best_name]["MAPE"]
        future_df["BestModel_DirAcc"] = metrics_dict[best_name]["DirectionAcc"]
        
        all_stage2_predictions.append(future_df)

# -----------------------------
# Combine all Stage 2 predictions
# -----------------------------
if all_stage2_predictions:
    stage2_df = pd.concat(all_stage2_predictions, ignore_index=True)
else:
    raise ValueError("No Stage 2 predictions generated!")


=== Phase 2 - Timeframe: Short ===
=== Phase 2 - Timeframe: Swing ===
=== Phase 2 - Timeframe: Long ===
=== Phase 2 - Timeframe: Daily ===


from sklearn.linear_model import LinearRegression, Lasso, Ridge
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Row

# -----------------------------
# Parameters
# -----------------------------
target_stage2 = "TomorrowClose"
epsilon = 1e-8
all_stage2_predictions = []

# -----------------------------
# Loop over timeframes (Spark-native per company)
# -----------------------------
for tf, sdf_tf in timeframe_dfs_all.items():  
    companies = [r.CompanyId for r in sdf_tf.select("CompanyId").distinct().collect()]
    print(f"=== Phase 2 - Timeframe: {tf} ===")

    # -----------------------------
    # Phase 2 loop (per company & timeframe)
    # -----------------------------
    for cid in companies:
        # Use the last-row topN for this company & timeframe
        df_c = sdf_tf.filter(F.col("CompanyId") == cid).toPandas()
        if df_c.empty:
            continue
    
        # Log-transform OHLC columns to stabilize variance
        for col in ["Open","High","Low","Close"]:
            df_c[f"log_{col}"] = np.log(df_c[col].replace(0, epsilon))
    
        # Prepare training data
        train_df = df_c[df_c[target_stage2].notna()].copy()
        if train_df.empty:
            continue
    
        # 1️⃣ Select numeric & boolean features (excluding target)
        numeric_cols = train_df.select_dtypes(include=[np.number]).columns.difference([target_stage2]).tolist()
        bool_cols = train_df.select_dtypes(include=["bool"]).columns.tolist()
        all_features = numeric_cols + bool_cols
        
        # 2️⃣ Compute correlations with target
        corr = train_df[all_features + [target_stage2]].corr()[target_stage2].abs()
        
        # 3️⃣ Keep features above threshold (exclude target explicitly)
        threshold = 0.03
        good_features = corr[corr >= threshold].drop(target_stage2).index.tolist()
            
        # Prepare train dataset (already Pandas)
        X_train = train_df[good_features].fillna(0)
        y_train = train_df[target_stage2]

        # -----------------------------
        # Identify future rows
        # -----------------------------
        #future_df = df_c.filter(F.col(target_stage2).isNull() | F.isnan(F.col(target_stage2)))
        future_df = df_c[df_c[target_stage2].isna()]
        
        if future_df.empty:
            print(f"⚠️ future_df is empty for {cid}-{tf}")
            continue


        # Convert future_df to Pandas for sklearn prediction
        X_future = future_df.toPandas()
        
        # Ensure all training features are present
        missing_cols = [col for col in X_train.columns if col not in X_future.columns]
        
        if missing_cols:
            zeros_df = pd.DataFrame(0, index=X_future.index, columns=missing_cols)
            X_future = pd.concat([X_future, zeros_df], axis=1)
        
        # Reorder columns to match training
        X_future = X_future[X_train.columns]


        # -----------------------------
        # Train models
        # -----------------------------
        models = {
            "Linear": LinearRegression(),
            "Lasso": Lasso(alpha=0.01),
            "Ridge": Ridge(alpha=1.0, solver="svd"),
            "XGBoost": XGBRegressor(n_estimators=50, max_depth=3, learning_rate=0.1, verbosity=0)
        }

        for model in models.values():
            model.fit(X_train, y_train)

        # -----------------------------
        # Compute training metrics
        # -----------------------------
        metrics_dict = {}
        for name, model in models.items():
            pred = model.predict(X_train)
            rmse = mean_squared_error(y_train, pred, squared=False)
            mae = mean_absolute_error(y_train, pred)
            mape = np.mean(np.abs((y_train - pred) / (y_train + epsilon)))
            direction = np.mean(np.sign(pred[1:] - pred[:-1]) == np.sign(y_train.values[1:] - y_train.values[:-1]))
            r2 = r2_score(y_train, pred)
            k = X_train.shape[1]
            n = len(y_train)
            adj_r2 = 1 - ((1 - r2) * (n - 1) / (n - k - 1)) if n - k - 1 != 0 else 0
            metrics_dict[name] = {"RMSE": rmse, "MAE": mae, "MAPE": mape, "DirectionAcc": direction, "R2": r2, "AdjR2": adj_r2}

        # -----------------------------
        # Pick best model
        # -----------------------------
        best_name = select_best_model(metrics_dict, strategy="hybrid")
        best_model = models[best_name]

        # -----------------------------
        # Make predictions for future
        # -----------------------------
        preds_rows = [
            Row(**{f"Pred_{name}": float(model.predict(X_future)[i]) for name, model in models.items()})
            for i in range(len(X_future))
        ]
        preds_sdf = spark.createDataFrame(preds_rows)

        # Add row index to future_df for joining
        future_df = future_df.withColumn("row_idx", F.monotonically_increasing_id())
        preds_sdf = preds_sdf.withColumn("row_idx", F.monotonically_increasing_id())

        # Join predictions
        future_df = future_df.join(preds_sdf, on="row_idx").drop("row_idx")

        # Weighted ensemble
        total_inv = sum(1 / metrics_dict[m]["RMSE"] for m in metrics_dict)
        weights = {m: (1 / metrics_dict[m]["RMSE"]) / total_inv for m in metrics_dict}
        future_df = future_df.withColumn(
            "Pred_Sklearn",
            sum(F.col(f"Pred_{m}") * float(w) for m, w in weights.items())
        )

        # Predicted return if Close exists
        if "Close" in future_df.columns:
            future_df = future_df.withColumn("PredictedReturn_Sklearn", (F.col("Pred_Sklearn") - F.col("Close")) / F.col("Close"))

        # -----------------------------
        # Store best model info
        # -----------------------------
        future_df = future_df.withColumn("BestModel", F.lit(best_name))
        future_df = future_df.withColumn("BestModel_RMSE", F.lit(metrics_dict[best_name]["RMSE"]))
        future_df = future_df.withColumn("BestModel_MAPE", F.lit(metrics_dict[best_name]["MAPE"]))
        future_df = future_df.withColumn("BestModel_DirAcc", F.lit(metrics_dict[best_name]["DirectionAcc"]))
        future_df = future_df.withColumn("TimeFrame", F.lit(tf))
        future_df = future_df.withColumn("CompanyId", F.lit(cid))

        # Optional: store all metrics per model
        for name, m in metrics_dict.items():
            for metric_name, metric_value in m.items():
                future_df = future_df.withColumn(f"{name}_{metric_name}", F.lit(metric_value))

        # Append to stage2 predictions
        all_stage2_predictions.append(future_df.toPandas())

        print(f"✅ Best model for {cid}-{tf}: {best_name} | RMSE={metrics_dict[best_name]['RMSE']:.6f}, MAPE={metrics_dict[best_name]['MAPE']:.4f}, DirAcc={metrics_dict[best_name]['DirectionAcc']:.3f}")


from sklearn.linear_model import LinearRegression, Lasso, Ridge
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd

# -----------------------------
# Parameters
# -----------------------------
target_stage2 = "TomorrowClose"
epsilon = 1e-8
all_stage2_predictions = []

# -----------------------------
# Loop over timeframes (Spark-native per company)
# -----------------------------
for tf, sdf_tf in timeframe_dfs_all.items():  
    #companies = sdf_tf.select("CompanyId").distinct().rdd.flatMap(lambda x: x).collect()
    companies = [r.CompanyId for r in sdf_tf.select("CompanyId").distinct().collect()]

    print(f"=== Phase 2 - Timeframe: {tf} ===")


    # -----------------------------
    # Phase 2 loop (per company & timeframe)
    # -----------------------------
    for cid in companies:
        for tf, df_c in timeframe_dfs.items():

            # 1️⃣ Prepare train & future sets
            X_train = train_df[good_features].fillna(0)
            y_train = train_df[target_stage2]
            #future_df = df_c[df_c[target_stage2].isna()].copy()
            #print(f"Company {cid}, TimeFrame {tf}: total rows={len(df_c)}, missing target_stage2={df_c[target_stage2].isna().sum()}")

            #future_df = df_c.filter(F.col(target_stage2).isNull())
            # Before filtering future_df
            #debug_spark_df(df_c, col_check=target_stage2, name=f"Company {cid} TimeFrame {tf}")

            #future_df = df_c[df_c[target_stage2].isna()].copy()
            future_df = df_c.filter(F.col(target_stage2).isNull() | F.isnan(F.col(target_stage2)))

            #debug_spark_df(future_df, col_check=target_stage2, name=f"Future rows {cid}-{tf}")

            # Option 1: using rdd.isEmpty()
            if future_df.rdd.isEmpty():
                print(f"⚠️ future_df is empty for {cid}-{tf}")
                continue
                
            #X_future = future_df[good_features].fillna(0)
            from pyspark.sql import functions as F

            X_future = future_df.select(*good_features).fillna(0)

            
            # 2️⃣ Train models
            models = {
                "Linear": LinearRegression(),
                "Lasso": Lasso(alpha=0.01),
                "Ridge": Ridge(alpha=1.0, solver="svd"),
                "XGBoost": XGBRegressor(n_estimators=50, max_depth=3, learning_rate=0.1, verbosity=0)
            }
            for model in models.values():
                model.fit(X_train, y_train)
            
            # 3️⃣ Compute training metrics
            metrics_dict = {}
            preds_train = {}
            for name, model in models.items():
                pred = model.predict(X_train)
                preds_train[name] = pred
                rmse = mean_squared_error(y_train, pred, squared=False)
                mae = mean_absolute_error(y_train, pred)
                mape = np.mean(np.abs((y_train - pred) / (y_train + 1e-8)))
                direction = np.mean(np.sign(pred[1:] - pred[:-1]) == np.sign(y_train.values[1:] - y_train.values[:-1]))
                r2 = r2_score(y_train, pred)
                k = X_train.shape[1]
                n = len(y_train)
                adj_r2 = 1 - ((1 - r2) * (n - 1) / (n - k - 1)) if n - k - 1 != 0 else 0
                metrics_dict[name] = {
                    "RMSE": rmse, "MAE": mae, "MAPE": mape,
                    "DirectionAcc": direction, "R2": r2, "AdjR2": adj_r2
                }
    
            # 4️⃣ Pick best model based on hybrid strategy
            best_name = select_best_model(metrics_dict, strategy="hybrid")
            best_model = models[best_name]
    
            # 5️⃣ Predict future rows
            for name, model in models.items():
                future_df[f"Pred_{name}"] = model.predict(X_future)
            
            # Weighted ensemble (inverse RMSE)
            total_inv = sum(1 / metrics_dict[m]["RMSE"] for m in metrics_dict)
            weights = {m: (1 / metrics_dict[m]["RMSE"]) / total_inv for m in metrics_dict}
            future_df["Pred_Sklearn"] = sum(future_df[f"Pred_{m}"] * w for m, w in weights.items())
    
            # Predicted return if Close exists
            if "Close" in future_df.columns:
                future_df["PredictedReturn_Sklearn"] = (future_df["Pred_Sklearn"] - future_df["Close"]) / future_df["Close"]
    
            # -----------------------------
            # 6️⃣ Store best model info + debug
            # -----------------------------
            try:
                # Store best model info
                future_df["BestModel"] = best_name
                future_df["BestModel_RMSE"] = metrics_dict[best_name]["RMSE"]
                future_df["BestModel_MAPE"] = metrics_dict[best_name]["MAPE"]
                future_df["BestModel_DirAcc"] = metrics_dict[best_name]["DirectionAcc"]
            
                # Optional: store all model metrics
                for name, metrics in metrics_dict.items():
                    for metric_name, metric_value in metrics.items():
                        future_df[f"{name}_{metric_name}"] = metric_value
            
                # Add identifiers
                future_df["TimeFrame"] = tf
                future_df["CompanyId"] = cid
            
                # Debug checks before append
                if future_df.empty:
                    print(f"⚠️ Warning: future_df is empty for {cid}-{tf}")
                else:
                    print(f"future_df shape: {future_df.shape}")
                    print(f"Columns: {future_df.columns.tolist()}")
                    print(f"Sample rows:\n{future_df.head()}")
            
                # Append to stage 2 predictions
                all_stage2_predictions.append(future_df)
            
                # ✅ Debug print best model summary
                print(f"✅ Best model for {cid}-{tf}: {best_name} | "
                      f"RMSE={metrics_dict[best_name]['RMSE']:.6f}, "
                      f"MAPE={metrics_dict[best_name]['MAPE']:.4f}, "
                      f"DirAcc={metrics_dict[best_name]['DirectionAcc']:.3f}")
            
            except Exception as e:
                print(f"❌ Error storing/appending future_df for {cid}-{tf}: {e}")





from sklearn.linear_model import LinearRegression, Lasso, Ridge
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd

# -----------------------------
# Parameters
# -----------------------------
target_stage2 = "TomorrowClose"
epsilon = 1e-8
all_stage2_predictions = []

# -----------------------------
# Loop over timeframes (Spark-native per company)
# -----------------------------
for tf, sdf_tf in timeframe_dfs_all.items():  
    #companies = sdf_tf.select("CompanyId").distinct().rdd.flatMap(lambda x: x).collect()
    companies = [r.CompanyId for r in sdf_tf.select("CompanyId").distinct().collect()]

    print(f"=== Phase 2 - Timeframe: {tf} ===")

    for i, cid in enumerate(companies, start=1):
        # Filter single company and convert to Pandas
        df_c = sdf_tf.filter(F.col("CompanyId") == cid).toPandas()
        if df_c.empty:
            continue

        # Log-transform OHLC columns to stabilize variance
        for col in ["Open","High","Low","Close"]:
            df_c[f"log_{col}"] = np.log(df_c[col].replace(0, epsilon))

        # Prepare training data
        train_df = df_c[df_c[target_stage2].notna()].copy()
        if train_df.empty:
            continue

        # 1️⃣ Select numeric & boolean features (excluding target)
        numeric_cols = train_df.select_dtypes(include=[np.number]).columns.difference([target_stage2]).tolist()
        bool_cols = train_df.select_dtypes(include=["bool"]).columns.tolist()
        all_features = numeric_cols + bool_cols
        
        # 2️⃣ Compute correlations with target
        corr = train_df[all_features + [target_stage2]].corr()[target_stage2].abs()
        
        # 3️⃣ Keep features above threshold (exclude target explicitly)
        threshold = 0.03
        good_features = corr[corr >= threshold].drop(target_stage2).index.tolist()


        # -----------------------------
        # Prepare training data
        # -----------------------------
        X_train = train_df[good_features].fillna(0)
        y_train = train_df[target_stage2]
        
        # Optional: fill future and test features consistently
        X_test_filled = X_test.fillna(0) if 'X_test' in locals() else None
        X_future_filled = X_future.fillna(0) if 'X_future' in locals() else None
        
        # -----------------------------
        # Initialize models
        # -----------------------------
        models = {
            "Linear": LinearRegression(),
            "Lasso": Lasso(alpha=0.01),
            "Ridge": Ridge(alpha=1.0, solver="svd"),
            "XGBoost": XGBRegressor(n_estimators=50, max_depth=3, learning_rate=0.1, verbosity=0)
        }
        
        # -----------------------------
        # 1️⃣ Fit all models
        # -----------------------------
        for model in models.values():
            model.fit(X_train, y_train)
        
        # -----------------------------
        # 2️⃣ Compute training metrics manually
        # -----------------------------
        metrics_dict = {}
        preds_train = {}
        
        for name, model in models.items():
            y_pred = model.predict(X_train)
            preds_train[name] = y_pred
            
            # Ensure arrays
            y_true_arr = np.nan_to_num(np.array(y_train).flatten())
            y_pred_arr = np.nan_to_num(np.array(y_pred).flatten())
            
            n = len(y_true_arr)
            k = X_train.shape[1]
            
            # MAE
            mae = np.mean(np.abs(y_true_arr - y_pred_arr))
            
            # MSE
            mse = np.mean((y_true_arr - y_pred_arr)**2)
            
            # RMSE
            rmse = np.sqrt(mse)
            
            # R2
            ss_res = np.sum((y_true_arr - y_pred_arr)**2)
            ss_tot = np.sum((y_true_arr - np.mean(y_true_arr))**2)
            r2 = 1 - ss_res/ss_tot if ss_tot != 0 else 0
            
            # Adjusted R2
            adj_r2 = 1 - (1 - r2) * (n - 1) / (n - k - 1) if n - k - 1 > 0 else 0
            
            # MAPE (avoid division by zero)
            mape = np.mean(np.abs((y_true_arr - y_pred_arr) / (y_true_arr + 1e-8)))
            
            # Directional accuracy
            direction = np.mean(np.sign(y_pred_arr[1:] - y_pred_arr[:-1]) == np.sign(y_true_arr[1:] - y_true_arr[:-1]))
            
            metrics_dict[name] = {
                "MAE": mae,
                "MSE": mse,
                "RMSE": rmse,
                "R2": r2,
                "AdjR2": adj_r2,
                "MAPE": mape,
                "DirectionAcc": direction
            }
            
            #print(f"company: {cid} model: {name} | rmse: {rmse:.6f}, mae: {mae:.6f}, mape: {mape:.6f}, dir_acc: {direction:.4f}, r2: {r2:.4f}, adj_r2: {adj_r2:.4f}")
        
        # -----------------------------
        # 3️⃣ Pick best model (lowest RMSE)
        # -----------------------------
        epsilon = 1e-8
        best_name = min(metrics_dict, key=lambda m: metrics_dict[m]["RMSE"])
        best_name = max(metrics_dict, key=lambda m: metrics_dict[m]["DirectionAcc"])
        best_name = max(
            metrics_dict,
            key=lambda m: (metrics_dict[m]["DirectionAcc"], -metrics_dict[m]["RMSE"])
        )

        best_model = models[best_name]
        #print(f"Best model for company {cid}: {best_name} (RMSE: {metrics_dict[best_name]['RMSE']:.6f})")
        
        # -----------------------------
        # 4️⃣ Compute ensemble weights (inverse RMSE, safe)
        # -----------------------------
        total_inv = sum(1/(metrics_dict[m]["RMSE"] + epsilon) for m in metrics_dict)
        weights = {m: (1/(metrics_dict[m]["RMSE"] + epsilon))/total_inv for m in metrics_dict}
        #print(f"Ensemble weights: {weights}")
        
        # -----------------------------
        # 5️⃣ Predict future rows
        # -----------------------------
        # -----------------------------
        # 5️⃣ Predict future rows
        # -----------------------------
        future_df = df_c[df_c[target_stage2].isna()].copy()
        
        if not future_df.empty:
            # 1️⃣ Align future features to training features
            X_future = future_df.copy()
            for col in good_features:
                if col not in X_future.columns:
                    X_future[col] = 0
            X_future = X_future[good_features].fillna(0)
        
            # 2️⃣ Predict each model
            for name, model in models.items():
                future_df[f"Pred_{name}"] = model.predict(X_future)
        
            # 3️⃣ Weighted ensemble
            future_df["Pred_Sklearn"] = sum(future_df[f"Pred_{m}"] * w for m, w in weights.items())
        
            # 4️⃣ Compute predicted return if 'Close' exists
            if "Close" in future_df.columns:
                future_df["PredictedReturn_Sklearn"] = (future_df["Pred_Sklearn"] - future_df["Close"]) / future_df["Close"]
        
            # 5️⃣ Best model based on RMSE
            best_model = min(metrics_dict, key=lambda m: metrics_dict[m]["RMSE"])
            future_df["BestModel"] = best_model
            future_df["BestModel_RMSE"] = metrics_dict[best_model]["RMSE"]
            future_df["BestModel_MAPE"] = metrics_dict[best_model]["MAPE"]
            future_df["BestModel_DirAcc"] = metrics_dict[best_model]["DirectionAcc"]
        
            # 6️⃣ Optional: store all model metrics for reference
            if not future_df.empty:
                for model_name, metrics in metrics_dict.items():
                    for metric_name, metric_value in metrics.items():
                        future_df[f"{model_name}_{metric_name}"] = metric_value


        
            # 7️⃣ Add identifiers
            future_df["TimeFrame"] = tf
            future_df["CompanyId"] = cid
        
            # 8️⃣ Append to Stage 2 predictions
            all_stage2_predictions.append(future_df)
        else:
            print(f"⚠️ No future rows to predict for {cid}-{tf}")



In [15]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -----------------------------
# Parameters
# -----------------------------
top_n_phase2 = 30  # number of top candidates per timeframe

# -----------------------------
# Combine all Stage 2 predictions into a single Pandas DF
# -----------------------------
if all_stage2_predictions:
    stage2_df = pd.concat(all_stage2_predictions, ignore_index=True)
else:
    raise ValueError("No Stage 2 predictions generated!")


# -----------------------------
# Prepare dict to hold top candidates per timeframe
# -----------------------------
from pyspark.sql import functions as F

phase2_top_dfs = {}

for tf, sdf_tf in timeframe_dfs_all.items():
    # Filter only the top N from your ML future predictions
    stage2_tf_df = pd.concat([df for df in all_stage2_predictions if df["TimeFrame"].iloc[0] == tf], ignore_index=True)
    
    if stage2_tf_df.empty:
        continue
    
    # Pick top N by MaxPredictedReturn
    stage2_tf_top = stage2_tf_df.sort_values("PredictedReturn_Sklearn", ascending=False).head(top_n_phase2)
    
    # Convert only new columns to Spark DF
    new_cols = ["CompanyId", "TimeFrame", "Pred_Sklearn", "PredictedReturn_Sklearn", 
                "BestModel", "BestModel_RMSE", "BestModel_MAPE", "BestModel_DirAcc"]
    
    spark_stage2_new = spark.createDataFrame(stage2_tf_top[new_cols])
    
    # Join **only these new columns** to your original sdf_tf
    sdf_enriched = sdf_tf.join(F.broadcast(spark_stage2_new),
                               on=["CompanyId", "TimeFrame"],
                               how="left")
    
    # Optional: add Phase2 rank
    # -----------------------------
    # Rank only the new predictions
    # -----------------------------
    from pyspark.sql.window import Window
    import pyspark.sql.functions as F
    
    window_tf = Window.partitionBy("TimeFrame").orderBy(F.desc("PredictedReturn_Sklearn"))
    spark_stage2_new = spark_stage2_new.withColumn("Phase2_Rank", F.row_number().over(window_tf))
    
    # -----------------------------
    # Join back to your historical dataframe
    # -----------------------------
    sdf_enriched = sdf_tf.join(F.broadcast(spark_stage2_new),
                               on=["CompanyId", "TimeFrame"],
                               how="left")



    
    phase2_top_dfs[tf] = sdf_enriched


# -----------------------------
# Sanity check
# -----------------------------
#for tf, sdf in phase2_top_dfs.items():
    # print(f"{tf}: {sdf.count()} rows, columns: {sdf.columns}")
    # print(f"{tf}: {sdf.count()} rows")
    
print(f"✅ Stage 2 completed: Top {top_n_phase2} candidates selected per timeframe")




✅ Stage 2 completed: Top 30 candidates selected per timeframe


In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX

# -----------------------------
# Parameters
# -----------------------------
top_n_final = 10
sarimax_order = (1,0,0)
sarimax_seasonal_order = (0,0,0,0)
epsilon = 1e-6
ml_weight = 0.6
sarimax_weight = 0.4

forecast_steps_map = {
    "Daily": 1,
    "Short": 3,
    "Swing": 5,
    "Long": 10
}

import numpy as np
from statsmodels.tsa.stattools import acf

def infer_season_length(ts, max_lag=30, threshold=0.3):
    """
    Infer seasonal period `m` from autocorrelation.

    Parameters:
    -----------
    ts : pandas.Series
        Time series values
    max_lag : int
        Maximum lag to inspect for autocorrelation
    threshold : float
        Minimum autocorrelation to consider a peak as seasonal

    Returns:
    --------
    m : int
        Estimated seasonal period
    """
    ts = ts.dropna()
    if len(ts) < 2:
        return 1  # not enough data to infer
    
    acf_vals = acf(ts, nlags=max_lag, fft=True)
    
    # Ignore lag 0
    acf_vals[0] = 0
    
    # Find first lag where autocorrelation exceeds threshold
    peaks = np.where(acf_vals > threshold)[0]
    
    if len(peaks) == 0:
        return 1  # no strong seasonality detected
    
    # Choose the first peak as seasonal period
    m = int(peaks[0])
    return m


import pmdarima as pm

# -----------------------------
# Phase 3: Loop over companies per timeframe
# -----------------------------
phase3_results = []

for tf, sdf_tf in phase2_top_dfs.items():
    print(f"=== Phase 3 - Timeframe: {tf} ===")

    # Collect companies
    companies = sdf_tf.select("CompanyId").distinct().rdd.flatMap(lambda x: x).collect()
    forecast_horizon = forecast_steps_map.get(tf, 1)
    
    for cid in companies:
        # Filter Spark DF once, convert to Pandas
        df_c = sdf_tf.filter(F.col("CompanyId") == cid).orderBy("StockDate").toPandas()
        if df_c.empty:
            continue
        
        # -----------------------------
        # SARIMAX Forecast
        # -----------------------------
        y = df_c["Close"].replace(0, epsilon)

        last_close = y.iloc[-1]

        try:
            #auto_model = fit_auto_arima(y, seasonal=True, m=7)  # m=7 for weekly seasonality on daily data
            m = infer_season_length(y, max_lag=30, threshold=0.3)
            '''
            auto_model = pm.auto_arima(
                y,
                start_p=0, start_q=0,  
                max_p=2, max_q=2,      # keep small since your system has 6GB RAM (3,3)
                d=None,                # let auto_arima decide
                start_P=0, start_Q=0,  
                max_P=1, max_Q=1,      # was (2,2)
                D=None,
                m=m,                   # season length (e.g. 7 = weekly seasonality for daily data)
                seasonal=True,
                stepwise=True,         # faster stepwise search
                suppress_warnings=True,
                error_action="ignore", # continue even if a model fails
                trace=False             # show models it tries (True)
            )
            '''


            
            auto_model = pm.auto_arima(
                y,
                start_p=0, start_q=0,
                max_p=2, max_q=2,      # smaller max orders → fewer models
                d=None,                # let auto_arima decide
                start_P=0, start_Q=0,
                max_P=1, max_Q=1,      # smaller seasonal orders
                D=None,
                m=m,                   # seasonal length
                seasonal=True,
                stepwise=True,         # enable stepwise search (faster than full search)
                max_order=3,           # sum of p+q+P+Q ≤ 3 → reduces combinations
                max_d=2,               # restrict differencing search
                max_D=1,               # restrict seasonal differencing
                n_jobs=1,              # parallel jobs if supported
                suppress_warnings=True,
                error_action="ignore",
                trace=False            # disable verbose output
            )
          
            if auto_model is not None:
                # Extract best orders found by auto_arima
                sarimax_order = auto_model.order
                sarimax_seasonal_order = auto_model.seasonal_order
                # Log the AIC
                #print(f"Best auto_arima model: {auto_model.summary()}")
                #print(f"AIC: {auto_model.aic()}")
                aic=auto_model.aic()
                mltype="automl"
            else:
                sarimax_order = (1,0,0)
                sarimax_seasonal_order = (0,0,0,0)
                aic=0
                mltype="sarimax"

                
            model = SARIMAX(y,
                            order=sarimax_order,
                            seasonal_order=sarimax_seasonal_order,
                            enforce_stationarity=False,
                            enforce_invertibility=False)
            sarimax_res = model.fit(disp=False)
            forecast = sarimax_res.get_forecast(steps=forecast_horizon)
            pred_price = forecast.predicted_mean.iloc[-1]
            last_close = y.iloc[-1]
            sarimax_return = (pred_price - last_close) / last_close
        except Exception as e:
            print(f"⚠️ SARIMAX failed for {cid}-{tf}: {e}")
            pred_price = last_close
            sarimax_return = 0.0
        
        # -----------------------------
        # ML Prediction from existing Phase 2 columns
        # -----------------------------
        ml_return = df_c["MaxPredictedReturn"].iloc[0] if "MaxPredictedReturn" in df_c.columns else 0.0
        
        # -----------------------------
        # Weighted score
        # -----------------------------
        weighted_score = ml_weight * ml_return + sarimax_weight * sarimax_return
        
        # -----------------------------
        # Store enriched data
        # -----------------------------
        df_c["SARIMAX_PredictedClose"] = pred_price
        df_c["SARIMAX_PredictedReturn"] = sarimax_return
        df_c["WeightedScore"] = weighted_score
        df_c["AIC"] = aic
        df_c["MlType"] = mltype
        phase3_results.append(df_c)


=== Phase 3 - Timeframe: Short ===


In [None]:

# -----------------------------
# Combine all companies/timeframes
# -----------------------------
df_phase3_full = pd.concat(phase3_results, ignore_index=True)
df_phase3_spark = spark.createDataFrame(df_phase3_full)

# -----------------------------
# Enrich with company info
# -----------------------------
sdf_company = spark.table("bsf.company").select("CompanyId","TradingSymbol","Name")
df_phase3_enriched = df_phase3_spark.join(F.broadcast(sdf_company), on="CompanyId", how="left")

# 3️⃣ Save to Delta
table='bsf.final_enriched'
spark.sql(f"DROP TABLE IF EXISTS {table}")
# Correct save
df_phase3_enriched.write.format("delta").mode("overwrite").saveAsTable(f"{table}")



# 1️⃣ Keep only the latest row per company + timeframe
window_last = Window.partitionBy("CompanyId", "TimeFrame").orderBy(F.desc("StockDate"))
df_latest_per_company = (
    df_phase3_enriched
    .withColumn("rn", F.row_number().over(window_last))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

# 2️⃣ Rank companies by WeightedScore per timeframe
window_tf = Window.partitionBy("TimeFrame").orderBy(F.desc("WeightedScore"))
df_topN_companies = (
    df_latest_per_company
    .withColumn("Phase3_Rank", F.row_number().over(window_tf))
    .filter(F.col("Phase3_Rank") <= top_n_final)
    .drop("Phase3_Rank")
)

# 3️⃣ Save to Delta
table='bsf.final_enriched_selected'
spark.sql(f"DROP TABLE IF EXISTS {table}")
# Correct save
df_topN_companies.write.format("delta").mode("overwrite").saveAsTable(f"{table}")

df_topN_companies.toPandas().to_csv(
    "/srv/lakehouse/files/top_candidates_output.csv",
    index=False
)

# -----------------------------
# Optional: Create dict by timeframe for plotting
# -----------------------------
phase3_top_dfs = {
    tf: df_phase3_enriched.filter(F.col("TimeFrame") == tf)
    for tf in df_phase3_enriched.select("TimeFrame").distinct().rdd.flatMap(lambda x: x).collect()
}
# -----------------------------
# Optional: show counts
# -----------------------------
for tf in timeframes:
    print(f"{tf}: Final top N = {phase3_top_dfs[tf].count()}")

print(f"✅ Stage 3 completed: Latest rows per company + top {top_n_final} candidates selected per timeframe")

In [None]:
# Read Delta table
df_latest_spark = spark.table("bsf.top_buys_enriched_latest")

# Convert to Pandas
df_latest_pdf = df_latest_spark.toPandas()

# Option 1: Show all records (careful if large)
#print(df_latest_pdf)

# Option 2: Show grouped by timeframe
#for tf, group in df_latest_pdf.groupby("TimeFrame"):
#    print(f"\n=== TimeFrame: {tf} ===")
#    display(group.sort_values("WeightedScore", ascending=False))
print(f"✅ Stage 4 completed: Confirm write to delta table!!")

In [None]:
spark.stop()