In [1]:
import re

import numpy  as np
import pandas as pd

from IPython.display import display, Markdown, Latex
from scipy           import stats

from pyparsing import (
    Literal,
    Word,
    Group,
    Forward,
    alphas,
    alphanums,
    Regex,
    ParseException,
    CaselessKeyword,
    Suppress,
    delimitedList,
)

# Sympy não é safe segundo a documentação!
# https://docs.sympy.org/latest/modules/core.html#module-sympy.core.sympify

# Exemplo de uso do pyparsing
#https://github.com/pyparsing/pyparsing/blob/master/examples/fourFn.py

print('Done')

Done


# Funções de transformação e operações

In [21]:
epsilon = 1e-12

# Colocando funções numpy nos operadores
opn = {
    "+": np.add,
    "-": np.subtract,
    "*": np.multiply,
    "/": np.true_divide,
    "^": np.power,
    'idasd': lambda x: x
}

fn = {
    "sin": np.sin,
    "cos": np.cos,
    
    #"tan": np.tan,
    "exp": np.exp,
    
    #"abs": np.abs,
    
    "tanh": np.tanh,
    "id"  : lambda x: x,
    "log" : lambda x: 0 if np.isnan(np.log(x)) else np.log(x),
    
    #"trunc": int,
    #"round": round,
    
    "SQRTABS": lambda x: np.sqrt(np.abs(x)), # Sem ponto para não gerar problema no parser
    #"sgn": lambda a: -1 if a < -epsilon else 1 if a > epsilon else 0,
    
    # functionsl with multiple arguments
    #"multiply": lambda a, b: a * b,
    #"hypot": np.hypot,
    
    # functions with a variable number of arguments
    #"all": lambda *a: all(a),
}

# Implementação do parser

> Adaptado de: https://github.com/pyparsing/pyparsing/blob/master/examples/fourFn.py

In [83]:
exprStack = []

def push_first(toks):
    exprStack.append(toks[0])

def push_unary_minus(toks):
    for t in toks:
        if t == "-":
            exprStack.append("unary -")
            
        else:
            break


bnf = None

def BNF():
    """
    expop   :: '^'
    multop  :: '*' | '/'
    addop   :: '+' | '-'
    integer :: ['+' | '-'] '0'..'9'+
    atom    :: PI | E | real | fn '(' expr ')' | '(' expr ')'
    factor  :: atom [ expop factor ]*
    term    :: factor [ multop factor ]*
    expr    :: term [ addop term ]*
    """
    global bnf
    
    if not bnf:
        # use CaselessKeyword for e and pi, to avoid accidentally matching
        # functions that start with 'e' or 'pi' (such as 'exp'); Keyword
        # and CaselessKeyword only match whole words
        e = CaselessKeyword("E")
        pi = CaselessKeyword("PI")
        
        # fnumber = Combine(Word("+-"+nums, nums) +
        #                    Optional("." + Optional(Word(nums))) +
        #                    Optional(e + Word("+-"+nums, nums)))
        # or use provided pyparsing_common.number, but convert back to str:
        # fnumber = ppc.number().addParseAction(lambda t: str(t[0]))
        
        fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?")
        ident = Word(alphas, alphanums + "_$")

        plus, minus, mult, div = map(Literal, "+-*/")
        lpar, rpar = map(Suppress, "()")
        addop  = plus | minus
        multop = mult | div
        expop  = Literal("^")

        expr      = Forward()
        expr_list = delimitedList(Group(expr))
        
        # add parse action that replaces the function identifier with a (name, number of args) tuple
        def insert_fn_argcount_tuple(t):
            fn = t.pop(0)
            num_args = len(t[0])
            t.insert(0, (fn, num_args))

        fn_call = (ident + lpar - Group(expr_list) + rpar).setParseAction(
            insert_fn_argcount_tuple
        )
        atom = (
            addop[...]
            + (
                (fn_call | pi | e | fnumber | ident).setParseAction(push_first)
                | Group(lpar + expr + rpar)
            )
        ).setParseAction(push_unary_minus)

        # by defining exponentiation as "atom [ ^ factor ]..." instead of "atom [ ^ atom ]...", we get right-to-left
        # exponents, instead of left-to-right that is, 2^3^2 = 2^(3^2), not (2^3)^2.
        factor = Forward()
        factor <<= atom + (expop + factor).setParseAction(push_first)[...]
        term = factor + (multop + factor).setParseAction(push_first)[...]
        expr <<= term + (addop + term).setParseAction(push_first)[...]
        bnf = expr
        
    return bnf

def evaluate_stack(s):
    op, num_args = s.pop(), 0
    
    if isinstance(op, tuple):
        op, num_args = op
    
    if op == "unary -":
        return -evaluate_stack(s)
    
    if op in "+-*/^":
        # note: operands are pushed onto the stack in reverse order
        op2 = evaluate_stack(s)
        op1 = evaluate_stack(s)

        aux =  opn[op](op1, op2)
    
        # Provavelmente deu uma operação inválida no expoente ou divisão
        # Controlando limites dos valores nas operações básicas
        if np.isnan(aux):
            return 0.0
        elif aux > 1e+150:
            return 1e+200
        elif aux < -1e+150:
            return -1e+200
        else:
            return aux
        
    elif op == "PI":
        return math.pi  # 3.1415926535
    
    elif op == "E":
        return math.e  # 2.718281828
    
    elif op in fn:
        # note: args are pushed onto the stack in reverse order
        args = reversed([evaluate_stack(s) for _ in range(num_args)])
        aux  = fn[op](*args)
    
        # Controlando resultado da aplicação de função
        if np.isnan(aux):
            return 0.0
        elif aux > 1e+150:
            return 1e+200
        elif aux < -1e+150:
            return -1e+200
        else:
            return aux
        
    elif op[0].isalpha():
        raise Exception("invalid identifier '%s'" % op)
        
    else:
        # try to evaluate as int first, then as float if int fails
        try:
            return int(op)
        
        except ValueError:
            return float(op)
        
# Função de teste que eles fornecem
def test(s, expected):
    exprStack[:] = []
    
    try:
        results = BNF().parseString(s, parseAll=True)
        val = evaluate_stack(exprStack[:])
        
    except ParseException as pe:
        print(s, "failed parse:", str(pe))
        
    except Exception as e:
        print(s, "failed eval:", str(e), exprStack)
        
    else: # Se deu certo, vamos ignorar
        if val == expected:
            return "ok"
        else:
            return "not ok"
        
        #if val == expected:
        #    print(s, "=", val, results, "=>", exprStack)
        #else:
        #    print(s + "!!!", val, "!=", expected, results, "=>", exprStack)   
        #return val
    
# Parse de expressões.
def parse(s):
    exprStack[:] = []
    
    try:
        results = BNF().parseString(s, parseAll=True)
        val = evaluate_stack(exprStack[:])
        
    except ParseException as pe:
        #print(s, "failed parse:", str(pe))
        print("failed parse")
        
        return 0.0
    
    except Exception as e:
        #print(s, "failed eval:", str(e), exprStack)
        print("failed eval", str(e))
        
        return 0.0
    
    else:
        if np.isnan(val):
            print('Eval deu nan')
            
            return 0.0
        
        elif np.isinf(val):
            print('Eval deu inf')
            
            return np.exp(300)
        
        return val

# Testagem do parser implementado

Criação de casos tentando capturar entradas problemáticas do parser, e testagem para verificar se ele trata devidamente os casos infinitos e NaN.

Se um teste falhar, aparecerá uma descrição da entrada que ocorreu a falha.

Warnings do numpy não são falhas - o parser deve encontrar quando esses problemas acontecem e fazer o tratamento corretamente.

In [93]:
# Testando expressões parecidas com o que encontraremos em expressões do FEAT/ITEA/SymTree
import itertools

#Algumas observações para testar
xs_testing = [
    [ 1.0,  2.0,   1.0],
    [ 0.0,  0.0,   0.0],
    [-1.0,  0.0,   1.0],
    [-2.0, -3.0, -10.0],
    [-5.5, -0.0,   5.5],
]

# Funções de transformação para testar
fs_testing = [
    'id',
    'sin',
    'cos',
    'tanh',
    'log',
    'SQRTABS'
]

# Coeficientes para testar (apesar de não ser utilizado no cálculo do disentanglement)
cs_testing = [
      0.0,
     -1.0,
      1.0,
     10.0,
    -10.0,
]

# Termos para testar
ts_testing = [
    [ 0,  0,  0],
    [ 1,  0,  0],
    [ 0, -1,  0],
    [-1, -1, -1],
    [ 0, -0,  0],
    [ 1,  2,  3],
    [-1,  2, -3]
]

# Testando todas as combinações
for x, f, c, t in itertools.product(xs_testing, fs_testing, cs_testing, ts_testing):
    
    # Testando interação esperada -----------------------------------------------------
    interaction = []
    for x_, t_ in zip(x, t):
        interaction.append(f'{x_}^{t_}')
        
    # Aplicar os mesmos tratamentos que fazemos no parser:
    # nan -> 0;inf -> 1e+200; -inf -> 1e-200
    safe_expected = np.prod(np.power(x, t))
    
    if np.isnan(safe_expected):
        safe_expected = 0.0
    elif safe_expected > 1e+150:
        safe_expected = 1e+200
    elif safe_expected < -1e+150:
        safe_expected = -1e+200
    else:
        pass
    
    test_result = test(f'{" * ".join(interaction)}', safe_expected)
    
    if test_result is not "ok":
        print("Erro no teste - interação")
        print("String passada ao parse:", f'{" * ".join(interaction)}')
        print("Resultado do parse:", parse(f'{" * ".join(interaction)}'))
        print("Avaliação esperada:", safe_expected)

    # Testando função de transformação aplicada na interação anterior  --------------------
    safe_expected = fn[f](safe_expected)
        
    # Aplicar os mesmos tratamentos que fazemos no parser:
    # nan -> 0;inf -> 1e+200; -inf -> 1e-200
    
    if np.isnan(safe_expected):
        safe_expected = 0.0
    elif safe_expected > 1e+150:
        safe_expected = 1e+200
    elif safe_expected < -1e+150:
        safe_expected = -1e+200
    else:
        pass
    
    test_result = test(f'{f}({" * ".join(interaction)})', safe_expected)
    
    if test_result is not "ok":
        print("Erro no teste - transformação")
        print("String passada ao parse:", f'{f}({" * ".join(interaction)})')
        print("Resultado do parse:", parse(f'{f}({" * ".join(interaction)})'))
        print("Avaliação esperada:", safe_expected)

    # Testando coeficiente  --------------------
    safe_expected = c*(safe_expected)
        
    # Aplicar os mesmos tratamentos que fazemos no parser:
    # nan -> 0;inf -> 1e+200; -inf -> 1e-200
    
    if np.isnan(safe_expected):
        safe_expected = 0.0
    elif safe_expected > 1e+150:
        safe_expected = 1e+200
    elif safe_expected < -1e+150:
        safe_expected = -1e+200
    else:
        pass
    
    test_result = test(f'{c}*{f}({" * ".join(interaction)})', safe_expected)
    
    if test_result is not "ok":
        print("Erro no teste - coeficiente")
        print("String passada ao parse:", f'{c}*{f}({" * ".join(interaction)})')
        print("Resultado do parse:", parse(f'{c}*{f}({" * ".join(interaction)})'))
        print("Avaliação esperada:", safe_expected)



# Desentanglement para o ITEA

In [94]:
def Z_ITEA(itexpr_str, Xtrain):
    
    # Tirando o ponto de sqrt.abs para não dar problema no parser
    itexpr_str = itexpr_str.replace('sqrt.abs', 'SQRTABS')
    
    # Separando termos (soma só ocorre entre eles)
    itexpr_its = itexpr_str.split(' + ')
    
    nterms          = len(itexpr_its)
    nsamples, nvars = Xtrain.shape
    
    #print(nsamples, nvars, nterms)
    
    Z = np.zeros( (nsamples, nterms) )
    
    # Cada linha de Z é uma observação
    for row in range(nsamples):

        # Cada coluna de uma linha é um termo IT da expressão
        for col, it in enumerate(itexpr_its):
            with_vars = it

            # Criar uma string substituindo as variáveis x0, x1, ... pelo observado
            for i in range(nvars):
                with_vars = with_vars.replace(f'x{i}', str(Xtrain[row, i]))
            
            # Avaliar e guardar na posição
            Z[row, col] = parse(with_vars)
    
    # Calcular correlações e tirar média
    disentanglements = []
    
    for col, _ in enumerate(itexpr_its):
        for comparison, _ in enumerate(itexpr_its):
            if col != comparison:
                corr, p = stats.pearsonr( Z[:, col], Z[:, comparison] )
                
                # Correlação de pearson: divide pelo desvio padrão,
                # se for 0 gera nan. Existência do pearson não é garantida.
                corr = 0.0 if np.isnan(corr) else corr
                
                disentanglements.append(corr**2)
        
    return np.mean(disentanglements)

In [95]:
# Testando para todas as expressões

resultsDF = pd.read_csv('stats.csv')
results   = resultsDF.to_dict('list')

for i, expr in enumerate(results['expr']):
    #print(f'../datasets/airfoil-train-{i//6}.dat')
    df_train = pd.read_csv(f'../datasets/airfoil-train-{i//6}.dat', sep=',', header=None)

    # Extração das variáveis explicatórias e variável alvo
    Xtrain = df_train.iloc[:, :-1].values

    #print('-----------------')
    print(f'Expressão n {i}:\t', end='')
    
    try:
        disentanglement = Z_ITEA(expr, Xtrain)
        print(disentanglement)
    except Exception as e:
        print('NAN', str(e))
        continue
        
    #for it in expr.split(' + '):
    #    print(it)

Expressão n 0:	



0.11399708019008042
Expressão n 1:	0.09459572637388415
Expressão n 2:	0.10963764613420612
Expressão n 3:	0.07675637708815754
Expressão n 4:	0.11486082872216924
Expressão n 5:	0.0967313681444404
Expressão n 6:	0.13335104442173393
Expressão n 7:	0.1465736502226571
Expressão n 8:	0.14837106722650048
Expressão n 9:	0.12550658599965808
Expressão n 10:	0.13483736150418457
Expressão n 11:	0.10436546825525861
Expressão n 12:	0.15944178313370427
Expressão n 13:	0.129112953811258
Expressão n 14:	0.0943298580397617
Expressão n 15:	0.11038883597850994
Expressão n 16:	0.1332899261874345
Expressão n 17:	0.11455310142398568
Expressão n 18:	0.11707433975507682
Expressão n 19:	0.10661452794278467
Expressão n 20:	0.10519888812429685
Expressão n 21:	0.12881990661733028
Expressão n 22:	0.12257140760135567
Expressão n 23:	0.12291231357814589
Expressão n 24:	0.1365050961659008
Expressão n 25:	0.17417529811673618
Expressão n 26:	0.1498247367137459
Expressão n 27:	0.1462550264273897
Expressão n 28:	0.11684960

# Disentanglement para o FEAT

In [96]:
def Z_FEAT(feat_str, Xtrain):
    
    # Arrumando a raiz quadrada sqrt(|x|)
    
    #print(feat_str)
    
    feat_str = feat_str.replace('sqrt(|', 'SQRTABS((')
    feat_str = feat_str.replace('|', ')')
    
    # Regex para quebrar no offset, vai sempre gerar uma lista com 2 elementos
    # o primeiro vazio e o segundo com o restante
    feat_str = re.split(r'^-?[0-9]\d*\.\d+\*\(offset\)', feat_str)[1]

    #print('-----')


    # Regex para pegar os termos. Novamente o primeiro é nulo. Vai quebrar nos coeficientes
    feat_strs = re.split(r'\s\+\s-?\d+\.\d+\*', feat_str)[1:]

    #for s in feat_strs:
    #    print(s)

    
    nterms          = len(feat_strs)
    nsamples, nvars = Xtrain.shape
    
    #print(nsamples, nvars, nterms)
    
    Z = np.zeros( (nsamples, nterms) )
    
    # Cada linha de Z é uma observação
    for row in range(nsamples):

        # Cada coluna de uma linha é um termo IT da expressão
        for col, feat in enumerate(feat_strs):
            with_vars = feat

            # Criar uma string substituindo as variáveis x_0, x_1,.. pelo observado
            for i in range(nvars):
                with_vars = with_vars.replace(f'x_{i}', str(Xtrain[row, i]))
            
            # Avaliar e guardar na posição
            
            Z[row, col] = parse(with_vars)
    
    disentanglements = []
    
    for col, _ in enumerate(feat_strs):
        for comparison, _ in enumerate(feat_strs):
            if col != comparison:
                corr, p = stats.pearsonr( Z[:, col], Z[:, comparison] )
                
                # Correlação de pearson: divide pelo desvio padrão
                # se for 0 gera nan. Existência do pearson não é garantida.
                corr = 0.0 if np.isnan(corr) else corr
                
                disentanglements.append(corr**2)
        
    #print(disentanglements, np.mean(disentanglements))
    return np.mean(disentanglements)

In [97]:
# Testando para todas as expressões

resultsDF = pd.read_csv('feat-airfoil-resultsregression.csv')
results   = resultsDF.to_dict('list')

for i, expr in enumerate(results['Expression']):
    #print(f'../datasets/airfoil-train-{i//6}.dat')
    
    df_train = pd.read_csv(f'../datasets/airfoil-train-{i//6}.dat', sep=',', header=None)

    # Extração das variáveis explicatórias e variável alvo
    Xtrain = df_train.iloc[:, :-1].values
    
    #print('-----------------')
    print(f'Expressão n {i}:\t', end='')

    try:
        disentanglement = Z_FEAT(expr, Xtrain)
        print(disentanglement)
    except Exception as e:
        print('NAN', str(e))
        continue

Expressão n 0:	



KeyboardInterrupt: 