In [None]:
# Imports and environment
from pathlib import Path
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pysr import PySRRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error

# Optional: set Julia path if needed (did this due to the shenanigans my PC did to me today. you can change it if youd like, but i would reccomend just shoving your julia path here)
julia_bindir = r""
if Path(julia_bindir).exists():
    os.environ["JULIA_BINDIR"] = julia_bindir
print("Julia path:", os.environ.get("JULIA_BINDIR"))


In [None]:
# Load dataset (resolve path robustly)
candidate_paths = [
    Path('reactor_readings_cleaned_filtered.csv'),
    Path.cwd().parent / 'reactor_readings_cleaned_filtered.csv',
]
csv_path = next((p for p in candidate_paths if p.exists()), None)
if csv_path is None:
    raise FileNotFoundError('reactor_readings_cleaned_filtered.csv not found in notebook dir or project root')
df = pd.read_csv(csv_path)
print('Columns loaded:', df.columns.tolist())
display(df.head())

# Ensure timestamps are sorted
df = df.sort_values('timestamp').reset_index(drop=True)


In [None]:
# Feature engineering: robust derivative, smoothing, lags and rates

# Compute dT/dt using central gradient
df['dTdt_calc'] = np.gradient(df['temperature'].to_numpy(), df['timestamp'].to_numpy())

# Light smoothing to reduce noise
df['dTdt_smooth'] = pd.Series(df['dTdt_calc']).rolling(5, min_periods=1, center=True).mean()

# Minimal temporal features focused on actuators that actually change
for col in ['fuel', 'rod_insertion']:
    df[f'{col}_lag1'] = df[col].shift(1)
    df[f'{col}_rate'] = df[col].diff() / df['timestamp'].diff()

# Replace infs from zero/duplicate dt; drop rows missing engineered features
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# Define features and target
features = ['fuel', 'rod_insertion', 'temperature', 'fuel_lag1', 'rod_insertion_lag1', 'fuel_rate', 'rod_insertion_rate']
var_names = ['fuel_pct', 'rod_insertion', 'temp', 'fuel_pct_lag1', 'rod_insertion_lag1', 'fuel_pct_rate', 'rod_insertion_rate']
target = 'dTdt_smooth'

# Build a clean, finite dataset
combined = df[features + [target]].replace([np.inf, -np.inf], np.nan).dropna()
X = combined[features].astype(np.float64)
y = combined[target].astype(np.float64)

# Transient-focused weights aligned to combined
weights = (1 + 5 * (combined['fuel_rate'].abs() + combined['rod_insertion_rate'].abs()))
weights = (weights / weights.mean()).to_numpy()

# Sanity checks
for name, arr in [('X', X.to_numpy()), ('y', y.to_numpy()), ('weights', weights)]:
    bad = ~np.isfinite(arr)
    if bad.any():
        print(f'Warning: non-finite values in {name}:', bad.sum())

# Time-ordered split
X_train, X_test, y_train, y_test, w_train, w_test = train_test_split(
    X, y, weights, test_size=0.2, shuffle=False
)

len(X), len(X_train), len(X_test)

In [None]:
# Configure and train PySR
model = PySRRegressor(
    niterations=120,
    binary_operators=['+', '-', '*'],
    unary_operators=[],
    model_selection='best',
    maxsize=30,
    elementwise_loss='(p, y, w) -> w * abs(p - y)',
    deterministic=True,
    parallelism='serial',
    random_state=42,
    progress=False,
    variable_names=var_names,
)

# Convert to contiguous float64 arrays for Julia backend
Xtr = np.ascontiguousarray(X_train.to_numpy(dtype=np.float64))
ytr = y_train.to_numpy(dtype=np.float64)
wtr = w_train.astype(np.float64)

# Sanity checks
for name, arr in [('X_train', Xtr), ('y_train', ytr), ('w_train', wtr)]:
    if not np.isfinite(arr).all():
        raise ValueError(f'Non-finite values found in {name}')

print('Training symbolic model for dT/dt...', Xtr.shape, ytr.shape)

model.fit(Xtr, ytr, weights=wtr)

model

In [None]:
# Evaluate and visualize
import re
y_pred = model.predict(np.ascontiguousarray(X_test.to_numpy(dtype=np.float64)))
r2 = r2_score(y_test.to_numpy(dtype=np.float64), y_pred)
mse = mean_squared_error(y_test.to_numpy(dtype=np.float64), y_pred)
print(f'R^2: {r2:.4f}')
print(f'MSE: {mse:.6f}')

print('\nCandidate Formulas:')
print(model)

print('\nBest Formula (raw):')
best_formula = model.get_best()['equation']
print(best_formula)

# Human-readable mapping with spaces and percent sign
mapping = {
    'fuel_pct': 'fuel %',
    'rod_insertion': 'rod insertion',
    'temp': 'temp',
    'fuel_pct_lag1': 'fuel % (lag1)',
    'rod_insertion_lag1': 'rod insertion (lag1)',
    'fuel_pct_rate': 'fuel % rate',
    'rod_insertion_rate': 'rod insertion rate',
}
pretty = best_formula
for k, v in mapping.items():
    pretty = re.sub(rf'(?<![A-Za-z0-9_]){re.escape(k)}(?![A-Za-z0-9_])', v, pretty)
print('\nBest Formula (readable):')
print(pretty)

plt.figure(figsize=(7,5))
plt.scatter(y_test, y_pred, s=6, alpha=0.5)
plt.xlabel('Actual dT/dt (\\u00B0C/s)')
plt.ylabel('Predicted dT/dt (\\u00B0C/s)')
plt.title('Predicted vs Actual Temperature Change Rate')
plt.grid(True)
plt.show()