In [1]:
import os

from series_extraction.excel_loader import ExcelLoader
from series_extraction.excel_validator import ExcelValidator
from series_extraction.excel_cleaner import ExcelCleaner
from series_extraction.table_finder import TableFinder
from series_extraction.series_extractor import SeriesExtractor
from series_extraction.excel_compatibility_checker import ExcelCompatibilityChecker
from series_extraction.series_iterator import SeriesIterator
from series_extraction.series_mapper import SeriesMapper

from ast_building.formula_parser import FormulaParser
from ast_building.series_implementer import SeriesImplementer

In [2]:
current_directory = os.getcwd()

parent_directory = os.path.abspath(os.path.join(current_directory, os.pardir))
data_directory = os.path.join(parent_directory, 'data')

project_name = 'test_excel_1'

excel_raw_file_path = os.path.join(data_directory, "excel_files_raw", f"{project_name}_raw.xlsx")
excel_reduced_filepath = os.path.join(data_directory, "excel_files_reduced", f"{project_name}_reduced.xlsx")

In [3]:
excel_raw= ExcelLoader.load_file(excel_raw_file_path)
excel_reduced = ExcelLoader.load_file(excel_reduced_filepath)

In [4]:
is_valid = ExcelValidator.validate_excel(excel_reduced)

In [5]:
if not is_valid:
    raise Exception("Excel file is not valid")

In [6]:
excel_reduced_clean = ExcelCleaner.clean_excel(excel_reduced)

In [7]:
extracted_tables, data = TableFinder.find_tables(excel_reduced_clean)

In [8]:
series_data = SeriesExtractor.extract_table_details(extracted_tables, data)

In [9]:
is_compatible = ExcelCompatibilityChecker.check_file(excel_raw, excel_reduced, extracted_tables)

In [10]:
if not is_compatible:
    raise Exception("Excel file is not compatible")

In [11]:
series_dict = SeriesExtractor.extract_series(extracted_tables=extracted_tables, data=data)

In [12]:
series_dict

{'Sheet1': [Series(series_id='Sheet1|horizontal_column_1|12|2', worksheet=Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None), series_header='horizontal_column_1', formulas=['=B3', '=C3'], values=[1, 2], header_location=<HeaderLocation.LEFT: 'left'>, series_starting_cell=Cell(column=3, row=12, coordinate='C12', value=None, value_type=None), series_length=2, series_data_type=<SeriesDataType.INT: 'int'>),
  Series(series_id='Sheet1|horizontal_column_2|13|2', worksheet=Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None), series_header='horizontal_column_2', formulas=['=B4', '=C4'], values=[3, 4], header_location=<HeaderLocation.LEFT: 'left'>, series_starting_cell=Cell(column=3, row=13, coordinate='C13', value=None, value_type=None), series_length=2, series_data_type=<SeriesDataType.INT: 'int'>),
  Series(series_id='Sheet1|col_1|2|2', worksheet=Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None), series_header='col_1', formulas=[N

In [13]:
series_mapping = SeriesMapper.map_series(series_dict)

In [14]:
series_mapping

{Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None): {Cell(column=3, row=12, coordinate=None, value=None, value_type=None): (0,
   Series(series_id='Sheet1|horizontal_column_1|12|2', worksheet=Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None), series_header='horizontal_column_1', formulas=['=B3', '=C3'], values=[1, 2], header_location=<HeaderLocation.LEFT: 'left'>, series_starting_cell=Cell(column=3, row=12, coordinate='C12', value=None, value_type=None), series_length=2, series_data_type=<SeriesDataType.INT: 'int'>)),
  Cell(column=4, row=12, coordinate=None, value=None, value_type=None): (1,
   Series(series_id='Sheet1|horizontal_column_1|12|2', worksheet=Worksheet(sheet_name='Sheet1', workbook_file_path=None, worksheet=None), series_header='horizontal_column_1', formulas=['=B3', '=C3'], values=[1, 2], header_location=<HeaderLocation.LEFT: 'left'>, series_starting_cell=Cell(column=3, row=12, coordinate='C12', value=None, value_type=None), se

In [15]:
series_iterator = SeriesIterator.iterate_series(series_dict)

In [16]:
series_list = [series for series in series_iterator if series.formulas != [None, None]]

Series Id can be concatenation of sheet_name, header_name, header_column_index, header_row_index

In [17]:
for series in series_list:
    formula_1 = series.formulas[0]
    formula_1_ast = FormulaParser.parse_formula(formula_1)
    series_implementer = SeriesImplementer(series_mapping, sheet_name = series.worksheet.sheet_name)
    formula_1_ast_series = series_implementer.replace_range_nodes(formula_1_ast)

    formula_2 = series.formulas[1]
    formula_2_ast = FormulaParser.parse_formula(formula_2)
    formula_2_ast_series = series_implementer.replace_range_nodes(formula_2_ast)
    
    sheet_name = series.worksheet.sheet_name

    series_list = series_dict.get(sheet_name)


In [18]:
import xlcalculator
import ast
from typing import List
from objects import Series


class SeriesRangeDelta:
    def __init__(
        self,
        start_row_index_delta,
        end_row_index_delta,
        start_column_index_delta,
        end_column_index_delta,
        start_row_index,
        end_row_index,
    ):
        self.start_row_index_delta = start_row_index_delta
        self.end_row_index_delta = end_row_index_delta
        self.start_column_index_delta = start_column_index_delta
        self.end_column_index__delta = end_column_index_delta
        self.start_row_index = start_row_index
        self.end_row_index = end_row_index


class ASTGenerator:
    def __init__(
        self,
        formula_1_ast_series: xlcalculator.ast_nodes.ASTNode,
        formula_2_ast_series: xlcalculator.ast_nodes.ASTNode,
        series_list: List[Series],
    ):
        self.formula_1_ast_series = formula_1_ast_series
        self.formula_2_ast_series = formula_2_ast_series
        self.series_list = series_list

    @staticmethod
    def extract_tuples(node_value: str):
        return ast.literal_eval(node_value)

    def get_delta_between_nodes(self, node1_value: str, node2_value: str):
        node1_tuple = self.extract_tuples(node1_value)
        node2_tuple = self.extract_tuples(node2_value)

        if node1_tuple[1] == (None, None) or node2_tuple[1] == (None, None):
            return None
        else:
            return self.calculate_deltas(node1_tuple, node2_tuple)

    @staticmethod
    def calculate_deltas(
        node1_tuple: tuple[tuple[str], tuple[int, int]],
        node2_tuple: tuple[tuple[str], tuple[int, int]],
    ) -> SeriesRangeDelta:
        node1_series_ids, node1_row_indexes = node1_tuple
        node2_series_ids, node2_row_indexes = node2_tuple

        start_row_index_delta = node2_row_indexes[0] - node1_row_indexes[0]
        end_row_index_delta = node2_row_indexes[1] - node1_row_indexes[1]
        start_column_index_delta = int(node2_series_ids[0].split("|")[-1]) - int(
            node1_series_ids[0].split("|")[-1]
        )
        end_column_index_delta = int(node2_series_ids[-1].split("|")[-1]) - int(
            node1_series_ids[-1].split("|")[-1]
        )

        return SeriesRangeDelta(
            start_row_index_delta,
            end_row_index_delta,
            start_column_index_delta,
            end_column_index_delta,
            node1_row_indexes[0],
            node1_row_indexes[-1],
        )

    def apply_delta_to_range_node(
        self,
        node1: xlcalculator.ast_nodes.ASTNode,
        node2: xlcalculator.ast_nodes.ASTNode,
        n: int,
    ):
        if isinstance(node1, xlcalculator.ast_nodes.RangeNode) and isinstance(
            node2, xlcalculator.ast_nodes.RangeNode
        ):
            return self.process_range_node(node1, node2, n)
        elif hasattr(node1, "args") and isinstance(
            node1, xlcalculator.ast_nodes.FunctionNode
        ):
            return self.process_function_node(node1, node2, n)
        elif hasattr(node1, "left") and isinstance(
            node1, xlcalculator.ast_nodes.OperatorNode
        ):
            return self.process_operator_node(node1, node2, n)
        else:
            return node1

    def process_range_node(self, node1, node2, n):
        series_range_delta = self.get_delta_between_nodes(node1.tvalue, node2.tvalue)
        if series_range_delta:

            start_row_index_delta = series_range_delta.start_row_index_delta
            end_row_index_delta = series_range_delta.end_row_index_delta
            start_column_index_delta = series_range_delta.start_column_index_delta
            end_column_index_delta = series_range_delta.end_column_index__delta
            start_row_index = series_range_delta.start_row_index
            end_row_index = series_range_delta.end_row_index
            return self.update_range_node(
                node1,
                start_row_index_delta,
                end_row_index_delta,
                start_column_index_delta,
                end_column_index_delta,
                start_row_index,
                end_row_index,
                n,
            )
        return node1

    def update_range_node(
        self,
        node1,
        start_row_index_delta,
        end_row_index_delta,
        start_column_index_delta,
        end_column_index_delta,
        start_row_index,
        end_row_index,
        n,
    ):

        series_ids = self.extract_tuples(node1.tvalue)[0]
        new_series_ids = [
            self.add_column_delta_to_series_id(sid, start_column_index_delta * (n - 1))
            for sid in series_ids
        ]

        new_tvalue = str(
            (
                tuple(new_series_ids),
                (
                    start_row_index + start_row_index_delta * (n - 1),
                    end_row_index + end_row_index_delta * (n - 1),
                ),
            )
        )
        return xlcalculator.ast_nodes.RangeNode(
            xlcalculator.tokenizer.f_token(
                tvalue=new_tvalue, ttype="operand", tsubtype="range"
            )
        )

    def add_column_delta_to_series_id(self, series_id: str, column_delta: int):
            sheet_name, series_header, start_column_index, end_column_index = (
                series_id.split("|")
            )
            updated_index_end = str(int(end_column_index) + column_delta)
            for series in self.series_list:
                if (
                    series.series_id.split("|")[0] == sheet_name and
                    series.series_id.split("|")[2] == start_column_index and
                    series.series_id.split("|")[3] == updated_index_end
                ):
                    return series.series_id
            return series_id

    def process_function_node(self, node1, node2, n):
        modified_args = [
            self.apply_delta_to_range_node(arg, node2.args[i], n)
            for i, arg in enumerate(node1.args)
        ]
        modified_node = xlcalculator.ast_nodes.FunctionNode(node1.token)
        modified_node.args = modified_args
        return modified_node

    def process_operator_node(self, node1, node2, n):
        modified_left = (
            self.apply_delta_to_range_node(node1.left, node2.left, n)
            if node1.left
            else None
        )
        modified_right = (
            self.apply_delta_to_range_node(node1.right, node2.right, n)
            if node1.right
            else None
        )
        modified_node = xlcalculator.ast_nodes.OperatorNode(node1.token)
        modified_node.left = modified_left
        modified_node.right = modified_right
        return modified_node

    def get_nth_formula(self, n: int) -> xlcalculator.ast_nodes.ASTNode:
        return self.apply_delta_to_range_node(
            self.formula_1_ast_series, self.formula_2_ast_series, n=n
        )


class FormulaGenerator:
    """Creates instances of ASTGenerator given two formula_ast objects"""

    @staticmethod
    def get_ast_generator(
        formula_1_ast_series: xlcalculator.ast_nodes.ASTNode,
        formula_2_ast_series: xlcalculator.ast_nodes.ASTNode,
        series_list: List[Series],
    ) -> ASTGenerator:
        """Create an instance of ASTGenerator given two formula_ast objects and a series_list"""
        ast_generator = ASTGenerator(
            formula_1_ast_series, formula_2_ast_series, series_list
        )
        return ast_generator


In [25]:
series

Series(series_id='Sheet3|col_4|1|4', worksheet=Worksheet(sheet_name='Sheet3', workbook_file_path=None, worksheet=None), series_header='col_4', formulas=['=Sheet3!A2*Sheet3!B2', '=Sheet3!A3*Sheet3!B3'], values=[24, 24], header_location=<HeaderLocation.TOP: 'top'>, series_starting_cell=Cell(column=4, row=2, coordinate='D2', value=None, value_type=None), series_length=2, series_data_type=<SeriesDataType.INT: 'int'>)

In [26]:
formula_1 = "=Sheet3!A2*Sheet3!B2"
formula_2 = "=Sheet3!A3*Sheet3!B3"

formula_1_ast = FormulaParser.parse_formula(formula_1)
series_implementer = SeriesImplementer(series_mapping, sheet_name = series.worksheet.sheet_name)
formula_1_ast_series = series_implementer.replace_range_nodes(formula_1_ast)

formula_2_ast = FormulaParser.parse_formula(formula_2)
formula_2_ast_series = series_implementer.replace_range_nodes(formula_2_ast)

sheet_name = series.worksheet.sheet_name

series_list = series_dict.get(sheet_name)

ast_generator = FormulaGenerator.get_ast_generator(formula_1_ast_series, formula_2_ast_series, series_list)

formula_1_ast_new = ast_generator.get_nth_formula(n=1)
formula_2_ast_new = ast_generator.get_nth_formula(n=2)

In [27]:
print(formula_1_ast_new)
print(formula_2_ast_new)

((('Sheet3|col_1|1|1',), (0, 0))) * ((('Sheet3|col_2|1|2',), (0, 0)))
((('Sheet3|col_1|1|1',), (1, 1))) * ((('Sheet3|col_2|1|2',), (1, 1)))
