In [47]:
import numpy as np
import torch
import subprocess
from itertools import product
import string

def get_data(task_name, batch=1000):
    A, b, Z, Z2, hidden, hidden2 = load_tensors(task_name)
    Z = np.round(Z); Z2 = np.round(Z2)
    data = torch.load(f"../tasks/{task_name}/data.pt")
    inputs = data[0].detach().numpy()
    outputs = data[1].detach().numpy()

    if len(data[0].shape) == 2:
        inputs_last = data[0][:,[-1]].detach().numpy()
        input_dim_larger_than_one_flag = False
    else:
        input_dim = data[0].shape[2]
        if input_dim > 1:
            input_dim_larger_than_one_flag = True
        else:
            input_dim_larger_than_one_flag = False
        inputs_last = data[0][:,-1].detach().numpy()
    outputs_last = data[1][:,[-1]].detach().numpy()
    if len(outputs_last.shape) == 3:
        outputs_last = outputs_last[:,:,0]
    if batch == None:
        return A, b, Z, Z2, hidden, hidden2, inputs, inputs_last, outputs, outputs_last, input_dim_larger_than_one_flag
    else:
        return A, b, Z[:batch], Z2[:batch], hidden[:batch], hidden2[:batch], inputs[:batch], inputs_last[:batch], outputs[:batch], outputs_last[:batch], input_dim_larger_than_one_flag

def load_tensors(task):
    # Construct the file paths
    a_path = f"./tasks/{task}/A_best.pt"
    b_path = f"./tasks/{task}/b_best.pt"
    z_path = f"./tasks/{task}/Z_best.pt"
    z2_path = f"./tasks/{task}/Z2_best.pt"
    hidden = f"./tasks/{task}/hidden.pt"
    hidden2 = f"./tasks/{task}/hidden2.pt"

    # Load the tensors
    A = torch.load(a_path)
    b = torch.load(b_path)
    Z = torch.load(z_path)
    Z2 = torch.load(z2_path)

    hidden = torch.load(hidden)
    hidden2 = torch.load(hidden2)
    return A, b, Z, Z2, hidden, hidden2

def get_symbolic_formula(task):
    A, b, Z, Z2, hidden, hidden2, inputs, inputs_last, outputs, outputs_last, _ = get_data(task)
    hidden_dim = Z.shape[1]
    input_dim = inputs_last.shape[1]

    data = np.loadtxt(f'./tasks/{task}/output.txt', dtype=int)
    hidden_max = np.max(data, axis=0)[:hidden_dim]
    batch = 1000
    np.savetxt(f'./tasks/{task}/output.txt', data[:batch], delimiter=" ", newline=" \n", fmt="%d")

    for i in range(hidden_dim):
        data = np.loadtxt(f'./tasks/{task}/hidden{i}.txt', dtype=int)
        np.savetxt(f'./tasks/{task}/hidden{i}.txt', data[:batch], delimiter=" ", newline=" \n", fmt="%d")
        
    parities = list(product(*[['+','-']]*hidden_dim))
    
    def change_parity(parity):
        for i in range(hidden_dim):
            if parity[i] == '-':
                data = np.loadtxt(f'./tasks/{task}/output.txt', dtype=int)
                data[:,i] = hidden_max[i] - data[:,i]
                np.savetxt(f'./tasks/{task}/output.txt', data[:batch], delimiter=" ", newline=" \n", fmt="%d")

                for j in range(hidden_dim):
                    data = np.loadtxt(f'./tasks/{task}/hidden{j}.txt', dtype=int)
                    data[:,i] = hidden_max[i] - data[:,i]
                    if j == i:
                        data[:,-1] = hidden_max[i] - data[:,-1]
                    np.savetxt(f'./tasks/{task}/hidden{j}.txt', data[:batch], delimiter=" ", newline=" \n", fmt="%d")

    symbolic_formulas = []

    for parity in parities:
        # change parity
        print('trying parity ',parity)
        change_parity(parity)

        # regressing
        subprocess.run(["g++", "-o", "main", "./symbolic_regression/main.cpp", "-std=c++11"])
        # (input, hidden) -> hidden
        for i in range(hidden_dim):
            subprocess.run(["./main", f"{task}", f"/hidden{i}", str(input_dim+hidden_dim)])
        # hidden -> output
        subprocess.run(["./main", f"{task}", "/output", str(hidden_dim)])

        # check success
        success = 0
        for i in range(hidden_dim):
            success += (np.loadtxt(f'./tasks/{task}/hidden{i}_symbolic.txt', dtype=str).shape[0] > 0)
        success += (np.loadtxt(f'./tasks/{task}/output_symbolic.txt', dtype=str).shape[0] > 0)

        print('success = ',success)

        if success == hidden_dim + 1:
            for i in range(hidden_dim):
                symbolic_formulas.append(np.loadtxt(f'./tasks/{task}/hidden{i}_symbolic.txt', dtype=str)[0])
            symbolic_formulas.append(np.loadtxt(f'./tasks/{task}/output_symbolic.txt', dtype=str)[0])
            
            # change A & b
            for i in range(hidden_dim):
                if parity[i] == '-':
                    A[i,:] = - A[i,:]
                    b = b - hidden_max[i] * A[i,:]
                torch.save(A, f"./tasks/{task}/A_best.pt")
                torch.save(b, f"./tasks/{task}/b_best.pt")
            
            break

        # revert parity
        change_parity(parity)


    if success < hidden_dim + 1:
        for i in range(hidden_dim):
            symbolic_formulas.append('a')
        symbolic_formulas.append('a')
        
    return symbolic_formulas


def RPN2function(RPN):
    #RPN = "bA<>a-A"
    function_string = ""
    stack = []
    
    char0 = ['a','b','c','d','e','f','g','h','i','j','k','1','2','3','4','5','6','7','8','9']
    char1 = ['>','<','~','H','D','A','U','V']
    char2 = ['+','*','-','%']

    def add_parenthesis(string):
        if len(string) > 1:
            string = "(" + string + ")"
        return string

    for i in range(len(RPN)):
        char = RPN[i]
        if char in char0:
            stack.append(char)
        if char in char1:
            if char == 'H' or char == 'D' or char == 'A':
                stack[-1] = char + "(" + stack[-1] + ")"
            if char == '>':
                stack[-1] = stack[-1] + "+1"
            if char == '<':
                stack[-1] = stack[-1] + "-1"
            if char == 'U':
                stack[-1] = stack[-1] + "+100"
            if char == 'V':
                stack[-1] = stack[-1] + "-100"
            if char == '~':
                stack[-1] = "-" + add_parenthesis(stack[-1])
        if char in char2:
            stack[-2] = add_parenthesis(stack[-2]) + char + add_parenthesis(stack[-1])
            stack.pop()


    function_string = stack[0].replace('A', 'abs').replace('D', 'delta').replace('H','heaviside')
    return function_string


def delta(x):
        return x == 0

def heaviside(x):
    return x > 0

In [48]:
# produce program

def produce_program_symbolic_regression(task_name, print_code=False, function_strings=None, h_round=None):
    data = get_data(task_name)
    A, b, Z, Z2, hidden, hidden2, inputs, inputs_last, outputs, outputs_last, _ = data
    
    num_seq = inputs.shape[1]
    num_data = inputs_last.shape[0]
    dim_hidden = Z.shape[1]
    if len(inputs_last.shape) == 1:
        dim_input = 1
    else:
        dim_input = inputs_last.shape[1]
        
    if dim_hidden + dim_input > 6 or dim_hidden > 2:
        print("input_dim + hidden_dim > 6 or hidden_dim > 2, skip...")
        code = "success_flag = 0 # input_dim + hidden_dim > 6 or hidden_dim > 2 for symbolic regression"
        with open(f'./programs/{task}.txt'.replace('_','-'),"w") as f:
            f.writelines(code)
        return code, 0
        
    if function_strings == None:
        symbolic_formulas = get_symbolic_formula(task_name)
        function_strings = [RPN2function(formula) for formula in symbolic_formulas]

    if h_round == None:
        # hidden should be initialized to be the 
        h = - np.round(np.matmul(b, np.linalg.inv(A))).astype('int')
    else:
        for i in range(dim_hidden):
            h = - np.matmul(b, np.linalg.inv(A))
            if len(h.shape) == 2:
                h = h[0]
            if h_round[i] == '+':
                h[i] = np.ceil(h[i]).astype('int')
            else:
                h[i] = np.floor(h[i]).astype('int')
    
    variables = string.ascii_lowercase[:14] # hidden variables denoted as a,b,c,d,...; input variable denoted as x (or x1 & x2); output variable denoted as y.
    # initialize_code (initialize hidden states)
    initialize_code = ""
    if len(h.shape) == 2:
        h = h[0]
    for i in range(dim_hidden):
        initialize_code += f"{variables[i]} = {h[i]};"
        
    # hidden code
    hidden_code = ""
    
    for i in range(dim_hidden):
        hidden_i_code = f"next_{variables[i]} = {function_strings[i]}"
        if i == 0:
            hidden_code = hidden_i_code
        else:
            hidden_code += "\n        " + hidden_i_code
            
    hidden_code += "\n        "
            
    for i in range(dim_hidden):
        hidden_code += f"{variables[i]} = next_{variables[i]};"
            
    
    # output code
    output_code = f"y = {function_strings[-1]}"
    
    if dim_input == 2:
        function_code = f"""
def f(s,t):
    {initialize_code}
    ys = []
    for i in range({num_seq}):
        {variables[dim_hidden]} = s[i]; {variables[dim_hidden+1]} = t[i];
        {hidden_code}
        {output_code}
        ys.append(y)
    return ys
    """
        
    if dim_input == 1:
        function_code = f"""
def f(s):
    {initialize_code}
    ys = []
    for i in range({num_seq}):
        {variables[dim_hidden]} = s[i]
        {hidden_code}
        {output_code}
        ys.append(y)
    return ys
    """

    
    preprocess_code = f"""
import numpy as np
data = get_data(\"{task_name}\")
A, b, Z, Z2, hidden, hidden2, inputs, inputs_last, outputs, outputs_last, _ = data
num_example = inputs.shape[0]
    """
    
    if dim_input == 1:
        verify_code = f"""
wrong = 0
for i in range(int(num_example*0.01)):
#for i in range(int(num_example)):
    if i % 100000 == 0:
        print(i)
    out_pred = f(inputs[i])
    wrong += np.sum(1 - (np.array(outputs[i]) == np.array(out_pred)))
    
if wrong == 0:
    success_flag = 1
    print('verification success')
else:
    success_flag = 0
    print('verification failure')
"""
    
    if dim_input == 2:
        verify_code = f"""
wrong = 0
for i in range(int(num_example*0.01)):
#for i in range(int(num_example)):
    out_pred = f(inputs[i,:,0], inputs[i,:,1])
    wrong += np.sum(1 - (np.array(outputs[i]) == np.array(out_pred)))
    
if wrong == 0:
    success_flag = 1
    print('verification success')
else:
    success_flag = 0
    print('verification failure')
"""
    
    code = f"""
{preprocess_code}
{function_code}
{verify_code}
"""
    if print_code == True:
        print(code)
    with open(f'./programs/{task}.txt'.replace('_','-'),"w") as f:
        f.writelines(function_code)
    return code, function_strings
    

In [56]:
'''task = 'rnn_majority0_1_numerical'
code, function_strings = produce_program_symbolic_regression(task, print_code=True)
exec(code)
success_flag
code, function_strings = produce_program_symbolic_regression(task, print_code=True, h_shift=-1, function_strings=function_strings)
exec(code)
success_flag'''

trying parity  ('+',)


};
^


symbolic regressing
symbolic regressing
success =  1
trying parity  ('-',)


  success += (np.loadtxt(f'./tasks/{task}/hidden{i}_symbolic.txt', dtype=str).shape[0] > 0)
};
^


symbolic regressing


KeyboardInterrupt: 