In [None]:
# %pip install tqdm
# %pip install python-dotenv
# %pip install torch==2.4.0+cu118
# %pip install scikit_learn==1.2.2
# %pip install ipython
# %pip install pandas
# %pip install numpy
# %pip install matplotlib
# %pip install tabulate
# %pip install scipy
# %pip install git+https://github.com/Louis-Li-dev/ML_tool_kit

In [None]:
import os
import sys
parent_dir = os.path.join(os.getcwd(), '..')
if parent_dir not in sys.path: sys.path.append(parent_dir)
from utility.data_utils import *
from utility.visuals import *
from dotenv import load_dotenv
from model.CNN import ConditionalSegmentationVAE
from mkit.torch_support.tensor_utils import xy_to_tensordataset
from torch import nn
from IPython.display import clear_output
from sklearn.ensemble import RandomForestRegressor
load_dotenv()
DATA_DIR = os.getenv("DATA_DIR")

- Dataset

In [None]:
if not os.path.exists(DATA_DIR): raise FileNotFoundError("Make sure the data directory is correctly placed.")

In [None]:
files = get_files(DATA_DIR)

return_list = []
file = files[0]
city_name = file.split('\\')[-1].split('.csv')[0].split('_')[0]

path_name = process_and_transform_data(file, resolution=.5, overwrite=True)
with open(path_name, 'rb') as f:
    result_dict = pickle.load(f)
labels = result_dict['labels']
encoder = result_dict['encoder']
MAX_LEN = result_dict['max length']
file_name = result_dict['file name']
WIDTH = result_dict['width']
HEIGHT = result_dict['height']

In [None]:
# for idx, label in enumerate(labels):
#     plt.imshow(labels[idx])
#     plt.savefig(f'../fig/{idx}_{file_name}.png')
#     plt.show()


- x y splitting

In [None]:
from sklearn.model_selection import train_test_split


unique_labels = [u for u in labels if np.array(np.where(u != 0)).T.shape[0] > 1]
padded_labels = []
for label in unique_labels:
    unique_vals = np.unique(label)[1:]
    new_vals = []
    count = 0
    for val in unique_vals:    
        dummy_vals = np.zeros(label.shape)
        dummy_vals[np.where(label == val)] = 1
        new_vals.append(dummy_vals)
        count += 1
    for i in range(count, MAX_LEN):
        dummy_vals = np.zeros(label.shape)
        new_vals.append(dummy_vals)
    new_vals = np.array(new_vals)
    padded_labels.append(new_vals)
train_labels, test_labels = train_test_split(padded_labels, test_size=.2)


In [None]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_channels, img_width, img_height, start_dim, n_layers, latent_dim):
        """
        Fully Connected Encoder.
        
        Args:
            input_channels (int): Number of input channels (e.g., 3 for RGB).
            img_width (int): Image width.
            img_height (int): Image height.
            start_dim (int): Size of the first hidden layer.
            n_layers (int): Number of hidden layers (each doubling the previous size).
            latent_dim (int): Size of the latent representation.
        """
        super(Encoder, self).__init__()
        input_size = input_channels * img_width * img_height
        layers = []
        # First layer: from flattened input to start_dim
        layers.append(nn.Linear(input_size, start_dim))
        layers.append(nn.Mish())
        hidden_dim = start_dim
        # Add n_layers that double the hidden size at each layer
        for _ in range(n_layers):
            next_dim = hidden_dim * 2
            layers.append(nn.Linear(hidden_dim, next_dim))
            layers.append(nn.Mish())
            hidden_dim = next_dim
        # Final layer to produce latent representation
        layers.append(nn.Linear(hidden_dim, latent_dim))
        self.encoder = nn.Sequential(*layers)

    def forward(self, x):
        # Flatten input: (B, C, W, H) -> (B, C*W*H)
        x = x.view(x.size(0), -1)
        return self.encoder(x)

class Decoder(nn.Module):
    def __init__(self, output_channels, img_width, img_height, start_dim, n_layers, latent_dim):
        """
        Fully Connected Decoder.
        
        Args:
            output_channels (int): Number of output channels (should match input_channels).
            img_width (int): Image width.
            img_height (int): Image height.
            start_dim (int): Should match the start_dim used in the encoder.
            n_layers (int): Number of hidden layers (used in encoder, in reverse order here).
            latent_dim (int): Size of the latent representation.
        """
        super(Decoder, self).__init__()
        output_size = output_channels * img_width * img_height
        layers = []
        # For symmetry, assume the encoder’s last hidden dimension was start_dim * (2 ** n_layers)
        hidden_dim = start_dim * (2 ** n_layers)
        # First layer: from latent_dim to hidden_dim
        layers.append(nn.Linear(latent_dim, hidden_dim))
        layers.append(nn.Mish())
        # Then, for each layer, halve the hidden dimension
        for _ in range(n_layers):
            next_dim = hidden_dim // 2
            layers.append(nn.Linear(hidden_dim, next_dim))
            layers.append(nn.Mish())
            hidden_dim = next_dim
        # Final layer: output layer to reconstruct the flattened image
        layers.append(nn.Linear(hidden_dim, output_size))
        layers.append(nn.Sigmoid())  # Ensures the output values are between 0 and 1
        self.decoder = nn.Sequential(*layers)
        self.output_channels = output_channels
        self.img_width = img_width
        self.img_height = img_height

    def forward(self, z):
        x = self.decoder(z)
        # Reshape back to image dimensions: (B, output_channels, img_width, img_height)
        x = x.view(x.size(0), self.output_channels, self.img_width, self.img_height)
        return x

class Autoencoder(nn.Module):
    def __init__(self, input_channels, img_width, img_height, start_dim, n_layers, latent_dim, output_channels):
        """
        Fully Connected Autoencoder (combining the encoder and decoder).
        """
        super(Autoencoder, self).__init__()
        self.encoder = Encoder(input_channels, img_width, img_height, start_dim, n_layers, latent_dim)
        self.decoder = Decoder(output_channels, img_width, img_height, start_dim, n_layers, latent_dim)

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

def generate(input_channels, img_width, img_height, start_dim, n_layers, latent_dim, output_channels, device="cpu", output_type="autoencoder"):
    """
    Creates an encoder, decoder, or autoencoder model based on user input.

    Args:
        input_channels (int): Number of input channels (e.g., 3 for RGB).
        img_width (int): Image width.
        img_height (int): Image height.
        start_dim (int): The size of the first hidden layer.
        n_layers (int): Number of hidden layers (each doubling the dimension in the encoder).
        latent_dim (int): Size of the latent representation.
        output_channels (int): Number of output channels (should match input_channels).
        device (str): 'cpu' or 'cuda'.
        output_type (str): 'encoder', 'decoder', or 'autoencoder'.

    Returns:
        A PyTorch model on the selected device.
    """
    device = torch.device(device)

    if output_type == "encoder":
        model = Encoder(input_channels, img_width, img_height, start_dim, n_layers, latent_dim)
    elif output_type == "decoder":
        model = Decoder(output_channels, img_width, img_height, start_dim, n_layers, latent_dim)
    elif output_type == "autoencoder":
        model = Autoencoder(input_channels, img_width, img_height, start_dim, n_layers, latent_dim, output_channels)
    else:
        raise ValueError("Invalid output_type. Choose from 'encoder', 'decoder', or 'autoencoder'.")
    return model.to(device)


- data processing

In [None]:

loader, val_loader = xy_to_tensordataset(
    train_labels, train_labels,
    return_loader=True, 
    batch_size=8,
    input_dtype=torch.float32,
    output_dtype=torch.float32,
    val_ratio=.15,
)

- For Machine Learning Models
    - To fit the data formats of tensors, every sci-kit learn model needs to be wrapped inside the object MLWrapper

In [None]:
class MLWrapper(nn.Module):
    def __init__(self, model_object = RandomForestRegressor, **args):
        self.model = model_object(**args)
        self.device = torch.device('cpu')
    def loader_to_xy(self, loader):
        x, y = loader.dataset.tensors
        x, y = np.array(x).squeeze(1), np.array(y)
        x = x.reshape(x.shape[0], -1)
        return x, y
    def fit(self, train_loader, val_loader):
        train_x, train_y = self.loader_to_xy(train_loader)
        val_train_x, val_train_y = self.loader_to_xy(val_loader)
        self.model.fit(train_x, train_y)
        accu = self.model.score(val_train_x, val_train_y)
        print(accu)
    def inference(self, img):
        batch_size, _, _, _ = img.shape
        img = img.reshape(batch_size, -1)
        return torch.tensor(self.model.predict(img))

- VAE

In [None]:
IMG_CHANNELS = 1    # For grayscale images; use 3 for RGB.
N_EPOCHS = 100      # Adjust as needed.
LATENT_DIM = 300    # Dimensionality of the latent space.
FEATURE_MAPS = 8    # Base number of feature maps.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ConditionalSegmentationVAE(
    latent_dim=LATENT_DIM,
    width=WIDTH,
    height=HEIGHT,
    img_channels=IMG_CHANNELS,
    feature_maps=FEATURE_MAPS,
    device=device
)

segmentation_loss_fn = nn.BCEWithLogitsLoss().to(device)

# Assume your train_loader and val_loader are defined appropriately.
model.train_vae(
    train_loader=loader,       # your training DataLoader
    val_loader=val_loader,       # your validation DataLoader
    n_epochs=N_EPOCHS,
    seg_criterion=segmentation_loss_fn,
    kl_weight=0.001,
    patience=10,
    device=device
)
clear_output(wait=True)

In [None]:

evaluate_and_plot(test_loader, model=model, encoder=encoder, title='VAE', dataset_name=city_name)

- Random Forest

In [None]:


model = MLWrapper()
model.fit(loader, val_loader)
evaluate_and_plot(test_loader, model=model, encoder=encoder, title='RF', dataset_name=city_name)