In [1]:
from google.colab import files
uploaded = files.upload()

Saving test_dataset0202.zip to test_dataset0202.zip


In [None]:
!unzip test_dataset0202.zip

In [None]:
!mkdir datasets

In [5]:
!mv test_dataset/ datasets/

# Formal

In [1]:
import yaml
import os
import yaml
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms

In [2]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),  # size: 512x512
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # size: 256x256
            nn.Conv2d(16, 32, kernel_size=3, padding=1),  # size: 256x256
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # size: 128x128
            nn.Conv2d(32, 32, kernel_size=3, padding=1),  # size: 128x128
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # size: 64x64
            nn.Flatten(),
            nn.Linear(32 * 64 * 64, 1024),  # size: 1024
        )

    def forward(self, x):
        x = self.features(x)
        return x

In [3]:
class Decoder(nn.Module):
    def __init__(self, input_size, output_size):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(128, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x

In [None]:
class MultiTailDecoder(nn.Module):
    def __init__(self, input_size, classification_sizes=None, regression_size=None):
        super(MultiTailDecoder, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)
        self.relu2 = nn.ReLU()
        self.classification_tails = [nn.Linear(128, size) for size in classification_sizes] if classification_sizes else []
        self.regression_tail = nn.Linear(128, regression_size) if regression_size else None

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        classification_outputs = [tail(x) for tail in self.classification_tails] if self.classification_tails else []
        regression_output = self.regression_tail(x) if self.regression_tail else None
        return classification_outputs, regression_output

In [4]:
class EncoderDecoderModel(nn.Module):
    def __init__(self, encoder, decoders):
        super(EncoderDecoderModel, self).__init__()
        self.encoder = encoder
        self.decoders = decoders

    def forward(self, x):
        x = self.encoder(x)
        batch_size = x.size(0)  # Get the batch size
        x = x.view(batch_size, -1)  # Flatten the feature tensor, considering the batch size
        decoder_outputs = {decoder_name: decoder(x) for decoder_name, decoder in self.decoders.items()}
        return decoder_outputs  # note that the multi-tail decoder returns a list of outputs

In [5]:
class DAGDataset(torch.utils.data.Dataset):
    def __init__(self, dataset_name: str, datasets_folder: str="./datasets", transform=None):
        self.dataset_name = dataset_name
        self.datasets_folder = datasets_folder
        self.dataset_path = os.path.join(self.datasets_folder, self.dataset_name)
        self.images_folder = os.path.join(self.dataset_path, "images")
        self.params_folder = os.path.join(self.dataset_path, "params")
        self.ranges_file_path = os.path.join(self.dataset_path, "ranges.yml")
        self.ranges = None
        self.decoders = None
        self.transform = transforms.Compose(
            [transforms.Resize((512, 512)), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
            ) if transform is None else transform
        self.data = self.load_data()

    def load_data(self):
        # Load the ranges from the YAML file
        with open(self.ranges_file_path, 'r') as file:
            self.ranges = yaml.safe_load(file)
        # Load the decoders from the YAML file
        with open(os.path.join(self.dataset_path, "decoders.yml"), 'r') as file:
            self.decoders = yaml.safe_load(file)
        # read images and parameters
        data = []
        for image_name in os.listdir(self.images_folder):
            image_path = os.path.join(self.images_folder, image_name)
            param_path = os.path.join(self.params_folder, os.path.splitext(image_name)[0] + ".yml")
            with open(param_path, 'r') as file:
                param = yaml.safe_load(file)
            # normalize
            param = self.preprocess(param)
            param = self.format_target_to_decoders(param)
            data.append((image_path, param))
        return data
    
    def format_target_to_decoders(self, target):
        formatted_target = {}
        for decoder_name, decoder_params in self.decoders.items():
            formatted_target[decoder_name] = {
                "classification_targets": [],
                "regression_target": []
            }
            for param_name in decoder_params:
                param_type = self.ranges[param_name]['type']
                if param_type == 'float' or param_type == 'int':
                    formatted_target[decoder_name]['regression_target'].append(target[param_name])
                elif param_type == 'vector':
                    formatted_target[decoder_name]['regression_target'].extend(target[param_name])
                elif param_type == 'states' or param_type == 'bool':
                    formatted_target[decoder_name]['classification_targets'].append(target[param_name])
        return formatted_target

    def preprocess(self, param):
        processed_param = {}
        # for float and vector: normalize with min max
        # for states, bool: convert to one hot
        # for ints: treat as float, but round back to int when saving as param
        for param_name, param_spec in self.ranges.items():
            if param_spec['type'] == 'float' or param_spec['type'] == 'int' or param_spec['type'] == 'vector':
                processed_param[param_name] = self.normalize(param[param_name], param_spec)
            elif param_spec['type'] == 'states' or param_spec['type'] == 'bool':
                processed_param[param_name] = self.one_hot(param[param_name], param_spec)
            else:
                raise ValueError(f"Unsupported parameter type: {param_spec['type']}")
        return processed_param

    def normalize(self, value, param_spec):
        if param_spec['type'] == 'float' or param_spec['type'] == 'int':
            return (value - param_spec['min']) / (param_spec['max'] - param_spec['min'])
        elif param_spec['type'] == 'vector':
            return [(value[i] - param_spec[f'{dim}min']) / (param_spec[f'{dim}max'] - param_spec[f'{dim}min']) for i, dim in enumerate(['x', 'y', 'z'])]
        else:
            raise ValueError(f"Unsupported parameter type: {param_spec['type']}")

    def one_hot(self, value, param_spec):
        if param_spec['type'] == 'states':
            index = param_spec['values'].index(value)
            return [1 if i == index else 0 for i in range(len(param_spec['values']))]
        elif param_spec['type'] == 'bool':
            # make bools onehot too to make it consistent
            return [1, 0] if value else [0, 1]
        else:
            raise ValueError(f"Unsupported parameter type: {param_spec['type']}")


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample, target = self.data[idx]
        sample = Image.open(sample).convert('L')

        if self.transform:
            sample = self.transform(sample)

        # convert target's values to tensor
        for decoder_name, decoder_outputs in target.items():
            for i, classification_target in enumerate(decoder_outputs['classification_targets']):
                target[decoder_name]['classification_targets'][i] = torch.tensor(classification_target, dtype=torch.float32)
            target[decoder_name]['regression_target'] = torch.tensor(decoder_outputs['regression_target'], dtype=torch.float32)

        return sample, target

In [6]:
dataset = DAGDataset("test_dataset")

In [7]:
dataset.__getitem__(0)

(tensor([[[1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          ...,
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.]]]),
 {'Bm Base Shape': tensor([0., 1.]),
  'Bm Size': tensor([0.2754, 0.3033, 0.5714]),
  'Floor Ledge Extrusion X': tensor(0.7033),
  'Floor Ledge Extrusion Z': tensor(0.8891),
  'Floor Ledge Size X': tensor(0.4263),
  'Floor Ledge Size Z': tensor(0.1645),
  'Has Floor Ledge': tensor([1., 0.]),
  'Has Window Ledge': tensor([1., 0.]),
  'Num Floors': tensor(0.7500),
  'Num Windows Each Side': tensor(0.2500),
  'Rf Base Shape': tensor([0., 1., 0.]),
  'Rf Size': tensor([0.5063, 0.8231, 0.9543]),
  'Window Divided Horizontal': tensor([1., 0.]),
  'Window Divided Vertical': tensor([0., 1.]),
  'Window Interpanel Offset Percentage Y': tensor(0.8371),
  'Window Interpanel Offset Percentage Z': tensor(0.9874),
  'Window Ledge Extr

In [8]:
# split into train val and test
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

# Create DataLoader
batch_size = 32
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
encoder = Encoder()

In [9]:
# Load the ranges from the YAML file
with open('./datasets/test_dataset/ranges.yml', 'r') as file:
    ranges = yaml.safe_load(file)

# Create a mapping between parameter names and output sizes
parameter_output_mapping = {}
for param_name, param_specs in ranges.items():
    if param_specs['type'] == 'float':
        parameter_output_mapping[param_name] = 1  # 1 for scalar
    elif param_specs['type'] == 'int':
        parameter_output_mapping[param_name] = 1  # 1 for scalar
    elif param_specs['type'] == 'vector':
        parameter_output_mapping[param_name] = 3  # 3 for x, y, z
    elif param_specs['type'] == 'states':
        parameter_output_mapping[param_name] = len(param_specs['values'])
    elif param_specs['type'] == 'bool':
        parameter_output_mapping[param_name] = 2  # 2 for binary encoding

In [None]:
# Load the decoders' params from the YAML file
with open('./datasets/test_dataset/decoders.yml', 'r') as file:
    decoders_params = yaml.safe_load(file)

decoders = nn.ModuleDict()
# initialize decoders with corresponding output tails
for decoder_name, param_names in decoders_params.items():
    classification_tails = []
    regression_tails = []
    for param_name in param_names:
        spec = ranges[param_name]
        # if type is bool or states, add to classification tails
        # if type is float, int or vector, add to regression tails
        if spec['type'] == 'bool' or spec['type'] == 'states':
            classification_tails.append((param_name, parameter_output_mapping[param_name]))
        else:
            regression_tails.append((param_name, parameter_output_mapping[param_name]))
    classification_sizes = [size for _, size in classification_tails]
    regression_size = sum([size for _, size in regression_tails])
    # add decoder to model
    decoders[decoder_name] = MultiTailDecoder(1024, classification_sizes, regression_size)

# decoders = nn.ModuleDict({
#     param_name: Decoder(4096, output_size)
#     for param_name, output_size in parameter_output_mapping.items()
# })

In [None]:
model = EncoderDecoderModel(encoder, decoders)

In [10]:
# Define loss function and optimizer
# for regression, use MSELoss, for classification, use CrossEntropyLoss
class EncDecsLoss(nn.Module):
    def __init__(self, decoders):
        super(EncDecsLoss, self).__init__()
        self.decoders = decoders

    def forward(self, outputs, targets):
        loss = 0.0
        for decoder_name, decoder_output in outputs.items():
            loss += decoder_loss(decoder_name, decoder_output, targets[decoder_name])
        return loss
    
def classification_loss(output, target):
    return nn.CrossEntropyLoss()(output, target)

def regression_loss(output, target):
    return nn.MSELoss()(output, target)

# def find_param_name_in(target, param_name):
#     for decoder_name, decoder_outputs in target.items():
#         for i, classification_target in enumerate(decoder_outputs['classification_targets']):
#             if param_name == 
#     return None

def decoder_loss(decoder_name, decoder_output, target):
    '''
    If classification param name ( from targets[decoder_name][i] ) is "Has ___", 
        and it's predicted wrongly, 
        discard loss from other tasks.
    Window divided bools: when got wrong, discard loss for interpanel offset too 
    '''
    classification_outputs = decoder_output[0]  # note that model outputs a tuple of list instead of dict of list
    regression_output = decoder_output[1]
    loss = 0.0
    if classification_outputs:
        for i, output in enumerate(classification_outputs):
            gt = target["classification_targets"][i]
            loss += classification_loss(output, gt)
            # check for bools by checking one-hot vector of size 2
            if gt.size(1) == 2:
                if gt.argmax() != output.argmax():
                    # if got wrong, discard loss from other tasks
                    if "Ledge" in decoder_name:
                        return loss  # wait, will this loss still affect other tails? 
                    elif "Panel" in decoder_name:
                        return loss  # TODO: discard only loss from interpanel offset in regression part
    if regression_output is not None:
        loss += regression_loss(regression_output, target["regression_target"])
    return loss

criterion = EncDecsLoss(decoders)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop with train and val
epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, targets = data
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if i % 2000 == 1999:  # print every 2000 mini-batches
            print(f"[{epoch + 1}, {i + 1}] loss: {running_loss / 2000:.3f}")
            running_loss = 0.0
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for i, data in enumerate(val_loader, 0):
            inputs, targets = data
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item()
        print(f"Validation loss: {val_loss / len(val_loader)}")

print("Finished Training")

# Save your trained model if needed
torch.save(model.state_dict(), "encDecModel.pth")

  target[key] = torch.tensor(value, dtype=torch.float32)
  return F.mse_loss(input, target, reduction=self.reduction)


In [None]:
# Test the model
model.eval()
test_loss = 0.0
with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        inputs, targets = data
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_loss += loss.item()
    print(f"Test loss: {test_loss / len(test_loader)}")
print("Finished Testing")