In [None]:
#default_exp translator

# Translator
> Translate Excel formulas on sheets to Python code.

In [None]:
#export
from ebb.parser import parse, parse_value, Ref, parse_formula, InfixOp, PrefixOp, PostfixOp, Function
from ebb.util import colname_to_num, num_to_colname

In [None]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Translation
Evaluation is useful, but in the end the core seems to be translation.

Goal of this part given a table-like grid of cells, write an equivalent Python program that can be run on the input data (cells without any dependencies) to generate the output (cells in the last column or cells without any dependencies).

In [None]:
#export

from collections.abc import Iterable

def construct_tuple(x, y):
    x = (x,) if isinstance(x, str) or not isinstance(x, Iterable) else tuple(x)
    y = (y,) if isinstance(y, str) or not isinstance(y, Iterable) else tuple(y)
    return x + y

infix_translate_map = {
    '\\+': lambda x, y: f'{x} + {y}',
    '\\*': lambda x, y: f'{x} * {y}',
    ',': construct_tuple

}
prefix_translate_map = {}
postfix_translate_map = {}

def choose(index, *values):
    print(index, values)
    if 1 <= index <= len(values):
        return values[index-1]
    else:
        raise ValueError
        
function_translate_map = {
    'CHOOSE': choose,
}

# We save the variables in a tuple because it's convenient to use ordering
# to compare formulas. If they have the same AST structure we can just check
# them for consistency one by one.
def translate(tree, variables=tuple()):  # Todo(Rik): str might translate to ref.
    if isinstance(tree, (int, float, bool, str)):
        return tree, variables
    elif isinstance(tree, InfixOp) and tree.op == ':':
        # Todo(Rik): handle ranges in formulas
        raise NotImplemented
    elif isinstance(tree, InfixOp):
        t_left, var_left = translate(tree.left, variables)
        t_right, var_right = translate(tree.right, variables)
        return infix_translate_map[tree.op](t_left, t_right), var_left + var_right
    elif isinstance(tree, PrefixOp):
        t_arg, var_arg = translate(tree.arg, variables)
        return prefix_translate_map[tree.op](t_arg), var_arg
    elif isinstance(tree, PostfixOp):
        t_arg, var_arg = translate(tree.arg, variables)
        return postfix_translate_map[tree.op](t_arg), var_arg
    elif isinstance(tree, Function):
        t_args, var_args = translate(tree.args, variables)
        # Gymnastics to handle one-argument functions
        # Todo(Rik): zero-argument functions?
        if not isinstance(t_args, tuple): t_args = (t_args,)
        return function_translate_map[tree.name](*t_args), var_args
    elif isinstance(tree, Ref):  # Create and return variable name
        return tree.to_string(), variables + (tree,)

In [None]:
translate(parse('= CHOOSE(1, 1, 2, 3)'))

1 (1, 2, 3)


(1, ())

That covers translating a single cell. Next example is detecting when output cells are similar. In the below example, it should recognize that the first and second rows are the same function.

In [None]:
#export
def equivalent(tree, other):
    if type(tree) != type(other): return False
    if isinstance(tree, (int, float, bool, str)):  # Todo(Rik): maybe str parses to Ref?
        return tree == other
    elif isinstance(tree, InfixOp) and tree.op == ':':
        # Todo(Rik): handle ranges in formulas
        raise NotImplemented
    elif isinstance(tree, InfixOp):
        return (tree.op == other.op
                and equivalent(tree.left, other.left)
                and equivalent(tree.right, other.right))
    elif isinstance(tree, PrefixOp) or isinstance(tree, PostfixOp):
        return (tree.op == other.op
                and equivalent(tree.arg, other.arg))
    elif isinstance(tree, Function):  # Todo(Rik): consider turning ops into function calls?
        return (tree.name == other.name
               and equivalent(tree.args, other.args))
    elif isinstance(tree, Ref):  # Create and return variable name
        return True

You can still bamboozle this algorithm by putting `=A1+B1` and then `=C14+D37`. However, since the ordering of variables in translate is deterministic (and we used a tuple!), we can then check for equivalence of variables.

In [None]:
assert equivalent(parse('=A1+B1'), parse('=A2+B2'))

In [None]:
#export
def consistent(these_vars, other_vars):
    """Checks if the tuples of variables are "transposed" across rows, returning the delta if so."""
    # Todo(Rik): deal with fixed rows/columns
    # Yeahhh wouldn't I like an Option<i32> here...
    if len(these_vars) != len(other_vars): return False
    if any(v.column != w.column for v, w in zip(these_vars, other_vars)): return False
    if len(set(v.row - w.row for v, w in zip(these_vars, other_vars))) != 1: return False
    return these_vars[0].row - other_vars[0].row

In [None]:
tree, these_vars = translate(parse('=A1+B1'))
other, other_vars = translate(parse('=A2+B2'))
assert consistent(these_vars, other_vars) == -1

tree, these_vars = translate(parse('=A1+B1'))
other, other_vars = translate(parse('=A5+B5'))
assert consistent(these_vars, other_vars) == -4

tree, these_vars = translate(parse('=A3+B3'))
other, other_vars = translate(parse('=A1+B1'))
assert consistent(these_vars, other_vars) == 2

tree, these_vars = translate(parse('=A1+B1'))
other, other_vars = translate(parse('=A2+C3'))
assert not consistent(these_vars, other_vars)

tree, these_vars = translate(parse('=A1*B1'))
other, other_vars = translate(parse('=A2+B2'))
assert consistent(these_vars, other_vars)

In [None]:
def is_copied_across(column):
    head, *column = column
    head_ast = parse(head)
    head_code, head_vars = translate(head_ast)
    for i, item in enumerate(column):
        item_ast = parse(item)
        if not equivalent(head_ast, item_ast): return False
        _, item_vars = translate(item_ast)
        if not consistent(item_vars, head_vars) == i+1: return False
    return True

In [None]:
assert is_copied_across(['=A1+B1', '=A2+B2'])
assert not is_copied_across(['=A1+B1', '=A3+B3'])
assert not is_copied_across(['=A1+B1', '=A2*B2'])
assert is_copied_across(['=A1+B1', '=A2+B2', '=A3+B3'])
assert not is_copied_across(['=A1+B1', '=A2+C3'])

In [None]:
def write_function(name, code, variables):
    variables = ', '.join(v.to_string() for v in variables)
    return '\n'.join([
        f'def {name}({variables}):',
        f'  return {code}'
    ])

row = [3, 4, '=A1+B1']
code, these_vars = translate(parse(row[2]))
print(write_function('adder', code, these_vars))

def adder(A1, B1):
  return A1 + B1


In [None]:
def column_to_code(colname, sheet):
    colnum = colname_to_num(colname)

    function_name = f'calculate_{colname}'

    column_asts = [parse(item) for item in [row[colnum] for row in sheet]]
    column_codes, column_vars = zip(*[translate(ast) for ast in column_asts])

    head_ast, *tail_asts = column_asts
    head_code, *tail_codes = column_codes
    head_vars, *tail_vars = column_vars

    for i, (other_ast, other_vars) in enumerate(zip(tail_asts, tail_vars)):
        if not equivalent(head_ast, other_ast) or not consistent(other_vars, head_vars) == (i+1):
            raise NotImplementedError('Inconsistent column formula in row {i}')

    input_data = [[sheet[r.row][r.column] for r in variables] for variables in column_vars]
    variable_text = ', '.join(var.to_string() for var in head_vars)

    function_body = write_function(function_name, head_code, head_vars)
    exec(function_body)
    function = eval(function_name)
    function_result = [function(*row) for row in input_data]
    result = '\n'.join([
        function_body,
        '',
        f'{colname} = [{function_name}({variable_text}) for {variable_text} in {input_data}]',
        f'\nResult: {function_result}'
    ])
    return result

In [None]:
sheet = [
    [3, 4, 7, '=A1+B1*C1'],
    [4, 5, 8, '=A2+B2*C2']
]
print(column_to_code('D', sheet))

def calculate_D(A1, B1, C1):
  return A1 + B1 * C1

D = [calculate_D(A1, B1, C1) for A1, B1, C1 in [[3, 4, 7], [4, 5, 8]]]

Result: [31, 44]


Next, chaining functions. One cell = one line of code, I suppose. :)

In [None]:
#export
from collections import deque
import re

def is_value(cell):
    return not isinstance(cell, str) or not cell.startswith('=')


def cell_to_code(ref, sheet, header):
    code, variables = translate(parse(sheet[ref.row][ref.column]))
    code = deque([code])
    to_map = [r for r in variables if not is_value(sheet[r.row][r.column])]
    
    # Todo(Rik): circular reference protection. Requires maintaining a tree,
    # rather than just a flat list. I.e. track the chain of things that
    # led us to calculating this cell and see if it includes something
    # this cell refers to.
    done = {ref}
    
    variables = deque([r for r in variables if is_value(sheet[r.row][r.column])])
    while to_map:
        this_ref = to_map.pop()        
        if this_ref in done or this_ref in variables: continue

        this_code, these_vars = translate(parse(sheet[this_ref.row][this_ref.column]))

        variables.extend([r for r in these_vars if is_value(sheet[r.row][r.column]) and r not in variables])
        to_map.extend([r for r in these_vars if not is_value(sheet[r.row][r.column]) and r not in to_map])

        code.appendleft(f'{this_ref.to_string()} = {this_code}')
        done.add(this_ref)
        
    def translate_variables(line, header):
        for i, name in enumerate(header):
            line = re.sub(rf'{num_to_colname(i)}([0-9]+)', rf'{name}\1', line)
        return line
    code = [translate_variables(line, header) for line in code]
    return code, variables

In [None]:
sheet = [
    ['3', '4', '=A1+D2', '0'],
    ['4', '5', '=A2+D3', '=A1+D1'],
]

code, variables = cell_to_code(Ref(row=0, column=2), sheet, ['A', 'B', 'C', 'D'])
code, variables

(['D2 = A1 + D1', 'A1 + D2'],
 deque([Ref(row=0, column=0, fixed_row=False, fixed_column=False),
        Ref(row=0, column=3, fixed_row=False, fixed_column=False)]))

In [None]:
#export
def write_multiline_function(name, code, variables):
    variables = ', '.join(variables)
    *lines, last = code
    return '\n'.join([
        f'def {name}({variables}):'
    ] + [f'    {line}' for line in lines] + [
        f'    return {last}'
    ])

In [None]:
print(write_multiline_function('foo', code, [v.to_string() for v in variables]))

def foo(A1, D1):
    D2 = A1 + D1
    return A1 + D2


In this case, we would expect some kind of "start-up" rows, and then an eventually stable set of cell formulas. Especially gnarly when there are two regimes.

In [None]:
#export
def find_transitions(column):
    transitions = set()
    previous_ast, *asts = [parse(cell) for cell in column]
    for i, ast in enumerate(asts):
        if not equivalent(ast, previous_ast):
            previous_ast = ast
            transitions.add(i+1)
    return transitions

column = ['=A1+B1', '=A2+B2', '=A3*B3', '=A4*B4']
find_transitions(column) 

{2}

That routine will find cells that are copied across. Now we can extract the loop body (as separate from initialization).

In [None]:
sheet = [
    [3, 4, 0, '=A1+B1', '=C1*D1'],
    [4, 5, 3, '=A2+B2', '=C2*D2'],
    [7, 8, 2, '=A3+B3', '=E1*C3*D3'],
    [7, 8, 2, '=A4+B4', '=E2*C4*D4'],
]

value_cols = [(i, col) for i, col in enumerate(zip(*sheet)) if all(is_value(cell) for cell in col)]
calc_cols = [(i, col) for i, col in enumerate(zip(*sheet)) if not all(is_value(cell) for cell in col)]
value_cols, calc_cols

([(0, (3, 4, 7, 7)), (1, (4, 5, 8, 8)), (2, (0, 3, 2, 2))],
 [(3, ('=A1+B1', '=A2+B2', '=A3+B3', '=A4+B4')),
  (4, ('=C1*D1', '=C2*D2', '=E1*C3*D3', '=E2*C4*D4'))])

In [None]:
#export
import operator as op
from functools import reduce

def join_sets(sets):
    return reduce(op.ior, sets, set())

def join_dicts(dicts):
    return reduce(lambda d, e: {**d, **e}, dicts, {})

join_sets([{1, 2, 3}, {4, 5, 6}]), join_dicts([{3: 4}, {4: 5}])

({1, 2, 3, 4, 5, 6}, {3: 4, 4: 5})

There's an implicit ordering here, or "compatibleness". Some kind of intersection operation where column 3 has no transition, but column 4 has one at index 2, so therefore we see a transition at index 2.

In case we had something like `[[0, 1], [2, 3]]`, `[[0], [1, 2, 3]]`, we would have gotten transitions at 1 and 2. Overlapping transforms are not interesting.

In [None]:
def sheet_to_code(sheet, has_header=True):
    if has_header:
        header, *sheet = sheet
    else:
        header = [num_to_col(i) for i, _ in enumerate(sheet[0])]
        
    value_cols = [(i, col) for i, col in enumerate(zip(*sheet)) if all(is_value(cell) for cell in col)]
    calc_cols = [(i, col) for i, col in enumerate(zip(*sheet)) if not all(is_value(cell) for cell in col)]
    transitions = list(join_sets([find_transitions(col) for _, col in calc_cols]))
    block_ranges = list(zip([0]+transitions, transitions+[len(sheet)]))
    blocks = [sheet[start:end] for start, end in block_ranges]

    result = ['output = []']
    for start, end in block_ranges:
        to_calc = []
        for colnum, _ in calc_cols:
            colname = num_to_colname(colnum)
            function_name = f'calculate_{header[colnum]}{start+1}'
            code, variables = cell_to_code(Ref(row=start, column=colnum), sheet, header)
            named_vars = [f'{header[v.column]}{v.row}' for v in variables]

            to_calc.append((colname, function_name, named_vars))
            result.append('')
            result.append(write_multiline_function(function_name, code, named_vars))

        # Create loop body
        loop_body = []
        # Assign non-local variables, if any
        loop_body.extend([
            f'{header[v.column]}{v.row} = output[-{start-v.row}][{v.column}]'
            for v in variables if v.row != start
        ])

        # Calculate results which determined we needed to calculate
        for colname, function_name, named_vars in to_calc:
            call_variables = ', '.join(named_vars)
            loop_body.append(f'{colname}{start+1} = {function_name}({call_variables})')
            
        # Add to output
        sorted_columns = ', '.join([f'{num_to_colname(i)}{start+1}' for i in range(len(sheet[start]))])
        loop_body.append(f'output.append([{sorted_columns}])')
            
        # Put loop body in results
        loop_vars = ', '.join(f'{header[v.column]}{v.row}' for v in variables if v.row == start)
        loop_data = [[row[v.column] for v in variables if v.row==start] for row in sheet[start:end]]
        
        result.extend(['', f'for {loop_vars} in {loop_data}:'])
        result.extend(['    '+stmt for stmt in loop_body])
    return '\n'.join(result)

sheet = [
    ['shares_outstanding', 'ebitda', 'price', 'stock_price', 'to_buy'],
    [3, 4, 0, '=A1+B1', '=C1*D1'],
    [4, 5, 3, '=A2+B2', '=C2*D2'],
    [7, 8, 2, '=A3+B3', '=E1*C3*D3'],
    [7, 8, 2, '=A4+B4', '=E2*C4*D4'],
]
from pprint import pprint
pprint(sheet)
print(sheet_to_code(sheet))

[['shares_outstanding', 'ebitda', 'price', 'stock_price', 'to_buy'],
 [3, 4, 0, '=A1+B1', '=C1*D1'],
 [4, 5, 3, '=A2+B2', '=C2*D2'],
 [7, 8, 2, '=A3+B3', '=E1*C3*D3'],
 [7, 8, 2, '=A4+B4', '=E2*C4*D4']]
output = []

def calculate_stock_price1(shares_outstanding0, ebitda0):
    return shares_outstanding1 + ebitda1

def calculate_to_buy1(price0, shares_outstanding0, ebitda0):
    stock_price1 = shares_outstanding1 + ebitda1
    return price1 * stock_price1

for price0, shares_outstanding0, ebitda0 in [[0, 3, 4], [3, 4, 5]]:
    D1 = calculate_stock_price1(shares_outstanding0, ebitda0)
    E1 = calculate_to_buy1(price0, shares_outstanding0, ebitda0)
    output.append([A1, B1, C1, D1, E1])

def calculate_stock_price3(shares_outstanding2, ebitda2):
    return shares_outstanding3 + ebitda3

def calculate_to_buy3(price2, shares_outstanding2, ebitda2, price0, shares_outstanding0, ebitda0):
    stock_price1 = shares_outstanding1 + ebitda1
    to_buy1 = price1 * stock_price1
    stock_price3 = s

# Making a small demo
Leverage ipydatagrid (from Bloomberg) to show a small demo with a sheet on the left and code on the right. Next up probably for loops. :)

Also could think about a backwards connection, i.e. editing the python code and updating the Excel sheet as necessary. What to do when putting a new line though... Something to think about.

For now though, I think continuing the row-interpretation work and making it go in the demo might be good!

In [None]:
sheet = [
    [3, 4, 0, '=A1+B1', '=C1*D1'],
    [4, 5, 3, '=A2+B2', '=C2*D2'],
    [7, 8, 2, '=A3+B3', '=E1*C3*D3'],
    [7, 8, 2, '=A4+B4', '=E2*C4*D4'],
]

sheet = [[2, 3, 4, '=A1*B1+C1']]

In [None]:
from ipydatagrid import DataGrid
import ipywidgets as widgets
import pandas as pd

datagrid = DataGrid(pd.DataFrame(sheet), editable=True, base_column_size=128, base_row_size=30,
                    layout={'height': '200px', 'width': '600px'})
code_output = widgets.Output(layout={'border': '1px solid black'})

def update_df(cell):
    sheet[cell['row']][cell['column']] = cell['value']
    with code_output:
        code_output.clear_output()        
        update_code(sheet)
    
def update_code(sheet):
    code = sheet_to_code(sheet)
    print(code)
    exec(code)
    print(f'Value of output: {eval("output")}')
        
    
datagrid.on_cell_change(update_df)
with code_output:
    update_code(sheet)
    
widgets.Box(children=[datagrid, code_output])

Box(children=(DataGrid(auto_fit_params={'area': 'all', 'padding': 30, 'numCols': None}, base_column_size=128, …

In [None]:
from nbdev.export import notebook2script; notebook2script()

Converted Untitled.ipynb.
Converted evaluator.ipynb.
Converted parser.ipynb.
Converted translator.ipynb.
Converted util.ipynb.
