In [3]:
from pathlib import Path
import pandas as pd
import os
from torch.utils.data import Dataset,DataLoader
from PIL import Image
from torchvision import transforms as T
import torch.nn as nn
import torch
import torch.nn.functional as F
from sklearn.model_selection import GroupKFold, KFold, StratifiedKFold
from sklearn.model_selection import train_test_split
import numpy as np
from fastprogress.fastprogress import master_bar, progress_bar
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score
from torchvision import models
import pdb
import albumentations as A
from albumentations.pytorch.transforms import ToTensor
import matplotlib.pyplot as plt
import pickle

In [5]:
# !pip install efficientnet_pytorch
# !pip install densenet_pytorch
# !pip install resnet_pytorch

In [6]:
from efficientnet_pytorch import EfficientNet
from densenet_pytorch import DenseNet 
from resnet_pytorch import ResNet 

In [16]:
models_path = '/content/drive/MyDrive/MedathonModels/'

In [8]:
def get_model(model_name='efficientnet-b0',lr=1e-5,wd=0.01,freeze_backbone=False,opt_fn=torch.optim.AdamW,device=None):
    if device is None:
        device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    if model_name.startswith('dense'):
        model = MedathonDenseNet(model_name=model_name)
    elif model_name.startswith('eff'):
        model = MedathonEfficientNet(model_name=model_name)
    elif model_name.startswith('res'):
        model = MedathonResNet(model_name=model_name)
    if freeze_backbone:
        for parameter in model.backbone.parameters():
            parameter.requires_grad = False
    opt = opt_fn(model.parameters(),lr=lr,weight_decay=wd)
    model = model.to(device)
    return model, opt

In [9]:
class MedathonEfficientNet(nn.Module):
    def __init__(self,model_name='efficientnet-b0',pool_type=F.adaptive_avg_pool2d):
        super().__init__()
        self.pool_type = pool_type
        self.backbone = EfficientNet.from_pretrained(model_name)
        in_features = getattr(self.backbone,'_fc').in_features
        self.classifier = nn.Linear(in_features,1)

    def forward(self,x):
        features = self.pool_type(self.backbone.extract_features(x),1)
        features = features.view(x.size(0),-1)
        return self.classifier(features)
    
class MedathonDenseNet(nn.Module):
    def __init__(self,model_name='densenet121',pool_type=F.adaptive_avg_pool2d):
        super().__init__()
        self.pool_type = pool_type
        # self.backbone = EfficientNet.from_pretrained(model_name)
        self.backbone = DenseNet.from_pretrained(model_name)
        in_features = getattr(self.backbone,'classifier').in_features
        self.classifier = nn.Linear(in_features,1)

    def forward(self,x):
        features = self.pool_type(self.backbone.extract_features(x),1)
        features = features.view(x.size(0),-1)
        return self.classifier(features)
    
class MedathonResNet(nn.Module):
    def __init__(self,model_name='resnet18',pool_type=F.adaptive_avg_pool2d):
        super().__init__()
        self.pool_type = pool_type
        # self.backbone = EfficientNet.from_pretrained(model_name)
        self.backbone = ResNet.from_pretrained(model_name)
        in_features = getattr(self.backbone,'fc').in_features
        self.classifier = nn.Linear(in_features,1)

    def forward(self,x):
        features = self.pool_type(self.backbone.extract_features(x),1)
        features = features.view(x.size(0),-1)
        return self.classifier(features)

In [10]:
def get_augs_test(p,height,width):
#     p=0.5
    test_tfms = A.Compose([
        A.Resize(height, width),
        A.RandomRotate90(p=p),
            A.Flip(p=p),
            A.OneOf([
                A.RandomBrightnessContrast(brightness_limit=0.2,
                                           contrast_limit=0.2,
                                           ),
                A.HueSaturationValue(
                    hue_shift_limit=20,
                    sat_shift_limit=50,
                    val_shift_limit=50)
            ], p=p),
            A.OneOf([
                A.IAAAdditiveGaussianNoise(),
                A.GaussNoise(),
            ], p=p),
#         ToTensor(normalize=imagenet_stats)
        ])
    return test_tfms

In [11]:
class MedathonDataset(Dataset):
    def __init__(self,im_path,transforms=None,is_test=False):
        # self.df = df
        self.im_path = im_path
        self.transforms = transforms
        self.is_test = is_test
        
    def __getitem__(self,idx):
        img_path = f"{self.im_path}"
        img = Image.open(img_path).convert("RGB")
        if self.transforms:
                img = self.transforms(image=np.array(img))["image"]
        if self.is_test:
            return img
        # target = self.df.iloc[idx]['label']
        # return img, torch.tensor([target],dtype=torch.float32)
    
    def __len__(self):
        # return self.df.shape[0]
        return 1

In [None]:
# test_df = pd.DataFrame({"filename":os.listdir('muradatavalid')})
# test_df["label"] = test_df["filename"].apply(lambda x: int(x[0]))
# test_df.to_csv('valid.csv', index=False)

In [12]:
def get_preds(model,test_ds, test_dl, device=None,tta=1):
    if device is None:
        device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    preds = np.zeros(len(test_ds))
    for tta_id in range(tta):
        test_preds = []
        model.eval()
        with torch.no_grad():
            for xb in test_dl:
                xb = xb.to(device)
                out = model(xb.permute(0,3,1,2).float())
                out = torch.sigmoid(out)
                test_preds.extend(out.cpu().numpy())
            preds += np.array(test_preds).reshape(-1)
        # print(f'TTA {tta_id}')
    preds /= tta
    return preds

In [13]:
def predict_image_class(file_path: str, models, weights, sizes) -> str:
    """"
    Yarışmacıların Doldurması gereken fonksiyon.
    """
    #TODO:
    #all preprocessing & inference steps

    preds=[]
    for i, model in enumerate(models):
        test_ds = MedathonDataset(im_path=file_path, transforms=get_augs_test(p=0.5, height=sizes[i],width=sizes[i]),is_test=True)
        test_dl = DataLoader(dataset=test_ds, batch_size=1, shuffle=False, num_workers=1)
        preds.append(get_preds(model, test_ds, test_dl, tta=8))
    pred = np.sum([preds[i]*weights[i] for i in range(len(models))])
    if pred<0.5: 
        return 'normal'
    else:
        return 'anormal'

In [None]:
import glob
import os
from pathlib import Path
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
def test(folder_path):
    dense121model, dense121opt = get_model(model_name='densenet121',lr=1e-4,wd=1e-4)
    dense121model.load_state_dict(torch.load(f'{models_path}densenet121_BCEWithLogits_10_0.9453735494127344_224-224_102_-1.pth',map_location=device))
    effnet3model, effnet3opt = get_model(model_name='efficientnet-b3',lr=1e-4,wd=1e-4)
    effnet3model.load_state_dict(torch.load(f'{models_path}efficientnet-b3_BCEWithLogits_10_0.9305413142769728_256-256_28_-1.pth',map_location=device))
    resnet18model, resnet18eopt = get_model(model_name='resnet18',lr=1e-4,wd=1e-4)
    resnet18model.load_state_dict(torch.load(f'{models_path}resnet18_BCEWithLogits_10_0.9164495022403869_512-512_32_-1.pth',map_location=device))
    labels = []
    predictions = []
    # for image_path in glob.glob("{}/*".format(test_dir)):
    for image_path in glob.glob("{}/*/*g".format(test_dir)):
        # Image label from directory name.
        # For example image_path: dandelion/a.jpg label: dandelion
        label = Path(image_path).parent.stem
        # label = "normal" if int(image_path.split('/')[-1][0])==0 else "anormal"
        
        # Predict Image Class 
        prediction = predict_image_class(str(image_path),
                                             models=[resnet18model, dense121model, effnet3model],
                                             weights=[0.45,0.40,0.15],
                                             sizes=[512,224,256])
#         print(image_path.split('/')[-1], prediction)
        # Save Predicted and Real label.
        predictions.append(prediction)
        labels.append(label)
        # print(f"label: {label}, pred: {prediction})")
    return labels, predictions

test_dir='/content/drive/MyDrive/muradatavalid' #TODO:

print(test_dir)
y_true, y_pred = test(test_dir)

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred))