In [None]:
#old model

'''
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        # Load scoring file
        self.data = pd.read_csv(kmer_score_file)

        # Extract features and labels
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model with an embedding layer
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        # Embedding layer
        self.embedding = nn.Linear(input_dim, embedding_dim)
        
        # LSTM layers
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Pass through embedding layer
        x = self.embedding(x)
        x = x.unsqueeze(1)  # Add sequence dimension for LSTM
        
        # Pass through LSTM layers
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        
        # Extract the last output of the LSTM
        x = x[:, -1, :]
        
        # Fully connected layers
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)  # Output probabilities

# Hyperparameters
embedding_dim = 128  # New hyperparameter for embedding layer
hidden_dim1 = 64
hidden_dim2 = 32
dense_units = 64
num_classes = 3
learning_rate = 0.005
num_epochs = 180
batch_size = 32

# File paths
kmer_score_file = '/home/user/torch_shrimp/until-tools/mod/k-mer/test2_new.csv'

# Dataset and DataLoader
dataset = KmerFrequencyDataset(kmer_score_file)
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = int(0.1 * dataset_size)
test_size = dataset_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]  # Automatically adjust to input feature size
model = LSTMModel(input_dim, embedding_dim, hidden_dim1, hidden_dim2, dense_units, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)
    
    # Validation loop
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    print(f"Epoch [{epoch + 1}/{num_epochs}] - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Evaluation on test set
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        output = model(X_batch)
        _, predicted = torch.max(output.data, 1)
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()
test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Save model
torch.save(model.state_dict(), 'new_1.pth')
print("Model training completed and saved.")
'''

In [None]:
#New model (NO log)
'''
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        # Load scoring file
        self.data = pd.read_csv(kmer_score_file)

        # Extract features and labels
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model with embedding layer
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        # Embedding layer
        self.embedding = nn.Linear(input_dim, embedding_dim)
        
        # LSTM layers
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Embedding layer
        x = self.embedding(x)  # (batch_size, input_dim) -> (batch_size, embedding_dim)
        x = x.unsqueeze(1)  # Add sequence dimension for LSTM

        # LSTM layers
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)

        # Extract the last output of the LSTM
        x = x[:, -1, :]  # Use the last hidden state

        # Fully connected layers
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)  # Output probabilities

# Hyperparameters
embedding_dim = 128
hidden_dim1 = 64
hidden_dim2 = 32
dense_units = 64
num_classes = 3
learning_rate = 0.005
num_epochs = 180
batch_size = 32

# File paths
kmer_score_file = '/home/user/torch_shrimp/until-tools/mod/k-mer/test2_new.csv'

# Dataset and DataLoader
dataset = KmerFrequencyDataset(kmer_score_file)
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = int(0.1 * dataset_size)
test_size = dataset_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]  # Automatically adjust to input feature size
model = LSTMModel(input_dim, embedding_dim, hidden_dim1, hidden_dim2, dense_units, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training and validation loop
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)
    
    # Validation
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    print(f"Epoch [{epoch + 1}/{num_epochs}] - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Evaluation on test set
model.eval()
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Generate classification report
print("Classification Report:")
print(classification_report(all_labels, all_preds))

# Save the model
torch.save(model.state_dict(), 'updated_model_1.pth')
print("Model training completed and saved.")
'''


In [None]:
#Training model version 1
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime

# Setup logging
log_file = f"training_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.0001,
    "num_epochs": 600,
    "batch_size": 32,
    "file_path": '/home/user/torch_shrimp/until-tools/mod/k-mer/train400.csv'
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(0.8 * dataset_size)
val_size = int(0.1 * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
#model = LSTMModel(input_dim, **hyperparams)
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Training and validation loop
for epoch in range(hyperparams["num_epochs"]):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)

    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Evaluation on test set
model.eval()
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
logging.info(f"Test Accuracy: {test_accuracy:.2f}%")
logging.info("Classification Report:")
logging.info(classification_report(all_labels, all_preds))

# Save the model
model_save_path = "file_tune_5.pth"
torch.save(model.state_dict(), model_save_path)
logging.info(f"Model training completed and saved to {model_save_path}")


In [None]:
#Version 2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime

# Setup logging
log_file = f"training_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.0001,
    "num_epochs": 500,
    "batch_size": 32,
    "train_size" : 0.8,
    "val_size" : 0.1,
    "file_path": '/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train400.csv'
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(hyperparams["train_size"] * dataset_size)
val_size = int(hyperparams["val_size"] * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
#model = LSTMModel(input_dim, **hyperparams)
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Training and validation loop
for epoch in range(hyperparams["num_epochs"]):
    # Training phase
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Validation phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    # Test phase (add test computation inside the loop)
    test_loss = 0
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(torch.float32)
            output = model(X_batch)
            loss = criterion(output, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            test_total += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    test_loss /= len(test_loader)
    test_accuracy = 100 * test_correct / test_total

    # Log all results in one place
    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - "
                 f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

# Evaluation on test set
model.eval()
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
logging.info(f"Test Accuracy: {test_accuracy:.2f}%")
logging.info("Classification Report:")
logging.info(classification_report(all_labels, all_preds))

# Save the models
model_save_path = "model_9_v2.pth"
torch.save(model.state_dict(), model_save_path)
logging.info(f"Model training completed and saved to {model_save_path}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime
import os

# Set log directory
log_dir = "/home/user/torch_shrimp/until-tools/mod/k-mer/es/batch1"
os.makedirs(log_dir, exist_ok=True)

# Ensure the log filename is the same as the directory name
log_file = os.path.join(log_dir, f"{os.path.basename(log_dir)}.txt")

# Remove previous logging handlers to prevent duplicate logs
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Setup logging
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.001,
    "num_epochs": 500,
    "batch_size": 32,
    "train_size": 0.8,
    "val_size": 0.1,
    "file_path": '/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train400.csv',
    "patience": 50
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(hyperparams["train_size"] * dataset_size)
val_size = int(hyperparams["val_size"] * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Training and validation loop
best_val_loss = float('inf')
patience = hyperparams["patience"]
patience_counter = 0
best_model_path = os.path.join(log_dir, "best_model.pth")
final_model_path = os.path.join(log_dir, "final_model.pth")

for epoch in range(hyperparams["num_epochs"]):
    # Training phase
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Validation phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - "
                 f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

    # Early stopping check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        patience_counter += 1
        if patience_counter >= patience:
            logging.info(f"Early stopping triggered at epoch {epoch + 1}")
            break

# Save final model
torch.save(model.state_dict(), final_model_path)
logging.info(f"Training completed. Best model saved at {best_model_path}, Final model saved at {final_model_path}")


In [None]:
#version 4 (missing test)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime
import os

# Set log directory
log_dir = "/home/user/torch_shrimp/until-tools/mod/k-mer/es/batch0"
os.makedirs(log_dir, exist_ok=True)

# Ensure the log filename is the same as the directory name
log_file = os.path.join(log_dir, f"{os.path.basename(log_dir)}.txt")

# Remove previous logging handlers to prevent duplicate logs
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Setup logging
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.001,
    "num_epochs": 500,
    "batch_size": 32,
    "train_size": 0.7,
    "val_size": 0.1,
    "file_path": "/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train200.csv",
    "patience": 50
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(hyperparams["train_size"] * dataset_size)
val_size = int(hyperparams["val_size"] * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Paths for saving models
best_model_path = os.path.join(log_dir, "best_model.pth")
latest_model_path = os.path.join(log_dir, "latest_model.pth")

# Training and validation loop
best_val_loss = float("inf")
patience = hyperparams["patience"]
patience_counter = 0

for epoch in range(hyperparams["num_epochs"]):
    # Training phase
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Validation phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - "
                 f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

    # Save latest model
    torch.save(model.state_dict(), latest_model_path)

    # Early stopping check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        patience_counter += 1
        if patience_counter >= patience:
            logging.info(f"Early stopping triggered at epoch {epoch + 1}")
            break

logging.info(f"Training completed. Best model saved at {best_model_path}")
logging.info(f"Latest model saved at {latest_model_path}")

# Load the best model for evaluation
best_model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)
best_model.load_state_dict(torch.load(best_model_path))
best_model.eval()

# Evaluation on test set with F1-score
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = best_model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
f1_report = classification_report(all_labels, all_preds, digits=4)

logging.info(f"Final Test Accuracy (Best Model): {test_accuracy:.2f}%")
logging.info("Final Classification Report:")
logging.info(f1_report)


In [None]:
#version 5 (f-1 everyepoch)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime
import os

# Set log directory
log_dir = "/home/user/torch_shrimp/until-tools/mod/k-mer/es/batchsig"
os.makedirs(log_dir, exist_ok=True)

# Ensure the log filename is the same as the directory name
log_file = os.path.join(log_dir, f"{os.path.basename(log_dir)}.txt")

# Remove previous logging handlers to prevent duplicate logs
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Setup logging
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.001,
    "num_epochs": 500,
    "batch_size": 32,
    "train_size": 0.8,
    "val_size": 0.1,
    "file_path": "/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train200.csv",
    "patience": 75
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(hyperparams["train_size"] * dataset_size)
val_size = int(hyperparams["val_size"] * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Paths for saving models
best_model_path = os.path.join(log_dir, "best_model.pth")
latest_model_path = os.path.join(log_dir, "latest_model.pth")

# Training and validation loop
best_val_loss = float("inf")
patience = hyperparams["patience"]
patience_counter = 0

for epoch in range(hyperparams["num_epochs"]):
    # Training phase
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Validation phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(y_batch.cpu().numpy())
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total
    f1_report = classification_report(all_labels, all_preds, digits=4)

    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - "
                 f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")
    logging.info("F1 Score:\n" + f1_report)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        patience_counter += 1
        if patience_counter >= patience:
            logging.info(f"Early stopping triggered at epoch {epoch + 1}")
            break


In [4]:
#version 7
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import logging
import json
from datetime import datetime
import os

# Set log directory
log_dir = "/home/user/torch_shrimp/until-tools/mod/k-mer/es400/batch400-3"
os.makedirs(log_dir, exist_ok=True)

# Ensure the log filename is the same as the directory name
log_file = os.path.join(log_dir, f"{os.path.basename(log_dir)}.txt")

# Remove previous logging handlers to prevent duplicate logs
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Setup logging
logging.basicConfig(filename=log_file, level=logging.INFO, format="%(asctime)s - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)

# Function to log hyperparameters
def log_hyperparameters(hyperparams):
    logging.info("Hyperparameters:")
    logging.info(json.dumps(hyperparams, indent=4))

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

# Hyperparameters
hyperparams = {
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.0001,
    "num_epochs": 1500,
    "batch_size": 32,
    "train_size": 0.8,
    "val_size": 0.1,
    "file_path": "/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train400.csv",
    "patience": 100
}
log_hyperparameters(hyperparams)

# Dataset and DataLoader
dataset = KmerFrequencyDataset(hyperparams["file_path"])
dataset_size = len(dataset)
train_size = int(hyperparams["train_size"] * dataset_size)
val_size = int(hyperparams["val_size"] * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"])
test_loader = DataLoader(test_dataset, batch_size=hyperparams["batch_size"])

# Initialize model, criterion, and optimizer
input_dim = dataset.features.shape[1]
model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=hyperparams["learning_rate"])

# Paths for saving models
best_model_path = os.path.join(log_dir, "best_model.pth")
latest_model_path = os.path.join(log_dir, "latest_model.pth")

# Training and validation loop
best_val_loss = float("inf")
patience = hyperparams["patience"]
patience_counter = 0

for epoch in range(hyperparams["num_epochs"]):
    # Training phase
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += y_batch.size(0)
        train_correct += (predicted == y_batch).sum().item()
    train_loss /= len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    # Validation phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
    val_loss /= len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    # Testing phase
    model.eval()
    test_loss = 0
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            test_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            test_total += y_batch.size(0)
            test_correct += (predicted == y_batch).sum().item()
    test_loss /= len(test_loader)
    test_accuracy = 100 * test_correct / test_total

    logging.info(f"Epoch [{epoch + 1}/{hyperparams['num_epochs']}] - "
                 f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%, "
                 f"Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.2f}%")

    # Save latest model
    torch.save(model.state_dict(), latest_model_path)

    # Early stopping check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        patience_counter += 1
        if patience_counter >= patience:
            logging.info(f"Early stopping triggered at epoch {epoch + 1}")
            break

logging.info(f"Training completed. Best model saved at {best_model_path}")
logging.info(f"Latest model saved at {latest_model_path}")

# Load the best model for evaluation
best_model = LSTMModel(
    input_dim=input_dim,
    embedding_dim=hyperparams["embedding_dim"],
    hidden_dim1=hyperparams["hidden_dim1"],
    hidden_dim2=hyperparams["hidden_dim2"],
    dense_units=hyperparams["dense_units"],
    num_classes=hyperparams["num_classes"]
)
best_model.load_state_dict(torch.load(best_model_path))
best_model.eval()

# Evaluation on test set with F1-score
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = best_model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
f1_report = classification_report(all_labels, all_preds, digits=4)

logging.info(f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
                 f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%, "
                 f"Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.2f}%")
logging.info("Final Classification Report:")
logging.info(f1_report)


Hyperparameters:
{
    "embedding_dim": 128,
    "hidden_dim1": 64,
    "hidden_dim2": 32,
    "dense_units": 64,
    "num_classes": 3,
    "learning_rate": 0.0001,
    "num_epochs": 1500,
    "batch_size": 32,
    "train_size": 0.8,
    "val_size": 0.1,
    "file_path": "/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/train400.csv",
    "patience": 100
}
Epoch [1/1500] - Train Loss: 1.0980, Train Acc: 34.90%, Val Loss: 1.0992, Val Acc: 31.67%, Test Loss: 1.0983, Test Acc: 37.50%
Epoch [2/1500] - Train Loss: 1.0992, Train Acc: 32.81%, Val Loss: 1.0992, Val Acc: 31.67%, Test Loss: 1.0983, Test Acc: 37.50%
Epoch [3/1500] - Train Loss: 1.0989, Train Acc: 32.60%, Val Loss: 1.0993, Val Acc: 31.67%, Test Loss: 1.0983, Test Acc: 37.50%
Epoch [4/1500] - Train Loss: 1.0983, Train Acc: 35.42%, Val Loss: 1.0993, Val Acc: 31.67%, Test Loss: 1.0983, Test Acc: 37.50%
Epoch [5/1500] - Train Loss: 1.0980, Train Acc: 36.77%, Val Loss: 1.0993, Val Acc: 31.67%, Test Loss: 1.0982, Test Acc: 37.50%
E

In [None]:
#EVALUATION
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        # Load scoring file
        self.data = pd.read_csv(kmer_score_file)

        # Extract features and labels
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model with embedding layer
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        # Embedding layer
        self.embedding = nn.Linear(input_dim, embedding_dim)
        
        # LSTM layers
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Embedding layer
        x = self.embedding(x)  # (batch_size, input_dim) -> (batch_size, embedding_dim)
        x = x.unsqueeze(1)  # Add sequence dimension for LSTM

        # LSTM layers
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)

        # Extract the last output of the LSTM
        x = x[:, -1, :]  # Use the last hidden state

        # Fully connected layers
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)  # Output probabilities

# Hyperparameters and file paths
embedding_dim = 128
hidden_dim1 = 64
hidden_dim2 = 32
dense_units = 64
num_classes = 3
batch_size = 32

kmer_score_file = '/home/user/torch_shrimp/until-tools/mod/k-mer/test3.csv'
model_path = '/home/user/torch_shrimp/until-tools/mod/k-mer/updated_model_1.pth'  # Path to the saved model file

# Dataset and DataLoader
dataset = KmerFrequencyDataset(kmer_score_file)
dataset_size = len(dataset)
_, _, test_dataset = torch.utils.data.random_split(
    dataset, 
    [int(0.8 * dataset_size), int(0.1 * dataset_size), dataset_size - int(0.8 * dataset_size) - int(0.1 * dataset_size)]
)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Initialize model
input_dim = dataset.features.shape[1]  # Automatically adjust to input feature size
model = LSTMModel(input_dim, embedding_dim, hidden_dim1, hidden_dim2, dense_units, num_classes)

# Load model weights
model.load_state_dict(torch.load(model_path))
model.eval()

# Evaluation on test set
test_correct = 0
test_total = 0
all_preds = []
all_labels = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(torch.float32)
        output = model(X_batch)
        _, predicted = torch.max(output.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
        test_total += y_batch.size(0)
        test_correct += (predicted == y_batch).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Generate classification report
print("Classification Report:")
print(classification_report(all_labels, all_preds))


In [9]:
#Update testing
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        self.data = pd.read_csv(kmer_score_file)
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])

# Define LSTM model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)

def load_data(kmer_score_file):
    data = pd.read_csv(kmer_score_file)
    features = data.drop(['status'], axis=1).values.astype(np.float32)
    true_labels = data['status'].values.astype(int)
    return features, true_labels

def load_trained_model(model_path, input_dim):
    model = LSTMModel(input_dim=input_dim, embedding_dim=128, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

def test_model(model, test_loader, label_mapping, device='cpu'):
    model.to(device)
    model.eval()
    
    all_predictions = []
    all_true_labels = []
    all_probabilities = []
    true_prediction_probs = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device).float()
            y_batch = y_batch.to(device)

            outputs = model(X_batch)
            probabilities = outputs.cpu().numpy()
            predictions = np.argmax(probabilities, axis=1)
            
            # Store probabilities for true predictions
            for i, pred in enumerate(predictions):
                if pred == y_batch[i].item():
                    true_prediction_probs.append(probabilities[i][pred])

            all_probabilities.extend(probabilities)
            all_predictions.extend(predictions)
            all_true_labels.extend(y_batch.cpu().numpy())

    return all_predictions, all_true_labels, all_probabilities, true_prediction_probs

# Load test dataset
kmer_score_file = '/home/user/torch_shrimp/until-tools/mod/k-mer/dataset/test5101.csv'
lstm_model = '/home/user/torch_shrimp/until-tools/mod/k-mer/es/batch1/best_model.pth'
features, true_labels = load_data(kmer_score_file)

# Create DataLoader for test set
test_samples = torch.tensor(features)
test_labels = torch.tensor(true_labels)
batch_size = 32
test_dataset = torch.utils.data.TensorDataset(test_samples, test_labels)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define label mapping
label_mapping = {0: 'healthy', 1: 'WSSV', 2: 'AHPND'}

# Load the trained model
model = load_trained_model(lstm_model, input_dim=features.shape[1])

# Test the model and get predictions
predictions, true_labels, probabilities, true_prediction_probs = test_model(model, test_loader, label_mapping, device='cpu')

# Compute statistics
num_true_predictions = sum([1 for i in range(len(predictions)) if predictions[i] == true_labels[i]])
num_false_predictions = len(predictions) - num_true_predictions
avg_true_prediction_prob = np.mean(true_prediction_probs) if true_prediction_probs else 0.0

# Print summary statistics
print(f"Average True Prediction Probability: {avg_true_prediction_prob:.4f}")
print(f"Number of True Predictions: {num_true_predictions} ({(num_true_predictions / len(predictions)) * 100:.2f}%)")
print(f"Number of False Predictions: {num_false_predictions} ({(num_false_predictions / len(predictions)) * 100:.2f}%)")
print("\nPredictions for test samples:")

# Print individual predictions
for i, pred in enumerate(predictions):
    true_label = true_labels[i]
    predicted_label = label_mapping[pred]
    true_label_name = label_mapping[true_label]

    print(f"Sample {i+1}:")
    print(f"  - True Label: {true_label_name} ({true_label})")
    print(f"  - Predicted Class: {predicted_label} ({pred})")
    print(f"  - Probabilities: {probabilities[i].tolist()}")


  model.load_state_dict(torch.load(model_path))


Average True Prediction Probability: 0.9942
Number of True Predictions: 284 (94.67%)
Number of False Predictions: 16 (5.33%)

Predictions for test samples:
Sample 1:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabilities: [5.973054430796765e-05, 2.279859018017305e-06, 0.9999380111694336]
Sample 2:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabilities: [7.436559826601297e-06, 3.3737815101630986e-05, 0.999958872795105]
Sample 3:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabilities: [1.1116862879134715e-06, 3.990975585566048e-07, 0.9999984502792358]
Sample 4:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabilities: [2.353409399802331e-06, 7.926922194201325e-07, 0.9999969005584717]
Sample 5:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabilities: [3.107519432887784e-06, 1.016458440972201e-06, 0.9999958276748657]
Sample 6:
  - True Label: AHPND (2)
  - Predicted Class: AHPND (2)
  - Probabi

In [None]:
# New testing with row indication

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, Dataset

# Custom Dataset for k-mer frequency data
class KmerFrequencyDataset(Dataset):
    def __init__(self, kmer_score_file):
        # Load scoring file
        self.data = pd.read_csv(kmer_score_file)

        # Extract features and labels
        self.features = self.data.drop(['status'], axis=1).values.astype(np.float32)
        self.labels = self.data['status'].values.astype(int)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.features[idx]), torch.tensor(self.labels[idx])


# Define LSTM model with embedding layer
class LSTMModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3):
        super(LSTMModel, self).__init__()
        # Embedding layer
        self.embedding = nn.Linear(input_dim, embedding_dim)

        # LSTM layers
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim1, batch_first=True, dropout=0.5)
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True, dropout=0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(hidden_dim2, dense_units)
        self.fc2 = nn.Linear(dense_units, num_classes)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # Embedding layer
        x = self.embedding(x)  # (batch_size, input_dim) -> (batch_size, embedding_dim)
        x = x.unsqueeze(1)  # Add sequence dimension for LSTM

        # LSTM layers
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)

        # Extract the last output of the LSTM
        x = x[:, -1, :]  # Use the last hidden state

        # Fully connected layers
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return torch.softmax(x, dim=1)  # Output probabilities

def load_data(kmer_score_file):
    data = pd.read_csv(kmer_score_file)
    features = data.drop(['status'], axis=1).values.astype(np.float32)
    true_labels = data['status'].values.astype(int)
    return data, features, true_labels

def load_trained_model(model_path, input_dim):
    # Define the model architecture
    model = LSTMModel(input_dim=input_dim, embedding_dim=128, hidden_dim1=64, hidden_dim2=32, dense_units=64, num_classes=3)

    # Load the saved model weights
    model.load_state_dict(torch.load(model_path))
    model.eval()  # Set the model to evaluation mode
    return model

def test_model(model, test_loader, label_mapping, device='cpu'):
    model.to(device)
    model.eval()

    all_predictions = []
    all_true_labels = []
    all_probabilities = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device).float()
            y_batch = y_batch.to(device)

            # Get model predictions
            outputs = model(X_batch)
            probabilities = outputs.cpu().numpy()
            predictions = np.argmax(probabilities, axis=1)

            all_probabilities.extend(probabilities)
            all_predictions.extend(predictions)
            all_true_labels.extend(y_batch.cpu().numpy())

    return all_predictions, all_true_labels, all_probabilities

# Load test dataset
kmer_score_file = '/home/user/torch_shrimp/until-tools/mod/k-mer/test5101.csv'  # Update this with your file path
lstm_model = '/home/user/torch_shrimp/until-tools/mod/k-mer/file_tune1.pth'
raw_data, features, true_labels = load_data(kmer_score_file)

# Create DataLoader for test set
test_samples = torch.tensor(features)
test_labels = torch.tensor(true_labels)
batch_size = 32
test_dataset = torch.utils.data.TensorDataset(test_samples, test_labels)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define label mapping (ensure this matches your labels)
label_mapping = {0: 'healthy', 1: 'WSSV', 2: 'ANPHD'}  # Example mapping

# Load the trained model
model = load_trained_model(lstm_model, input_dim=features.shape[1])

# Test the model and get predictions
predictions, true_labels, probabilities = test_model(model, test_loader, label_mapping, device='cpu')

# Print predictions for each test sample
for i, pred in enumerate(predictions):
    true_label = true_labels[i]
    predicted_label = label_mapping[pred]  # Map predicted class to label
    true_label_name = label_mapping[true_label]  # Map true class to label
    row_data = raw_data.iloc[i].to_dict()  # Extract the corresponding row from the dataset

    print(f"Sample {i + 1} (Row {i}):")
    print(f"  - Row Data: {row_data}")
    print(f"  - True Label: {true_label_name} ({true_label})")
    print(f"  - Predicted Class: {predicted_label} ({pred})")
    print(f"  - Probabilities: {probabilities[i].tolist()}\n")
