In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
from tensorflow.keras import Input

# === CALLBACK pour suivre les poids ===
class WeightLogger(Callback):
    def __init__(self, layer_name="lstm"):
        super().__init__()
        self.layer_name = layer_name
        self.weights_per_epoch = []

    def on_epoch_end(self, epoch, logs=None):
        for layer in self.model.layers:
            if self.layer_name in layer.name.lower():
                weights = layer.get_weights()[0]
                self.weights_per_epoch.append(weights.copy())
                break

# === 1. Chargement des données ===
def load_data(csv_path, ws):
    df = pd.read_csv(csv_path)
    scaler = MinMaxScaler()
    scaled_df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
    y_scaled = scaled_df['production']
    X_scaled = scaled_df.drop(columns=['production'])

    def create_sequences(X, y, window_size=21):
        Xs, ys = [], []
        for i in range(len(X) - window_size):
            Xs.append(X[i:i + window_size])
            ys.append(y[i + window_size])
        return np.array(Xs), np.array(ys)

    X_seq, y_seq = create_sequences(X_scaled, y_scaled, ws)
    return X_seq, y_seq, scaler, scaled_df

# === 2. Model ConvLSTM1D ===
def model_convlstm1d(X_seq, y_seq, epoch, batch_sizes):
    X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

    model = Sequential([
        Input(shape=(X_train.shape[1], X_train.shape[2])),
        Conv1D(filters=128, kernel_size=5, activation='relu', padding='same'),
        LSTM(64, return_sequences=False),
        Dense(32, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(0.001), loss='mae')

    weight_logger = WeightLogger(layer_name="lstm")

    history = model.fit(
        X_train, y_train,
        epochs=epoch,
        batch_size=batch_sizes,
        validation_split=0.1,
        verbose=1,
        callbacks=[weight_logger]
    )

    y_pred = model.predict(X_test).flatten()

    return model, history, y_pred, y_test, weight_logger

# === 3. Performance model ===
def plot_function(scaled_data, y_pred, y_test, scaler, weight_logger, history):
    n_features = scaled_data.shape[1]
    dummy = np.zeros((len(y_pred), n_features))
    target_idx = scaled_data.columns.get_loc('production')
    dummy[:, target_idx] = y_pred
    y_pred_real = scaler.inverse_transform(dummy)[:, target_idx]

    dummy_test = np.zeros((len(y_test), n_features))
    dummy_test[:, target_idx] = y_test
    y_test_real = scaler.inverse_transform(dummy_test)[:, target_idx]

    mae = mean_absolute_error(y_test_real, y_pred_real)
    mse = np.mean((y_test_real - y_pred_real) ** 2)
    r2 = r2_score(y_test_real, y_pred_real)

    w00 = [w[0, 0] for w in weight_logger.weights_per_epoch]
    plt.figure(figsize=(8, 4))
    plt.plot(w00)
    plt.xlabel("Époque")
    plt.ylabel("Valeur du poids [0,0]")
    plt.title("Évolution du poids (entrée 0 → cellule 0) dans la couche LSTM")
    plt.grid(True)
    plt.show()

    plt.figure(figsize=(10, 4))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Courbe de Loss (MAE) par époque')
    plt.xlabel('Époque')
    plt.ylabel('MAE')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 6))
    plt.plot(y_test_real, label="Vrai")
    plt.plot(y_pred_real, label="Prévu")
    plt.legend()
    plt.title(f"Conv1D + LSTM - MAE: {mae:.4f} | R²: {r2:.4f} | MSE: {mse:.4f}")
    plt.show()

    plt.figure(figsize=(6, 6))
    plt.scatter(y_test_real, y_pred_real, alpha=0.7, color='orange')
    plt.plot([min(y_test_real), max(y_test_real)], [min(y_test_real), max(y_test_real)], 'r--', label="Idéal (y = ŷ)")
    plt.xlabel("Valeurs réelles (y)")
    plt.ylabel("Prédictions (ŷ)")
    plt.title("Prédictions vs Réel")
    plt.legend()
    plt.grid(True)
    plt.show()

    return mae, mse, r2, y_test_real, y_pred_real

# === 4. Main ===
def main():
    csv_path = "scaled_dataset.csv"
    window_size = 21
    X_seq, y_seq, scaler, scaled_data = load_data(csv_path, window_size)
    model, history, y_pred, y_test, weight_logger = model_convlstm1d(X_seq, y_seq, epoch=700, batch_sizes=32)
    mae, mse, r2, y_test_real, y_pred_real = plot_function(scaled_data, y_pred, y_test, scaler, weight_logger, history)
    print(f"MAE: {mae:.4f}, MSE: {mse:.4f}, R²: {r2:.4f}")

    StatsDf = pd.DataFrame({
        "Y_test": y_test_real,
        "Y_pred": y_pred_real,
        "Error": y_test_real - y_pred_real,
        "Error_Percent": abs(y_test_real - y_pred_real) / y_test_real * 100
    })

    print(StatsDf.describe())
    StatsDf['Error'].hist(bins=100)
    plt.title("Distribution de l'erreur")
    plt.xlabel("Erreur absolue")
    plt.ylabel("Fréquence")
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
from tensorflow.keras import Input

# === CALLBACK pour suivre les poids ===
class WeightLogger(Callback):
    def __init__(self, layer_name="lstm"):
        super().__init__()
        self.layer_name = layer_name
        self.weights_per_epoch = []

    def on_epoch_end(self, epoch, logs=None):
        for layer in self.model.layers:
            if self.layer_name in layer.name.lower():
                weights = layer.get_weights()[0]
                self.weights_per_epoch.append(weights.copy())
                break

# === 1. Chargement des données ===
def load_data(csv_path, ws):
    df = pd.read_csv(csv_path, sep=';')
    df['date1'] = pd.to_datetime(df['date'], dayfirst=True)

    df['dayofweek'] = df['date1'].dt.dayofweek
    df['month'] = df['date1'].dt.month

    df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
    df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
    df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
    df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)

    df.drop(columns=['month', 'dayofweek'], inplace=True)
    df = df.select_dtypes(include=[np.number])
    scaler = MinMaxScaler()
    scaled_df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
    y_scaled = scaled_df['production']
    X_scaled = scaled_df.drop(columns=['production'])

    def create_sequences(X, y, window_size=21):
        Xs, ys = [], []
        for i in range(len(X) - window_size):
            Xs.append(X[i:i + window_size])
            ys.append(y[i + window_size])
        return np.array(Xs), np.array(ys)

    X_seq, y_seq = create_sequences(X_scaled, y_scaled, ws)
    return X_seq, y_seq, scaler, scaled_df

# === 2. Model ConvLSTM1D ===
def model_convlstm1d(X_seq, y_seq, epoch, batch_sizes):
    X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

    model = Sequential([
        Input(shape=(X_train.shape[1], X_train.shape[2])),
        Conv1D(filters=128, kernel_size=5, activation='relu', padding='same'),
        LSTM(64, return_sequences=False),
        Dense(32, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(0.001), loss='mae')

    weight_logger = WeightLogger(layer_name="lstm")

    history = model.fit(
        X_train, y_train,
        epochs=epoch,
        batch_size=batch_sizes,
        validation_split=0.1,
        verbose=1,
        callbacks=[weight_logger]
    )

    y_pred = model.predict(X_test).flatten()

    return model, history, y_pred, y_test, weight_logger

# === 3. Performance model ===
def plot_function(scaled_data, y_pred, y_test, scaler, weight_logger, history):
    n_features = scaled_data.shape[1]
    dummy = np.zeros((len(y_pred), n_features))
    target_idx = scaled_data.columns.get_loc('production')
    dummy[:, target_idx] = y_pred
    y_pred_real = scaler.inverse_transform(dummy)[:, target_idx]

    dummy_test = np.zeros((len(y_test), n_features))
    dummy_test[:, target_idx] = y_test
    y_test_real = scaler.inverse_transform(dummy_test)[:, target_idx]

    mae = mean_absolute_error(y_test_real, y_pred_real)
    mse = np.mean((y_test_real - y_pred_real) ** 2)
    r2 = r2_score(y_test_real, y_pred_real)

    w00 = [w[0, 0] for w in weight_logger.weights_per_epoch]
    plt.figure(figsize=(8, 4))
    plt.plot(w00)
    plt.xlabel("Époque")
    plt.ylabel("Valeur du poids [0,0]")
    plt.title("Évolution du poids (entrée 0 → cellule 0) dans la couche LSTM")
    plt.grid(True)
    plt.show()

    plt.figure(figsize=(10, 4))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Courbe de Loss (MAE) par époque')
    plt.xlabel('Époque')
    plt.ylabel('MAE')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 6))
    plt.plot(y_test_real, label="Vrai")
    plt.plot(y_pred_real, label="Prévu")
    plt.legend()
    plt.title(f"Conv1D + LSTM - MAE: {mae:.4f} | R²: {r2:.4f} | MSE: {mse:.4f}")
    plt.show()

    plt.figure(figsize=(6, 6))
    plt.scatter(y_test_real, y_pred_real, alpha=0.7, color='orange')
    plt.plot([min(y_test_real), max(y_test_real)], [min(y_test_real), max(y_test_real)], 'r--', label="Idéal (y = ŷ)")
    plt.xlabel("Valeurs réelles (y)")
    plt.ylabel("Prédictions (ŷ)")
    plt.title("Prédictions vs Réel")
    plt.legend()
    plt.grid(True)
    plt.show()

    return mae, mse, r2, y_test_real, y_pred_real

# === 4. Main ===
def main():
    csv_path = "full_dataset.csv"
    window_size = 21
    X_seq, y_seq, scaler, scaled_data = load_data(csv_path, window_size)
    model, history, y_pred, y_test, weight_logger = model_convlstm1d(X_seq, y_seq, epoch=700, batch_sizes=32)
    mae, mse, r2, y_test_real, y_pred_real = plot_function(scaled_data, y_pred, y_test, scaler, weight_logger, history)
    print(f"MAE: {mae:.4f}, MSE: {mse:.4f}, R²: {r2:.4f}")

    error_percent = np.divide(
        np.abs(y_test_real - y_pred_real),
        y_test_real,
        out=np.full_like(y_test_real, np.nan),
        where=y_test_real != 0
    ) * 100

    StatsDf = pd.DataFrame({
        "Y_test": y_test_real,
        "Y_pred": y_pred_real,
        "Error": y_test_real - y_pred_real,
        "Error_Percent": error_percent
    })

    print(StatsDf.describe())
    StatsDf['Error'].hist(bins=100)
    plt.title("Distribution de l'erreur")
    plt.xlabel("Erreur absolue")
    plt.ylabel("Fréquence")
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    main()