In [1]:
import pandas as pd
import os
import torch
import torch.nn as nn
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
import random
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import torch.nn.functional as F
from datetime import datetime

# Task#1: Variability prediction

### Data preprocessing

In [None]:
# Helper function to clean and fix variability labels
def clean_variability_label(label):
    label = label.lower()
    if "periodic" in label:
        return "periodic"
    elif "aperiodic" in label:
        return "aperiodic"
    elif "not" in label:
        return "not variable"
    else:
        print(label)

class LightCurveDataset(Dataset):
    def __init__(self, files, sequence_length=1000):
        self.files = files
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        # Load CSV file (skip metadata rows)
        df = pd.read_csv(self.files[idx], skiprows=9)

        # Extract date, time, StdMag
        df['Timestamp'] = pd.to_datetime(df['Date'] + ' ' + df['Time'], errors='coerce')
        df['Timestamp'] = df['Timestamp'].ffill()
        #df = df.dropna(subset=['Timestamp']) #filling이 아니라 drop하고 싶다면 대체
        df = df.sort_values('Timestamp')
        df['TimeDiff'] = df['Timestamp'].diff().dt.total_seconds().fillna(0)

        # Convert to tensors
        stdmag = torch.tensor(df['StdMag'].values, dtype=torch.float32)
        time_diff = torch.tensor(df['TimeDiff'].values, dtype=torch.float32)

        # Pad or truncate sequences to fixed length
        seq_len = len(stdmag)
        if seq_len > self.sequence_length:
            start_idx = random.randint(0, seq_len - self.sequence_length)
            stdmag = stdmag[start_idx:start_idx + self.sequence_length]
            time_diff = time_diff[start_idx:start_idx + self.sequence_length]
            seq_len = self.sequence_length
        else:
            padding = self.sequence_length - seq_len
            stdmag = torch.cat([stdmag, torch.zeros(padding)])
            time_diff = torch.cat([time_diff, torch.zeros(padding)])

        # Variability type label
        try:
            metadata = pd.read_csv(self.files[idx], nrows=5, header=None)
            variability_type = metadata.iloc[3, 1]
            variability_type = clean_variability_label(str(variability_type))
        except (IndexError, AttributeError, KeyError):
            variability_type = 'not variable'
        label = torch.tensor(label_encoder.transform([variability_type])[0], dtype=torch.long)

        return time_diff, stdmag, label

In [3]:
class LightCurveDataset(Dataset):
    def __init__(self, files, sequence_length=1000):
        self.files = files
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        # Load CSV file (skip metadata rows and set the right header)
        df = pd.read_csv(self.files[idx], skiprows=9)  # Skip the first 9 rows to start at data header

        # Extract date, time, StdMag, and variability type
        # Handle inconsistent timestamp formats
        df['Timestamp'] = pd.to_datetime(df['Date'] + ' ' + df['Time'], errors='coerce')

        # Fill NaT with interpolated or forward-filled time values
        df['Timestamp'] = df['Timestamp'].ffill()

        df = df.sort_values('Timestamp')
        df['TimeDiff'] = df['Timestamp'].diff().dt.total_seconds().fillna(0)

        # No normalization of StdMag to preserve its range and variability information
        stdmag = torch.tensor(df['StdMag'].values, dtype=torch.float32)  # Ensure Float32 dtype
        time_diff = torch.tensor(df['TimeDiff'].values, dtype=torch.float32)  # Ensure Float32 dtype
        
        # Pad or truncate sequences to fixed length
        if len(stdmag) > self.sequence_length:
            stdmag = stdmag[:self.sequence_length]
            time_diff = time_diff[:self.sequence_length]
        else:
            padding = self.sequence_length - len(stdmag)
            stdmag = torch.cat([stdmag, torch.zeros(padding)])
            time_diff = torch.cat([time_diff, torch.zeros(padding)])

        # Variability type label (read from the 5th row as metadata)
        try:
            variability_type = pd.read_csv(self.files[idx], nrows=5).iloc[3, 1].split()[1]  # Use proper iloc syntax
            variability_type = clean_variability_label(variability_type)
        except (IndexError, AttributeError):
            # Handle missing or inconsistent variability information by assigning a default label
            variability_type = 'not variable'  # Set default label for incomplete or missing data
        label = torch.tensor(label_encoder.transform([variability_type])[0], dtype=torch.long)

        return stdmag, time_diff, label

# Preprocess labels for the first time (should be run once)
# Gather all variability types from the files and fit the encoder
variability_types = []
for file in files:
    try:
        metadata = pd.read_csv(file, nrows=5, header=None)  # Read first 5 rows to extract metadata
        variability = metadata.iloc[3, 1].split()[1]  # Use iloc with proper row and column access
        variability = clean_variability_label(variability)
        variability_types.append(variability)
    except (IndexError, AttributeError):
        # Handle missing or inconsistent variability information by assigning a default label
        variability_types.append('not variable')  # Default label for missing metadata

label_encoder.fit(variability_types)

# Load data
dataset = LightCurveDataset(files)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Transformer Model with batch_first set to True for better inference performance
class TransformerModel(nn.Module):
    def __init__(self, input_dim=1, nhead=4, num_layers=2, hidden_dim=128, output_dim=3):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.pos_encoder = nn.Linear(1, hidden_dim)
        encoder_layers = nn.TransformerEncoderLayer(hidden_dim, nhead, batch_first=True)  # Set batch_first=True
        self.transformer = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, time_diff):
        # Embed the StdMag values
        x = self.embedding(x.unsqueeze(-1))
        
        # Use time differences as positional encodings
        pos_enc = self.pos_encoder(time_diff.unsqueeze(-1))
        
        # Add positional encodings to input
        x = x + pos_enc
        
        # Pass through transformer layers
        x = self.transformer(x)
        
        # Classification output
        out = self.fc(x.mean(dim=1))
        return out

# Initialize model
model = TransformerModel()

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop (example for 1 epoch)
for data, time_diff, labels in dataloader:
    optimizer.zero_grad()
    output = model(data, time_diff)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()

    print(f"Loss: {loss.item()}")


NameError: name 'files' is not defined