# 16. Time Series Analysis s Transformery

**Autor:** Praut s.r.o. - AI Integration & Business Automation

V tomto notebooku se naučíme používat Transformer modely pro analýzu časových řad - predikci, detekci anomálií a klasifikaci sekvencí.

## Obsah
1. Úvod do Time Series Transformerů
2. Příprava dat pro časové řady
3. Predikce časových řad (Forecasting)
4. Klasifikace časových řad
5. Produkční pipeline

In [None]:
# Instalace knihoven
!pip install transformers accelerate torch pandas numpy scikit-learn matplotlib gluonts pytorch-forecasting -q

In [None]:
import torch
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# Kontrola GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Používám zařízení: {device}")

## 1. Úvod do Time Series Transformerů

Transformery pro časové řady využívají attention mechanismus k zachycení dlouhodobých závislostí v datech:

| Model | Použití | Výhody |
|-------|---------|--------|
| Informer | Dlouhodobé predikce | Efektivní attention O(L log L) |
| Autoformer | Sezónní data | Auto-correlation mechanism |
| TimeSeriesTransformer | Obecné | HuggingFace integrace |
| PatchTST | Multivariate | Patch-based tokenizace |

In [None]:
# Generování syntetických dat pro demonstraci

def generate_sales_data(
    start_date: str = '2023-01-01',
    periods: int = 365,
    freq: str = 'D'
) -> pd.DataFrame:
    """
    Generuje syntetická data prodejů s trendem, sezónností a šumem.
    Simuluje reálné e-commerce prodeje.
    """
    np.random.seed(42)
    dates = pd.date_range(start=start_date, periods=periods, freq=freq)
    
    # Základní trend (růst)
    trend = np.linspace(100, 150, periods)
    
    # Týdenní sezónnost (vyšší prodeje o víkendu)
    day_of_week = np.array([d.dayofweek for d in dates])
    weekly_pattern = np.where(day_of_week >= 5, 1.3, 1.0)  # Víkend +30%
    
    # Měsíční sezónnost (vyšší prodeje před svátky)
    month = np.array([d.month for d in dates])
    monthly_pattern = np.where(np.isin(month, [11, 12]), 1.5, 1.0)  # Vánoce +50%
    monthly_pattern = np.where(np.isin(month, [1, 2]), 0.8, monthly_pattern)  # Leden/únor -20%
    
    # Náhodný šum
    noise = np.random.normal(0, 10, periods)
    
    # Kombinace všech komponent
    sales = trend * weekly_pattern * monthly_pattern + noise
    sales = np.maximum(sales, 0)  # Prodeje nemohou být záporné
    
    # Vytvoření DataFrame
    df = pd.DataFrame({
        'date': dates,
        'sales': sales.round(2),
        'day_of_week': day_of_week,
        'month': month,
        'is_weekend': (day_of_week >= 5).astype(int),
        'is_holiday_season': np.isin(month, [11, 12]).astype(int)
    })
    
    return df

# Generování dat
sales_data = generate_sales_data(periods=365*2)  # 2 roky dat
print(f"Vygenerováno {len(sales_data)} záznamů")
print(f"\nPrvních 5 řádků:")
print(sales_data.head())

# Vizualizace
plt.figure(figsize=(14, 5))
plt.plot(sales_data['date'], sales_data['sales'], alpha=0.7)
plt.title('Syntetická data prodejů')
plt.xlabel('Datum')
plt.ylabel('Prodeje')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 2. Příprava dat pro časové řady

In [None]:
@dataclass
class TimeSeriesConfig:
    """Konfigurace pro zpracování časových řad."""
    sequence_length: int = 30  # Délka vstupní sekvence (lookback)
    prediction_horizon: int = 7  # Kolik kroků predikovat dopředu
    stride: int = 1  # Krok mezi sekvencemi
    train_ratio: float = 0.7
    val_ratio: float = 0.15
    test_ratio: float = 0.15
    scale_data: bool = True


class TimeSeriesPreprocessor:
    """
    Předzpracování časových řad pro Transformer modely.
    Vytváří sliding window sekvence pro supervised learning.
    """
    
    def __init__(self, config: TimeSeriesConfig):
        self.config = config
        self.scaler = StandardScaler() if config.scale_data else None
        self.feature_columns = None
        
    def create_sequences(
        self,
        data: np.ndarray,
        target_idx: int = 0
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Vytvoří sekvence pro predikci.
        
        Args:
            data: Data tvaru (n_samples, n_features)
            target_idx: Index cílové proměnné
            
        Returns:
            X: Vstupní sekvence (n_sequences, seq_length, n_features)
            y: Cílové hodnoty (n_sequences, prediction_horizon)
        """
        seq_len = self.config.sequence_length
        pred_horizon = self.config.prediction_horizon
        stride = self.config.stride
        
        X, y = [], []
        
        for i in range(0, len(data) - seq_len - pred_horizon + 1, stride):
            # Vstupní sekvence
            X.append(data[i:i + seq_len])
            # Cílové hodnoty (pouze target proměnná)
            y.append(data[i + seq_len:i + seq_len + pred_horizon, target_idx])
        
        return np.array(X), np.array(y)
    
    def prepare_data(
        self,
        df: pd.DataFrame,
        target_column: str,
        feature_columns: Optional[List[str]] = None
    ) -> Dict[str, torch.Tensor]:
        """
        Kompletní příprava dat včetně škálování a rozdělení.
        """
        # Určení feature sloupců
        if feature_columns is None:
            feature_columns = [target_column]
        self.feature_columns = feature_columns
        
        # Extrakce dat
        data = df[feature_columns].values.astype(np.float32)
        target_idx = feature_columns.index(target_column)
        
        # Škálování
        if self.scaler:
            data = self.scaler.fit_transform(data)
        
        # Vytvoření sekvencí
        X, y = self.create_sequences(data, target_idx)
        
        # Rozdělení na train/val/test
        n_samples = len(X)
        train_end = int(n_samples * self.config.train_ratio)
        val_end = train_end + int(n_samples * self.config.val_ratio)
        
        result = {
            'X_train': torch.FloatTensor(X[:train_end]),
            'y_train': torch.FloatTensor(y[:train_end]),
            'X_val': torch.FloatTensor(X[train_end:val_end]),
            'y_val': torch.FloatTensor(y[train_end:val_end]),
            'X_test': torch.FloatTensor(X[val_end:]),
            'y_test': torch.FloatTensor(y[val_end:])
        }
        
        print(f"Připraveno:")
        print(f"  Train: {result['X_train'].shape[0]} sekvencí")
        print(f"  Val: {result['X_val'].shape[0]} sekvencí")
        print(f"  Test: {result['X_test'].shape[0]} sekvencí")
        print(f"  Vstupní tvar: {result['X_train'].shape}")
        print(f"  Výstupní tvar: {result['y_train'].shape}")
        
        return result
    
    def inverse_transform_target(
        self,
        scaled_values: np.ndarray,
        target_idx: int = 0
    ) -> np.ndarray:
        """Převede škálované hodnoty zpět na původní měřítko."""
        if self.scaler is None:
            return scaled_values
        
        # Vytvoříme dummy array pro inverse transform
        n_features = len(self.feature_columns)
        dummy = np.zeros((len(scaled_values), n_features))
        dummy[:, target_idx] = scaled_values.flatten()
        
        return self.scaler.inverse_transform(dummy)[:, target_idx]


# Použití
config = TimeSeriesConfig(
    sequence_length=30,
    prediction_horizon=7,
    stride=1
)

preprocessor = TimeSeriesPreprocessor(config)
prepared_data = preprocessor.prepare_data(
    sales_data,
    target_column='sales',
    feature_columns=['sales', 'day_of_week', 'is_weekend', 'is_holiday_season']
)

## 3. Predikce časových řad (Forecasting)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import math


class PositionalEncoding(nn.Module):
    """Pozicové kódování pro Transformer."""
    
    def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # Vytvoření pozicového kódování
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """x: Tensor tvaru (batch, seq_len, d_model)"""
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)


class TimeSeriesTransformer(nn.Module):
    """
    Transformer model pro predikci časových řad.
    Encoder-only architektura s attention mechanismem.
    """
    
    def __init__(
        self,
        n_features: int,
        d_model: int = 64,
        n_heads: int = 4,
        n_layers: int = 2,
        d_ff: int = 256,
        dropout: float = 0.1,
        prediction_horizon: int = 7
    ):
        super().__init__()
        
        self.n_features = n_features
        self.d_model = d_model
        self.prediction_horizon = prediction_horizon
        
        # Input projection
        self.input_projection = nn.Linear(n_features, d_model)
        
        # Positional encoding
        self.positional_encoding = PositionalEncoding(d_model, dropout=dropout)
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=n_layers
        )
        
        # Output layers
        self.output_projection = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, prediction_horizon)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Input tensor (batch, seq_len, n_features)
            
        Returns:
            predictions: (batch, prediction_horizon)
        """
        # Project input to d_model dimensions
        x = self.input_projection(x)
        
        # Add positional encoding
        x = self.positional_encoding(x)
        
        # Transformer encoding
        x = self.transformer_encoder(x)
        
        # Use last position for prediction
        x = x[:, -1, :]  # (batch, d_model)
        
        # Project to prediction horizon
        predictions = self.output_projection(x)
        
        return predictions


# Vytvoření modelu
n_features = prepared_data['X_train'].shape[2]
model = TimeSeriesTransformer(
    n_features=n_features,
    d_model=64,
    n_heads=4,
    n_layers=2,
    prediction_horizon=config.prediction_horizon
).to(device)

print(f"Model vytvořen")
print(f"Počet parametrů: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
from torch.utils.data import DataLoader, TensorDataset


class TimeSeriesTrainer:
    """
    Trainer pro trénování Time Series Transformer modelu.
    """
    
    def __init__(
        self,
        model: nn.Module,
        device: torch.device,
        learning_rate: float = 1e-3,
        weight_decay: float = 1e-5
    ):
        self.model = model
        self.device = device
        self.optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=learning_rate,
            weight_decay=weight_decay
        )
        self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode='min', factor=0.5, patience=5
        )
        self.criterion = nn.MSELoss()
        self.history = {'train_loss': [], 'val_loss': []}
        
    def train_epoch(self, dataloader: DataLoader) -> float:
        """Trénuje jednu epochu."""
        self.model.train()
        total_loss = 0
        
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)
            
            self.optimizer.zero_grad()
            predictions = self.model(X_batch)
            loss = self.criterion(predictions, y_batch)
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            total_loss += loss.item()
        
        return total_loss / len(dataloader)
    
    @torch.no_grad()
    def evaluate(self, dataloader: DataLoader) -> float:
        """Evaluuje model na validačních datech."""
        self.model.eval()
        total_loss = 0
        
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)
            
            predictions = self.model(X_batch)
            loss = self.criterion(predictions, y_batch)
            total_loss += loss.item()
        
        return total_loss / len(dataloader)
    
    def train(
        self,
        train_data: Dict[str, torch.Tensor],
        n_epochs: int = 50,
        batch_size: int = 32,
        early_stopping_patience: int = 10
    ) -> Dict:
        """Kompletní trénování s early stopping."""
        # Vytvoření DataLoaderů
        train_dataset = TensorDataset(train_data['X_train'], train_data['y_train'])
        val_dataset = TensorDataset(train_data['X_val'], train_data['y_val'])
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)
        
        best_val_loss = float('inf')
        patience_counter = 0
        best_model_state = None
        
        for epoch in range(n_epochs):
            train_loss = self.train_epoch(train_loader)
            val_loss = self.evaluate(val_loader)
            
            self.history['train_loss'].append(train_loss)
            self.history['val_loss'].append(val_loss)
            
            self.scheduler.step(val_loss)
            
            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                best_model_state = self.model.state_dict().copy()
            else:
                patience_counter += 1
            
            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{n_epochs} - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")
            
            if patience_counter >= early_stopping_patience:
                print(f"Early stopping v epoše {epoch+1}")
                break
        
        # Načtení nejlepšího modelu
        if best_model_state:
            self.model.load_state_dict(best_model_state)
        
        return self.history


# Trénování
trainer = TimeSeriesTrainer(model, device, learning_rate=1e-3)
history = trainer.train(prepared_data, n_epochs=100, batch_size=32, early_stopping_patience=15)

# Vizualizace loss
plt.figure(figsize=(10, 4))
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training History')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Evaluace na testovacích datech

@torch.no_grad()
def evaluate_model(
    model: nn.Module,
    X_test: torch.Tensor,
    y_test: torch.Tensor,
    preprocessor: TimeSeriesPreprocessor
) -> Dict:
    """Kompletní evaluace modelu s metrikami."""
    model.eval()
    
    # Predikce
    X_test_device = X_test.to(device)
    predictions = model(X_test_device).cpu().numpy()
    actuals = y_test.numpy()
    
    # Inverse transform (zpět na původní měřítko)
    predictions_original = np.array([
        preprocessor.inverse_transform_target(pred) for pred in predictions
    ])
    actuals_original = np.array([
        preprocessor.inverse_transform_target(act) for act in actuals
    ])
    
    # Metriky
    mae = mean_absolute_error(actuals_original.flatten(), predictions_original.flatten())
    mse = mean_squared_error(actuals_original.flatten(), predictions_original.flatten())
    rmse = np.sqrt(mse)
    
    # MAPE (Mean Absolute Percentage Error)
    mape = np.mean(np.abs((actuals_original - predictions_original) / (actuals_original + 1e-8))) * 100
    
    results = {
        'MAE': mae,
        'MSE': mse,
        'RMSE': rmse,
        'MAPE': mape,
        'predictions': predictions_original,
        'actuals': actuals_original
    }
    
    print(f"Výsledky evaluace:")
    print(f"  MAE: {mae:.2f}")
    print(f"  RMSE: {rmse:.2f}")
    print(f"  MAPE: {mape:.2f}%")
    
    return results


# Evaluace
results = evaluate_model(
    model,
    prepared_data['X_test'],
    prepared_data['y_test'],
    preprocessor
)

# Vizualizace predikcí
plt.figure(figsize=(14, 5))

# Zobrazení několika predikcí
n_show = 5
for i in range(n_show):
    idx = i * 20
    if idx < len(results['predictions']):
        x = range(idx, idx + config.prediction_horizon)
        plt.plot(x, results['actuals'][idx], 'b-', alpha=0.5, label='Skutečnost' if i == 0 else '')
        plt.plot(x, results['predictions'][idx], 'r--', alpha=0.5, label='Predikce' if i == 0 else '')

plt.xlabel('Čas')
plt.ylabel('Prodeje')
plt.title('Porovnání predikcí vs skutečnost')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

## 4. Klasifikace časových řad

In [None]:
def generate_activity_data(n_samples: int = 1000, seq_length: int = 100) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generuje syntetická data aktivit (chůze, běh, stání).
    Simuluje data z akcelerometru.
    """
    np.random.seed(42)
    
    X = []
    y = []
    
    for _ in range(n_samples):
        activity = np.random.choice([0, 1, 2])  # 0=stání, 1=chůze, 2=běh
        
        if activity == 0:  # Stání - nízká variance
            x = np.random.normal(0, 0.1, (seq_length, 3))
        elif activity == 1:  # Chůze - střední variance, periodická
            t = np.linspace(0, 4*np.pi, seq_length)
            x = np.column_stack([
                0.5 * np.sin(t) + np.random.normal(0, 0.2, seq_length),
                0.3 * np.cos(t) + np.random.normal(0, 0.2, seq_length),
                0.2 * np.sin(2*t) + np.random.normal(0, 0.1, seq_length)
            ])
        else:  # Běh - vysoká variance, rychlejší periodicita
            t = np.linspace(0, 8*np.pi, seq_length)
            x = np.column_stack([
                1.0 * np.sin(t) + np.random.normal(0, 0.4, seq_length),
                0.6 * np.cos(t) + np.random.normal(0, 0.3, seq_length),
                0.5 * np.sin(2*t) + np.random.normal(0, 0.2, seq_length)
            ])
        
        X.append(x)
        y.append(activity)
    
    return np.array(X, dtype=np.float32), np.array(y)


class TimeSeriesClassifier(nn.Module):
    """
    Transformer model pro klasifikaci časových řad.
    """
    
    def __init__(
        self,
        n_features: int,
        n_classes: int,
        d_model: int = 64,
        n_heads: int = 4,
        n_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.input_projection = nn.Linear(n_features, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout=dropout)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        
        # Global average pooling + classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, n_classes)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Input projection
        x = self.input_projection(x)
        x = self.positional_encoding(x)
        
        # Transformer
        x = self.transformer(x)
        
        # Global average pooling
        x = x.mean(dim=1)
        
        # Classification
        return self.classifier(x)


# Generování dat
X_activity, y_activity = generate_activity_data(n_samples=1000, seq_length=100)
print(f"Data aktivit: {X_activity.shape}")
print(f"Distribuce tříd: {np.bincount(y_activity)}")

# Rozdělení dat
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X_activity, y_activity, test_size=0.2, random_state=42, stratify=y_activity
)

# Konverze na tensory
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.LongTensor(y_train)
X_test_t = torch.FloatTensor(X_test)
y_test_t = torch.LongTensor(y_test)

In [None]:
# Trénování klasifikátoru

classifier = TimeSeriesClassifier(
    n_features=3,
    n_classes=3,
    d_model=64,
    n_heads=4,
    n_layers=2
).to(device)

optimizer = torch.optim.AdamW(classifier.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Training loop
n_epochs = 30
for epoch in range(n_epochs):
    classifier.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = classifier(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += (predicted == y_batch).sum().item()
        total += y_batch.size(0)
    
    if (epoch + 1) % 5 == 0:
        acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{n_epochs} - Loss: {total_loss/len(train_loader):.4f}, Acc: {acc:.1f}%")

# Evaluace
classifier.eval()
with torch.no_grad():
    X_test_device = X_test_t.to(device)
    outputs = classifier(X_test_device)
    _, predicted = outputs.max(1)
    
    accuracy = (predicted.cpu() == y_test_t).float().mean().item() * 100
    print(f"\nTest Accuracy: {accuracy:.1f}%")

# Confusion matrix
from sklearn.metrics import classification_report, confusion_matrix

y_pred = predicted.cpu().numpy()
print("\nClassification Report:")
print(classification_report(
    y_test, y_pred,
    target_names=['Stání', 'Chůze', 'Běh']
))

## 5. Produkční pipeline

In [None]:
from datetime import datetime
import json


class ProductionTimeSeriesForecaster:
    """
    Produkční pipeline pro predikci časových řad.
    Zahrnuje předzpracování, predikci a post-processing.
    """
    
    def __init__(
        self,
        model: nn.Module,
        preprocessor: TimeSeriesPreprocessor,
        device: torch.device
    ):
        self.model = model
        self.preprocessor = preprocessor
        self.device = device
        self.model.eval()
        
        # Statistiky
        self.prediction_count = 0
        self.prediction_history = []
        
    @torch.no_grad()
    def predict(
        self,
        recent_data: pd.DataFrame,
        target_column: str,
        feature_columns: Optional[List[str]] = None,
        return_confidence: bool = True
    ) -> Dict:
        """
        Vytvoří predikci na základě nejnovějších dat.
        
        Args:
            recent_data: DataFrame s nejnovějšími daty
            target_column: Název cílového sloupce
            feature_columns: Seznam feature sloupců
            return_confidence: Zda vrátit odhad spolehlivosti
        """
        if feature_columns is None:
            feature_columns = self.preprocessor.feature_columns
        
        # Kontrola délky dat
        seq_len = self.preprocessor.config.sequence_length
        if len(recent_data) < seq_len:
            raise ValueError(f"Potřeba alespoň {seq_len} záznamů, poskytnuto {len(recent_data)}")
        
        # Příprava dat
        data = recent_data[feature_columns].tail(seq_len).values.astype(np.float32)
        
        # Škálování
        if self.preprocessor.scaler:
            data = self.preprocessor.scaler.transform(data)
        
        # Predikce
        X = torch.FloatTensor(data).unsqueeze(0).to(self.device)
        predictions_scaled = self.model(X).cpu().numpy()[0]
        
        # Inverse transform
        target_idx = feature_columns.index(target_column)
        predictions = self.preprocessor.inverse_transform_target(
            predictions_scaled, target_idx
        )
        
        # Vytvoření datumů pro predikce
        last_date = recent_data['date'].iloc[-1]
        prediction_dates = pd.date_range(
            start=last_date + timedelta(days=1),
            periods=len(predictions),
            freq='D'
        )
        
        result = {
            'predictions': predictions.tolist(),
            'dates': prediction_dates.strftime('%Y-%m-%d').tolist(),
            'timestamp': datetime.now().isoformat(),
            'model_type': 'TimeSeriesTransformer'
        }
        
        # Odhad spolehlivosti na základě variance v historických datech
        if return_confidence:
            historical_std = recent_data[target_column].std()
            result['confidence_interval'] = {
                'lower': (predictions - 1.96 * historical_std).tolist(),
                'upper': (predictions + 1.96 * historical_std).tolist()
            }
        
        # Uložení do historie
        self.prediction_count += 1
        self.prediction_history.append({
            'timestamp': result['timestamp'],
            'predictions': result['predictions']
        })
        
        return result
    
    def get_statistics(self) -> Dict:
        """Vrátí statistiky použití."""
        return {
            'total_predictions': self.prediction_count,
            'prediction_horizon': self.preprocessor.config.prediction_horizon,
            'sequence_length': self.preprocessor.config.sequence_length,
            'features_used': self.preprocessor.feature_columns
        }
    
    def save_model(self, path: str):
        """Uloží model a preprocessor."""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'scaler': self.preprocessor.scaler,
            'config': self.preprocessor.config,
            'feature_columns': self.preprocessor.feature_columns
        }, path)
        print(f"Model uložen do: {path}")


# Použití produkční pipeline
forecaster = ProductionTimeSeriesForecaster(model, preprocessor, device)

# Simulace real-time predikce
recent_data = sales_data.tail(60)  # Poslední 2 měsíce

prediction_result = forecaster.predict(
    recent_data,
    target_column='sales',
    feature_columns=['sales', 'day_of_week', 'is_weekend', 'is_holiday_season']
)

print("Predikce prodejů na příštích 7 dní:")
for date, pred, lower, upper in zip(
    prediction_result['dates'],
    prediction_result['predictions'],
    prediction_result['confidence_interval']['lower'],
    prediction_result['confidence_interval']['upper']
):
    print(f"  {date}: {pred:.1f} (95% CI: {lower:.1f} - {upper:.1f})")

print(f"\nStatistiky: {forecaster.get_statistics()}")

In [None]:
# Vizualizace predikcí s confidence intervals

plt.figure(figsize=(14, 6))

# Historická data
plt.plot(
    recent_data['date'].values[-30:],
    recent_data['sales'].values[-30:],
    'b-',
    label='Historická data',
    linewidth=2
)

# Predikce
pred_dates = pd.to_datetime(prediction_result['dates'])
predictions = prediction_result['predictions']
lower = prediction_result['confidence_interval']['lower']
upper = prediction_result['confidence_interval']['upper']

plt.plot(pred_dates, predictions, 'r-', label='Predikce', linewidth=2)
plt.fill_between(pred_dates, lower, upper, alpha=0.3, color='red', label='95% CI')

plt.xlabel('Datum')
plt.ylabel('Prodeje')
plt.title('Predikce prodejů s confidence intervaly')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Shrnutí

V tomto notebooku jsme se naučili:

1. **Příprava dat** - Vytváření sliding window sekvencí pro supervised learning
2. **Time Series Transformer** - Encoder-only architektura s attention pro predikci
3. **Klasifikace sekvencí** - Rozpoznávání vzorů v časových řadách
4. **Produkční pipeline** - Real-time predikce s confidence intervaly

### Klíčové poznatky

- Transformery excellují v zachycení dlouhodobých závislostí díky attention
- Správná normalizace dat je kritická pro stabilní trénování
- Prediction horizon volíme podle business požadavků (kratší = přesnější)
- Confidence intervaly pomáhají s rozhodováním v nejistých situacích

### Praktické tipy

- Pro kratší sekvence (<100) stačí menší modely (d_model=64, n_layers=2)
- Multivariate features výrazně zlepšují predikce
- Sezónnost a trendy jsou dobré explicitně zakódovat jako features
- Regularizace (dropout, weight decay) prevence overfittingu