In [1]:
import numpy as np

def buffer(x: np.ndarray):
    return x

def inverter(x: np.ndarray):
    return np.logical_not(x)

def AND(x: np.ndarray):
    return np.logical_and(x[0], x[1])
    
def NAND(x: np.ndarray):
    return np.logical_not(np.logical_and(x[0], x[1]))

def OR(x: np.ndarray):
    return np.logical_or(x[0], x[1])
    
def NOR(x: np.ndarray):
    return np.logical_not(np.logical_or(x[0], x[1]))

basics = {
    'BUF': buffer,
    'INV': inverter,
    'AND': AND,
    'NAND': NAND,
    'OR': OR,
    'NOR': NOR,
    # 'INPUT' = INPUT,
    # 'OUTPUT' = OUTPUT
}

class Circuit:
    
    # initialize the circuit
    # :param file: a file detailing the circuit gates, nodes, and I/O
    def __init__(self, file):
        self.parse(file)
        self.file = file
    
    # parse the input file
    def parse(self, file):
        # open file
        with open(file) as f:
            circuit = f.read()
            
        # double spaces are annoying
        circuit.replace('  ', ' ')
        
        # create an empty dictionary
        self.circuit = {}
        
        # iterate over each line in the file (each gate or input/output)
        for i, element in enumerate(circuit.split('\n')):
            # ignore empty strings/lines
            if element == '':
                continue
            # split the line into each of its components
            info = element.split(' ')
            
            # if the first element is input or output, uses a slightly different format
            # {INPUT: [1, 2, 3...]}
            if info[0] in ['INPUT', 'OUTPUT']:
                self.circuit.update({info[0]: list(map(int, info[2:]))})
            # else use this format
            # {GATE_#: {IN: [1, 2], OUT: [3]}}
            else:
                self.circuit.update({info[0]+f'_{i}': {'IN': list(map(int, info[1:-1])), 'OUT': int(info[-1])}})
        
        # find the largest number in the file
        self.length = 0
        for line in circuit.split('\n'):
            for num in line.split(' '):
                try:
                    num = int(num)
                    if num > self.length:
                        self.length = num
                except:
                    continue
        # add two for direct indexing
        # i.e. index 0 and -1 are ignored, so index 1 -> circuit[1]
        self.length+=2
            
    # simulate the circuit until steady state is reached
    def simulate(self, input_vector):
        # initialize the circuit with all 0s
        self.start = np.asarray([0]*self.length)
        self.next = np.copy(self.start)
        
        print('Input: ' + input_vector)
        # add the input values
        for i, index in zip(input_vector[:], self.circuit['INPUT']):
            self.next[index] = int(i)
        # print(self.next[1:])
        
        print(self.start)
        print(len(self.start))
        # while not in a steady state, continue looping over the gates in the circuit
        while not np.array_equal(self.start, self.next):
            self.start = np.copy(self.next)
            for element in self.circuit.keys():
                if element in ['INPUT', 'OUTPUT']:
                    continue
                self.next[self.circuit[element]['OUT']] = basics[element.split('_')[0]](self.start[self.circuit[element]['IN']])
            # print(self.next[1:])
        print('Output: ', ''.join([str(a) for a in self.next[self.circuit['OUTPUT']][:-1]]))
        return self.next[self.circuit['OUTPUT'][:-1]]
    
    def _parse_faults(self, file):
        with open(file) as f:
            lines = [line.rstrip() for line in f]
        faults = [line.split(' ') for line in lines[1:]]
        return np.asarray(faults, dtype=int)[:, 0].tolist(), np.asarray(faults, dtype=int)[:, 1].tolist()
    
    def simulate_faults(self, input_vector, file=None, fault_nodes=None, fault_values=None, verbose=True):
        
        if file is not None:
            fault_nodes, fault_values = self._parse_faults(file)
        
        if fault_nodes is None and file is None:
            fault_nodes = list(range(self.length))

        if fault_values is None and file is None:
            fault_values = [0, 1]*len(fault_nodes)
            fault_nodes = fault_nodes*2
            fault_nodes.sort()
        
        self.fault_values = fault_values
        self.fault_nodes = fault_nodes
        
        # initialize N copies of the circuit with all 0s
        # each row is N copies of that node
        self.start = np.zeros((self.length, len(fault_values)), dtype=int)       
        self.next = np.copy(self.start)
                
        # add the input values
        for i, index in zip(input_vector[:], self.circuit['INPUT']):
            self.next[index, :] = int(i)
        # print(self.next[1:])
                
        # add the faults into the initial state
        for i, (node, value) in enumerate(zip(fault_nodes, fault_values)):
            self.next[node, i] = value
        
        # while not in a steady state, continue looping over the gates in the circuit
        while not np.array_equal(self.start, self.next):
            self.start = np.copy(self.next)
            for element in self.circuit.keys():
                if element in ['INPUT', 'OUTPUT']:
                    continue
                self.next[self.circuit[element]['OUT']] = basics[element.split('_')[0]](self.start[self.circuit[element]['IN']])
                # reset faults
                for i, (node, value) in enumerate(zip(fault_nodes, fault_values)):
                    self.next[node, i] = value
        outputs = [''.join([str(int(a)) for a in self.next[self.circuit['OUTPUT'], n][:-1]]) for n in range(len(fault_nodes))]
        output = ''
        with open(f'fault_analysis_{self.file.split("/")[1]}', 'a') as f:
            f.write(f'Circuit: {self.file}\nInput Vector: {input_vector}\nFault-Free Output: {outputs[-1]}\n')
            count = 0
            for z, output in enumerate(outputs[:-1]):
                if output != outputs[-1]:
                    count += 1
            if file:
                f.write(f'FAULTS DETECTED: {count} ({int(100*count/(len(fault_nodes)))}%)\n')
            elif not file:
                f.write(f'FAULTS DETECTED: {count} ({int(100*count/(len(fault_nodes)-4))}%)\n')
            for z, output in enumerate(outputs[:-1]):
                if output != outputs[-1]:
                    f.write(f'    {int(fault_nodes[z])} stuck at {int(fault_values[z])}\n')
            f.write('\n')
        with open(f'fault_analysis_{self.file.split("/")[1]}') as file:
            f = file.read()
        ff = f.split('\n\n')
        for x in ff[-2].splitlines():
            print(x)
        return None
        # return self.next[self.circuit['OUTPUT'][:-1]]
    
    def where_input(self, node):
        temp = []
        for element in self.circuit.keys():
            if element in ['INPUT', 'OUTPUT']:
                if node in self.circuit[element]:
                    temp.append(element)
            elif node in self.circuit[element]['IN']:
                temp.append(element)
        return temp
        
    def where_output(self, node):
        for element in self.circuit.keys():
            if element in ['INPUT', 'OUTPUT']:
                if node in self.circuit[element]:
                    return element
            elif node in self.circuit[element]['OUT']:
                return element
    
    def controlling_value(self, gate):
        if gate in ['OR', 'NOR']:
            return 1
        elif gate in ['AND', 'NAND']:
            return 0
        elif gate in ['BUF', 'INV']:
            return None
      
    
    def find_test_pattern(self, fault_node, fault_value, *, verbose=False):
        
        # function for checking implications for inputs
        def imply(i, v, s, *, verbose=False):
            # make sure i, v doesn't violate s
            if s[i]!=v and s[i] is not None:
                if verbose:
                    print('assignment violated initial state', i, v, s)
                return None
            
            # find gates to forward propagate
            input_gates = self.where_input(i)
            
            s0 = s.copy()
            
            s0[i] = v
            if verbose:
                print(f'updating {i} with value {v}')
            
            for input_gate in input_gates:
                # ignore input and output
                if input_gate in ['INPUT', 'OUTPUT']:
                    continue
                
                # define I/O nodes for convenience
                ins = self.circuit[input_gate]['IN']
                out = self.circuit[input_gate]['OUT']
                
                # if controlling value is v or all inputs are set, evaluate 
                # and potentially propagate output if no conflicts
                if (self.controlling_value(input_gate.split('_')[0])==v
                    or None not in s0[ins]):
                    
                    # evaluate state
                    if s0[ins][0]==None:
                        s0[out] = int(basics[input_gate.split('_')[0]](s0[ins][::-1]))
                    else:
                        jj = basics[input_gate.split('_')[0]](s0[ins])
                        if isinstance(jj, np.ndarray):
                            jj = jj[0]
                        s0[out] = int(jj)
                                            
                    # if value conflicts with previous assignment, error!
                    if s[out]!=None and s[out]!=s0[out]:
                        if verbose:
                            print('conflict with previous assignment')
                        return None
                    # if value conflicts with the fault assignment, error!
                                         
                    # propagate output and assign to state
                    s0 = imply(out, s0[out], s0.copy(), verbose=verbose)
                    
                    if s0 is None:
                        return None
                    
                # else can not propagate value further, so continue looping over gates
                else:
                    continue
            return s0
                                                               
        def propagate(i, v, g, s, fn, fv, *, verbose=False):
            # i: input node
            # v: value to assign
            # gs: gate to propagate down
            # s: current circuit state
            # fn: the fault node being tested
            # fv: the fault value
                        
            # check that assignment doesn't violate state
            if s[i]!=v and s[i] is not None:
                if verbose:
                    print('assignment violated initial state', i, v, s, fn, fv)
                return None
                
            # if assignment doesn't violate state, assign value
            s[i] = v
            
            # find gates to forward propagate
            input_gates = self.where_input(i)
                        
            # loop over relevant gates
            for input_gate in [g]:
                # ignore input and output
                if input_gate in ['INPUT', 'OUTPUT']:
                    continue
                
                # create a temporary state for comparison
                s0 = s.copy()
                                
                # define I/O nodes for convenience
                ins = self.circuit[input_gate]['IN']
                out = self.circuit[input_gate]['OUT']
                
                # imply forward from fault node, update objective while implying
                if i == fn:  # if node to be set is fault node
                    
                    # identify controlling value for the gate
                    c = self.controlling_value(input_gate.split('_')[0])
                    
                    # assign non-controlling value to other input
                    temp = ins.copy()
                    temp.remove(i)
                    if len(temp)!=0:  # buffers/inverters trivial
                        temp = temp[0]
                        s0[temp] = int(not c)
                        if verbose:
                            print(f'updating {temp} with value {s0[temp]}, back')
                        
                        # stop if back-propagating breaks things
                        if (s[temp] is not None) and (s0[temp]!=s[temp]):
                            if verbose:
                                print('back-propagating violation', s, s0, temp)
                            return None
                    
                    # evaluate gate output
                    jj = basics[input_gate.split('_')[0]](s0[ins])
                    if isinstance(jj, np.ndarray):
                        jj = jj[0]
                    s0[out] = int(jj)
                    
                    if verbose:
                        print(f'updating {out} with value {s0[out]}, forward')
                    
                    # if value conflicts with previous assignment, stop
                    if s[out] is not None and s[out]!=s0[out]:
                        if verbose:
                            print('conflict with previous assignment')
                        return None
                    
                    # if value conflicts with the fault testing, stop
                    if s0[fn] == fv:
                        if verbose:
                            print('conflict with fault testing')
                        return None
                    
                    if any([o is not None for o in s0[self.circuit['OUTPUT']]]):
                        return s0
                    
                    # treat the output as an error and propagate
                    # basically tracking D/Dbar
                    temp = s0
                    
                    for gate in self.where_input(out):
                        temp = propagate(out, s0[out], gate, s0[:], out, int(not s0[out]), verbose=verbose)
                        if temp is None:
                            temp = s0
                    s0 = temp
                    
                    
                    if s0 is None:
                        return None
                    else:
                        s = s0
                        
                # else can not propagate value further, so continue looping
                else:
                    s = s0
                    continue
                    
            # when all loops/recursion complete, return the state of the circuit
            return s
                
        
        # establish inputs
        primary_inputs = self.circuit['INPUT'][:-1]
        
        # use None values for initial states
        state = np.full(self.length, None)

        # add value to test for fault
        state[fault_node] = int(not fault_value)
            
        ref_state = state.copy()
            
        fault_inputs = self.where_input(fault_node)
        
        def test_inputs(input_nodes, current_state, *, verbose=False):
            if len(input_nodes)==0 and current_state is not None:
                return current_state
            elif current_state is None:
                return current_state
            else:
                # try input = 0
                temp_state = imply(input_nodes[0], 0, current_state.copy(), verbose=verbose)
                if verbose:
                    print(temp_state)
                if temp_state is None:
                    temp_state = imply(input_nodes[0], 1, current_state.copy(), verbose=verbose)
                return test_inputs(input_nodes[1:], temp_state, verbose=verbose)
                
        
        # outer loop iterates over paths for fault propagation
        for fault_input in fault_inputs:
            # reset state to reference for each iteration
            state = ref_state
            
            # try to propagate fault to an output through gate fault_input
            state = propagate(fault_node, int(not fault_value), fault_input, state.copy(), fault_node, fault_value, verbose=verbose)
            
            if verbose:
                print(state)
            
            # if there is an error, go to next path
            if state is None:
                continue
            # if none of the outputs are set, go to next path
            elif all([o is not None for o in state[self.circuit['OUTPUT']]]):
                continue
            # if there is no error and at least one of the output states are set, start PI assignment
            elif any([o is not None for o in state[self.circuit['OUTPUT']]]):
                temp = state.copy()
                # test values
                # will return final state after input assignment
                temp = test_inputs(primary_inputs, temp, verbose=verbose)
                # if error, no inputs worked
                if temp is None:
                    continue
                elif temp is not None:
                    return ''.join([str(a) for a in temp[primary_inputs]])
            else:
                continue
                    
        return 'Failed to generate test vector'


## S27

In [2]:
file = 'Circuit Inputs/s27.txt'
c = Circuit(file)

In [3]:
input_vector = c.find_test_pattern(11, 0)
input_vector

'0000001'

In [4]:
c.simulate_faults(input_vector)

Circuit: Circuit Inputs/s27.txt
Input Vector: 0000001
Fault-Free Output: 0011
FAULTS DETECTED: 13 (32%)
    1 stuck at 1
    3 stuck at 1
    5 stuck at 0
    7 stuck at 1
    8 stuck at 1
    9 stuck at 1
    10 stuck at 0
    11 stuck at 0
    12 stuck at 1
    14 stuck at 0
    15 stuck at 1
    19 stuck at 1
    20 stuck at 1


## S298f_2

In [6]:
file = 'Circuit Inputs/s298f_2.txt'
c = Circuit(file)

In [7]:
input_vector = c.find_test_pattern(68, 0)
input_vector

'Failed to generate test vector'

In [None]:
c.simulate_faults(input_vector)

## S344f_2

In [None]:
file = 'Circuit Inputs/s344f_2.txt'
c = Circuit(file)

In [9]:
input_vector = c.find_test_pattern(91, 0)
input_vector

'01010100000000000'

In [None]:
c.simulate_faults(input_vector)

## S349f_2

In [11]:
file = 'Circuit Inputs/s349f_2.txt'
c = Circuit(file)

In [12]:
input_vector = c.find_test_pattern(179, 0)
input_vector

'000000000000000000000000'

In [None]:
c.simulate_faults(input_vector)