In [34]:
from typing import Optional, Any
from graphviz import Digraph


# Classes definition

In [35]:
class State:
    """
    Transducer's automaton state representation
    """
    def __init__(self, name)-> None:
        self.name = name
        
    def __repr__(self)-> str:
        return self.name
    

class Transition:
    """
    Represents state + input -> result 
    """
    def __init__(self, state: State, input, output)-> None:
        """
        Create portion of transition change
        :param state: state of automaton
        :param input: input 
        :param output: state for transition function or output value for output function
        """
        self.output = output
        self.input = input
        self.state = state
        self.name = str(state) + "+" + str(input) + "->" + str(output)
        
    def __repr__(self)-> str:
        return self.name 
        

class Transducer:
    def __init__(self, name: str, inputs: set, outputs: set, states: set,
                 init_state: State, transitions_f: list, output_f: list)-> None:
        """
        
        :param inputs: set of possible inputs
        :param outputs: set of possible outputs
        :param states: set of states
        :param init_state: initial transducer state
        :param transitions_f: list of transitions for states
        :param output_f: list of transitions for outputs
        """
        super().__init__()
        self.name = name
        self.output_f = output_f
        self.transitions_f = transitions_f
        self.init_state = init_state
        self.states = states
        self.outputs = outputs
        self.inputs = inputs
    
    def get_transaction(self, state: State, inp)-> Optional[State]:
        """
        Searches through out of all defined transactions for a transaction
        :param state: current state of automaton
        :param inp: input of automaton
        :return: state of automaton after transaction or None
        """
        for trans in self.transitions_f:
            if trans.state.name == state.name and trans.input == inp:
                return trans.output
        return None
                
    
    def get_output(self, state: State, inp) -> Optional[Any]:
        """
        
        :param state: 
        :param inp: 
        :return: output of output function or None
        """
        for trans in self.output_f:
            if trans.state.name == state.name and trans.input == inp:
                return trans.output
        return None


## Create target transducer

In [36]:
# Create target transducer form paper
# States
s0 = State('s0')
s1 = State('s1')
s2 = State('s2')

# Inputs
cleansq = 'cleansq'
cleanrd = 'cleanrd'
greensq = 'greensq'
yellowrd = 'yellowrd'

# Outputs
sqclean = 'sqclean'
rdclean = 'rdclean'
sqgreen = 'sqgreen'
rdyellow = 'rdyellow'

# Transitions
transitions = set()
transitions.add(Transition(s0, cleansq, s1))
transitions.add(Transition(s0, cleanrd, s2))
transitions.add(Transition(s1, greensq, s0))
transitions.add(Transition(s2, yellowrd, s0))

# Outputs
outputs = set()
outputs.add(Transition(s0, cleansq, sqclean))
outputs.add(Transition(s0, cleanrd, rdclean))
outputs.add(Transition(s1, greensq, sqgreen))
outputs.add(Transition(s0, yellowrd, rdyellow))

input_alphabet = {cleansq, cleanrd, greensq, yellowrd}
output_alphabet = {sqclean, rdclean, sqgreen, rdyellow}

target_transducer = Transducer('Target', input_alphabet, output_alphabet,
                               {s0, s1, s2}, s0, transitions, outputs)


## Produce an automaton of the target transducer

In [37]:
def get_transducer_automaton(transducer: Transducer):
    from IPython.display import Image
    dot = Digraph(comment='The Automaton of %s transducer' % transducer.name)

    for state in list(transducer.states):
        dot.node(state.name, state.name)

    for trans in list(transducer.transitions_f):
        dot.edge(trans.state.name, trans.output.name, trans.input)

    filename = '../log/%s_transducer' % transducer.name
    dot.render(filename, format='png')
    return Image(url= filename + '.png')

In [38]:
get_transducer_automaton(target_transducer)

## Create production resources P

In [39]:
# Cleaner
s01 = State('s01')

transitions = set()
transitions.add(Transition(s01, cleansq, s01))
transitions.add(Transition(s01, cleanrd, s01))

outputs = set()
outputs.add(Transition(s01, cleansq, sqclean))
outputs.add(Transition(s01, cleanrd, rdclean))

cleaner = Transducer("Cleaner", {cleansq, cleanrd}, {sqclean, rdclean}, {s01}, s01, transitions, outputs)
get_transducer_automaton(cleaner)

In [40]:
# Painter of square parts into green color
s02 = State('s02')

transitions = set()
transitions.add(Transition(s02, greensq, s02))

outputs = set()
outputs.add(Transition(s02, greensq, sqgreen))

paint_green = Transducer("PaintGreen", {greensq}, {sqgreen}, {s02}, s02, transitions, outputs)
get_transducer_automaton(paint_green)

In [41]:
# Painter of round parts into yellow color
s03 = State('s03')

transitions = set()
transitions.add(Transition(s03, yellowrd, s03))

outputs = set()
outputs.add(Transition(s03, yellowrd, rdyellow))

paint_yellow = Transducer("PaintYellow", {yellowrd}, {rdyellow}, {s03}, s03, transitions, outputs)
get_transducer_automaton(paint_yellow)

In [42]:
# Production resources
P = [cleaner, paint_green, paint_yellow]
# Error state
s_err = State('s_err')
out_err = "<error>"

# Safety game for orchestration problem
G = ({inputs}, {1, ..., m} Q, q0, ro)
## Define game state and game error state

In [43]:
class GameState(State):
    """
    State defined as cartesian product of state of the target transducer
    and manufacturing resources transducers
    """
    def __init__(self, target_state: State, states: list)-> None:
        super().__init__(GameState._generate_state_name(target_state, states))
        self.target_state = target_state
        self.states = states
    
    @staticmethod
    def _generate_state_name(target_state: State, states: list)-> str:
        n = ",".join([state.name for state in states])
        return target_state.name + ";" + n

class GameErrorState(State):
    """
    Error state
    """
    def __init__(self)-> None:
        super().__init__('q_err')


## Build game state space and list of transitions between them
Q = S x S1 x...xSm + q_err


In [53]:
q_err = GameErrorState()
q0 = GameState(target_transducer.init_state, [prod_transd.init_state for prod_transd in P])

game_states = dict()
game_states[q_err.name] = q_err
game_transactions = dict()
explored_states = dict()

def test_target_for_state(curr_state: GameState):
    
    # If in error state stop exploring
    if curr_state == q_err:
        print("Called to explore error state. Skip")
        return
    
    # If already explored then stop
    if explored_states.get(curr_state.name) is not None:
        print("Already in explored states: %s Skip" % curr_state.name)
        return
    else:
        explored_states[curr_state.name] = curr_state
    
    for input in input_alphabet:
        
        new_state = target_transducer.get_transaction(curr_state.target_state, input)
        target_output = target_transducer.get_output(curr_state.target_state, input)
        
        if new_state is None:
            new_state = s_err
            target_output = out_err
        
        # Check for res output
        for h in range(len(P)):
            res_transd = P[h]
            res_curr_state = curr_state.states[h]
            
            new_res_state = res_transd.get_transaction(res_curr_state, input)
            res_output = res_transd.get_output(res_curr_state, input)
            
            if new_res_state is None:
                new_res_state = s_err
                res_output = out_err
            
            if res_output == target_output:
                new_res_states = list(curr_state.states)
                new_res_states[h] = new_res_state
                new_game_state = GameState(new_state, new_res_states)
            else:
                new_game_state = q_err
            
            if game_states.get(new_game_state.name, None) is None:
                game_states[new_game_state.name] = new_game_state
            else:
                new_game_state = game_states[new_game_state.name]
            
            transition = Transition(curr_state, input, new_game_state)
            if game_transactions.get(transition.name, None) is None:
                game_transactions[transition.name] = transition
            
            # Explore states if automatons aren't in error state
            if new_res_state != s_err and new_state != s_err:
                test_target_for_state(new_game_state)
            
# Run game states generation from target init state
test_target_for_state(q0)

print("After game state space exploration following states were found:")
for g_s_name in game_states.keys():
    print(game_states[g_s_name])

print("\nand following transactions:")
for g_t_name in game_transactions.keys():
    print(game_transactions[g_t_name])

Called to explore error state. Skip
Already in explored states: s0;s01,s02,s03 Skip
After game state space exploration following states were found:
q_err
s_err;s_err,s02,s03
s_err;s01,s_err,s03
s2;s01,s02,s03
s_err;s01,s02,s_err
s1;s01,s02,s03
s0;s01,s02,s03

 and following transactions:
s0;s01,s02,s03+yellowrd->s_err;s_err,s02,s03
s0;s01,s02,s03+yellowrd->s_err;s01,s_err,s03
s0;s01,s02,s03+yellowrd->q_err
s0;s01,s02,s03+cleanrd->s2;s01,s02,s03
s2;s01,s02,s03+yellowrd->q_err
s2;s01,s02,s03+cleanrd->q_err
s2;s01,s02,s03+cleanrd->s_err;s01,s_err,s03
s2;s01,s02,s03+cleanrd->s_err;s01,s02,s_err
s2;s01,s02,s03+greensq->s_err;s_err,s02,s03
s2;s01,s02,s03+greensq->q_err
s2;s01,s02,s03+greensq->s_err;s01,s02,s_err
s2;s01,s02,s03+cleansq->q_err
s2;s01,s02,s03+cleansq->s_err;s01,s_err,s03
s2;s01,s02,s03+cleansq->s_err;s01,s02,s_err
s0;s01,s02,s03+cleanrd->q_err
s0;s01,s02,s03+greensq->s_err;s_err,s02,s03
s0;s01,s02,s03+greensq->q_err
s0;s01,s02,s03+greensq->s_err;s01,s02,s_err
s0;s01,s02,s03+cle