In [None]:
import os
import glob
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
import pytorch_lightning as pl
from torch.utils.data import Dataset,DataLoader
from torchvision import transforms
import torchvision.models as vismodels


#%load_ext tensorboard
#%tensorboard --logdir ./logs

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((256,256)),#256
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
class PneumoniaDataset(Dataset):
    
    def __init__(self,root,transforms=None):
        imagepth = []
        for typ in os.listdir(root):
            pth = root + "/{}/*.jpeg".format(typ)
            imagepth += glob.glob(pth)
        self.imagepth = imagepth
        self.transforms = transforms

    
    def __len__(self):
        
        return len(self.imagepth)
    
    def __getitem__(self,idx):
        
        img_path = self.imagepth[idx]
        img = Image.open(img_path).convert("RGB")
        target = 1 if "/PNEUMONIA/" in img_path else 0   
        
        
        target = torch.FloatTensor([target])
        if self.transforms:
            img = self.transforms(img)
        
        return img,target
        

In [None]:
train_ds = PneumoniaDataset("../input/chest-xray-pneumonia/chest_xray/train",transforms=data_transforms["train"])
test_ds = PneumoniaDataset("../input/chest-xray-pneumonia/chest_xray/test",transforms=data_transforms["val"])
print(f'train_ds size is {len(train_ds)} test_ds size is {len(test_ds)}')

In [None]:
int(train_ds[1002][1])

In [None]:
def initialize_model(model_path=None,pretrained=False,loc=None):
    model = vismodels.densenet121(pretrained = pretrained)
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Linear(num_ftrs,1 )#model.classifier = torch.nn.Linear(1024,1)
    if model_path!=None:
        model.load_state_dict(torch.load(model_path,map_location=loc))
    
    return model

In [None]:
#from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.metrics.functional import accuracy,precision_recall
#from pytorch_lightning.metrics import MetricCollection, Accuracy, Precision, Recall

#metric_collection = MetricCollection([
#    Accuracy(),
#    Precision(num_classes=2, average='macro'),
#    Recall(num_classes=2, average='macro')
#])
#logger = TensorBoardLogger("logs", name="my_model")


In [None]:

class LitModel(pl.LightningModule):
    def __init__(self,pmodel,lr=1e-3):
        super().__init__()
        self.pmodel = pmodel
        self.lr = lr
        #self.log = pl.loggers.TensorBoardLogger('logs/')

        #self.train_acc = pl.metrics.Accuracy()
        #self.valid_acc = pl.metrics.Accuracy()    
        
    def forward(self, x):
        embedding = self.pmodel(x)
        return embedding
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return optimizer


    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
       
        x_hat = self.pmodel(x) 
        x_hat = F.sigmoid(x_hat)

        loss = F.binary_cross_entropy(x_hat, y)
       # self.log('train_loss', loss)
        
        tar = y.int()
        acc = accuracy(x_hat,tar)
        pre,rec = precision_recall(x_hat,tar)
        #acc = metric_collection(x_hat,tar)

        self.log("train_acc",acc,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("train_recall",rec,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("train_prec",pre,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("train_loss",loss,on_step=False,on_epoch=True,prog_bar=True,logger=True)

        #self.log('train_acc', self.train_acc, on_step=True, on_epoch=False)
        return loss
    

    
    
    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        x_hat = self.pmodel(x)
        x_hat = F.sigmoid(x_hat)
        loss = F.binary_cross_entropy(x_hat, y)
       # self.log('val_loss', loss)
        
        tar = y.int()
        
        acc = accuracy(x_hat,tar)
        pre,rec = precision_recall(x_hat,tar)
        #acc =  metric_collection(x_hat,tar)
        self.log("val_acc",acc,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("val_prec",pre,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("val_rec",rec,on_step=False,on_epoch=True,prog_bar=True,logger=True)
        self.log("val_loss",loss,on_step=False,on_epoch=True,prog_bar=True,logger=True)

  

        



In [None]:
!git clone https://github.com/AwaisIsane/Pneumonia-Detection.git
densenet = initialize_model(model_path = "./Pneumonia-Detection/Pneumonia2.pt")
batch_size = 32
#densenet = initialize_model(pretrained=True)
train_dl = DataLoader(train_ds,batch_size=batch_size,num_workers=4)
test_dl = DataLoader(test_ds,batch_size=batch_size,num_workers=4)

In [None]:
#Pmodel = LitModel(densenet)
#Pmodel = LitModel(densenet,lr = 1e-3)
Pmodel = LitModel(Pmodel.pmodel,lr=1e-4)
#trainer = pl.Trainer(max_epochs=10,gpus=1)#25+10(e-3)+25(e-4)

In [None]:
torch.save(Pmodel.pmodel.state_dict(),"Pneumoniaden.pt")

In [None]:
trainer = pl.Trainer(max_epochs=25,gpus=1)#25+

In [None]:
trainer.fit(Pmodel,train_dl,test_dl)

In [None]:
cp = Pmodel.pmodel
torch.save(cp.state_dict(),"Pneumonia2.pt")

In [None]:
#efficient net
!pip install efficientnet_pytorch

from efficientnet_pytorch import EfficientNet
effnet = EfficientNet.from_pretrained('efficientnet-b3')
in_features = effnet._fc.in_features
effnet._fc = nn.Linear(in_features,1)
Emodel = LitModel(effnet,lr = 1e-3)

In [None]:
Emodel = LitModel(Emodel.pmodel,lr = 1e-4)

In [None]:
trainer = pl.Trainer(max_epochs=25,gpus=1)
trainer.fit(Emodel,train_dl,test_dl)