In [68]:
import os
import io
import cv2
import csv

import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision
from torchvision import transforms, models, datasets

import matplotlib.pyplot as plt
from PIL import Image
from torch import optim
import glob, numpy as np, pandas as pd

from glob import glob
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SubsetRandomSampler

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [69]:
model = models.vgg16(pretrained=True)
 
train_data_dir = r'train'
test_data_dir = r'test_split/1'

In [70]:
class Fire(Dataset):
    def __init__(self, folder):
        no_fire=[os.path.normpath(i) for i in glob(folder+'/no_fire/*.jpg')]
        fire=[os.path.normpath(i) for i in glob(folder+'/fire/*.jpg')]
        self.fpaths=fire[:2000]+no_fire[:2000]
        self.normalize=transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        from random import shuffle, seed; seed(10);
        shuffle(self.fpaths)
        self.targets=[]
        for fpath in self.fpaths:
            if fpath.startswith(r'train\fire') or fpath.startswith(r'test\fire'):
                self.targets.append(1)
            else:
                self.targets.append(0)

    def __len__(self):
        return len(self.fpaths)

    def __getitem__(self, ix):
        f = self.fpaths[ix]
        target = self.targets[ix]
        im = cv2.imread(f)[:,:,::-1]
        im = cv2.resize(im, (224,224))
        im = torch.tensor(im/255)
        im = im.permute(2,0,1)
        im = self.normalize(im)
        return im.float().to(device), torch.tensor([target]).float().to(device)


In [71]:
def get_model2():
    for param in model.parameters():
        param.requires_grad = False
    counter = 0
    for param in model.parameters():
        counter += 1
        if counter > 22:
            param.requires_grad = True
 
    model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))

    model.classifier = nn.Sequential(nn.Flatten(),
                                     nn.Linear(512, 128),
                                     nn.ReLU(),
                                     nn.Dropout(0.2),
                                     nn.Linear(128, 1),
                                     nn.Sigmoid()
                                     )

    loss = nn.BCELoss()
    optim = torch.optim.Adam(model.parameters(), lr=1e-3)
    return model.to(device), loss, optim


model, criterion, optimizer = get_model2()

In [72]:
def train_batch(x, y, model, loss, optim):
    model.train(),
    prediction = model(x)
    loss_value = loss(prediction, y)
    loss_value.backward()
    optim.step()
    optim.zero_grad()
    return loss_value.item()


def accuracy(x, y, model):
    model.eval()
    prediction = model(x)
    is_correct = (prediction>0.5) == y
    return is_correct.cpu().numpy().tolist()


In [73]:
fire_dataset = Fire(train_data_dir)
validation_split = 0.2
seed = 10

indxs = list(range(len(fire_dataset)))
split = int(np.floor(validation_split * len(fire_dataset)))
np.random.seed(seed)
np.random.shuffle(indxs)
trn_ind, val_ind = indxs[split:], indxs[:split]

trn_s = SubsetRandomSampler(trn_ind)
val_s = SubsetRandomSampler(val_ind)

trn_dl = DataLoader(fire_dataset,batch_size=32,sampler=trn_s)
vld_dl = DataLoader(fire_dataset,batch_size=32,sampler=val_s)

train_accuracies, train_losses = [], []
val_accuracies = []

In [None]:
for epoch in range(20):
    print(f" epoch {epoch + 1}/20")
    train_epoch_losses, train_epoch_accuracies = [], []
    val_epoch_accuracies = []
    for ix, batch in enumerate(iter(trn_dl)):
        x, y = batch
        batch_loss = train_batch(x, y, model, criterion, optimizer)
        train_epoch_losses.append(batch_loss)
    for ix, batch in enumerate(iter(trn_dl)):
        x, y = batch
        is_correct = accuracy(x, y, model)
        train_epoch_accuracies.extend(is_correct)

    train_epoch_loss = np.array(train_epoch_losses).mean()
    train_epoch_accuracy = np.mean(train_epoch_accuracies)
    for ix, batch in enumerate(iter(vld_dl)):
        x, y = batch
        val_acc = accuracy(x, y, model)
        val_epoch_accuracies.extend(val_acc)
    val_epoch_accuracy = np.mean(val_epoch_accuracies)
    train_losses.append(train_epoch_loss)
    train_accuracies.append(train_epoch_accuracy)
    val_accuracies.append(val_epoch_accuracy)
    print("loss at {} epoch is ".format(epoch + 1) + str(train_epoch_loss))
    print("accuracy at {} epoch is ".format(epoch + 1) + str(train_epoch_accuracy * 100) + "%")
    print("validation accuracy at {} epoch is ".format(epoch + 1) + str(val_epoch_accuracy * 100) + "%")


In [74]:
PATH=r"C:\Users\Наиль\Desktop\vgg16_unfreeze3.pt"
#torch.save(model,PATH)


In [75]:
model = torch.load(PATH)
model.eval()

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [76]:
tests = [os.path.normpath(i) for i in glob('test/*.jpg')]


In [77]:
def transform_image(image_bytes):
    my_transforms = transforms.Compose([transforms.Resize(255),
                                        transforms.CenterCrop(224),
                                        transforms.ToTensor(),
                                        transforms.Normalize(
                                            [0.485, 0.456, 0.406],
                                            [0.229, 0.224, 0.225])])
    image = Image.open(io.BytesIO(image_bytes))
    return my_transforms(image).unsqueeze(0)


def get_prediction(image_bytes):
    tensor = transform_image(image_bytes=image_bytes)
    tensor = tensor.to(device)
    output = model.forward(tensor)

    probs = torch.nn.functional.softmax(output, dim=1)
    conf, classes = torch.max(probs, 1)
    return conf.item(), classes.item()


loader = transforms.Compose([ transforms.ToTensor()])


def image_loader(image_name):    
    image = cv2.imread(image_name)[:,:,::-1]
    image = cv2.resize(image, (224,224))
    image = torch.tensor(image/255)
    image = image.permute(2,0,1)
    normalize=transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    image = normalize(image)
    image = image.unsqueeze(0)  
    return image.float() 


results = []
with torch.no_grad():    
    for test in tests:
        image = image_loader(test)
        image=image.cuda()
        result = 1 if model(image).item() > 0.5 else 0
        #print(str(result) + " in " + test)
        results.append((test[5:],result))

#print(results)


In [78]:
with open('test_lables.csv','w',encoding='UTF8',newline='') as f:
    writer = csv.writer(f)

    for t in results:
        writer.writerow(t)