# Przygotowanie danych treningowych



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pathlib
dest_path = "/content/drive/MyDrive/sunflowers"
ds_path = dest_path
p = pathlib.Path(dest_path)

# Użyteczne zależności

In [None]:
!pip install pytorch-lightning

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import torchvision
from torchvision import transforms
from torchvision.transforms import Compose
from torchvision import datasets, models, transforms
from torchvision.transforms import (
    Compose,
    Lambda,
    Normalize,
    RandomHorizontalFlip,
    RandomVerticalFlip,
    Resize,
    ToTensor,
)

import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn.functional as F

import pytorch_lightning as pl
from pytorch_lightning import LightningDataModule
from pytorch_lightning import seed_everything, Trainer
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt

import os
from PIL import Image
from random import sample
import numpy as np
from glob import glob
import cv2
from torchmetrics import Accuracy


In [None]:
seed_everything(42, workers=True)

INFO:lightning_fabric.utilities.seed:Global seed set to 42


42

# Trening



In [None]:
class ComponentDataset(Dataset):
    def __init__(self, path, transform, target_transform=None, train_val = 'train'):
        self.train_val = train_val
        self.paths, self.classes = self.get_paths(path)
        self.transform = transform
        self.target_transform = target_transform

    def get_paths(self, path):
        paths = []

        classes = []
        for root, dirs, files in os.walk(path):
            if len(dirs)>0:
                classes = dirs

            for file in files:
                paths.append(root + '/' + file)
        return paths, sorted(classes)


    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        image = Image.open(self.paths[idx]).convert('RGB')
        image_shape = image.size
        original_size =  torch.tensor([image_shape[1]/1228, image_shape[0]/1231])
        ratio = torch.tensor((image_shape[1]/image_shape[0])).float()
        label_name = pathlib.Path( self.paths[idx]).parent.name
        label = self.classes.index(label_name)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, int(label), original_size, ratio

In [None]:
data_transforms = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
image_datasets = {x: ComponentDataset(path=ds_path,
                                      transform=data_transforms,target_transform=None, train_val=x) for x in ['train', 'val']}



In [None]:
import torch
from torch.utils.data import DataLoader
from pytorch_lightning import LightningDataModule

class ComponentDataModule(LightningDataModule):
    def __init__(self, image_dataset, batch_size=32):
        super().__init__()
        self.image_dataset = image_dataset
        self.batch_size = batch_size

    def setup(self, stage=None):
        if stage == 'fit' or stage is None:
            train_dataset = self.image_dataset['train']
            val_dataset = self.image_dataset['val']
            self.train_dataset = train_dataset
            self.val_dataset = val_dataset

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size)


In [None]:
my_data_module = ComponentDataModule(image_datasets)

In [None]:
class ConcatModel(nn.Module):
    def __init__(self):
        super().__init__()
        model_ft = models.resnet34(pretrained=True)
        self.resnet = torch.nn.Sequential(*(list(model_ft.children())[:-1]))
        self.mlp = nn.Sequential(
          nn.Linear(515, 256),
          nn.ReLU(),
          nn.Linear(256, 128),
          nn.ReLU(),
          nn.Linear(128, 6)
      )

    def forward(self, x, org_sizes, ratios):
      with torch.no_grad():
        x = self.resnet(x)
      x = x.squeeze(-1)
      org_sizes = org_sizes.unsqueeze(-1)
      ratios = ratios.unsqueeze(-1).unsqueeze(-1)
      emb = torch.cat((x, org_sizes, ratios), dim = 1)
      emb = emb.view(emb.size(0), -1)
      x = self.mlp(emb)
      return x

In [None]:
val_losses = []
train_losses = []
train_acc = []
val_acc = []
class ClassifingModel(pl.LightningModule):
    def __init__(
            self,
            model: nn.Module,
            learning_rate,
            loss_function,
            ) -> None:
        super().__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.loss_function = loss_function
        self.optimizer = None
        self.acc = Accuracy(task="multiclass", num_classes=3)

    def training_step(self, batch, batch_idx):
        inputs, labels, org_sizes, ratios = batch
        outputs = self.model(inputs, org_sizes, ratios)
        _, preds = torch.max(outputs, 1)
        loss = self.loss_function(outputs, labels)
        train_losses.append(loss)
        acc = self.acc(preds, labels)
        train_acc.append(acc)
        self.log_dict({'train_loss': loss.item()})
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, labels, org_sizes, ratios = batch
        outputs = self.model(inputs, org_sizes, ratios)
        _, preds = torch.max(outputs, 1)
        loss = self.loss_function(outputs, labels)
        val_losses.append(loss)
        acc = self.acc(preds, labels)
        val_acc.append(acc)
        self.log('val_loss', loss, prog_bar=True, on_epoch=True, on_step=False)
        return loss

    def test_step(self, batch, batch_idx):
        inputs, labels, org_sizes, ratios = batch
        outputs = self.model(inputs, org_sizes, ratios)
        _, preds = torch.max(outputs, 1)
        acc = self.acc(preds, labels)
        loss = self.loss_function(outputs, labels)
        self.log_dict({'test_loss': loss.item()})
        self.log_dict({'test_acc': acc.item()})
        return loss, acc

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return [optimizer], []

In [None]:
lr_monitor = LearningRateMonitor(logging_interval='step')


In [None]:
model = ConcatModel()
loss_function = nn.CrossEntropyLoss()
lr = .0027
classifier = ClassifingModel(model, lr, loss_function)



In [None]:
trainer = Trainer(
    accelerator="gpu",
    num_sanity_val_steps=1,
    max_epochs=50,
    log_every_n_steps=1,
    callbacks=[lr_monitor]
)


INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [None]:
trainer.fit(classifier, my_data_module)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name          | Type               | Params
-----------------------------------------------------
0 | model         | ConcatModel        | 21.5 M
1 | loss_function | CrossEntropyLoss   | 0     
2 | acc           | MulticlassAccuracy | 0     
-----------------------------------------------------
21.5 M    Trainable params
0         Non-trainable params
21.5 M    Total params
85.802    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.


In [None]:
trainer.test(classifier, datamodule= my_data_module, ckpt_path='best')

INFO:pytorch_lightning.utilities.rank_zero:Restoring states from the checkpoint path at /content/lightning_logs/version_2/checkpoints/epoch=49-step=2150.ckpt
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.utilities.rank_zero:Loaded model weights from the checkpoint at /content/lightning_logs/version_2/checkpoints/epoch=49-step=2150.ckpt


Testing: 0it [00:00, ?it/s]

[{'test_loss': 0.0645001158118248, 'test_acc': 0.9704360961914062}]

# Ewaluacja

In [None]:
def get_image(image):
    image_shape = image.size
    original_size = torch.tensor([image_shape[1] / 1228, image_shape[0] / 1231])
    ratio = torch.tensor((image_shape[1] / image_shape[0])).float()


    image = data_transforms(image)

    image = image.unsqueeze(0)
    original_size = original_size.unsqueeze(0)
    ratio = ratio.unsqueeze(0)
    return image, original_size, ratio


In [None]:
# Define paths to the base images and their corresponding binary masks
base_dir = "/content/drive/MyDrive/test"
mask_dir = "/content/drive/MyDrive/test_masks"

# Define output directory for the cropped images
output_dir = "/content/drive/MyDrive/test_masks"
counter = 0
# Loop over the files in the base image directory
for file in os.listdir(base_dir):

    if file.endswith(".jpg"):
        # Load the base image
        base_img_path = os.path.join(base_dir, file)
        base_img = cv2.imread(base_img_path)

        # Load the corresponding binary mask
        mask_path = os.path.join(mask_dir, file[:-4] + ".png")
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # Find the bounding box of the mask
        contours, hierarchy = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        for i,cnt in enumerate(contours):
            x, y, w, h = cv2.boundingRect(cnt)

            # Crop the base image based on the bounding box
            cropped_img = base_img[y:y + h, x:x + w]

            # Save the cropped image
            image = Image.fromarray(cropped_img)
            image, original_size, ratio = get_image(image)

            with torch.no_grad():
                outputs = model(image, original_size, ratio)
                _, predicted = torch.max(outputs, 1)
                class_index = predicted.item()
                if class_index == 0:
                  cv2.rectangle(base_img, (x, y), (x+w, y+h), (0, 255, 0), 5)
                  cv2.putText(base_img, str(class_index), (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 5)
                if class_index == 1:
                  cv2.rectangle(base_img, (x, y), (x+w, y+h), (0, 160, 255), 5)
                  cv2.putText(base_img, str(class_index), (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 160, 255), 5)
                if class_index == 2:
                  cv2.rectangle(base_img, (x, y), (x+w, y+h), (0, 0, 255), 5)
                  cv2.putText(base_img, str(class_index), (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 5)
                cv2.imwrite(output_dir + "/result_" + str(counter) + ".jpg", base_img)
        counter += 1

# Wykres straty

In [None]:
train_loss = []
for loss in train_losses:
  train_loss.append(loss.item())

In [None]:
val_loss = []
for loss in val_losses:
  val_loss.append(loss.item())

In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(10,5))
plt.title("Training and Validation Loss")
plt.plot(val_loss, label="val")
plt.plot(train_loss[::43], label="train")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# Wykres Precyzji

In [None]:
train_accuracy = []
for acc in train_acc:
  train_accuracy.append(acc.item())

In [None]:
val_accuracy = []
for acc in val_acc:
  val_accuracy.append(acc.item())

In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(10,5))
plt.title("Training and Validation Accuracy")
plt.plot(val_accuracy[::42], label="val")
plt.plot(train_accuracy[::43], label="train")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

# Zapisywanie modelu

In [None]:
path_to_save = '/content/drive/MyDrive/neural_network/growth_phases_model.pth'
torch.save(model.state_dict(), path_to_save)