In [1]:
import os
import os.path as path
import numpy as np
import pandas as pd
import abc

import operator
from queue import Queue
from copy import deepcopy
from collections import defaultdict

import torch
import torch.nn as nn
import torch.utils as utils
import pytorch_lightning as pl
import torch.nn.functional as F

from torch.utils.data import DataLoader, Dataset

import sklearn.metrics as metrics


In [2]:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# consoleHandler = logging.StreamHandler()
# consoleHandler.setLevel(logging.DEBUG)
open ('outputs.log', 'w').close()
fileHandler = logging.FileHandler('outputs.log', mode='w')
fileHandler.setLevel(logging.DEBUG)


formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)

logger.addHandler(fileHandler)


In [3]:

class Vertex: 
    def __init__(self, dests:list = [], data = None):
        self.data = data
        self.dests = dests
    
    def add_dest (self, dest):
        assert dest not in self.dests
        assert self not in dest.srcs.srcs
        self.dests.append (dest)
        self.dest.srcs.append (self)
        
    def remove_edge (self, dest):
        if dest in self.dests: 
            self.dests.remove (dest)
        
class FlowchartNode (Vertex):
    def __init__ (self, position:list, label:str = 'layer', dests:list = [], srcs:list = []):
        self.dests = dests
        self.srcs = srcs
        self.position = position
        
        self.torchComponent = None
        self.label = label
        
        if all ([isinstance (dest, FlowchartNode) for dest in dests]):
            self.dests = dests
        
        else:
            raise ValueError ('Destinations of FlowchartNode must be of the same class')
    
    # def set_torch_component (self, module: str, size):
    #     self.torchComponent = TorchComponent (module, size)
        
    def __repr__ (self):
        return 'FlowchartNode(position={}, label={}, dests={}, srcs={}'.format(
            self.position, self.label, ', '.join([d.label for d in self.dests]),
            ', '.join([s.label for s in self.srcs])
        )
        
class NodeGroup:
    def __init__ (self, nodes): 
        self.nodes = nodes
        
    def move (self, x=0, y=0):
        for node in self.nodes:
            node.position[0] += x
            node.position[1] += y


class FlowchartGraph:
    def __init__(self):
        self.startNode = FlowchartNode ([0, 0])
        
    @classmethod
    def fromDict (cls, inpDict, startNode, endNode, torchComponentsDict={}):
        f = cls ()
        
        nodes = {label:deepcopy(FlowchartNode ([i, i], label=label)) for i, label in enumerate(inpDict.keys())}
        # for label, dests in inpDict.items():
        #     print (f'{label=} {dests=}')
        #     for dest in dests:
        #         nodes[label].dests.append(nodes[dest])
        #         print (f'adding {dest} to {label}')
        #         nodes[dest].srcs.append (nodes[label])
        #         print (f'adding to src {label} to {dest}')
        
        # for label, node in nodes.items():
        #     # node.dests.extend ([nodes[dest] for dest in inpDict[label]])
        #     for destLabel in inpDict [label]:
        #         print (f'{destLabel=}, {label=}')
        #         cls.addEdge (node, nodes[destLabel])
        
        for label, dests in inpDict.items():
            # print (f'{label=} {dests=}')
            for dest in dests:
                cls.addEdge (nodes[label], nodes[dest])
                # print (f'{nodes[label].label} dest: {nodes[dest].label}')
                # print (f'{label=} {nodeLabels(nodes[label])}')

        for label, component in torchComponentsDict.items():
            nodes[label].torchComponent = component
        
        f.startNode = nodes[startNode]
        f._endNode = nodes[endNode]
        
        return f
    
    @property
    def endNode (self):
        assert hasattr(self, '_endNode'), 'call setEndNode to define '\
                                        + 'output point, necessary before '\
                                        + 'certain methods'
                                        
        return self._endNode
    
    def setEndNode (self, node:FlowchartNode):
        assert isinstance(node, FlowchartNode)
        self._endNode = node
        
    def searchGraph (self, **kwargs):
        visited = []
        results = []
        q = list()
        q.append (self.startNode)
        while q:
            s = q.pop (0)
            if s in visited:
                continue
            visited.append (s)
            
            if all (getattr(s, attr)==query for attr, query in kwargs.items()): 
                results.append (s)
            for dest in s.dests:
                q.append (dest)
                
        return results


    def graphView (self):
        excecuteOrder = self.generateExcecutionOrder()
        # excecuteOrderLength = max(list(excecuteOrder.keys()))
        return str('\n'.join ([
            'Index: {}: {}'.format (
                len(excecuteOrder)-index, 
                 ', '.join ([node.label for node in nodes])) 
            for index, nodes in excecuteOrder.items()
        ]))
        
    def generateExcecutionOrder (self):
        excOrder = {}
        
        q = list()
        q.append (self.endNode)
        
        while q:
            dequed = q.pop(0)
            if all (node in excOrder for node in dequed.dests):
                excOrder[dequed] = max ([-1] + [excOrder[dest] for dest in dequed.dests])+1
                
                for srcNode in dequed.srcs:
                    q.append (srcNode)
                    
        sortedExcOrder = defaultdict (list)
        for node, order in (excOrder.items()):
            sortedExcOrder[order].append (node)
        return dict(sorted(sortedExcOrder.items(), reverse=True))
        
    def compileTorchGraph (self, input_dim):
        self.initialize_torch_components (input_dim)
        return TorchGraph (self.startNode, self.endNode, self.generateExcecutionOrder())
        
    def initialize_torch_components (self, input_dim):
        q = list()
        self.startNode.torchComponent.initialize_torch(input_dim)
        q.extend (self.startNode.dests)
        while q:
            dequed = q.pop(0)
            for destNode in dequed.dests:
                q.append (destNode)
            
            srcs = dequed.srcs
            assert all (srcs[0].torchComponent.output_dim == src.torchComponent.output_dim for src in srcs)
            dequed.torchComponent.initialize_torch (srcs[0].torchComponent.output_dim)
    
    # @property
    # def model (self):
    #     if not hasattr(self.model, '_model'):
    #         raise ValueError ('Graph must be compiled before accessing model')
    #     return self._model
    
    @staticmethod
    def addEdge (src, dest): 
        assert (dest not in src.dests) and (src not in dest.srcs), \
            f'cannot add edge; duplicate connection {src.label=}, {dest.label=}'
            
        src.dests.append (dest)
        dest.srcs.append (src)
    
    @classmethod
    def countNodes (cls, node, visited=[]):
        destsCount = []
        for dest in node.dests:
            if dest not in visited:
                visited.append(dest)
                destsCount.append(cls.countNodes(dest, visited))
            
        return sum(destsCount)+1
    
    @classmethod
    def maxDepth (cls, node):
        if node is None:
            return -1
        
        else:
            return max ([0] + [
                    cls.maxDepth(node) 
                    for node in node.dests
                ])+1  
        
    def __str__ (self):
        return self.graphView()      


    def __repr__ (self):
        return (
            'FlowchartGraph(nodes: {}, depth: {})'
                .format (
                    self.countNodes(self.startNode), 
                    self.maxDepth(self.startNode)
            )
        )
        
        
class TorchComponent (nn.Module):
    nessisary_params = {'example':type}
        
    # @abc.abstractmethod
    def __init__ (self, ) -> None:
        """ Saves params and calculates output variable 
            Should verify that all nessisary_inputs are in inputs
            Vars:
                self.output_dim (int): dimension of the layer's output
                self.initialized (bool): False
            
        """
        super().__init__()
        
        return
    
    def verify_inputs (self):
        if not set(self.params.keys()) == set(self.nessisary_params.keys()):
            return False
        
    
    @abc.abstractmethod
    def initialize_torch (self, input_dim) -> None:
        """ Initializes a torch module with params
            Sets self.initializd = True
            Inputs:
                input_dim (int): dimension of the input layer's output
                                
        """
        return
    
    @abc.abstractmethod
    def forward (self, batch) -> torch.tensor:
        """ Runs the module and returns the output . 
            Must be called after self.initialize_torch
            
            Inputs: 
                batch (torch.Tensor): output from previous layer
                
            Returns:
                (torch.Tensor): layer output
            
        """
        return
        
    def __repr__ (self):
        return f'{type(self).__name__}(initialized: {self.initialized}, output_dim: {self.output_dim})'
    
class LinearComponent (TorchComponent):
    # nessisary_params = {'size':int, 'num_layers':int}
    
    def __init__ (self, size, num_layers=1):      
        """  
        Args: 
            size: int,
            num_layers: int
        
        Vars:
            self.params (dict): paramaters to build torch module with
            self.output_dim (int): dimension of the layer's output
            self.initialized (bool): False

        """
  
        # if self.verify_inputs (params) == False:
        #     raise ValueError ('params did not contain all nessisary_params')
        
        super().__init__()
        
        self.num_layers = num_layers
        self.output_dim = size
        
        self.initialized = False
        
        
    def initialize_torch (self, input_dim): 
        self.input_dim = True
        self.linear_modules = nn.ModuleList (
            [
                nn.Linear (input_dim, self.output_dim),
                *[
                    nn.Linear(self.output_dim, self.output_dim) 
                    for i in range (self.num_layers-1)
                ]
            ])
        
        # self.weight = nn.Parameter (torch.FloatTensor (self.output_dim, input_dim))
        # nn.init.xavier_uniform_(self.weight)
        # self.linear_test_layer = nn.Linear (input_dim, self.output_dim)
        
        # logging.debug (f'Initialized Linear Layer with input_size {input_dim}')
            
    def forward (self, batch):
        batch = batch.float()

        logger.debug (f'batch type= {batch.dtype}, {batch.shape=}')
        for i, layer in enumerate (self.linear_modules):
            logger.debug (f'starting linear layer {i} with batch type {batch.dtype}, size {batch.shape}')
            batch = layer (batch)
        
        # batch = self.linear_test_layer (batch)
        # batch = F.linear (batch, self.weight)

        logger.debug ('linear layer forward pass complete')
        
        return [batch]
                    
        
inpDict = {
    'a':['b','c'],
    'b':['d'],
    'c':['d'],
    'd':[]
}

inpDict = {
    'a':['b','c'],
    'b':['d'],
    'c':['e'],
    'd':['e'],
    'e':['f'],
    'f':[]
    
}


g = FlowchartGraph.fromDict (inpDict, 'a', 'f')


In [4]:
g.setEndNode(g.searchGraph (label='f')[0])

In [5]:
g.generateExcecutionOrder()

{4: [FlowchartNode(position=[0, 0], label=a, dests=b, c, srcs=],
 3: [FlowchartNode(position=[1, 1], label=b, dests=d, srcs=a],
 2: [FlowchartNode(position=[2, 2], label=c, dests=e, srcs=a,
  FlowchartNode(position=[3, 3], label=d, dests=e, srcs=b],
 1: [FlowchartNode(position=[4, 4], label=e, dests=f, srcs=c, d],
 0: [FlowchartNode(position=[5, 5], label=f, dests=, srcs=e]}

In [6]:
print (g.graphView())

Index: 1: a
Index: 2: b
Index: 3: c, d
Index: 4: e
Index: 5: f


In [7]:
class TorchGraph (pl.LightningModule):
    default_criterions = {
        'binary': nn.BCELoss,
        'regression':nn.MSELoss,
        'multiclass':nn.CrossEntropyLoss
        
    }
    
    available_metrics = {
        'accuracy': {
            'criterion': metrics.accuracy_score,
            'kwargs': {}, 
            'optimization_operator': operator.gt
        }
    }

    def __init__ (self, startNode, endNode, excecutionOrder):
        super().__init__()
        
        # self.assembled = False
        self.startNode = startNode
        self.endNode = endNode
        self.excecutionOrder = excecutionOrder
        
        self.startNode.srcs = ['inputs']
        
        self.best_metric_scores = defaultdict (dict)
        self.saved_min_loss = np.inf
        
        self.selected_metrics = {}
    
    def add_metric (self, metrics: list[str] | str):
        if isinstance(metrics, str):
            metrics = [metrics]
            
        for metric in metrics:
            if metric not in self.available_metrics:
                continue
            
            self.selected_metrics[metric] = self.available_metrics[metric]
            
            
    @property
    def criterion (self):
        if not hasattr(self, '_criterion'):
            self._criterion = self.get_criterion()()
        return self._criterion
    
    @criterion.setter
    def criterion (self, criterion):
        self._criterion = criterion
    
    def get_criterion (self):
        if self.endNode.torchComponent.output_dim > 1:
            return self.default_criterions['multiclass']
        
        else:
            return self.default_criterions['regression']
        
    
        
                            
    def forward (self, *args, labels=None):
        # assert self.assembled == True, 'torch must be assembled ' +\
        #                                'before calling forward'
        nodeOutputs = {'inputs':args}
        
        for order, nodes in self.excecutionOrder.items():
            for node in nodes: 
                inputs = []
                for src in node.srcs:
                    inputs.extend (nodeOutputs[src]) 
                    
                # logger.debug (f'{len(inputs)=}, {type(inputs[0])}, {inputs[0].shape=}')
                
                nodeOutputs[node] = node.torchComponent (*inputs)
                
                # nodeOutputs[node] = node.torchComponent (
                #     *[nodeOutputs[src] for src in node.srcs]
                # )
        outputs = nodeOutputs[self.endNode][0]
        
        if self.endNode.torchComponent.output_dim == 1:
            outputs = torch.squeeze (outputs)
        
        loss = 0
        if labels is not None:
            labels = labels.type (torch.float32)
            logger.debug (f'{self.criterion=}, {outputs.shape=}, {labels.shape=}, {outputs.dtype}, {labels.dtype}')
            loss = self.criterion (outputs, labels)
        
        return loss, outputs
    
    def configure_optimizers (self):
        return torch.optim.Adam(self.parameters, lr=1e-3)
    
    def _step (self, batch, step_type):
        src, trg = batch
        loss, out = self (src, labels=trg)
        self.log('loss/{}'.format(step_type), loss, prog_bar=True, logger=True)
        
        return {'loss': loss, 'y_hat': out, f'labels': trg}
    
    def training_step (self, batch, batch_idx):
        values = self._step (batch, 'train')
        return values
    
    def validation_step (self, valid_batch, batch_idx):
        values = self._step (valid_batch, 'valid')
        return values
        
    def test_step (self, test_batch, batch_idx):
        values = self._step (test_batch, 'test')
        return values
        
    def training_epoch_end(self, outputs):
        self.log_metrics(outputs, 'train')
        
    def validation_epoch_end(self, outputs):
        self.log_metrics(outputs, 'valid')

    def test_epoch_end(self, outputs):
        self.log_metrics(outputs, 'test')


    def log_metrics (self, outputs, step_type):
        """ Logs metrics defined in self.selected_metrics
        
        format of selected_metrics:
            Dict: {
                name: str,
                metric: Dict {
                    criterion: function (out, trg, kwargs)
                    kwargs: any extra key word arguments 
                            to be passed into criterion
                    optimization_operator: function (int, int) 
                                           simply an operator such as lt or gt
                                           defines which argument should be saved
                                           to self.best_metric_scores
                }
            }
        """
        
        labels, preds = [], []

        for output in outputs:
            labels.extend (output["labels"].detach().cpu())
            preds.extend (output["y_hat"].detach().cpu())
            
            # for out_labels in output["labels"].detach().cpu():
            #     labels.append(out_labels)
            # for out_predictions in output["y_hat"].detach().cpu():
            #     preds.append(out_predictions)

        labels = torch.stack(labels).int()
        preds = torch.stack(preds)
        
        
        for name, metric, in self.selected_metrics.items():
            metric_score = metric['criterion'] (preds, labels, **metric['kwargs'])
            
            if metric['optimization_operator'] (metric_score, self.best_metric_scores [step_type][name]):
                self.best_metric_scores [step_type][name] = metric_score
                
            self.log ('{}/{}'.format(name, step_type), metric_score)
            self.log('best {}/{}'.format(name, step_type),
                     self.saved_metric_scores[step_type][name])
        self.log (f'min_loss/{step_type}', self.min_loss (torch.stack(*(d['loss'] for d in outputs)).mean()))
        
    def min_loss (self, loss):
        if loss < self.saved_min_loss:
            self.saved_min_loss = loss
        return self.saved_min_loss
        
    
class WineDataset (Dataset):
    def __init__ (self, data, datasetPercent: float=None):
        self.datasetPercent = datasetPercent
        
        self.src = data.loc [:, data.columns != 'quality']
        self.trg = data.loc [:, 'quality']
                
    def __getitem__(self, idx):
        return [
            torch.tensor (self.src.iloc[idx]), 
            torch.tensor (self.trg.iloc[idx])
        ]

    def __len__(self):
        assert len(self.src) == len(self.trg)
        return len(self.trg)

    def __repr__(self):
        return f'WineDataset ({self.datasetPercent*100}% of full dataset)'


class WineDataModule (pl.LightningModule):
    def __init__ (self, dataPath, splits = [float,], batch_size = 32, shuffle=False, num_workers=0):
        super().__init__()
        
        self.batch_size = batch_size
        self.dataPath = dataPath
        self.splits = splits
        
        self.shuffle = shuffle
        self.num_workers = num_workers
        
    def setup (self, stage: str = None):
        dataset = pd.read_csv (self.dataPath, delimiter=';')
        
        assert len (self.splits) == 3, ValueError ('Splits must be of length 3: trainset, validset, testset')
        splits = self.split_data (dataset, self.splits)
        self.trainset, self.validset, self.testset = self.build_datasets (splits)
        
    def build_datasets (self, splits):
        return [
            WineDataset (
                split['dataset'], 
                datasetPercent=split['split_percent']
            ) for split in splits
        ]
        
    def split_data (self, dataset, splits):
        datasets = list()
        dataLen = len(dataset)
        
        prev = 0
        for i, split in enumerate(splits): 
            prev = int(sum (splits[:i])*dataLen)
            datasets.append (
                {
                    'dataset':dataset[prev:prev+int(dataLen*split)-1],
                    'split_percent':split
                 }
            )
            
        return datasets
        
    def train_dataloader (self):
        return DataLoader (
            self.trainset, 
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False
            )
    
    def val_dataloader (self):
        return DataLoader (
            self.validset, 
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False
            )
    
    def test_dataloader (self):
        return DataLoader (
            self.testset, 
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False
            )
        
        

In [8]:
dataPath = r'C:\Code\ML_GUI\winequality-white.csv'
df = pd.read_csv (dataPath, delimiter =';')

# x = WineDataset (df, 1)
# dl = DataLoader (x, batch_size = 32)

dm = WineDataModule (dataPath, [0.8, 0.1, 0.1], batch_size = 32)
dm.setup()
dl = dm.train_dataloader()

In [9]:
for batch in dl:
    print (batch[0].shape)
    print (batch[1].shape)
    
    break

torch.Size([32, 11])
torch.Size([32])


In [10]:
inpDict = {
    'Linear1':['Linear2'],
    'Linear2':[]
}

torchComponentsDict = {
    'Linear1': LinearComponent (size=24, num_layers=2),
    'Linear2': LinearComponent (size=1, num_layers=1)
}

g = FlowchartGraph.fromDict (inpDict, 'Linear1', 'Linear2', torchComponentsDict)
print (g)

Index: 1: Linear1
Index: 2: Linear2


In [11]:
graph = g.compileTorchGraph(dm.trainset.src.shape[1])

In [12]:
# graph.criterion = nn.L1Loss() 

In [13]:
loss, out = graph (batch[0], labels = batch[1])