LIBRARIES

In [129]:
import numpy as np
import time
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers, losses
from scipy.fft import fft, ifft
from scipy.signal import windows
from scipy.io import loadmat
from sklearn.model_selection import KFold
import os
from typing import Tuple, Union, Optional, List, Dict
from collections import deque
import random
from tqdm.auto import tqdm

HYPERPARAMETERS

In [130]:
STATE_DIM = 6
N_ACTIONS = 5
EPSILON = 0.3
ALPHA1 = 0.99
ALPHA2 = 0.996
M_MIN_MAX = 10
Q_NET_ARCH = [16, 16]
RESCALE_RANGE = (0.1, 0.9)

RLEnvironment & Epsilon-Greedy Policy


In [None]:
import numpy as np
import time
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers, losses
from scipy.fft import fft, ifft
from scipy.signal import windows
from scipy.io import loadmat
from sklearn.model_selection import KFold
import os
from typing import Tuple, Union, Optional, List, Dict
from collections import deque
import random
from tqdm.auto import tqdm
class RLEnvironment:
    def __init__(self, optimizer: tf.keras.optimizers.Optimizer,
                 initial_lr: float,
                 m_min_max: int = M_MIN_MAX,
                 alpha1: float = ALPHA1,
                 alpha2: float = ALPHA2):
        if not isinstance(optimizer, tf.keras.optimizers.Optimizer):
            log.warning("Optimizer might not be a standard Keras optimizer.")
        self.optimizer = optimizer
        self.initial_lr = float(initial_lr)
        self.m_min_max = m_min_max
        self.alpha1 = alpha1
        self.alpha2 = alpha2
        self.iteration = tf.Variable(0, dtype=tf.int64, trainable=False)
        self.lowest_losses = []
        self.prev_gradient_flat = None
        self._current_lr_float = self.initial_lr
        self._update_optimizer_lr(self._current_lr_float)
        log.info(f"RLEnvironment initialized with initial_lr={self.initial_lr}, "
                 f"alpha1={self.alpha1}, alpha2={self.alpha2}, M={self.m_min_max}")

    def _update_optimizer_lr(self, new_lr_float: float):
        self._current_lr_float = new_lr_float
        try:
            self.optimizer.learning_rate.assign(new_lr_float)
        except AttributeError:
            try:
                self.optimizer.learning_rate = new_lr_float
            except Exception as e:
                log.error(f"Failed to set optimizer learning rate: {e}. LR control might not work correctly.")

    def _update_lowest_losses(self, loss_value: float):
        if len(self.lowest_losses) < self.m_min_max:
            self.lowest_losses.append(loss_value)
            self.lowest_losses.sort()
        elif loss_value < self.lowest_losses[-1]:
            self.lowest_losses.pop()
            self.lowest_losses.append(loss_value)
            self.lowest_losses.sort()

    def get_current_lr(self) -> float:
        return self._current_lr_float

    def calculate_state(self, current_loss: tf.Tensor, current_gradient_list: list) -> np.ndarray:
        lr_state = self._current_lr_float
        loss_state = float(current_loss.numpy())
        if current_gradient_list and any(g is not None for g in current_gradient_list):
            flat_gradients = tf.concat([
                tf.reshape(g, [-1]) for g in current_gradient_list if g is not None], axis=0)
            grad_norm_state = float(tf.norm(flat_gradients).numpy())
            current_gradient_flat_np = flat_gradients.numpy()
        else:
            log.warning("No valid gradients found for state calculation.")
            grad_norm_state = 0.0
            current_gradient_flat_np = None
        iter_state = int(self.iteration.numpy())
        min_max_enc_state = 0
        if self.lowest_losses:
            min_loss_tracked = self.lowest_losses[0]
            max_loss_tracked = self.lowest_losses[-1]
            if loss_state <= min_loss_tracked:
                min_max_enc_state = 1
            elif loss_state > max_loss_tracked:
                min_max_enc_state = -1
        self._update_lowest_losses(loss_state)
        align_state = 0.0
        if (self.prev_gradient_flat is not None and
            current_gradient_flat_np is not None and
            current_gradient_flat_np.shape == self.prev_gradient_flat.shape):
            sign_prod = np.sign(current_gradient_flat_np * self.prev_gradient_flat)
            sign_prod = np.nan_to_num(sign_prod, nan=0.0)
            align_state = float(np.mean(sign_prod))
        elif self.prev_gradient_flat is None and iter_state > 0:
            log.warning("Previous gradient missing for alignment calculation.")
        elif current_gradient_flat_np is None:
            log.warning("Current gradient missing for alignment calculation.")
        elif (self.prev_gradient_flat is not None and
              current_gradient_flat_np is not None and
              current_gradient_flat_np.shape != self.prev_gradient_flat.shape):
            log.error("Gradient shape mismatch for alignment calculation. This should not happen.")
        if current_gradient_flat_np is not None:
            self.prev_gradient_flat = current_gradient_flat_np
        self.iteration.assign_add(1)
        state = np.array([
            lr_state, loss_state, grad_norm_state,
            iter_state, min_max_enc_state, align_state
        ], dtype=np.float32)
        return state

    def apply_action_and_get_reward(self, action: int, current_loss: tf.Tensor) -> tuple[float, float]:
        current_lr = self._current_lr_float
        if action == 0:
            new_lr = current_lr / self.alpha1
        elif action == 1:
            new_lr = current_lr / self.alpha2
        elif action == 2:
            new_lr = current_lr
        elif action == 3:
            new_lr = current_lr * self.alpha2
        elif action == 4:
            new_lr = current_lr * self.alpha1
        else:
            log.error(f"Invalid action received: {action}. Keeping LR unchanged.")
            new_lr = current_lr
        self._update_optimizer_lr(new_lr)
        loss_float = float(current_loss.numpy())
        reward = 1.0 / (loss_float + 1e-9)
        return self.get_current_lr(), reward

    def reset(self):
        log.info("Resetting RLEnvironment state.")
        self.iteration.assign(0)
        self.lowest_losses = []
        self.prev_gradient_flat = None
        self._update_optimizer_lr(self.initial_lr)

class EpsilonGreedyPolicy:
    def __init__(self, n_actions: int = N_ACTIONS, epsilon: float = EPSILON):
        self.n_actions = n_actions
        self.epsilon = epsilon
        log.info(f"EpsilonGreedyPolicy initialized with n_actions={n_actions}, epsilon={epsilon}")

    def select_action(self, q_values: tf.Tensor) -> int:
        q_values_squeezed = tf.squeeze(q_values).numpy()
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, self.n_actions)
        else:
            return int(np.argmax(q_values_squeezed))

    def select_action_eq8(self, q_values: tf.Tensor) -> int:
        q_values_squeezed = tf.squeeze(q_values).numpy()
        best_action = np.argmax(q_values_squeezed)
        probabilities = np.full(self.n_actions, self.epsilon / self.n_actions)
        probabilities[best_action] += (1.0 - self.epsilon)
        prob_sum = np.sum(probabilities)
        if not np.isclose(prob_sum, 1.0):
            probabilities /= prob_sum
        return int(np.random.choice(self.n_actions, p=probabilities))


Q-Network & Rescaler & DDQNAgent

In [None]:
import numpy as np
import time
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers, losses
from scipy.fft import fft, ifft
from scipy.signal import windows
from scipy.io import loadmat
from sklearn.model_selection import KFold
import os
from typing import Tuple, Union, Optional, List, Dict
from collections import deque
import random
from tqdm.auto import tqdm
class QNetwork(Model):
    def __init__(self, state_dim: int = STATE_DIM, action_dim: int = N_ACTIONS, hidden_units: list = Q_NET_ARCH):
        super().__init__(name='QNetwork')
        self.layers_list = [layers.Dense(units, activation='relu') for units in hidden_units]
        self.layers_list.append(layers.Dense(action_dim, activation='sigmoid'))
        self.build(input_shape=(None, state_dim))

    def call(self, x: tf.Tensor, training: bool = False) -> tf.Tensor:
        for layer in self.layers_list:
            try:
                x = layer(x, training=training)
            except TypeError:
                x = layer(x)
        return x

class Rescaler:
    def __init__(self, low: float = RESCALE_RANGE[0], high: float = RESCALE_RANGE[1], epsilon: float = 1e-6):
        self.low = low
        self.high = high
        self.range = high - low
        self.epsilon = epsilon
        self.running_min = None
        self.running_max = None
        self.alpha = 0.01

    def fit(self, y: np.ndarray):
        if y is None or y.size == 0:
            return
        if hasattr(y, 'numpy'):
            y = y.numpy()
        vals = y[np.isfinite(y)]
        if vals.size == 0:
            return
        mn, mx = np.min(vals), np.max(vals)
        if self.running_min is None or not np.isfinite(self.running_min):
            self.running_min, self.running_max = mn, mx
        else:
            self.running_min = self.alpha * mn + (1 - self.alpha) * self.running_min
            self.running_max = self.alpha * mx + (1 - self.alpha) * self.running_max
        if self.running_min > self.running_max:
            self.running_min, self.running_max = mn, mx

    def scale(self, y: np.ndarray) -> np.ndarray:
        if hasattr(y, 'numpy'):
            y = y.numpy()
        if self.running_min is None or self.running_max is None:
            return np.full_like(y, (self.low + self.high) / 2.0, dtype=np.float32)
        rng = self.running_max - self.running_min
        if rng < self.epsilon:
            return np.full_like(y, (self.low + self.high) / 2.0, dtype=np.float32)
        mask = np.isfinite(y)
        out = np.full_like(y, (self.low + self.high) / 2.0, dtype=np.float32)
        out[mask] = self.low + self.range * (y[mask] - self.running_min) / rng
        return np.clip(out, self.low, self.high).astype(np.float32)

    def inv_scale(self, y: np.ndarray) -> np.ndarray:
        if hasattr(y, 'numpy'):
            y = y.numpy()
        if self.running_min is None or self.running_max is None:
            return y.astype(np.float32)
        rng = self.running_max - self.running_min
        if rng < self.epsilon:
            return np.full_like(y, self.running_min, dtype=np.float32)
        mask = np.isfinite(y)
        out = np.full_like(y, self.running_min, dtype=np.float32)
        out[mask] = self.running_min + (y[mask] - self.low) * rng / self.range
        return out.astype(np.float32)

class DDQNAgent:
    def __init__(
        self, state_dim: int = STATE_DIM,
        action_dim: int = N_ACTIONS,
        buffer_size: int = 0,
        batch_size: int = 0,
        gamma: float = 0.0,
        q_learning_rate: float = 0.0,
        target_update_freq: int = 0
    ):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.gamma = gamma
        self.target_update_freq = target_update_freq
        self.main_net = QNetwork(state_dim, action_dim)
        self.target_net = QNetwork(state_dim, action_dim)
        self.target_net.build(input_shape=(None, state_dim))
        self.main_net.build(input_shape=(None, state_dim))
        self.target_net.set_weights(self.main_net.get_weights())
        self.optimizer = optimizers.Adam(learning_rate=q_learning_rate)
        self.replay_buffer = deque(maxlen=buffer_size)
        self.rescaler = Rescaler()
        self.loss_fn = losses.MeanSquaredError()
        self._update_counter = tf.Variable(0, dtype=tf.int64, trainable=False)

    def store_transition(self, state, action, reward, next_state, done):
        transition = (np.array(state, dtype=np.float32), int(action), float(reward), np.array(next_state, dtype=np.float32), bool(done))
        self.replay_buffer.append(transition)

    def sample_batch(self):
        if len(self.replay_buffer) < self.batch_size:
            return None
        batch = random.sample(self.replay_buffer, self.batch_size)
        s, a, r, ns, d = map(np.array, zip(*batch))
        return s.astype(np.float32), a.astype(np.int32), r.astype(np.float32), ns.astype(np.float32), d.astype(np.bool_)

    @tf.function
    def _train_target_step(self, states, actions, rewards, next_states, dones):
        q_main_next = self.main_net(next_states, training=False)
        best_next = tf.argmax(q_main_next, axis=1, output_type=tf.int32)
        q_target_next = tf.gather_nd(self.target_net(next_states, training=False), tf.stack([tf.range(tf.shape(actions)[0]), best_next], axis=1))
        inv_q = tf.numpy_function(self.rescaler.inv_scale, [q_target_next], tf.float32)
        inv_q = tf.reshape(inv_q, [-1])
        rewards = tf.reshape(rewards, [-1])
        dones_f = tf.cast(tf.reshape(dones, [-1]), tf.float32)
        y = rewards + self.gamma * inv_q * (1 - dones_f)
        y_scaled = tf.numpy_function(self.rescaler.scale, [y], tf.float32)
        y_scaled = tf.reshape(y_scaled, [-1])
        with tf.GradientTape() as tape:
            q_curr = tf.gather_nd(self.target_net(states, training=True), tf.stack([tf.range(tf.shape(actions)[0]), actions], axis=1))
            loss = self.loss_fn(y_scaled, q_curr)
        grads = tape.gradient(loss, self.target_net.trainable_variables)
        self.optimizer.apply_gradients([(g,v) for g,v in zip(grads, self.target_net.trainable_variables) if g is not None])
        return loss, y

    def update(self):
        batch = self.sample_batch()
        if batch is None:
            return None
        s, a, r, ns, d = batch
        loss, y = self._train_target_step(s, a, r, ns, d)
        self.rescaler.fit(y.numpy())
        self._update_counter.assign_add(1)
        if self._update_counter % self.target_update_freq == 0:
            self.main_net.set_weights(self.target_net.get_weights())
        return float(loss.numpy())

    @tf.function
    def get_q_values(self, state_tensor):
        return self.main_net(state_tensor, training=False)

    def get_action(self, state: np.ndarray, policy) -> int:
        q = self.get_q_values(tf.convert_to_tensor(state.reshape(1, -1), tf.float32))
        return policy.select_action(q)


CNN Train

In [None]:
import numpy as np
import time
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers, losses
from scipy.fft import fft, ifft
from scipy.signal import windows
from scipy.io import loadmat
from sklearn.model_selection import KFold
import os
from typing import Tuple, Union, Optional, List, Dict
from collections import deque
import random
from tqdm.auto import tqdm

@tf.function
def cnn_train_step(cnn_model, optimizer, loss_fn, x_batch, y_batch):
    with tf.GradientTape() as tape:
        preds = cnn_model(x_batch, training=True)
        loss = loss_fn(y_batch, preds)
        if cnn_model.losses:
            loss += tf.add_n(cnn_model.losses)
    grads = tape.gradient(loss, cnn_model.trainable_variables)
    vg = [(g, v) for g, v in zip(grads, cnn_model.trainable_variables) if g is not None]
    if vg:
        optimizer.apply_gradients(vg)
    return loss, [g for g, _ in vg]

def run_rl_controlled_cnn_step(cnn_model, optimizer, loss_fn, rl_env, ddqn_agent, policy, current_state, x_batch, y_batch, store_transition=True):
    if current_state is None or not isinstance(current_state, np.ndarray):
        raise ValueError("Invalid state")
    action = ddqn_agent.get_action(current_state, policy)
    lr_prev = rl_env.get_current_lr()
    loss_cnn, grads = cnn_train_step(cnn_model, optimizer, loss_fn, x_batch, y_batch)
    new_lr, reward = rl_env.apply_action_and_get_reward(action, loss_cnn)
    next_state = rl_env.calculate_state(loss_cnn, grads)
    if store_transition:
        ddqn_agent.store_transition(current_state, action, reward, next_state, False)
    return next_state, reward, loss_cnn, lr_prev

def train_cnn_with_rl(main_cnn, game_cnn, optimizer, loss_fn, ddqn_agent, rl_env, data_loader, config):
    epochs = config.get('epochs', 10)
    step4game = config.get('step4game', 5)
    policy = EpsilonGreedyPolicy()
    history = {'epoch': [], 'step': [], 'main_cnn_loss': [], 'game_cnn_loss': [], 'q_loss': [], 'reward': [], 'lr': []}
    training_steps = 0
    first = False
    for epoch in range(epochs):
        sum_main = sum_game = sum_q = sum_reward = 0
        updates = steps = 0
        pbar = tqdm(data_loader, desc=f"Epoch {epoch+1}")
        for x_batch, y_batch in pbar:
            x_batch = tf.convert_to_tensor(x_batch, tf.float32)
            y_batch = tf.convert_to_tensor(y_batch)
            game_cnn.set_weights(main_cnn.get_weights())
            if not first:
                loss0, grads0 = cnn_train_step(main_cnn, optimizer, loss_fn, x_batch, y_batch)
                rl_env.reset()
                state = rl_env.calculate_state(loss0, grads0)
                first = True
            else:
                state = state
            gs = state.copy()
            gl = 0
            for _ in range(step4game):
                action = ddqn_agent.get_action(gs, policy)
                lg, gg = cnn_train_step(game_cnn, optimizer, loss_fn, x_batch, y_batch)
                gl += float(lg.numpy())
                _, rg = rl_env.apply_action_and_get_reward(action, lg)
                ns = rl_env.calculate_state(lg, gg)
                ddqn_agent.store_transition(gs, action, rg, ns, False)
                gs = ns
            avg_game = gl/step4game
            ql = ddqn_agent.update() or 0
            ml = mr = 0
            ms = state.copy()
            lr0 = rl_env.get_current_lr()
            for _ in range(step4game):
                ms, rm, lm, lr0 = run_rl_controlled_cnn_step(main_cnn, optimizer, loss_fn, rl_env, ddqn_agent, policy, ms, x_batch, y_batch)
                ml += float(lm.numpy())
                mr += rm
            avg_main = ml/step4game
            avg_reward = mr/step4game
            state = ms
            sum_main += avg_main
            sum_game += avg_game
            sum_q += ql
            sum_reward += avg_reward
            updates += 1 if ql else 0
            steps += 1
            training_steps += 1
            pbar.set_postfix({'main_loss':f"{avg_main:.3f}", 'q_loss':f"{ql:.3f}", 'reward':f"{avg_reward:.3f}", 'lr':f"{lr0:.6f}"})
        history['epoch'].append(epoch+1)
        history['step'].append(training_steps)
        history['main_cnn_loss'].append(sum_main/steps)
        history['game_cnn_loss'].append(sum_game/steps)
        history['q_loss'].append(sum_q/updates if updates else 0)
        history['reward'].append(sum_reward/steps)
        history['lr'].append(rl_env.get_current_lr())
    return main_cnn, ddqn_agent, history


S-Transform

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# 1) Eğitimden gelen per-step LR (liste veya array)
# step_lr = np.array(step_lr)  
# Eğer sadece RL ajanınızın eğrisini çizmek istiyorsanız, aşağıda (g) olarak ekleyebilirsiniz.

n_steps = len(step_lr)
t = np.arange(n_steps)

# 2) Baseline schedule fonksiyonları
def exp_decay(lr0, lr_end, T):
    return lr0 * (lr_end / lr0) ** (t / T)

def quad_decay(lr0, lr_end, T):
    return lr0 * (1 - (t / T)**2)

def lin_decay(lr0, lr_end, T):
    return lr0 * (1 - t / T)

def hyper_decay(lr0, k):
    # burada k küçülme katsayısı (ör. k=1e-3). Ayarlayın.
    return lr0 / (1 + k * t)

def cyclic_decay(lr0, lr_max, T):
    # yarı dalga kosinüs
    return lr0 + (lr_max - lr0) * 0.5 * (1 - np.cos(2 * np.pi * t / T))

def increase_to_max(lr0, lr_max, T):
    return lr0 + (lr_max - lr0) * (t / T)

# 3) Parametreler
lr0      = step_lr[0]                    # baştaki
lr_end   = lr0 * 0.01                    # örnek son değer
lr_max   = lr0 * 10                      # örnek maksimum
T        = float(n_steps - 1)
k_hyper  = 1e-3                          # hyperbolik sabit, deneyin

# Hesaplayalım
curves = {
    "(a) Exponential": exp_decay(lr0, lr_end, T),
    "(b) Quadratic":   quad_decay(lr0, lr_end, T),
    "(c) Linear":      lin_decay(lr0, lr_end, T),
    "(d) Hyperbolic":  hyper_decay(lr0, k_hyper),
    "(e) Cyclic":      cyclic_decay(lr0, lr_max, T),
    "(f) Increase":    increase_to_max(lr0, lr_max, T),
    # dilerseniz kendi ajan eğrisini (g) olarak da ekleyin:
    # "(g) RL Agent": step_lr
}

# 4) Plot
fig, axs = plt.subplots(2, 3, figsize=(12, 8), sharex=True, sharey=True)
for ax, (title, lr_curve) in zip(axs.flatten(), curves.items()):
    ax.plot(t, np.log10(lr_curve))
    ax.set_title(title)
    ax.set_xlabel("Training Steps")
    ax.set_ylabel("Learning Rate (Log₁₀)")
    ax.grid(True)

plt.tight_layout()
plt.show()

NameError: name 'step_lr' is not defined

In [None]:


def s_transform(signal: np.ndarray, fs: float, f_min: float = 5, f_max: float = None, n_freqs: int = 100) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    n_samples = len(signal)
    if n_samples == 0:
        return np.empty((n_freqs, 0), dtype=complex), np.array([]), np.array([])

    time_vector = np.arange(n_samples) / fs
    if f_max is None:
        f_max = fs / 2
    if f_min >= f_max:
        freq_vector = np.linspace(f_min, f_max, n_freqs) if n_freqs > 0 else np.array([])
        return np.zeros((n_freqs, n_samples), dtype=complex), time_vector, freq_vector

    freq_vector = np.linspace(f_min, f_max, n_freqs)
    signal_fft = fft(signal)
    fft_freqs = np.fft.fftfreq(n_samples, 1.0/fs)
    s_matrix = np.zeros((n_freqs, n_samples), dtype=complex)

    for i, f in enumerate(freq_vector):
        if f < 1e-9:
            continue
        sigma_f = f / (2.0 * np.pi)
        gauss_win_freq = np.exp(-0.5 * ((fft_freqs - f) / sigma_f)**2)
        gauss_win_freq_neg = np.exp(-0.5 * ((fft_freqs + f) / sigma_f)**2)
        st_freq = signal_fft * (gauss_win_freq + gauss_win_freq_neg)
        s_matrix[i, :] = ifft(st_freq)

    return s_matrix, time_vector, freq_vector


def load_cwru_signal(file_path: str, signal_key: str) -> Optional[np.ndarray]:
    if not os.path.exists(file_path):
        return None
    try:
        data = loadmat(file_path)
        keys = [k for k in data.keys() if signal_key in k]
        if not keys:
            numeric_keys = [k for k, v in data.items() if isinstance(v, np.ndarray) and v.ndim >= 1 and np.issubdtype(v.dtype, np.number)]
            if not numeric_keys:
                 return None
            keys = sorted(numeric_keys, key=lambda k: len(data[k]), reverse=True)
        signal = data[keys[0]].flatten().astype(np.float32)
        return signal
    except Exception:
        return None


def segment_signal(signal: np.ndarray, segment_length: int, overlap: int = 0) -> List[np.ndarray]:
    if signal is None or len(signal) < segment_length:
        return []
    step = segment_length - overlap
    if step <= 0:
        step = 1
    start_indices = range(0, len(signal) - segment_length + 1, step)
    return [signal[i : i + segment_length] for i in start_indices]


def load_and_preprocess_data(config: dict) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[tuple], Optional[int]]:
    dataset_path = config.get('dataset_path', '/Users/kasimesen/Desktop/Kodlar/Python/12K_Drive_End_Bearing_Fault')
    dataset_name = config.get('dataset_name', 'CWRU')
    segment_length = config.get('segment_length', 1024)
    overlap = config.get('overlap', 0)
    fs = config.get('fs', 12000)
    signal_key = config.get('signal_key', 'DE_time')
    f_min = config.get('f_min', 5)
    f_max = config.get('f_max', fs / 3)
    n_freqs = config.get('n_freqs', 64)
    target_height = config.get('input_height', n_freqs)
    target_width = config.get('input_width', segment_length)

    all_processed_samples = []
    all_labels = []
    class_map = {}
    current_label = 0
    base_path = os.path.join(dataset_path, dataset_name)
    if not os.path.isdir(base_path):
        return None, None, None, None

    try:
        condition_folders = sorted([d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))])
        for condition_folder in condition_folders:
            if condition_folder not in class_map:
                 class_map[condition_folder] = current_label
                 current_label += 1
            label = class_map[condition_folder]
            filenames = sorted([f for f in os.listdir(os.path.join(base_path, condition_folder)) if f.lower().endswith(".mat")])
            for filename in filenames:
                signal = load_cwru_signal(os.path.join(base_path, condition_folder, filename), signal_key)
                if signal is None: continue
                for segment in segment_signal(signal, segment_length, overlap):
                    s_matrix, _, _ = s_transform(segment, fs=fs, f_min=f_min, f_max=f_max, n_freqs=n_freqs)
                    s_magnitude = np.abs(s_matrix)
                    processed_sample = s_magnitude
                    if target_height != n_freqs or target_width != segment_length:
                        resized_sample = tf.image.resize(processed_sample[..., tf.newaxis], [target_height, target_width], method=tf.image.ResizeMethod.BILINEAR)
                        processed_sample = tf.squeeze(resized_sample, axis=-1).numpy()
                    all_processed_samples.append(processed_sample)
                    all_labels.append(label)
        if not all_processed_samples:
            return None, None, None, None
        X = np.array(all_processed_samples, dtype=np.float32)[..., np.newaxis]
        y = np.array(all_labels, dtype=np.int64)
        input_shape = X.shape[1:]
        num_classes = len(class_map)
        return X, y, input_shape, num_classes
    except Exception:
         return None, None, None, None


def get_dataloader(X: np.ndarray, y: np.ndarray, fold_index: int, k_folds: int, batch_size: int, seed: int = 42) -> Tuple[Optional[tf.data.Dataset], Optional[tf.data.Dataset]]:
    if X is None or y is None or len(X) != len(y):
        return None, None
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=seed)
    train_indices, val_indices = list(kf.split(X, y))[fold_index]
    X_train, y_train = X[train_indices], y[train_indices]
    X_val, y_val = X[val_indices], y[val_indices]
    train_loader = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    val_loader = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return train_loader, val_loader
