# 92-separate-constraints-into-dag

In [1]:
import networkx as nx
from gpx import Model, Variable
from dataclasses import dataclass

In [2]:
import pint

In [3]:
m = Variable('m', 'count')
lam = Variable('\\lambda', 'count/hr')
t = Variable('t_{process}', 'min', 'process time')


# Some OO ideas

## Parametric inputs

In [9]:
import gpkit
import networkx as nx
from gpkit import VarKey, keydict
import pint
from pint import UnitRegistry
ureg = UnitRegistry()
from typing import List, Dict, Union
from dataclasses import dataclass
from adce import adnumber

In [3]:
@dataclass
class ParametricVariable:
    'defines the nodes'
    name: str  # name of the variable
    varkey: VarKey  # the gp varkey
    qty: pint.Quantity

    is_input: bool=False  # whether the variable is an input variable
    is_leaf: bool=False  # is the variable a leaf

    magnitude: float
    unit: str
    
    base_unit_val: float  # the value in base units
    adnum: adnumber  # the differentiable number

    defining_constraint: ParametricConstraint = None  # if the variable is not an input, this should be defined

def update_value(self, quantity=None):
    'update the value properties'
    if quantity:
        self.qty = quantity
        self.magnitude = getattr(quantity, 'magnitude', quantity)
        self.units = getattr(quantity, 'Units', '')
    else:
        self.qty = self.magnitude * self.ureg(self.unit)
    
    self.base_unit_val = self.qty.to_base_units().magnitude
    self.adnum = adnumber(self.base_unit_val)

In [10]:
class ParametricConstraint:
    '''a parametric constraint

    - this class also defines the edge of the graph
    '''
    def __init__(self, constraint_as_list:list, inputvars: Dict[VarKey, ParametricVariable]) -> None:        
        self.inputvars = inputvars  # input variables
        self.constraint: list = constraint_as_list  # the constraint represented as a list of 
        self.outputvar: ParametricVariable  # the output variable

        # the function representation of the constraint
        self.func = None 
        # run the factory
        self.func_factory()

        # self.sym_convert()
        # self.func_convert()


    def func_factory(self, constraint_as_list:list=None):
        'function factory to create the evaluate function for the class'
        
        precedence = TOKEN_PRECEDENCE
        expression = constraint_as_list if constraint_as_list else self.constraint

        #TODO:  change the power operator

        #TODO:  is this operating on varkeys or strings?
    
        def apply_operation(self, operators, operands):
            operator = operators.pop()
            operand2 = operands.pop()
            operand1 = operands.pop()
            if operator == '+':
                operands.append(operand1 + operand2)
            elif operator == '-':
                operands.append(operand1 - operand2)
            elif operator == '*':
                operands.append(operand1 * operand2)
            elif operator == '/':
                operands.append(operand1 / operand2)
            elif operator == '^':
                operands.append(operand1 ** operand2)

        def math_function(**variables):
            operators = []
            operands = []
            
            for token in expression:
                if token in precedence:
                    while (operators and operators[-1] in precedence and
                        precedence[operators[-1]] >= precedence[token]):
                        apply_operation(operators, operands)
                    operators.append(token)
                # elif token is number:
                else:
                    operands.append(variables[token])
            
            while operators:
                apply_operation(operators, operands)
            
            return operands[0]
        
        # Set the math function
        self.func = math_function


    def get_subgraph(self) -> nx.DiGraph:
        'get the directed subgraph for the constraint'
        subgraph = nx.DiGraph()
        # add the edges from all the inputs to the output var and color by the constraint
        subgraph.add_edges_from([(iv, self.outputvar) for iv in self.inputvars.values()], constraint=self)

        return subgraph


    def evaluate(self, inputs:Dict[VarKey, ParametricVariable]) -> Dict[VarKey, ParametricVariable]:
        'evaluate the constraint for the inputs (quantites and adnumbers) and save to the output variable'
        qty_inputs = {str(k) : v.qty for k,v in inputs.items()}
        adnum_inputs = {str(k) : v.adnumber for k,v in inputs.items()}

        # evaluate with pint quantites
        self.outputvar.qty = self.func(**qty_inputs)

        # evaluate with adnumbers
        self.outputvar.adnum = self.func(**adnum_inputs)


    def adiff(self) -> keydict:
        'perform the automatic differentiation aand return keydict of gradients'
        pass


    def update_output_var(self, outputvar:ParametricVariable):
        'update the outputvar variable'
        self.outputvar = outputvar
        outputvar.defining_constraint = self

NameError: name 'ParametricVariable' is not defined

In [5]:
class ParametricInputs:
    'holds all the parametric inputs and the graph'
    def __init__(self):
        self.constraints: List[ParametricConstraint] = []  # list of the constraints
        self.inputs = []  # parametric inputs
        self.terminal_nodes = []  # terminal nodes
        self.graph = nx.DiGraph()  # the graph of the parametic models

        self.nodes: List[ParametricVariable]


    def add_constraint(self, constraint: ParametricConstraint):
        # update the list of inputs
        pass


    def create_graph(self):
        'update the graph from the constraints'
        # self.graph = nx.compose()
        self.graph = nx.compose_all([pc.get_subgraph() for pc in self.constraints])
        return self.graph


    def add_terminal_node(self, node):
        pass


    def get_substitutions(self) -> Dict[VarKey, pint.Quantity]:
        'get the substitution dict to send to the gpkit model'
        # get the graph in a topological sort order
        n: ParametricVariable
        for n in self.graph.topological_sort():
            if n.is_input:
                # this does not need to be calculated
                continue
            # otherwise evaluate the constraint that defines the variable
            n.defining_constraint.evaluate(inputs=self.nodes)

            #TODO:  if the node has 0 connectivity, color it a terminal node
            

        # generate a substitution dict from the nodes (which should all be calculated now)        
        return {k.varkey : v.qty for k,v in self.nodes.items()}


    def _get_sorted_graph(self):
        'get the graph in a topological sort order'
        # the networkx gives the sorted list of nodes
        # if the node is an input, it can be skipped
        # if the node is not an input, need to evaluate the constraint
        pass


    def update_results(self, sol) -> dict:
        'update the results from the gpx solve'
        # add the values to the solutions

        # update the sensitivities
        # sum up the log sensitivities
        pass

    def check_graph(self):
        'checks to make sure the graph is indeed acyclic'
        # all leaf nodes g√∂back to nodes that are in the list of input nodes

In [14]:
from gpkit import ureg

## Test with an input

In [1]:
from gpkit import Model

In [None]:
# a little model
m = Variable('m', 'count')
lam = Variable('lambda', 'count/hr')
t = Variable('t_process', 'min', 'process time')

In [None]:
constr = [lam*t == m]
m = Model()

# Auto differentiation

In [12]:
from adce import adnumber

In [37]:
x = 13*ureg('min/m')

In [38]:
x_ad = adnumber(x.to_base_units().magnitude)

In [48]:
y = 11*ureg('m')
y_ad = adnumber(y.to_base_units().magnitude)

In [49]:
y*x

In [41]:
z_ad = x_ad*y_ad

In [43]:
z_ad.gradient(y_ad)

[780.0]

In [44]:
print(y_ad)
print(z_ad)

ad(10)
ad(7800.0)


In [46]:
z_ad.gradient(y_ad)[0]*y_ad.real/z_ad.real

1.0

# Expression Factory

In [50]:
def create_math_function(expression):
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2, '^': 3}
    
    def apply_operation(operators, operands):
        operator = operators.pop()
        operand2 = operands.pop()
        operand1 = operands.pop()
        if operator == '+':
            operands.append(operand1 + operand2)
        elif operator == '-':
            operands.append(operand1 - operand2)
        elif operator == '*':
            operands.append(operand1 * operand2)
        elif operator == '/':
            operands.append(operand1 / operand2)
        elif operator == '^':
            operands.append(operand1 ** operand2)
    
    def math_function(**variables):
        operators = []
        operands = []
        
        i = 0
        while i < len(expression):
            token = expression[i]
            if token in precedence:
                while (operators and operators[-1] in precedence and
                       precedence[operators[-1]] >= precedence[token]):
                    apply_operation(operators, operands)
                operators.append(token)
            else:
                operands.append(variables[token])
            i += 1
        
        while operators:
            apply_operation(operators, operands)
        
        return operands[0]
    
    return math_function



# variables = {'a': 5, 'b': 10, 'c': 3, 'd': 2, 'e': 3}
# result = math_func(**variables)
# print(result)  # Output: -9


In [5]:
def create_math_function(expression):
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2, '^': 3}
    
    def apply_operation(operators, operands):
        operator = operators.pop()
        operand2 = operands.pop()
        operand1 = operands.pop()
        if operator == '+':
            operands.append(operand1 + operand2)
        elif operator == '-':
            operands.append(operand1 - operand2)
        elif operator == '*':
            operands.append(operand1 * operand2)
        elif operator == '/':
            operands.append(operand1 / operand2)
        elif operator == '^':
            operands.append(operand1 ** operand2)
    
    def math_function(**variables):
        operators = []
        operands = []
        
        for token in expression:
            if token in precedence:
                while (operators and operators[-1] in precedence and
                       precedence[operators[-1]] >= precedence[token]):
                    apply_operation(operators, operands)
                operators.append(token)
            else:
                operands.append(variables[token])
        
        while operators:
            apply_operation(operators, operands)
        
        return operands[0]
    
    return math_function



# variables = {'a': 5, 'b': 10, 'c': 3, 'd': 2, 'e': 3}
# result = math_func(**variables)
# print(result)  # Output: -9


In [6]:
expression = ['a', '+', 'b', '-', 'c', '*', 'd', '^', 'e']
math_func = create_math_function(expression)

In [7]:
variables = {'a': 5, 'b': 100, 'c': 3, 'd': 2, 'e': 3}
print(math_func(**variables))

81


In [58]:
expression = ['a', '*', '3', '^', '2', '-', 'b']
math_func = create_math_function(expression)

In [60]:
variables = {'a': adnumber(5), 'b': adnumber(15)}
math_func(**variables)

KeyError: '3'

- interpret the operand, if a number to just be a number
- interpret the carat to be split up