In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from tensorflow import keras
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
import shap

In [2]:
df = pd.read_csv('final_engineered.csv')

df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values(['Ticker', 'Date'])

# Define feature and target columns
target_col = 'Net Income'
# Select all numeric columns as features, excluding the target and identifiers
feature_cols = df.select_dtypes(include=np.number).columns.drop(target_col)

In [8]:
X, y = [], []
n_past = 4 # Use the past 4 rows (quarters) to predict the 5th

# Group by each company's ticker to create sequences
for ticker, group in df.groupby('Ticker'):
    features = group[feature_cols].values
    target = group[target_col].values
    
    # Ensure there's enough data for at least one window
    if len(group) > n_past:
        for i in range(n_past, len(group)):
            # The past 4 rows of features
            X.append(features[i-n_past:i, :])
            # The 'Net Income' of the current row (the one we want to predict)
            y.append(target[i])

X = np.array(X)
y = np.array(y)

print(f"\nOriginal X shape (Samples, Timesteps, Features): {X.shape}")
print(f"Original y shape (Samples,): {y.shape}")


Original X shape (Samples, Timesteps, Features): (238, 4, 25)
Original y shape (Samples,): (238,)


In [10]:
# Split data before scaling
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# --- 4. Scale the Data ---
# Scaler for features
scaler_X = MinMaxScaler()
n_samples, timesteps, n_features = X.shape
# n_test_samples = X_test.shape[0]

# Reshape for scaling: (samples * timesteps, features)
X_reshaped = X.reshape(-1, n_features)
# X_test_reshaped = X_test.reshape(-1, n_features)

# Fit on training data and transform both train and test
X_scaled_reshaped = scaler_X.fit_transform(X_reshaped)
# X_test_scaled_reshaped = scaler_X.transform(X_test_reshaped)

# Reshape back to (samples, timesteps, features)
X_scaled = X_scaled_reshaped.reshape(n_samples, timesteps, n_features)
# X_test_scaled = X_test_scaled_reshaped.reshape(n_test_samples, timesteps, n_features)

# Scaler for the target variable (y)
scaler_y = MinMaxScaler()
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))
# y_test_scaled = scaler_y.transform(y_test.reshape(-1, 1))

In [12]:
# This is the key step for the FCN approach
X_flat = X_scaled.reshape(n_samples, timesteps * n_features)
# X_test_flat = X_test_scaled.reshape(n_test_samples, timesteps * n_features)

print(f"\nFlattened X_train shape for FCN input: {X_flat.shape}")
# print(f"Flattened X_test shape for FCN input: {X_test_flat.shape}")


Flattened X_train shape for FCN input: (238, 100)


In [14]:
model = keras.models.load_model('my_model.keras')
predictions = model.predict(X_flat)

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step


In [15]:
predictions_actual = scaler_y.inverse_transform(predictions)
# print(predictions_actual)

In [16]:
# mae = mean_absolute_error(y, predictions_actual)
# mse = mean_squared_error(y, predictions_actual)
# rmse = np.sqrt(mse)

# print(f"Mean Absolute Error (MAE): {mae:e}")
# print(f"Mean Squared Error (MSE): {mse:e}")
# print(f"Root Mean Squared Error (RMSE): {rmse:e}")


# # Plotting
# plt.style.use('seaborn-v0_8-whitegrid')
# fig, ax = plt.subplots(figsize=(8, 8))
# ax.scatter(y, predictions_actual, alpha=0.7, edgecolors='k')
# ax.plot([y.min(), y.max()], [y.min(), y.max()], 'r--', lw=2)
# ax.set_xlabel('Actual Values', fontsize=12)
# ax.set_ylabel('Predicted Values', fontsize=12)
# ax.set_title('Actual vs. Predicted Values', fontsize=14, fontweight='bold')
# plt.show()

In [20]:
# df.describe()

In [22]:
def make_flattened_feature_names(feature_names, timesteps):
    """
    Expands feature names into flattened names for SHAP on FCN input.
    
    Example: ["Revenue", "Expenses"], timesteps=4
    -> ["Q1_Revenue", "Q1_Expenses", "Q2_Revenue", "Q2_Expenses", ...]
    """
    flat_names = []
    for t in range(1, timesteps+1):
        for f in feature_names:
            flat_names.append(f"Q{t}_{f}")
    return flat_names

In [24]:
feature_names = feature_cols.tolist()  # or list(df[feature_cols].columns)
timesteps = X.shape[1]

flat_feature_names = make_flattened_feature_names(feature_names, timesteps)
# print(feature_names[:10])  # sanity check

In [26]:
# Explainer works on the model + training data
explainer = shap.Explainer(model, X_flat)

# Get SHAP values for test set
shap_values = explainer(X_flat)


PermutationExplainer explainer: 239it [00:58,  3.96it/s]                                                               


In [27]:
sample_idx = 22
sample_shap = shap_values.values[sample_idx]  # shape: (timesteps*n_features,)

In [28]:
factors = list(zip(flat_feature_names, sample_shap))
# Sort by absolute importance
top_factors = sorted(factors, key=lambda x: abs(x[1]), reverse=True)[:5]

In [29]:
# Suppose n_features = 10, timesteps = 4
n_features = X.shape[2]
timesteps = X.shape[1]

# shap_values.values has shape (samples, timesteps*n_features)
shap_reshaped = shap_values.values.reshape(
    shap_values.values.shape[0], timesteps, n_features
)

In [30]:
formatted_factors = []
for name, val in top_factors:
    direction = "increased" if val > 0 else "decreased"
    formatted_factors.append(f"{name}: {val:+.2f} (this {direction} the predicted net income)")

In [31]:
# shap.summary_plot(shap_values, X_flat, feature_names=flat_feature_names)
formatted_factors

['Q2_Normalized EBITDA: -0.00 (this decreased the predicted net income)',
 'Q3_EBITDA: -0.00 (this decreased the predicted net income)',
 'Q1_EBITDA: -0.00 (this decreased the predicted net income)',
 'Q4_EBITDA: -0.00 (this decreased the predicted net income)',
 'Q1_Interest Expense: +0.00 (this increased the predicted net income)']

In [32]:
def shapSheet():
    rows = []
    for i in range(237):
        sample_idx = i  # update this dynamically if needed
        sample_shap = shap_values.values[sample_idx]
        factors = list(zip(flat_feature_names, sample_shap))
        # Sort and get top 5 features by importance
        top_factors = sorted(factors, key=lambda x: abs(x[1]), reverse=True)[:5]
        ticker = df.iloc[5*i - 1]['Ticker']
        date_1 = df.iloc[5*i - 5]['Date']
        date_2 = df.iloc[5*i - 1]['Date']
        row_data = {
            'Ticker': ticker,
            'Date_1': date_1,
            'Date_2': date_2
        }
        # Use formatted factor strings for important features
        for j, (name, val) in enumerate(top_factors, 1):
            direction = "increased" if val > 0 else "decreased"
            row_data[f'Feature_{j}'] = f"{name}: {val:+.2f} (this {direction} the net income)"
        rows.append(row_data)
    output_df = pd.DataFrame(rows)
    output_df.to_csv('shap_top_features.csv', index=False)
    # print(output_df)

shapSheet()
