In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
import os
import numpy as np
import torch
import pytorch_lightning as pl
from models.modeling import VisionTransformer, CONFIGS


import torch
import torch.nn as nn

from torchmetrics import R2Score




In [2]:
class RMSELoss(nn.Module):
    def __init__(self, eps=1e-10):
        super().__init__()
        self.mse = nn.MSELoss()
        self.eps = eps
        
    def forward(self,yhat,y):
        loss = torch.sqrt(self.mse(yhat,y) + self.eps)
        return loss

In [3]:
from PIL import Image
class data(Dataset):
    def __init__(self, Img_path , band_path,imageformat='jpg',imageSize=256):
        super(data,self).__init__()
        self.path = Img_path
        self.imageSize = imageSize
        self.imageformat = imageformat
        self.bandgap = np.loadtxt(band_path, dtype=np.float32)
    def __len__(self):        
        return 12499
    def __getitem__(self, idx):
        # print(idx)
        img1 = Image.open(f'{self.path}/{idx}_zx.{self.imageformat}').resize((self.imageSize ,self.imageSize ))
        img1 = torch.from_numpy(np.array(img1)).permute(2,0,1).type(torch.float32)
        img2 = Image.open(f'{self.path}/{idx}_yz.{self.imageformat}').resize((self.imageSize ,self.imageSize ))
        img2 = torch.from_numpy(np.array(img2)).permute(2,0,1).type(torch.float32)
        img3 = Image.open(f'{self.path}/{idx}_yx.{self.imageformat}').resize((self.imageSize ,self.imageSize ))
        img3 = torch.from_numpy(np.array(img3)).permute(2,0,1).type(torch.float32)

        return torch.stack([img1, img2, img3], dim=0), self.bandgap[idx]


In [4]:
class BANGap(nn.Module):
    def __init__(self, img_size,num_classes=256):
        super(BANGap, self).__init__()
        self.img_size = img_size

        self.Vit = VisionTransformer(CONFIGS['ViT-B_16'], self.img_size, zero_head=True, num_classes=num_classes)
        self.seq = nn.Sequential(
            nn.Linear(num_classes*3, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.LeakyReLU(),
            nn.Linear(256, 1)
        )
    def forward(self, x):
        x,y,z = x[:,0,:,:,:],x[:,1,:,:,:],x[:,2,:,:,:]
        x ,_= self.Vit(x)
        y ,_= self.Vit(y)
        z ,_= self.Vit(z)
        x = torch.cat([x,y,z],dim=1)
        x = self.seq(x)        
        return x





In [5]:
class BandGap(pl.LightningModule):
    def __init__(self, lr=1e-3,model=None):
        super(BandGap,self).__init__()
        print("Model Initialized")
        self.lr = lr
        self.save_hyperparameters()        
        self.RMSELoss = RMSELoss()
        self.r2score = R2Score()
        self.MAE = nn.L1Loss()
        self.model = model
    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        sch = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True, threshold=0.0001, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-010)
        return {
            "optimizer":optimizer,
            "lr_scheduler" : {
                "scheduler" : sch,
                "monitor" : "val_RMSELoss",                
            }
        }

    def training_step(self, batch, batch_idx):
        x, y = batch
        y = y.unsqueeze(1)
        y_hat = self(x)
        loss = self.RMSELoss(y_hat, y)
        r2 = self.r2score(y_hat, y)
        mae = self.MAE(y_hat, y)
        self.log("train_RMSELoss", loss, prog_bar=True, on_step=True)
        self.log("train_r2", r2, prog_bar=True, on_step=True)
        self.log("train_MAE",mae , prog_bar=True, on_step=True)
        return {"loss": loss, "log": {"train_RMSELoss": loss, "train_r2": r2}}

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y = y.unsqueeze(1)
        y_hat = self(x)
        loss = self.RMSELoss(y_hat, y)
        r2 = self.r2score(y_hat, y)
        mae = self.MAE(y_hat, y)
        self.log("val_RMSELoss", loss, prog_bar=True, on_step=True)
        self.log("val_r2", r2, prog_bar=True, on_step=True)
        self.log("val_MAE",mae , prog_bar=True, on_step=True)
        return {"loss": loss, "log": {"val_RMSELoss": loss, "val_r2": r2}}

    def test_step(self, batch, batch_idx):
        x, y = batch
        y = y.unsqueeze(1)
        y_hat = self(x)
        loss = self.RMSELoss(y_hat, y)
        r2 = self.r2score(y_hat, y)
        mae = self.MAE(y_hat, y)
        self.log("test_RMSELoss", loss, prog_bar=True, on_step=True)
        self.log("test_r2", r2, prog_bar=True, on_step=True)
        self.log("test_MAE",mae , prog_bar=True, on_step=True)
        return {"loss": loss, "log": {"test_RMSELoss": loss, "test_r2": r2}}
    


In [6]:
class DataLoaderBA(pl.LightningDataModule):
    def __init__(self,data, batch_size=32, num_workers=4):
        super().__init__()
        self.data = data
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.pin_memory = True if torch.cuda.is_available() else False
        
    def setup(self, stage=None):
        self.train_data = Subset(self.data, np.arange(int(len(self.data)*0.8)))
        self.val_data = Subset(self.data, np.arange(int(len(self.data)*0.8),int(len(self.data)*0.9)))
        self.test_data = Subset(self.data, np.arange(int(len(self.data)*0.9),len(self.data)))
    def train_dataloader(self):
        return DataLoader(self.train_data, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=True,pin_memory=self.pin_memory)
    def val_dataloader(self):
        return DataLoader(self.val_data, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False,pin_memory=self.pin_memory)
    def test_dataloader(self):
        return DataLoader(self.test_data, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False,pin_memory=self.pin_memory)
    

        

In [7]:
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping

checkpoint_callback = ModelCheckpoint(
    dirpath='./checkpoints',
    filename='{epoch}-{val_RMSELoss:.4f}-{val_r2:.4f}-{val_MAE:.4f}',
    save_top_k=1,
    monitor='val_RMSELoss',
    every_n_epochs =1,
)
early_stop_callback = EarlyStopping(
    monitor='val_RMSELoss',
    min_delta=0.001,
    patience=10,
    verbose=False,
    mode='min'
)


In [8]:
d = data("./Images/images_06092020", "./Images/bandgaps.csv")
m = BANGap(256)
model = BandGap(lr=1e-4,model=m)
model = model.load_from_checkpoint("checkpoints\epoch=29-val_loss=0.95-val_r2=0.11-val_MAE=0.75.ckpt")
dataset = DataLoaderBA(d, batch_size=32, num_workers=0)
dataset.setup(None)
trainer = pl.Trainer(accelerator='gpu', num_sanity_val_steps=0,devices=1, max_epochs=100,accumulate_grad_batches=1,log_every_n_steps=1,callbacks=[checkpoint_callback,early_stop_callback])


Model Initialized


  rank_zero_warn(


Model Initialized


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [9]:
# trainer.fit(model, dataset)

In [15]:
import time
for i in dataset.train_dataloader():
    x,y = i
    x = x[0].unsqueeze(0)
    start = time.time()
    model.eval()
    model(x)
    end = time.time()
    print(end-start)
    break


1.1501328945159912


In [10]:
test = trainer.test(model, dataset.test_dataloader())


You are using a CUDA device ('NVIDIA GeForce RTX 4090') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  rank_zero_warn(


Testing: 0it [00:00, ?it/s]