In [1]:
#utility functions
import subprocess,random

def write_to_file(file_name,trace):
    with open(file_name,'w') as f:
        f.write(trace)


#parameters are file names
def execute_clingo(trace, automata):
    script = "./clingo " + trace + " " + automata
    process = subprocess.Popen(script,stdout=subprocess.PIPE,shell=True)
    return process


#need better definition
#check if no notes are left isolated
def check_all_included(connections,final_state):
    for s in range(final_state):
        s_included = False
        for c in connections:
            if( s in c):
                s_included = True
        if not s_included:
            return False
    return True

#mutual exclusive condition needed for the deltas
# return a list of tuple , (from state, number of MX)
# e.g. [(0,2)] means state0 need MX for two state
def mutual_required(deltas):
    from_states = []
    for d in deltas:
        from_states.append(d[0])
    result = []
    for f in from_states:
        count = from_states.count(f)
        if (count>1) :
            result.append((f,count))
    return list(set(result))
        

In [2]:
#only works for integers, take input limit, number of conditions needed
# return list of conditions, guaranteed to be mutually exclusive
def generate_conditions(lower_bound,upper_bound,num):
    if(upper_bound - lower_bound +1 < num):
        print "not so possible to generate so many conditions"
        return []
    results = []
    
    #list of ranges available : [[l,u],[l,u],[l,u]]
    ranges = [[lower_bound,upper_bound]]
    while(len(results)<num):
        #choose a range from list of continuous ranges
        chose_range = ranges[random.randint(0,len(ranges)-1)]
        low = chose_range[0]
        upp = chose_range[1]
        l = random.randint(low,upp)
        u = random.randint(l,upp)
        results.append([l,u])
        
        #update ranges
        left_upper = l-1
        right_lower = u+1
        ranges.remove([low,upp])
        if(left_upper >= low):
            ranges.append([low,left_upper])
        if(right_lower <= upp):
            ranges.append([right_lower,upp])
        
        #reset if requirement can't be met
        possible_ranges = 0
        for r in ranges:
            possible_ranges += r[1]-r[0]
        if(possible_ranges+len(results) < num):
            results = []
            ranges = [[lower_bound,upper_bound]]
    
    return results

In [32]:
import random
#ASSUMPTIONS: only one input, represented by number
#input should include an trace_length 

    
cond_param = 'C'
time_param = 'T'
def gen_states(output, states):
    for s in states:
        output += "state(" + s + ")."
    output += "\n\n"
    return output 

def gen_condition_tostr(output,conditions):
    for c in range(len(conditions)):
        output += "condition"+ str(c) + "("+ time_param + ","+ cond_param+"):-"
        output +=  str(conditions[c][0])+" <= "+cond_param+", "
        output += cond_param+" <= " + str(conditions[c][1]) + ", "
        output += "input("+time_param + ","+cond_param+"). \n"
    output += '\n'
    return output

def gen_deltas_tostr(output,deltas,states):
    for d in range(len(deltas)):
        from_st = deltas[d][0]
        to_st = deltas[d][1]
        mx_conditions = []
        output += "delta("+ states[from_st]+ "," + time_param + ","+ cond_param+ ","+ states[to_st]+ "):-"
        output += "condition"+ str(d) +"("+ time_param + ","+ cond_param +").\n"
    output += '\n'
    return output

def gen_state_trans(output, low, init_state):
    output += "st(" + str(low) + "," + init_state+ ").\n"
    output += "st(T,Y):- st(T-1,S),state(S),state(Y),delta(S,T,C,Y).\n\n"
    return output

#make sure at least there are automatas can run a valid trace
def gen_deltas_valid_path(states):
    deltas = []
    final_state = len(states)-1
    #select an state that is not initial state
    init_c = random.randint(1,final_state) 
    deltas.append((0,init_c))
    # state that is not accpt state
    acpt_c = random.randint(0,final_state-1) 
    deltas.append((acpt_c,final_state))

    #add connection between above two states, 
    #unless they are same or, initial state goes to accpt directly
    if(init_c != acpt_c and not(init_c == final_state and acpt_c == 0)):
        deltas.append((init_c,acpt_c))
        
    return deltas

def gen_include_all_states(deltas, states):
    final_state = len(states)-1
    #After there is a valid path from init to accpt
    #make sure there is no state not connected
    while(not check_all_included(deltas,final_state)):
        from_state = random.randint(0,final_state-2)
        to_state = random.randint(1,final_state-1)
        deltas.append((from_state,to_state))                   

    deltas =  list(set(deltas))
    deltas.sort()
    return deltas

def gen_constraints(output,final_state):
    output += ":- not st(T," + final_state + ")," + "trace_length(T).\n"
    return output
    
# all states can goto error state if none of it's conditions are met
def gen_error_state_conditions(output, error_conditions,states):
    output += "state(error).\n"
    for s in range(len(states)-1):
        output += "delta("+ states[s]+ "," + time_param + ","+ cond_param + ",error):-"
        for c in range(len(error_conditions[s])-1):
            condition_number = error_conditions[s][c]
            
            output +=  "not condition"+ str(c) +"("+ time_param + ","+ cond_param +"),"
        output += "input(" + time_param + ","+ cond_param +").\n"
    return output
    
class Automata:

    #assume first is the initial state and 
    #the last one is the accepting state
    
    states = []
    in_low = 0
    minStates = 2
    
    # to be improved if they works perfectly
    maxCondition = 3
    maxStates = 5
    in_upp = 10
    
    
    def __init__(self, states, in_low, in_upp):
        if (len(states)<self.minStates):
            print "states number not enough, will use default random states"
        self.states = states
        self.in_low = in_low
        self.in_upp = in_upp
        

    
    def generate_automata(self, file_name):
        output = ""
        
        #generate random states if not provided
        if((self.states == []) or (len(self.states) < self.minStates)):
            num_states = random.randint(self.minStates,self.maxStates+1)
            for i in range(num_states):
                self.states.append("state"+str(i))
                
        #limiting inputs
        output += "in_limit(" + str(self.in_upp) + "). \n"
        
        output = gen_states(output, self.states)
        
        #delta generation , list of tuples
        deltas = gen_deltas_valid_path(self.states)
        
        deltas = gen_include_all_states(deltas, self.states)
        
        #generate conditions along with deltas
        mx_needed = mutual_required(deltas)
        #index corresponding to deltas
        conditions = [[]]*len(deltas)
        #index corresponding to states, elements is condition numbers
        error_conditions = [[]]*len(self.states)
        for mx in mx_needed:
            #generate that many conditions, can add a random extra number
            # but then need to pick mx[1] out of them, TODO later
            mx_conditions = generate_conditions(self.in_low,self.in_upp,mx[1])
            for d in range(len(deltas)):
                #if this delta need MX
                from_st = deltas[d][0]
                if (from_st == mx[0]) :
                    if(len(mx_conditions) == 0):
                        print "something wrong in calculating MX?"
                    conditions[d] = mx_conditions[0]
                    error_conditions[from_st] = error_conditions[from_st]+[d]
                    del mx_conditions[0]
               
        #fill up non-mx required conditions
        #TODO add more complex conditions
        for c in range(len(conditions)):
            if (conditions[c]==[]):
                conditions[c] = generate_conditions(self.in_low,self.in_upp,1)[0]
                from_st = deltas[c][0]
                error_conditions[from_st] = [c]
                
#         print deltas
#         print conditions
        
        #output 
        output = gen_condition_tostr(output,conditions)
        output = gen_deltas_tostr(output,deltas,self.states)
        output = gen_state_trans(output,self.in_low, self.states[0])
        output = gen_error_state_conditions(output, error_conditions,self.states)
        output = gen_constraints(output,self.states[-1])
        
        output += "#hide.\n#show st/2.\n"
        
        
        
        ###temporaray:
        output += "trace_length(3).\n input(1,2).input(2,3).input(3,1)"
        return output
                       
    
            
auto = Automata(["state0","state1","state2","state3","state4"],0,10)
print auto.generate_automata('')

in_limit(10). 
state(state0).state(state1).state(state2).state(state3).state(state4).

condition0(T,C):-6 <= C, C <= 9, input(T,C). 
condition1(T,C):-8 <= C, C <= 8, input(T,C). 
condition2(T,C):-10 <= C, C <= 10, input(T,C). 
condition3(T,C):-8 <= C, C <= 10, input(T,C). 
condition4(T,C):-0 <= C, C <= 1, input(T,C). 

delta(state0,T,C,state2):-condition0(T,C).
delta(state1,T,C,state1):-condition1(T,C).
delta(state1,T,C,state4):-condition2(T,C).
delta(state2,T,C,state1):-condition3(T,C).
delta(state2,T,C,state3):-condition4(T,C).

st(0,state0).
st(T,Y):- st(T-1,S),state(S),state(Y),delta(S,T,C,Y).

state(error).
delta(state0,T,C,error):-input(T,C).
delta(state1,T,C,error):-not condition0(T,C),input(T,C).
delta(state2,T,C,error):-not condition0(T,C),input(T,C).
delta(state3,T,C,error):-input(T,C).
:- not st(T,state4),trace_length(T).
#hide.
#show st/2.
trace_length(3).
 input(1,2).input(2,3).input(3,1)


In [4]:
import os
import random
#test if the trace is acceptable or not,
#automata is the file name of automata in ASP
def verify(trace, automata_file):

    #write trace to file
    tmp_file = './trace_tmp.lp'
    write_to_file(tmp_file, trace)
        
    #execute script, call clingo
    res_clingo = execute_clingo(tmp_file, automata_file)
    
    output = ''
    a = res_clingo.stdout.readline()
    #check result
    result = False
    while(a)  :
        output += a
        a = res_clingo.stdout.readline()
        if ("SATISFIABLE" in a):
            result = True
    
    os.remove(tmp_file)
    print output
    return result

# set of possible inputs, outputs and
# and the max length of trace
def generate_trace(inputs,outputs,length):
    trace = ""
    inputs_example = "time"
    
    for i in range(length):
        trace += inputs_example + "("+str(i)+")." 
    return trace 

automata_file = './asp_automatas/simple_alarm.lp'
print verify(generate_trace(1,0,5),automata_file)



False


In [41]:
al = [[3,2],[6,0],[2,3]]
# al = [3,2,1]
al.sort()
print al

[[2, 3], [3, 2], [6, 0]]
