# Zadanie: prognoza vs wartości prawdziwe

Pozostaw zbiór testowy jak obecnie. Zmień funkcję do backtestów oraz stworzenia zbiorów treningowego oraz testowego, tak aby nie ucinało pierwszych wartości ze zbioru testowego. Ze zbioru treningowego wyciągnij zbiór walidacyjny. Stwórz coś na wzór ‘siatki hiperparametrów’, tak jak dzieje się w GridSearch (Podpowiedź: Wystarczą pętle). Jako hiperparametry traktuj wartość dla zmiennej look_back oraz liczbę komórek w warstwie (units). Sprawdź zatem wszystkie architektury dla look_back z zakresu od 1 do 12 oraz units z zakresu 1 do 12. Wybierz model, który ma najniższą metrykę RMSE na zbiorze walidacyjnym i zwizualizuj prognozę vs wartości prawdziwe.

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Input, LSTM, Dense
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import time

# Wczytanie danych
dataset = pd.read_csv('airline-passengers.csv')
dataset['Month'] = pd.to_datetime(dataset['Month'])
dataset.set_index(['Month'], inplace=True)

# Podział na zbiór treningowy i testowy
train_size = int(len(dataset) * 0.70)
test_size = len(dataset) - train_size
train, test = dataset.iloc[0:train_size], dataset.iloc[train_size:]

# Normalizacja danych
scaler = MinMaxScaler(feature_range=(0, 1))
train_scaled = scaler.fit_transform(train)
test_scaled = scaler.transform(test)

# Przygotowanie zbioru walidacyjnego (20% zbioru treningowego)
val_size = int(train_size * 0.2)
val_scaled = train_scaled[:val_size]
train_scaled = train_scaled[val_size:]

# Funkcja do tworzenia zbiorów danych
def create_datasets(data, look_back=1):
    X, y = [], []
    for i in range(len(data)-look_back):
        X.append(data[i:(i+look_back), 0])
        y.append(data[i + look_back, 0])
    return np.array(X), np.array(y)

# Funkcja do budowy modelu
def build_model(look_back, units):
    model = Sequential([
        Input(shape=(look_back, 1), name='input_layer'),
        LSTM(units, activation='tanh'),
        Dense(1, activation='linear')
    ])
    model.compile(loss='mean_squared_error', optimizer='adam')
    return model

# Przebudowana funkcja backtestu
def backtest(model, train_data, test_data, look_back, scaler):
    # Prognoza dla danych treningowych
    train_predict = model.predict(train_data)
    train_predict = scaler.inverse_transform(train_predict)
    train_actual = scaler.inverse_transform([train_data[:, -1, 0]])
    
    # Prognoza dla danych testowych
    test_predict = model.predict(test_data)
    test_predict = scaler.inverse_transform(test_predict)
    test_actual = scaler.inverse_transform([test_data[:, -1, 0]])
    
    # Obliczenie RMSE
    train_rmse = np.sqrt(mean_squared_error(train_actual[0], train_predict[:,0]))
    test_rmse = np.sqrt(mean_squared_error(test_actual[0], test_predict[:,0]))
    
    # Przygotowanie danych do wizualizacji
    train_predict_plot = np.empty_like(dataset.values)
    train_predict_plot[:, :] = np.nan
    train_predict_plot[val_size+look_back:len(train_predict)+val_size+look_back, :] = train_predict
    
    test_predict_plot = np.empty_like(dataset.values)
    test_predict_plot[:, :] = np.nan
    test_predict_plot[len(train_predict)+(look_back*2)+val_size+1:len(dataset)-1, :] = test_predict
    
    # Wizualizacja
    plt.figure(figsize=(16,9))
    plt.plot(dataset['Passengers'], label='True Values')
    plt.plot(pd.DataFrame(train_predict_plot, index=dataset.index), label='Training Predictions')
    plt.plot(pd.DataFrame(test_predict_plot, index=dataset.index), label='Test Predictions')
    plt.legend()
    plt.title(f'Model with look_back={look_back} - Train RMSE: {train_rmse:.2f}, Test RMSE: {test_rmse:.2f}')
    plt.show()
    
    return train_rmse, test_rmse

# Siatka hiperparametrów
best_rmse = float('inf')
best_params = {}
results = []
start_time = time.time()

print("Rozpoczęcie przeszukiwania siatki hiperparametrów...")
for look_back in range(1, 13):
    for units in range(1, 13):
        print(f"\nTesting look_back={look_back}, units={units}...")
        
        # Przygotowanie danych
        X_train, y_train = create_datasets(train_scaled, look_back)
        X_val, y_val = create_datasets(val_scaled, look_back)
        X_test, y_test = create_datasets(test_scaled, look_back)
        
        X_train = np.reshape(X_train, (X_train.shape[0], look_back, 1))
        X_val = np.reshape(X_val, (X_val.shape[0], look_back, 1))
        X_test = np.reshape(X_test, (X_test.shape[0], look_back, 1))
        
        # Budowa i trening modelu
        model = build_model(look_back, units)
        history = model.fit(
            X_train, y_train,
            epochs=100,
            batch_size=1,
            validation_data=(X_val, y_val),
            verbose=0
        )
        
        # Ocena na zbiorze walidacyjnym
        val_predict = model.predict(X_val)
        val_rmse = np.sqrt(mean_squared_error(y_val, val_predict))
        
        # Zapamiętanie najlepszego modelu
        if val_rmse < best_rmse:
            best_rmse = val_rmse
            best_params = {
                'look_back': look_back,
                'units': units,
                'model': model,
                'history': history
            }
        
        results.append({
            'look_back': look_back,
            'units': units,
            'val_rmse': val_rmse,
            'train_rmse': np.sqrt(history.history['loss'][-1]),
            'val_loss': history.history['val_loss'][-1]
        })

# Konwersja wyników do DataFrame
results_df = pd.DataFrame(results)
print("\nPrzeszukiwanie zakończone w %.2f sekund." % (time.time() - start_time))
print("\nNajlepsze parametry:", {k: v for k, v in best_params.items() if k != 'model' and k != 'history'})

# Ocena najlepszego modelu na zbiorze testowym
best_look_back = best_params['look_back']
best_units = best_params['units']
best_model = best_params['model']

X_test_best, y_test_best = create_datasets(test_scaled, best_look_back)
X_test_best = np.reshape(X_test_best, (X_test_best.shape[0], best_look_back, 1))

test_predict = best_model.predict(X_test_best)
test_rmse = np.sqrt(mean_squared_error(y_test_best, test_predict))
print(f"\nRMSE na zbiorze testowym dla najlepszego modelu: {test_rmse:.4f}")

# Wizualizacja krzywej uczenia dla najlepszego modelu
plt.figure(figsize=(12, 6))
plt.plot(best_params['history'].history['loss'], label='Train Loss')
plt.plot(best_params['history'].history['val_loss'], label='Validation Loss')
plt.title(f'Training History (look_back={best_look_back}, units={best_units})')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()

# Wizualizacja najlepszego modelu
backtest(best_model,
         np.reshape(create_datasets(train_scaled, best_look_back)[0], 
         X_test_best,
         best_look_back,
         scaler)

# Zapis wyników do pliku
results_df.to_csv('grid_search_results.csv', index=False)
print("\nWyniki przeszukiwania zapisane w pliku 'grid_search_results.csv'")

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Input, LSTM, Dense
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import time
import warnings

# Wyłącz ostrzeżenia Keras
warnings.filterwarnings('ignore', category=UserWarning)

# 1. Przygotowanie danych
def prepare_data():
    dataset = pd.read_csv('airline-passengers.csv')
    dataset['Month'] = pd.to_datetime(dataset['Month'])
    dataset.set_index(['Month'], inplace=True)
    
    train_size = int(len(dataset) * 0.70)
    train, test = dataset.iloc[0:train_size], dataset.iloc[train_size:]
    
    scaler = MinMaxScaler(feature_range=(0, 1))
    train_scaled = scaler.fit_transform(train)
    test_scaled = scaler.transform(test)
    
    val_size = int(train_size * 0.2)
    val_scaled = train_scaled[:val_size]
    train_scaled = train_scaled[val_size:]
    
    return train_scaled, val_scaled, test_scaled, scaler, dataset

# 2. Funkcje pomocnicze
def create_datasets(data, look_back=1):
    X, y = [], []
    for i in range(len(data)-look_back):
        X.append(data[i:(i+look_back), 0])
        y.append(data[i + look_back, 0])
    return np.array(X), np.array(y)

def build_model(look_back, units):
    model = Sequential([
        Input(shape=(look_back, 1)),
        LSTM(units, activation='tanh'),
        Dense(1)
    ])
    model.compile(loss='mse', optimizer='adam')
    return model

# 3. Optymalizacja przeszukiwania siatki
def optimize_parameters():
    train_scaled, val_scaled, _, _, _ = prepare_data()
    
    results = []
    best_rmse = float('inf')
    best_params = {}
    
    # Ograniczony zakres dla demonstracji
    for look_back in [3, 6, 9, 12]:  # Zamiast range(1,13)
        for units in [4, 8, 12]:      # Zamiast range(1,13)
            start_time = time.time()
            
            X_train, y_train = create_datasets(train_scaled, look_back)
            X_val, y_val = create_datasets(val_scaled, look_back)
            
            X_train = X_train.reshape(-1, look_back, 1)
            X_val = X_val.reshape(-1, look_back, 1)
            
            model = build_model(look_back, units)
            history = model.fit(
                X_train, y_train,
                epochs=50,  # Zmniejszona liczba epok
                batch_size=8,  # Zwiększony batch size
                validation_data=(X_val, y_val),
                verbose=0
            )
            
            val_rmse = np.sqrt(history.history['val_loss'][-1])
            results.append({
                'look_back': look_back,
                'units': units,
                'val_rmse': val_rmse,
                'time': time.time() - start_time
            })
            
            if val_rmse < best_rmse:
                best_rmse = val_rmse
                best_params = {
                    'look_back': look_back,
                    'units': units,
                    'model': model
                }
            
            print(f"look_back={look_back}, units={units}, val_rmse={val_rmse:.4f}, time={results[-1]['time']:.1f}s")
    
    return pd.DataFrame(results), best_params

# 4. Główna część skryptu
if __name__ == "__main__":
    print("Rozpoczęcie optymalizacji...")
    results_df, best_params = optimize_parameters()
    
    print("\nNajlepsze parametry:")
    print(f"look_back: {best_params['look_back']}")
    print(f"units: {best_params['units']}")
    print(f"val_rmse: {results_df['val_rmse'].min():.4f}")
    
    # Ocena na zbiorze testowym
    _, _, test_scaled, scaler, dataset = prepare_data()
    look_back = best_params['look_back']
    
    X_test, y_test = create_datasets(test_scaled, look_back)
    X_test = X_test.reshape(-1, look_back, 1)
    
    test_predict = best_params['model'].predict(X_test)
    test_predict = scaler.inverse_transform(test_predict)
    y_test = scaler.inverse_transform([y_test])[0]
    
    test_rmse = np.sqrt(mean_squared_error(y_test, test_predict))
    print(f"\nTest RMSE: {test_rmse:.4f}")
    
    # Wizualizacja
    plt.figure(figsize=(12, 6))
    plt.plot(dataset.index[-len(y_test):], y_test, label='True Values')
    plt.plot(dataset.index[-len(test_predict):], test_predict, label='Predictions')
    plt.title(f"Best Model (look_back={look_back}, units={best_params['units']})\nTest RMSE: {test_rmse:.4f}")
    plt.legend()
    plt.show()
    
    # Zapis wyników
    results_df.to_csv('optimization_results.csv', index=False)
    print("\nWyniki zapisane w optimization_results.csv")