In [1]:
import math
import torch
import torch.nn as nn

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class DynamicThresholdModel(nn.Module):
    def __init__(self, input_feat_dim=1, cnn_out_channels=64, d_model=128, nhead=8, d_hid=256, nlayers=3, dropout=0.2):
        super().__init__()
        
        self.conv_extractor = nn.Sequential(
            nn.Conv1d(input_feat_dim, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Conv1d(32, cnn_out_channels, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(cnn_out_channels),
            nn.ReLU(),
        )
        
        self.d_model = d_model
        self.embedding = nn.Linear(cnn_out_channels, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, d_hid, dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers)
        
        self.regressor_head = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Linear(d_model // 2, 1),
            nn.Softplus()
        )

    def forward(self, src: torch.Tensor) -> torch.Tensor:
        src = src.unsqueeze(1)
        src = self.conv_extractor(src)
        src = src.permute(0, 2, 1)
        
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        
        output = self.transformer_encoder(src)
        
        pooled_output = output.mean(dim=1)
        
        threshold = self.regressor_head(pooled_output)
        
        return threshold.squeeze(1)

In [2]:
import torch
import torch.optim as optim

def train_model(model, data, loss_function, epochs=30, batch_size=64, lr=5e-4):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    model.train()
    for epoch in range(epochs):
        total_epoch_loss = 0
        
        indices = torch.randperm(data.size(0))
        data_shuffled = data[indices]

        num_batches = len(data) // batch_size
        
        for i in range(num_batches):
            start_idx = i * batch_size
            end_idx = start_idx + batch_size
            residual_series = data_shuffled[start_idx:end_idx].to(device)

            r_a = torch.max(residual_series, dim=1).values
            r_n = torch.quantile(residual_series, 0.5, dim=1)

            optimizer.zero_grad()
            
            predicted_threshold = model(residual_series)
            
            loss = loss_function(predicted_threshold, r_n, r_a)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            total_epoch_loss += loss.item()
        
        scheduler.step()
        avg_loss = total_epoch_loss / num_batches
        print(f"Epoch {epoch+1}/{epochs}, Average Loss: {avg_loss:.6f}, LR: {optimizer.param_groups[0]['lr']:.6f}")

In [3]:
import torch

if __name__ == '__main__':
    def custom_margin_ranking_loss(threshold, r_n, r_a, margin_n=0.1, margin_a=0.1):
        threshold_clamped = torch.max(threshold, torch.tensor(0.0).to(threshold.device))
        loss_n = torch.relu(r_n - threshold_clamped + margin_n)
        loss_a = torch.relu(threshold_clamped - r_a + margin_a)
        total_loss = (loss_n + loss_a).mean()
        return total_loss
        
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    num_samples = 10000
    sequence_length = 592
    
    print("--- Generating Sample Data ---")
    
    sample_data = torch.randn(num_samples, sequence_length) * 0.2
    for i in range(num_samples):
        num_anomalies = torch.randint(1, 6, (1,)).item()
        anomaly_indices = torch.randint(0, sequence_length, (num_anomalies,)).unique()
        anomaly_values = torch.rand(len(anomaly_indices)) * 8 + 2
        sample_data[i, anomaly_indices] = anomaly_values

    print(f"Sample data created with shape: {sample_data.shape}")

    model = DynamicThresholdModel()
    
    print("\n--- Starting Training ---")
    train_model(
        model=model, 
        data=sample_data, 
        loss_function=custom_margin_ranking_loss,
        epochs=30,
        batch_size=64
    )
    print("--- Training Finished ---")

--- Generating Sample Data ---
Sample data created with shape: torch.Size([10000, 592])

--- Starting Training ---
Epoch 1/30, Average Loss: 0.000000, LR: 0.000500
Epoch 2/30, Average Loss: 0.000000, LR: 0.000500
Epoch 3/30, Average Loss: 0.000000, LR: 0.000500
Epoch 4/30, Average Loss: 0.000000, LR: 0.000500


KeyboardInterrupt: 