In [25]:
import numpy as np
import pandas as pd
import torch
import h5py
import re
import torch.nn as nn 
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data.dataset import random_split
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
from sklearn.model_selection import train_test_split

In [26]:
# Define the function to map source positions to classes
def map_to_class(source_position):
    return min(int((source_position - 10) / 10) + 1, 9)

# Initialize a tensor to store class, density, and source position data
data_tensor = torch.empty(0, 3, dtype=torch.float32)

# Define the smoke detectors positions
smoke_detectors = np.array([[60.0, 60.0], [30.0, 80.0]])

# Open the HDF5 file in read mode
with h5py.File("Smoke Flow Data.h5", "r") as hf:
    # Loop through each smoke detector
    for detector_idx, smoke_detector_pos in enumerate(smoke_detectors):
        x, y = smoke_detector_pos
        
        # Loop through each source position
        for source_group_name in hf:
            # Extract the source position value from the group name
            source_position_str = source_group_name.split('-')[-1]
            
            # Convert the source position string to a numerical data type
            source_position = float(source_position_str)
            
            # Map the source position to a class
            class_label = map_to_class(source_position)
            
            # Access the 'density' dataset within the group
            density_dataset = hf[source_group_name]['density']
            
            # Retrieve the density values at the specified position (x, y) for all time steps
            density_values = torch.tensor(density_dataset[:, int(x), int(y), 0], dtype=torch.float32)
            
            # Create a tensor containing class, density, and source position information
            class_tensor = torch.full((len(density_values), 1), class_label, dtype=torch.float32)
            density_tensor = density_values.view(-1, 1)
            position_tensor = torch.full((len(density_values), 1), source_position, dtype=torch.float32)
            class_density_position_tensor = torch.cat((density_tensor, position_tensor,class_tensor), dim=1)
            # print("class_density_position_tensor:", class_density_position_tensor.shape)
            
            # Append the tensor to the data tensor
            data_tensor = torch.cat((data_tensor, class_density_position_tensor), dim=0)

# Print the shape of the data tensor
print("Data tensor shape:", data_tensor.shape)
print(data_tensor[0])

### First column : density values
### Second column : source positin
### third column : class name

# for two different source 9018/ 2 
# for 9 4509 / 9 = 501

Data tensor shape: torch.Size([9018, 3])
tensor([ 0., 10.,  1.])


In [27]:
features = data_tensor[:, :2]  # Density values and source positions
labels = (data_tensor[:, 2] - 1).long() # Class labels (subtract 1 to make them 0-based)

In [28]:
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
# Create DataLoader for training and testing
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size= 128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size= 128, shuffle=False)


In [29]:
class EnhancedClassificationModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(EnhancedClassificationModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.5)
        
        self.fc2 = nn.Linear(hidden_size, hidden_size // 2)
        self.bn2 = nn.BatchNorm1d(hidden_size // 2)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.5)
        
        self.fc3 = nn.Linear(hidden_size // 2, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        # out = self.dropout1(out)
        
        out = self.fc2(out)
        out = self.bn2(out)
        out = self.relu2(out)
        # out = self.dropout2(out)
        
        out = self.fc3(out)
        return out

In [30]:
# Initialize the model, loss function, optimizer, and learning rate scheduler
model = EnhancedClassificationModel(input_size = 2, hidden_size = 128, num_classes=9)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = StepLR(optimizer, step_size= .5, gamma=0.9)


In [31]:
# Training loop
num_epochs = 40
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.long())
        loss.backward()
        optimizer.step()
    scheduler.step()  # Adjust learning rate
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item():.4f}")


Epoch 1/40, Loss: 1.8261
Epoch 2/40, Loss: 1.5794
Epoch 3/40, Loss: 1.3697
Epoch 4/40, Loss: 1.2629
Epoch 5/40, Loss: 1.3908
Epoch 6/40, Loss: 1.1539
Epoch 7/40, Loss: 1.0911
Epoch 8/40, Loss: 1.1516
Epoch 9/40, Loss: 1.0371
Epoch 10/40, Loss: 1.0770
Epoch 11/40, Loss: 1.0790
Epoch 12/40, Loss: 0.9789
Epoch 13/40, Loss: 0.9881
Epoch 14/40, Loss: 1.0082
Epoch 15/40, Loss: 0.9615
Epoch 16/40, Loss: 1.1920
Epoch 17/40, Loss: 0.9862
Epoch 18/40, Loss: 1.2604
Epoch 19/40, Loss: 0.9881
Epoch 20/40, Loss: 0.9455
Epoch 21/40, Loss: 0.9308
Epoch 22/40, Loss: 0.8842
Epoch 23/40, Loss: 0.9219
Epoch 24/40, Loss: 0.8840
Epoch 25/40, Loss: 1.0464
Epoch 26/40, Loss: 0.9415
Epoch 27/40, Loss: 1.0066
Epoch 28/40, Loss: 0.8864
Epoch 29/40, Loss: 1.1151
Epoch 30/40, Loss: 0.8990
Epoch 31/40, Loss: 1.0604
Epoch 32/40, Loss: 1.0837
Epoch 33/40, Loss: 0.9718
Epoch 34/40, Loss: 1.0313
Epoch 35/40, Loss: 0.9065
Epoch 36/40, Loss: 1.7730
Epoch 37/40, Loss: 0.9004
Epoch 38/40, Loss: 0.9051
Epoch 39/40, Loss: 0.

In [32]:
# Evaluation
model.eval()  # Set model to evaluation mode
correct, total = 0, 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
# print(total)
# print(correct)
# print(accuracy)
print(f'Accuracy of the model on the test set: {accuracy * 100:.2f}%')

Accuracy of the model on the test set: 91.96%
