In [None]:
# --- Cell 1: Tải các thư viện và mô hình cần thiết ---
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import joblib
import catboost
import xgboost as xgb

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from sklearn.metrics import mean_squared_error, mean_absolute_error

# Cấu hình hiển thị
pd.set_option('display.max_columns', None)

# Tắt thông báo không cần thiết của TensorFlow
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'


In [None]:
# --- Cell 2: Tải dữ liệu test và scaler ---
file_name = 'kpi_processed.csv'
try:
    df = pd.read_csv(file_name, index_col=0, parse_dates=True)
    print(f"Tải file '{file_name}' thành công.")
except Exception as e:
    print(f"LỖI: Không thể tải file '{file_name}'. {e}")

# Tải scaler
try:
    scaler = joblib.load('scaler.gz')
    print("Load scaler thành công từ 'scaler.gz'.")
except FileNotFoundError:
    print("Lỗi: File 'scaler.gz' không tìm thấy. Hãy chạy notebook catboost hoặc xgboost trước.")
    scaler = None
except Exception as e:
    print(f"Lỗi khi load scaler: {e}")
    scaler = None


In [None]:
# --- Cell 3: Tái tạo tập test từ dữ liệu gốc ---
if 'df' in locals() and scaler is not None:
    FEATURE_COLS = ['ps_traffic_mb', 'avg_rrc_connected_user', 'prb_dl_used', 'prb_dl_available_total', 'prb_utilization']
    N_FEATURES = len(FEATURE_COLS)
    TIMESTEPS_PER_HOUR = 4
    TIMESTEPS_PER_DAY = 24 * TIMESTEPS_PER_HOUR
    INPUT_DAYS = 2
    INPUT_STEPS = INPUT_DAYS * TIMESTEPS_PER_DAY
    OUTPUT_DAYS = 1
    OUTPUT_STEPS = OUTPUT_DAYS * TIMESTEPS_PER_DAY

    # Xử lý dữ liệu đầy đủ
    all_cells = df['cell_name'].unique()
    full_time_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq='15min')
    multi_index = pd.MultiIndex.from_product([all_cells, full_time_index], names=['cell_name', 'timestamp'])

    df_data_only = df.reset_index()[FEATURE_COLS + ['cell_name', 'timestamp']]
    df_grouped_unique = df_data_only.groupby(['cell_name', 'timestamp']).mean()
    df_full = df_grouped_unique.reindex(multi_index, fill_value=0).reset_index().set_index('timestamp')
    df_full = df_full.replace([np.inf, -np.inf], 0).fillna(0)

    # Chia tập test (dùng 10% cuối làm test)
    total_duration = df_full.index.max() - df_full.index.min()
    val_end_time = df_full.index.min() + total_duration * 0.9
    test_df = df_full[df_full.index >= val_end_time]

    # Scale tập test
    test_cells = test_df['cell_name']
    try:
        test_scaled_data = scaler.transform(test_df[FEATURE_COLS])
    except Exception as e:
        print(f"Lỗi khi scale test_df: {e}")
        test_scaled_data = np.zeros((len(test_df), len(FEATURE_COLS)))

    scaled_test_df = pd.DataFrame(test_scaled_data, columns=FEATURE_COLS, index=test_df.index)
    scaled_test_df['cell_name'] = test_cells.values

    # Tạo cửa sổ cho tập test
    def create_windows(data_df, input_steps, output_steps, feature_cols):
        X, y = [], []
        grouped = data_df.groupby('cell_name')
        for _, cell_data in grouped:
            cell_features = cell_data[feature_cols].values
            total_samples = len(cell_features)
            for i in range(total_samples - (input_steps + output_steps) + 1):
                X.append(cell_features[i : i + input_steps, :])
                y.append(cell_features[i + input_steps : i + input_steps + output_steps, :])
        return np.array(X), np.array(y)

    print("Đang tạo mẫu Test (X_test, y_test)...")
    X_test, y_test = create_windows(scaled_test_df, INPUT_STEPS, OUTPUT_STEPS, FEATURE_COLS)

    # Làm phẳng X_test cho CatBoost và XGBoost
    if X_test.size > 0:
        X_test_flat = X_test.reshape(X_test.shape[0], -1)
    else:
        X_test_flat = np.empty((0, INPUT_STEPS * N_FEATURES))

    print("Kích thước dữ liệu Test:")
    print(f"X_test shape: {X_test.shape}")
    print(f"X_test_flat shape: {X_test_flat.shape}")
    print(f"y_test shape: {y_test.shape}")
else:
    print("Lỗi: Không thể tạo tập test. 'df' hoặc 'scaler' không tồn tại.")


In [None]:
# --- Cell 4: Load các mô hình đã huấn luyện ---
models = {}

# Load Transformer
try:
    print("Loading Transformer model...")
    # Since we saved the entire model, we can load it directly.
    # Keras will handle the custom layers if they were saved correctly.
    transformer_model = keras.models.load_model('transformer_model.h5')
    models['Transformer'] = transformer_model
    print("Transformer model loaded.")
except Exception as e:
    print(f"Lỗi khi load Transformer model: {e}")

# Load LSTM
try:
    print("Loading LSTM model...")
    lstm_model = keras.models.load_model('lstm_model.h5')
    models['LSTM'] = lstm_model
    print("LSTM model loaded.")
except Exception as e:
    print(f"Lỗi khi load LSTM model: {e}")

# Load CatBoost
try:
    print("Loading CatBoost model...")
    catboost_model = joblib.load('catboost_model.joblib')
    models['CatBoost'] = catboost_model
    print("CatBoost model loaded.")
except Exception as e:
    print(f"Lỗi khi load CatBoost model: {e}")

# Load XGBoost
try:
    print("Loading XGBoost model...")
    xgboost_model = joblib.load('xgboost_model.joblib')
    models['XGBoost'] = xgboost_model
    print("XGBoost model loaded.")
except Exception as e:
    print(f"Lỗi khi load XGBoost model: {e}")

In [None]:
# --- Cell 5: Thực hiện dự đoán trên tập Test ---
predictions = {}

if 'models' in locals() and 'X_test' in locals():
    for name, model in models.items():
        print(f"--- Đang dự đoán với mô hình {name} ---")
        try:
            if name in ['Transformer', 'LSTM']:
                # Các mô hình DL cần input 3D
                y_pred_scaled = model.predict(X_test)
            else:
                # Các mô hình cây cần input 2D
                y_pred_flat = model.predict(X_test_flat)
                # Reshape lại kết quả về dạng 3D
                y_pred_scaled = y_pred_flat.reshape(X_test.shape[0], OUTPUT_STEPS, N_FEATURES)
            predictions[name] = y_pred_scaled
            print(f"Dự đoán cho {name} hoàn tất. Shape: {y_pred_scaled.shape}")
        except Exception as e:
            print(f"Lỗi khi dự đoán với {name}: {e}")
else:
    print("Lỗi: 'models' hoặc 'X_test' không tồn tại.")


In [None]:
# --- Cell 6: Đảo ngược Scaling và Tính toán Metrics ---
results = {}

if 'predictions' in locals() and 'y_test' in locals() and scaler is not None and y_test.size > 0:
    # Reshape y_test về dạng 2D để inverse transform và tính toán
    y_true_flat = y_test.reshape(-1, N_FEATURES)
    y_true_original_flat = scaler.inverse_transform(y_true_flat)

    for name, y_pred_scaled in predictions.items():
        print(f"--- Đang xử lý kết quả cho {name} ---")
        try:
            # Reshape dự đoán về 2D
            y_pred_flat = y_pred_scaled.reshape(-1, N_FEATURES)
            # Inverse transform
            y_pred_original_flat = scaler.inverse_transform(y_pred_flat)

            # Tính toán metrics trên dữ liệu đã inverse-scaled
            mse = mean_squared_error(y_true_original_flat, y_pred_original_flat)
            mae = mean_absolute_error(y_true_original_flat, y_pred_original_flat)

            results[name] = {'MSE': mse, 'MAE': mae}
            print(f"Kết quả cho {name}: MSE = {mse:.4f}, MAE = {mae:.4f}")
        except Exception as e:
            print(f"Lỗi khi xử lý kết quả cho {name}: {e}")

    # Tạo DataFrame từ results và hiển thị
    if results:
        results_df = pd.DataFrame(results).T
        print("\n--- Bảng so sánh kết quả ---")
        print(results_df)
        # Lưu results_df vào file CSV
        results_df.to_csv('comparison_metrics.csv')
        print("\nĐã lưu bảng so sánh vào 'comparison_metrics.csv'")
    else:
        print("Không có kết quả để hiển thị.")
else:
    print("Lỗi: Không thể tính toán metrics. 'predictions', 'y_test' hoặc 'scaler' không có sẵn hoặc y_test rỗng.")


In [None]:
# --- Cell 7: Trực quan hóa so sánh ---
if 'predictions' in locals() and 'y_test' in locals() and scaler is not None and y_test.size > 0:
    print("--- Trực quan hóa so sánh dự đoán trên một vài mẫu Test ---")

    num_samples_to_plot = min(3, X_test.shape[0])
    if num_samples_to_plot > 0:
        # Lấy y_test thực tế và inverse scale
        y_test_subset = y_test[:num_samples_to_plot]
        y_true_reshaped = y_test_subset.reshape(-1, N_FEATURES)
        y_true_original = scaler.inverse_transform(y_true_reshaped)
        y_true_original = y_true_original.reshape(num_samples_to_plot, OUTPUT_STEPS, N_FEATURES)

        # Inverse scale tất cả các dự đoán
        predictions_original = {}
        for name, y_pred_scaled in predictions.items():
            y_pred_subset = y_pred_scaled[:num_samples_to_plot]
            y_pred_reshaped = y_pred_subset.reshape(-1, N_FEATURES)
            y_pred_original = scaler.inverse_transform(y_pred_reshaped)
            predictions_original[name] = y_pred_original.reshape(num_samples_to_plot, OUTPUT_STEPS, N_FEATURES)

        # Vẽ biểu đồ
        for i in range(num_samples_to_plot):
            print(f"\n--- So sánh Mẫu #{i+1} ---")
            fig, axes = plt.subplots(N_FEATURES, 1, figsize=(18, 6 * N_FEATURES), sharex=True)
            if N_FEATURES == 1:
                axes = [axes]

            for j, feature_name in enumerate(FEATURE_COLS):
                ax = axes[j]
                # Vẽ giá trị thực tế
                ax.plot(y_true_original[i, :, j], label='Thực tế', linewidth=2, marker='.')

                # Vẽ dự đoán của các mô hình
                for name, y_pred_original in predictions_original.items():
                    ax.plot(y_pred_original[i, :, j], label=f'Dự đoán ({name})', linestyle='--', marker='x')

                ax.set_title(f'{feature_name} - Mẫu #{i+1}')
                ax.set_xlabel('Bước thời gian (15 phút)')
                ax.set_ylabel('Giá trị')
                ax.legend()
                ax.grid(True)

            plt.tight_layout()
            plt.show()
    else:
        print("Không có mẫu nào trong tập test để trực quan hóa.")
else:
    print("Lỗi: Không thể trực quan hóa do thiếu dữ liệu hoặc mô hình.")
