In [1]:
import sys
import os
fname = r'C:\Users\agmontesb\Documents\GitHub\excel\tests\test_base_workbook.py'
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(fname), '..')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(fname), r'..\tests')))

In [2]:
import pytest
import openpyxl as px
import pandas as pd
import itertools
import re
import inspect
from typing import Any, Literal, Optional

In [3]:

from excel_workbook import (
    ExcelWorkbook, ExcelTable, 
    cell_address, cell_pattern, 
    tbl_address, rgn_pattern,
    XlErrors, TABLE_DATA_MAP, EMPTY_CELL, CIRCULAR_REF,
    tbl_pattern, rgn_pattern, flatten_sets
    )

from tests.utilities import TableComparator


In [4]:
from tests.fixtures import static_workbook as base_workbook

In [5]:
def pass_anchors(coord1, coord2):
    anchors = ['$' if x[0] == '$' else '' for x in cell_pattern.match(coord1).groups()[1:]]
    sheet_name, *coords = cell_pattern.findall(coord2.replace('$', ''))[0]
    sheet = '' if not sheet_name else f"'{sheet_name}'!"
    return sheet + ''.join(x for tup in zip(anchors, coords) for x in tup)


In [6]:
def offset_rng(cells: str | list[str], col_offset: int = 0, row_offset: int = 0, 
                   disc_cell: str | None = None, tbl: Optional['ExcelTable'] = None) -> str | dict[str, str]:
    if bflag := isinstance(cells, str):
        cells = [cells]

    disc_sht = [None, tbl.parent.title] if tbl else [None,]
    predicate = lambda x: True
    # Cuando se eliminen celdas, se debe asegurar que el offset no sobrepase los límites de la tabla
    rmin, cmin = 1, ord('A')
    if disc_cell:
        sht, disc_cell = tbl_address(disc_cell)
        row, col = cell_address(disc_cell)
        rmin = int(row)
        cmin = ord(col)
        if sht not in disc_sht:
            disc_sht = [sht]
        if col_offset == 0 and row_offset:
            predicate = lambda x: int(cell_address(x)[0]) >= int(cell_address(disc_cell)[0])
        if col_offset and row_offset == 0:
            predicate = lambda x: ord(cell_address(x)[1]) >= ord(cell_address(disc_cell)[1])
        else:
            predicate = lambda x: '{0: >4s}{1}'.format(*cell_address(x)) >= '{0: >4s}{1}'.format(*cell_address(disc_cell))
        if col_offset == 0 and row_offset:
            predicate = lambda x: int(cell_address(x)[0]) >= int(cell_address(disc_cell)[0])
        if col_offset and row_offset == 0:
            predicate = lambda x: ord(cell_address(x)[1]) >= ord(cell_address(disc_cell)[1])
        else:
            predicate = lambda x: '{0: >4s}{1}'.format(*cell_address(x)) >= '{0: >4s}{1}'.format(*cell_address(disc_cell))

    try:
        filter_rng, filter_sht, filter_cells = zip(
            *[
                (x, *tbl_addr) for x in cells 
                if (tbl_addr := tbl_address(x)) and tbl_addr[0] in disc_sht 
            ]
        )
    except ValueError:
        answ = {}
    else:
        ndx = pd.Index(
            [x for x in itertools.chain(*[y.split(':') for y in filter_cells]) if predicate(x)]
        )
        db = ndx.str.extract(cell_pattern, expand=True).set_index(ndx)

        mask = ~db.row.str.contains('$', regex=False)
        fnc = lambda x: str(max(rmin, int(x.strip('$')) + row_offset))
        db.loc[mask, 'row'] = db.loc[mask, 'row'].apply(fnc)
        
        mask = ~db.col.str.contains('$', regex=False)
        fnc = lambda x: chr(max(cmin, ord(x) + col_offset))
        db.loc[mask, 'col'] = db.loc[mask, 'col'].apply(fnc)

        db['cell'] = db.col + db.row
        cells_map = db.cell.to_dict()
        values = [':'.join(map(lambda x: cells_map.get(x, x), key.split(':'))) for key in filter_cells]
        values = [(f"'{sht}'!" if sht else '') + value for sht, value in zip(filter_sht, values)]
        answ = dict(zip(filter_rng, values))
    return answ.get(cells[0], cells[0]) if bflag else answ
 

In [7]:
def set_field(self, changes: dict[str, Any], *, field: Literal['code', 'cell']= 'code', err_ref_code: str | None = None, data=None):
    df = data if isinstance(data, pd.DataFrame) else self.data
    ws = self.parent
    if field == 'code':
        if not (gmask := df.code.isin(list(changes.keys()))).any():
            return df
        ftn_sets = flatten_sets(df.loc[gmask].dependents)
        mask = df.code.isin(ftn_sets)
        pattern = rgn_pattern
        def replacement(m):
            if ':' in m[0]:
                m = tbl_pattern.match(m[0])
                linf, lsup = m[0].replace('$', '').split(':')
                if m[1]:
                    linf, lsup = map(lambda x: f"'{m[1]}'!{x}", (linf, lsup))
                if all(x in changes for x in (linf, lsup)):
                    return err_ref_code
                linf = changes.get(linf, linf)
                lsup = changes.get(lsup, lsup)
                if m[1]:
                    lsup = lsup.split('!')[1]
                linf, lsup = map(lambda tpl: pass_anchors(*tpl), zip(m[0].split(':'), (linf, lsup)))
                sub_str = ':'.join([linf, lsup])
                return sub_str
            return (err_ref_code or changes[m[0]]) if m[0] in changes else m[0]

        df.loc[mask, 'fml'] = df[mask].fml.str.replace(
            pattern,
            replacement,
            regex=True
        )            
        new_codes = err_ref_code or [changes[key] for key in df.loc[mask].code]
        df.loc[gmask, 'code'] = new_codes
        if err_ref_code:
            # Aseguramos que el código de error esta registrado en la tabla.
            # Si no existe, se crea un registro para el error y si existe no hace nada
            ws = self.parent
            cell_link = ws.parent[f'#{tbl_address(err_ref_code)[0]}'].title
            cell_link = f"'{cell_link}'!{XlErrors.REF_ERROR.code}"
            ws.register_code(self, cell_link, [], default_value=XlErrors.REF_ERROR)
            if df.code.tolist().count(err_ref_code) <= 1:
                return
            err_code = err_ref_code
            err_cell = cell_link.replace(f"'{ws.title}'!", '')
            # Se eliminan las filas duplicadas que resultan de la eliminación de enlaces a 
            # celdas inexistentes
            df.drop_duplicates(subset=['code'], inplace=True, keep=False)
            # Se identifican los errors origens
            mask = df.fml.fillna('').str.contains(err_code)
            if mask.any():
                err_cells, err_codes = zip(*df.loc[mask].code.items())
                err_cells = list(err_cells)
                df.loc[err_cells, 'value'] = XlErrors.REF_ERROR
                error_origens = set(err_codes)
                df.loc[err_cell, ['code', 'fml','dependents', 'res_order', 'ftype', 'value']] = [err_code, '', error_origens, 0, '#', XlErrors.REF_ERROR]
                # Se eliminan en el campo dependents las referencias a los códigos a ser eliminados
                if (mask := df.dependents.apply(lambda x: bool((x if isinstance(x, set) else set()) & set(old_codes)))).any():
                    df.loc[mask, 'dependents'] = df.loc[mask].dependents.apply(lambda x: set(x) - set(old_codes))

    else:  # field == 'cell'
        mask = df.code.isin(changes.keys())
        keys = df.loc[mask, 'code'].apply(changes.get)
        df.rename(index=keys, inplace=True)
    return df


In [8]:
wb = inspect.unwrap(base_workbook)()
ws = wb.sheets[1]
tbl = ws.tables[1]
df = tbl.data

In [9]:
df = tbl.data

In [10]:
to_delete = [cell for row in range(1, 21) if (cell := f'G{row}') in df.index]
to_delete


['G2', 'G6', 'G13', 'G14', 'G15', 'G16', 'G17']

In [11]:
disc_cell = offset_rng('G1', col_offset=-1)
changes = {
    df.loc[cell].code: df.loc[off_cell].code 
    for cell, off_cell in offset_rng(to_delete, disc_cell=disc_cell, col_offset=-1).items()
    if off_cell in df.index
}
changes

{'B2': 'B15', 'B16': 'B12', 'B17': 'B10', 'B3': 'B11', 'B4': 'B1'}

In [12]:
new_df = set_field(tbl, changes, field='code', err_ref_code="'sheet1'!ZZ0", data=tbl.data.copy())
new_df.head()

Unnamed: 0_level_0,fml,dependents,res_order,ftype,value,code
cell,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
F13,,"{B5, B3}",0,#,25,B15
F14,,"{B1, B6}",0,#,10,B12
F15,,"{B1, B7}",0,#,15,B10
F16,,"{B8, B1}",0,#,80,B11
F17,=+B11+B10+B12,{},1,$,105,B1


In [13]:
(TableComparator(tbl.data) ^ TableComparator(new_df)).sort_values('cell')

Unnamed: 0,cell,fml,dependents,res_order,ftype,value,code
6,G13,=+'sheet1'!A17,set(),1,$,100,B2
7,G14,,"{""B4"", ""B6""}",0,#,25,B16
8,G15,,"{""B4"", ""B7""}",0,#,38,B17
9,G16,=+'sheet1'!A1 + B15,"{""B8"", ""B4""}",1,$,2438,B3
10,G17,=+B3+B17+B16,set(),2,$,2501,B4
14,H14,=+SUM(B12:B16),"{""B9""}",1,H,35,B6
9,H14,=+SUM(B12:B12),"{""B9""}",1,H,35,B6
15,H15,=+SUM(B10:B17),"{""B9""}",1,H,53,B7
10,H15,=+SUM(B10:B10),"{""B9""}",1,H,53,B7
16,H16,=+SUM(B11:B3),"{""B9""}",2,$,2518,B8


In [15]:
a = dict(uno=1, dos=2, tres=3)
b = dict.fromkeys(a, 2222)
b

{'uno': 2222, 'dos': 2222, 'tres': 2222}