In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import sklearn
import pytorch_lightning as pl
import warnings
from pytorch_lightning.loggers import MLFlowLogger
from pytorch_lightning.loggers.base import LightningLoggerBase
from pytorch_lightning.callbacks import Callback
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from torch import nn
from torch.utils.data import DataLoader
from typing import Optional, List, Dict, Any, Callable

warnings.filterwarnings("ignore")
pl.utilities.seed.seed_everything(42)
pd.set_option('display.float_format', lambda x: '%.3f' % x)

In [None]:
def normalize(data):
    scaler = MinMaxScaler()
    return scaler.fit_transform(data)

def standarize(data):
    scaler = StandardScaler()
    return scaler.fit_transform(data)

# Load Data

In [None]:
class DataModule(pl.LightningDataModule):
    def __init__(self, data_dir: str, test_size: float = 0.3, val_size: float = 0.1, train_batch_size: int = 64, val_batch_size: int = 64, transforms: List[Callable] = [], no_batch: bool = False):
        super().__init__()
        self.data_dir = data_dir
        self.test_size = test_size
        self.val_size = val_size
        self.train_batch_size = train_batch_size
        self.val_batch_size = val_batch_size
        self.no_batch = no_batch
        self.transforms = transforms
        
    def prepare_data(self):
        self.full_data = pd.read_csv(self.data_dir)
        X = self.full_data.values[:, :-1]
        y = self.full_data.values[:, -1]
        for transform in self.transforms:
            X = transform(X)
        self.transformed_data = X
            
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=self.test_size, stratify=y)
        X_train = torch.tensor(X_train).float()
        X_test = torch.tensor(X_test).float()
        y_train = torch.tensor(y_train).long()
        y_test = torch.tensor(y_test).long()
        
        self.train_data = (X_train, y_train)
        self.test_data = (X_test, y_test)
    
    def setup(self, stage: Optional[str] = None):
        if stage == 'fit' or stage is None:
            X_train, y_train = self.train_data
            X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=self.val_size, stratify=y_train)
            self.train_data = []
            self.val_data = []
            for x, y in zip(X_train, y_train):
                self.train_data.append((x, y))
                
            for x, y in zip(X_val, y_val):
                self.val_data.append((x, y))
            
            if self.no_batch:
                self.train_batch_size = len(self.train_data)
                self.val_batch_size = len(self.val_data)
        
        if stage == 'test' or stage is None:
            X_test, y_test = self.test_data
            self.test_data = []
            for x, y in zip(X_test, y_test):
                self.test_data.append((x, y))

    def train_dataloader(self):
        return DataLoader(self.train_data, self.train_batch_size, shuffle=True, num_workers=1, pin_memory=True)
    
    def val_dataloader(self):
        return DataLoader(self.val_data, self.val_batch_size, shuffle=False, num_workers=1, pin_memory=True)
    
    def test_dataloader(self):
        return DataLoader(self.test_data, shuffle=False, num_workers=1, pin_memory=True)

In [None]:
#Initialize data
datamodule = DataModule('', no_batch=True, transforms=[normalize])
datamodule.prepare_data()

In [None]:
datamodule.full_data.head(5)

In [None]:
datamodule.full_data.info()

In [None]:
datamodule.full_data.describe()

# Visualisation

In [None]:
def plot_hist(data, x, y, figsize=(13, 13), bins=15):
    fig, ax = plt.subplots(x, y, figsize=figsize)
    for i in range(data.transformed_data.shape[1]):
        ax[int(i / y), i % y].hist(data.transformed_data[:, i], bins=bins)
        ax[int(i / y), i % y].set_title(data.full_data.columns[i])
    plt.show()

In [None]:
plot_hist(datamodule, 2, 4)

# Models

In [None]:
class Classifier(pl.LightningModule):
    def __init__(self, model, lr=1e-3):
        super().__init__()
        self.model = model
        self.lr = lr
    
    def forward(self, x):
        return self.model.forward(x)
        
    def shared_step(self, x, y):
        pred = self(x)
        criterion = nn.CrossEntropyLoss()
        loss = criterion(pred, y)
        return pred, loss
        
    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        pred, loss = self.shared_step(x, y)
        _, predicted = torch.max(pred.data, 1)
        return {'loss': loss, 'train_score': (predicted, y)}
    
    def training_epoch_end(self, outputs):
        loss = [output['loss'] for output in outputs]
        loss_nd = np.double(loss[-1].detach().cpu().numpy())
        
        results = [x['train_score'] for x in outputs]
        preds = []
        y = []
        for predicted, y_data in results:
            predicted = predicted.detach().cpu().numpy()
            y_data = y_data.detach().cpu().numpy()
            preds.extend(predicted)
            y.extend(y_data)
        f1 = f1_score(y, preds)
        self.log('loss', loss_nd, logger=True)
        self.log('train_f1', f1, prog_bar=True, logger=True)
    
    def validation_step(self, val_batch, val_batch_idx):
        x, y = val_batch
        pred, loss = self.shared_step(x, y)
        _, predicted = torch.max(pred.data, 1)
        return {'val_loss': loss, 'val_score': (predicted, y)}
    
    def validation_epoch_end(self, outputs):
        results = [x['val_score'] for x in outputs]
        preds = []
        y = []
        for predicted, y_data in results:
            predicted = predicted.detach().cpu().numpy()
            y_data = y_data.detach().cpu().numpy()
            preds.extend(predicted)
            y.extend(y_data)
        f1 = f1_score(y, preds)
        self.log('val_f1', f1, prog_bar=True, logger=True)
        
    def test_step(self, test_batch, test_batch_idx):
        x, y = test_batch
        pred, loss = self.shared_step(x, y)
        _, predicted = torch.max(pred.data, 1)
        return {'test_loss': loss, 'test_score': (predicted, y)}
    
    def test_epoch_end(self, outputs):
        results = [x['test_score'] for x in outputs]
        preds = []
        y = []
        for predicted, y_data in results:
            predicted = predicted.detach().cpu().numpy()
            y_data = y_data.detach().cpu().numpy()
            preds.extend(predicted)
            y.extend(y_data)
        acc = accuracy_score(y, preds)
        f1 = f1_score(y, preds)
        self.log('accuracy', acc, prog_bar=True, logger=True)
        self.log('f1', f1, prog_bar=True, logger=True)
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
        return [optimizer], [scheduler]

In [None]:
class Model(pl.LightningModule):
    def __init__(self):
        super().__init__()
        
        self.name = ''
    
    def forward(self, x):
        return self._nn(x)

# Evaluation

In [None]:
def generate(elem: Any, count: int, step: Any):
    current = elem
    output = []
    for _ in range(count):
        output.append(current)
        current += step
    return output

In [None]:
class Evaluator:
    def __init__(self, models: List[pl.LightningModule], version: str, trainer_params: Dict[str, Any] = {}, logger: LightningLoggerBase = MLFlowLogger, logger_params: Dict[str, Any] = {}, 
                 callbacks: List[Callback] = [], callbacks_params: List[Dict[str, Any]] = [{}]):
        self.models = models
        self.classifiers = [Classifier(model) for model in models]
        self.version = version
        self.paths = ['model/' + version + model.name for model in models]
        self.loggers = [logger(**logger_params) for _ in models]
        self.trainer_params = trainer_params
        self.callbacks = []
        for _ in models:
            self.callbacks.append([callback(**params) for callback, params in zip(callbacks, callbacks_params)])
    
    def train(self, min_lr: float, datamodule: pl.LightningDataModule, transforms: List[Callable] = [], find_lr: bool = True, verbose: bool = False, no_batch: bool = False):
        for path, classifier, logger in zip(self.paths, self.classifiers, self.loggers):
            self.trainer = pl.Trainer(logger=logger, **self.trainer_params)
            if find_lr:
                lr_finder = self.trainer.tuner.lr_find(classifier, min_lr=min_lr, datamodule=datamodule(no_batch=no_batch, transforms=transforms), early_stop_threshold=None)
                classifier.lr = lr_finder.suggestion()
            else:
                classifier.lr = min_lr

            if verbose and find_lr:
                print(f'Best lr: {classifier.lr}')

            self.trainer.fit(classifier, datamodule(no_batch=no_batch, transforms=transforms))
            self.trainer.save_checkpoint(path)
    
    def test(self, datamodule: pl.LightningDataModule, transforms: List[Callable], no_batch: bool = False):
        for path, model, logger in zip(self.paths, self.models, self.loggers):
            self.trainer = pl.Trainer(logger=logger, **self.trainer_params)
            classifier = Classifier.load_from_checkpoint(path, model=model)
            self.trainer.test(classifier, datamodule=datamodule(no_batch=no_batch, transforms=transforms))

In [None]:
class LinearSearch:
    def __init__(self, linear_params, linear_params_count):
        self.linear_params = linear_params
        self.linear_params_count = linear_params_count
    
    def search(self, models: List[pl.LightningModule], in_size: int, out_size: int, datamodule: pl.LightningDataModule, versions: List[str], min_lr: float = 1e-03, transforms: List[Callable] = [], 
               find_lr=False, verbose=False, callbacks: List[Callback] = [], callbacks_params: List[Dict[str, Any]] = [{}]):
        for i in range(self.linear_params_count):
            obj_models = [model(in_size, out_size) for model in models]
            params = {}
            for key in self.linear_params.keys():
                params[key] = self.linear_params[key][i]
            if verbose:
                print(params)
            evaluator = Evaluator(obj_models, versions[i], params, callbacks=callbacks, callbacks_params=callbacks_params)
            evaluator.train(min_lr, datamodule, transforms, find_lr=find_lr, verbose=verbose)
            evaluator.test(datamodule, transforms)

In [None]:
models = [] #Objects
min_lr = 1e-03
in_size = 0
out_size = 0
datamodule = DataModule
transforms = [normalize]
callbacks = [] # List of callback class names
callbacks_params = [{}] # List of corresponding parameters
linear_params_count = 6
linear_params = {
    'gpus': generate(1, params_count, 0),
    'max_epochs': [20, 30, 40, 50, 75, 100],
    'gradient_clip_val': generate(0.5, params_count, 0.),
    'stochastic_weight_avg': generate(True, params_count, False),
    'amp_level': generate('O3', params_count, ''),
    'precision': generate(16, params_count, 0)
}
versions = [str(max_epoch) + 'epoch_' for max_epoch in params['max_epochs']]

In [None]:
linear_search = LinearSearch(linear_params, linear_params_count)
linear_search.search(models, in_size, out_size, datamodule, versions, min_lr, transforms, False, False, callbacks, callbacks_params)