In [1]:
import os
import json
from PIL import Image, ImageOps
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch
import torch.nn as nn
import torch.optim as optim
from base import BaseModel
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from torchvision.transforms import v2

In [2]:
# Load labels from the JSON file
json_file_path = '/teamspace/studios/this_studio/data/final_labels_good_light_4551.json'  # Replace with your JSON file path

with open(json_file_path, 'r') as f:
    data_dict = json.load(f)

# Map labels to 'aligned' or 'not_aligned'
mapped_labels = {}
for img_name, label in data_dict.items():
    if label == 'aligned':
        mapped_labels[img_name] = 'aligned'
    else:
        mapped_labels[img_name] = 'not_aligned'

In [3]:
# Load labels from the JSON file
json_file_path = '/teamspace/studios/this_studio/data/label_bad_light.json'  # Replace with your JSON file path

with open(json_file_path, 'r') as f:
    data_dict = json.load(f)

# Map labels to 'aligned' or 'not_aligned'
mapped_labels2 = {}
for img_name, label in data_dict.items():
    if label == 'aligned':
        mapped_labels2[img_name] = 'aligned'
    else:
        mapped_labels2[img_name] = 'not_aligned'

In [4]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations, img_dir, transform=None):
        self.img_labels = []
        for img_name, label in annotations.items():
            # Map label to numerical value
            if label == 'aligned':
                mapped_label = torch.tensor([1.])  # Class 0
            else:
                mapped_label = torch.tensor([0.])  # Class 1 ('not_aligned')
            self.img_labels.append((img_name, mapped_label))
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_name, label = self.img_labels[idx]
        img_path = os.path.join(self.img_dir, img_name)
        # Open the image file
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

In [5]:
class TrDataset(Dataset):
  def __init__(self, base_dataset, transformations):
    super(TrDataset, self).__init__()
    self.base = base_dataset
    self.transformations = transformations

  def __len__(self):
    return len(self.base)

  def __getitem__(self, idx):
    x, y = self.base[idx]
    return self.transformations(x), y

In [6]:
def cut_in_half(img):
    width, height = img.size
    # print(width, height)
    intermediate = img.crop((width // 2, 0, width, height))
    width, height = intermediate.size
    # print(width, height)
    return ImageOps.equalize(intermediate.resize((width//2, height//2), Image.Resampling.LANCZOS))

In [7]:
transform = transforms.Compose([
    # transforms.Resize((224, 224)),  # Resize images to 224x224 pixels
    cut_in_half,
    transforms.ToTensor(),  # Convert PIL Image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize to ImageNet standards
                         std=[0.229, 0.224, 0.225]),
    # v2.RandomPerspective(distortion_scale=0.6, p=1.0),
    # v2.RandomAffine(degrees=(30, 70), translate=(0.1, 0.3), scale=(0.5, 0.75)),
    # v2.ElasticTransform(alpha=250.0),
    # v2.RandomRotation(degrees=(0, 90))
])
transform2 = transforms.Compose([
    # transforms.Resize((224, 224)),  # Resize images to 224x224 pixels
    cut_in_half,
    transforms.ToTensor(),  # Convert PIL Image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize to ImageNet standards
                         std=[0.229, 0.224, 0.225]),
])

In [8]:
# Specify the directory where your images are stored

img_dir = '/teamspace/studios/this_studio/data/train_set/good_light'  # Replace with your image directory
img_dir2 = '/teamspace/studios/this_studio/data/train_set/bad_light'

# Create the dataset
dataset = CustomImageDataset(mapped_labels, img_dir)
dataset2 = CustomImageDataset(mapped_labels2, img_dir2)
dataset = torch.utils.data.ConcatDataset([dataset, dataset2])

# Split the dataset into training and validation sets
from torch.utils.data import random_split

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_dataset = TrDataset(train_dataset, transform)
val_dataset = TrDataset(val_dataset, transform2)


# Create data loaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=10, persistent_workers = True, prefetch_factor=4)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=10)

In [9]:
# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Load the pre-trained ResNet-18 model
model = models.resnet18(pretrained=True)

# # Modify the final fully connected layer to match the number of classes
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 1)  # Two classes: 'aligned' and 'not_aligned'

model = model.to(device)

# Load the pre-trained ResNet-18 model
# model = models.efficientnet_v2_m('DEFAULT')

# # Modify the final fully connected layer to match the number of classes
# num_ftrs = model.classifier[-1].out_features
# m2 = nn.Linear(num_ftrs, 1)  # Two classes: 'aligned' and 'not_aligned'

# model = nn.Sequential(model,nn.ReLU(), m2).to(device)
# model = torch.compile(model)

torch.set_float32_matmul_precision('high') #'medium' | 

Using device: cuda




In [10]:
m = BaseModel(model, lr=0.0001)
checkpoint = ModelCheckpoint(monitor='val_f_beta', dirpath='checkpoints', filename='model-{epoch:02d}-{val_f_beta:.3f}', save_top_k=1, mode='max')
trainer = pl.Trainer(max_epochs=50, accelerator="auto", callbacks=[checkpoint])#, precision="16-mixed")

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [11]:
trainer.fit(m, train_loader, val_loader)

/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/pytorch_lightning/callbacks/model_checkpoint.py:654: Checkpoint directory /teamspace/studios/this_studio/checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name       | Type              | Params | Mode 
---------------------------------------------------------
0 | model      | ResNet            | 11.2 M | train
1 | loss       | BCEWithLogitsLoss | 0      | train
2 | f_beta     | BinaryFBetaScore  | 0      | train
3 | accuracy   | BinaryAccuracy    | 0      | train
4 | precision  | BinaryPrecision   | 0      | train
5 | recall     | BinaryRecall      | 0      | train
6 | last_layer | Linear            | 2      | train
---------------------------------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.708    Total estimated model params size (MB)
74        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

In [22]:
m = BaseModel.load_from_checkpoint("/teamspace/studios/this_studio/checkpoints/best.ckpt", model=model)

In [23]:
import os
import torch
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import torch.nn as nn

# Set device to GPU or CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Define data transformations
transform = transforms.Compose([
    # transforms.Resize((224, 224)),  # Resize images to 224x224 pixels
    cut_in_half,
    transforms.ToTensor(),          # Convert PIL Image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize to ImageNet standards
                         std=[0.229, 0.224, 0.225])
])

# Specify the directory where your dataset is stored
data_dir = '/teamspace/studios/this_studio/data/example_set'  # Replace with your dataset path

# Load the dataset using ImageFolder
dataset = datasets.ImageFolder(root=data_dir, transform=transform, target_transform=lambda x: torch.tensor([1.]) if x == 0 else torch.tensor([0.]))
dataset_loader = DataLoader(dataset, batch_size=1, shuffle=False)

trainer.test(m, dataset_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Using device: cuda


/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:424: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'val_f_beta': 0.8290155529975891,
  'val_accuracy': 0.9493243098258972,
  'val_precision': 0.8421052694320679,
  'val_recall': 0.7804877758026123}]