In [1]:
from openpyxl import Workbook, load_workbook
from openpyxl.writer.excel import save_virtual_workbook
import os
import pandas as pd
import numpy as np
import scipy as sp
from scipy import interpolate
from array import array
from datetime import datetime, timedelta
from pycel import ExcelCompiler
from dateutil.relativedelta import relativedelta
import re
from statistics import mean
from anytree import Node, RenderTree, find, Resolver, PostOrderIter

In [2]:
class SheetAnalyzer:

    BASE_ELEMENTS_ROW = 17 # Location of base's elements
    UNIT_ROW = 18 # Location of units
    CURVE = 19 # Location of Curve's interpolations
    METADATA_COL = "A"
    POINT_N_COL = "B"
    DATE_COL = "D"
    PRODUCT_NAME = "Product-Type"
    PRODUCT_PARENT = "SubType"
    DELIMITER_SHEET_UNFOLLOW = "_"

    def __init__(self, input, sheet_name) -> None:
        self.wb = load_workbook(input)
        self.evaluator = ExcelCompiler(filename=input)
        self.sheet_name = sheet_name
        self.ws = self.wb[self.sheet_name]

        self.specifications = []
        self.points = []
        self.data = {}

    # Carefull _getValuesBySpecificiation could return bad values for % as exemple
    # Get external parameters
    
    ### Util's functions

    def log_interp1d(self, xx, yy, kind='linear'):
        logx = np.log10(xx)
        logy = np.log10(yy)
        lin_interp = interpolate.interp1d(logx, logy, kind=kind, fill_value="extrapolate")
        log_interp = lambda zz: np.power(10.0, lin_interp(np.log10(zz)))
        return log_interp

    def evaluate(self, cell):
        eval = self.evaluator.evaluate(self.sheet_name+"!"+cell.coordinate)
        if isinstance(eval, float):
            eval = round(eval, 3)
        return eval
    
    ### Helper's functions

    def _fullFillPointsWithDates(self, points:array):
        """
            Return a fullfill array of points associate with dates 
        """
        ref_date = None
        for index, point in enumerate(points):
            if point["date"] is not None:
                ref_date = {"index": index, "date": point["date"]}
                break
        
        if ref_date is not None:
            for index, point in enumerate(points):
                if point["date"] is None:
                    if index < ref_date["index"]:
                        point["date"] = ref_date["date"] + relativedelta(years=-(ref_date["index"]-index))
                    else:
                        point["date"] = ref_date["date"] + relativedelta(years=(index-ref_date["index"]))
        
        return points
            
    def _getPointsWithDates(self):
        """
            Return dict (row_number, point_no, date) of all points and associate's date
        """
        points=[]
        for point in self.ws[self.POINT_N_COL]:
            if point.value is not None and (type(point.value) is int or type(point.value) is float or (type(point.value) is str and point.value.startswith("=")) ):
                points.append({
                    "row": point.row, 
                    "point_n": self.evaluate(point), 
                    "date": self.ws[self.DATE_COL+str(point.row)].value
                })
        
        return self._fullFillPointsWithDates(points)

    def _getSpecifications(self):
        """
        Return dict (column, specification, unit, interpolation) at BASE_ELEMENTS_ROW for a given sheetname
        Ignore the firsts 2 elements because they always not belong to specifications
        """
        return [{
            "column": be.column, 
            "specification_name": be.value, 
            "unit": self.ws.cell(row=self.UNIT_ROW, column=be.column).value, 
            "interpolation": self.ws.cell(row=self.CURVE, column=be.column).value
        } for be in self.ws[self.BASE_ELEMENTS_ROW] if be.value is not None][2:]

    def _getValuesBySpecificiation(self, specification):
        result = []
        value = None
        for point in self.points:
            cell = self.ws.cell(row=point["row"], column=specification["column"])
            if cell.value is not None and cell.value != "#REF!":
                if isinstance(cell.value, str):
                    if cell.value.startswith("="):
                        value = self.evaluator.evaluate(self.sheet_name+"!"+cell.coordinate)
                    if "%" in cell.value:
                        value = float(cell.value.replace("%", ""))/100
                elif isinstance(cell.value, float) or isinstance(cell.value, int):
                    value = cell.value
                result.append({
                    "row": point["row"],
                    "value": value
                })
        return result 

    def _evaluateInterpolation(self, specification):
        result = []
        values = self._getValuesBySpecificiation(specification)
        
        if (specification["interpolation"] == "CONST" and len(values) > 0) or (specification["interpolation"] is None and len(values) == 1):
            return [next(item["value"] for item in values if item["value"] is not None)] * len(self.points)
        elif len(values) >= 2:
            # Linear Interpolation by default
            interp1d = interpolate.interp1d([v["row"] for v in values], [v["value"] for v in values], fill_value="extrapolate")
            
            # Log interpolation
            if specification["interpolation"] == "LOG":
                interp1d = self.log_interp1d([v["row"] for v in values], [v["value"] for v in values])

            # Add result of interpolation for each point
            for point in self.points:
                if not any(v["row"] == point["row"] for v in values):
                    result.append(float(interp1d(point["row"])))
                else:
                    result.append(list(filter(lambda v: v["row"] == point["row"], values))[0]["value"])
                    
        return result            

    def _addInterpolations(self):
        """
            return a dict with specifications and interpolations
        """        
        for specification in self.specifications:
            specification["values"] = self._evaluateInterpolation(specification)

    def _getBaseElementMetaData(self):
        """
        Return array of tuples (row_id, metadata_name, metadata_value) for a given sheetname
        """
        return {cell.value: self.ws['B'][cell.row-1].value 
            for cell in self.ws[self.METADATA_COL] 
            if (cell.value is not None and self.ws['B'][cell.row-1].value is not None and cell.row < self.BASE_ELEMENTS_ROW)}

    def _getGlobalConstants(self):
        """
        Return all global constant in the sheet
        """
        compositions = [(cmp.row, cmp.value) for cmp in self.ws["A"] if cmp.value is not None]

        result = {}
        for index, cmp in enumerate(compositions):
            tmp = []            
            if(index != len(compositions)-1):
                last_row = compositions[index+1][0]-1
            else:
                last_row = len(self.ws["B"])

            for x in range(cmp[0]+1, last_row):
                if self.ws["B"+str(x)].value is not None:
                    tmp.append({
                        "constant_name": self.ws["B"+str(x)].value,
                        "value": self.evaluate(self.ws["C"+str(x)])
                    })
            
            result[cmp[1]] = tmp

        return result

    def _getOperations(self):
        """
        Return operations
        """
        items = [(it.row, it.value) for it in self.ws["A"] if it.value is not None]

        result = {}
        for index, fcn in enumerate(items):
            tmp = []            
            if(index != len(items)-1):
                last_row = items[index+1][0]-1
            else:
                last_row = len(self.ws["B"])

            for x in range(fcn[0]+1, last_row):
                if self.ws["B"+str(x)].value is not None:
                    tmp.append({
                        "operation_name": self.ws["B"+str(x)].value,
                        "operation": self.evaluate(self.ws["C"+str(x)]),
                        "unit": self.ws["D"+str(x)].value
                    })
            
            result[fcn[1]] = tmp

        return result

    def isOperationSheet(self):
        if "operation" in self.sheet_name.lower():
            return True
        return False
            
    # Access Functions

    def getRawDataStorage(self):
        """
            Return JSON storage of the input
        """

        if self.sheet_name.startswith(self.DELIMITER_SHEET_UNFOLLOW):
            return None

        self.points = self._getPointsWithDates()
        self.specifications = self._getSpecifications()

        self.data["label"] = self.sheet_name
        self.data["metadatas"] = self._getBaseElementMetaData()

        if self.points == [] and self.specifications == []:
            if self.isOperationSheet():
                self.data["operations"] = self._getOperations()
            else:
                # Global Constants Sheet
                self.data["constants"] = self._getGlobalConstants()
        else:
            if self.data["metadatas"] == {}:
                raise Exception("Metadatas are missing...")
            self._addInterpolations()
            self.data["specifications"] = self.specifications
        
        return self.data
        
    def getSpecificationByName(self, name):
        return next((item for item in self.specifications if item["specification_name"].lower() == name.lower()), None)                   

class SheetInterpreter:
    """
    Convert all operations with specific language to comprehensible mathematical operation in analyzer
    """
    def __init__(self, sheets_tree) -> None:
        self.sheets_tree = sheets_tree

    # Helper functions
    def findOperation(self, operation_category, operation_name):
        """
        Find an operation by it category and it operation_name 
        """
        for analyzer in self.sheets_tree.operation_sheets:
            for analyzer_operation_category, operations in analyzer.data["operations"].items():
                if analyzer_operation_category.lower() == operation_category.lower():
                    return next((operation for operation in operations if operation["operation_name"].lower() == operation_name.lower()), None) 
        return None

    def replaceVarByValue(self, word, node):
        """
        Replace Word by his Variable Value in the according worksheet
        """

        correct_word = word.replace("[", "").replace("]", "")
        attr = correct_word.split('.')
    
        if len(attr) == 2:
            if find(self.sheets_tree.root, lambda node: node.name.lower() == attr[0].lower()) is None:
                raise Exception("The sheet "+ attr[0]+ " doesn't map in the tree...")
            correct_word = attr[1]

        spec = node.analyzer.getSpecificationByName(correct_word)
        
        # if value not define, search in child and sum all of "word" values
        if spec is None:
            val = 0
            for child in node.children:
                val = val + self.replaceVarByValue(word, child)
            return val
        
        if spec is not None:
            if spec["interpolation"] == "CONST":
                val = spec["values"][0]
            else:
                # make an average of each interpolate's values 
                val = mean(spec["values"])
        return val

    def replaceFcnByVar(self, operations, operation_category):
        """
        Replace all {} by [] while it's present in string of all operations 
        """
        expression_fcn = '\{[\(\) \.a-zA-Z0-9]+\}'
        expression_var = '\[[ \(\)a-zA-Z0-9\.]+\]'

        if operation_category is None or operations is None:
            raise Exception('ReplaceFcnByVar needs operation_category and operations')
        
        origin_wks = find(self.sheets_tree.root, lambda node: node.name.lower() == operation_category.lower())
        if origin_wks is not None:
            
            for operation in operations:
                matches = re.finditer(expression_fcn, operation["operation"])

                # Replace first all vars [] by value
                for m in re.finditer(expression_var, operation["operation"]):
                    operation["operation"] = operation["operation"].replace(m.group(0), str(self.replaceVarByValue(m.group(0), origin_wks)))    

                # Replace all fcn {} by value
                while matches is not None:
                    for match in matches:
                        according_op = None
                        wks = None

                        fcn_name = match.group(0).replace("{", "").replace("}", "").strip()
                        attr = fcn_name.split('.')

                        # if operation exist in list operation, add the value of it in it
                        if len(attr) == 1:
                            according_op = next((op for op in operations if op["operation_name"] == fcn_name), None)
                            if according_op is not None:
                                wks = find(self.sheets_tree.root, lambda node: node.name.lower() == operation_category.lower())
                            
                        # Check if fcn_name is a child or parent function
                        if len(attr) == 2:
                            according_op = self.findOperation(attr[0], attr[1])
                            if according_op is not None:
                                wks = find(self.sheets_tree.root, lambda node: node.name.lower() == attr[0].lower())                        
                            
                        if according_op is not None and wks is not None:                            
                            # Transform all {} in children by interpretable {}
                            for m in re.finditer(expression_fcn, according_op["operation"]):
                                rpl = m.group(0).replace("{", "").replace("}", "").strip()

                                if len(rpl.split(".")) == 1:
                                    according_op["operation"] = according_op["operation"].replace(m.group(0), "{"+attr[0]+"."+rpl+"}")

                            # Transform all [] by value to avoid legacy interpretation problems
                            for m in re.finditer(expression_var, according_op["operation"]):
                                according_op["operation"] = according_op["operation"].replace(m.group(0), str(self.replaceVarByValue(m.group(0), wks)))

                            operation["operation"] = operation["operation"].replace(match.group(0), "("+according_op["operation"]+")")
                            
                    if re.search(expression_fcn, operation["operation"]) is not None:
                        matches = re.finditer(expression_fcn, operation["operation"])
                    else:
                        matches = None
                
        return operations

    # Access functions
    def evaluateOperationValues(self):
        """
        Replace all [] Expression by their Values to be evaluate next
        """
        
        # Search all {} operations and replace by []
        for o_wks in self.sheets_tree.operation_sheets:
            for operation_category, operations in o_wks.data["operations"].items():
                wks = find(self.sheets_tree.root, lambda node: node.name.lower() == operation_category.lower())
                if wks is not None:
                    operations = self.replaceFcnByVar(operations, operation_category)
        
        # Add [] operations and eval all
        for o_wks in self.sheets_tree.operation_sheets:
            for operation_category, operations in o_wks.data["operations"].items():
                wks = find(self.sheets_tree.root, lambda node: node.name.lower() == operation_category.lower())
                if wks is not None:
                    #operation = self.replaceFcnByVar(operations)
                    for operation in operations:
                        for match in re.finditer("\[[ \(\)a-zA-Z0-9\.]+\]", operation["operation"]):
                            operation["operation"] = operation["operation"].replace(match.group(0), str(self.replaceVarByValue(match.group(0), wks)))
                        #operation["operation"] = eval(operation["operation"])
            return o_wks.data["operations"]

class SheetTree:
    def __init__(self, path) -> None:
        self.path = os.getcwd() + path
        self.root = Node("root")
        self.all_sheet = None
        self.operation_sheets = []

    def getAllFiles(self):
        return next(os.walk(self.path), (None, None, []))[2]  # [] if no file
    
    def getAllSheets(self):
        return {file: load_workbook(self.path + file).sheetnames for file in self.getAllFiles()}
    
    def analyzeAllSheet(self):
        result = {}
        all_sheet = self.getAllSheets()

        for file in all_sheet:
            result[file] = []
            for sheet_name in all_sheet[file]:
                analyzer = SheetAnalyzer(self.path+file, sheet_name)
                if analyzer.getRawDataStorage() is not None:
                    result[file].append({sheet_name: analyzer})
        return result

    def mapSheetToTree(self):
        liste = []
        self.all_sheet = self.analyzeAllSheet()
        # Create all nodes
        for file in self.all_sheet:
            for sheet in self.all_sheet[file]:
                for sheet_name, analyzer in sheet.items():
                    if analyzer.isOperationSheet():
                        self.operation_sheets.append(analyzer)
                        continue

                    if analyzer.data["metadatas"] == {}:
                        continue

                    parent_name = analyzer.data["metadatas"][analyzer.PRODUCT_PARENT] if (analyzer.PRODUCT_PARENT in analyzer.data["metadatas"]) else None
                                                
                    liste.append(
                        (parent_name,
                        analyzer.data["metadatas"][analyzer.PRODUCT_NAME],
                        Node(analyzer.data["metadatas"][analyzer.PRODUCT_NAME], analyzer=analyzer))
                    )
        
        # Add parent for all nodes
        for element in liste:
            if element[0] is None:
                element[2].parent = self.root
            else:
                i = [i for i, v in enumerate(liste) if v[1] == element[0]]
                if i != []:
                    element[2].parent = liste[i[0]][2]

In [4]:
tree = SheetTree("/input/")
tree.mapSheetToTree()

In [5]:
tree_copy = tree

In [6]:
interpret = SheetInterpreter(tree_copy)
interpret.evaluateOperationValues()

{'Cell': [{'operation_name': 'Volume',
   'operation': '205*72*174/1000000',
   'unit': 'dm3'},
  {'operation_name': 'Weight',
   'operation': '0.7833333333333333*(205*72*174/1000000)',
   'unit': 'cm3'},
  {'operation_name': 'Nominal Capacity',
   'operation': '(0.7833333333333333*(205*72*174/1000000)) * 65',
   'unit': 'Wh'},
  {'operation_name': 'Energy density (volumetric)',
   'operation': '0.7833333333333333*65',
   'unit': 'Wh/dm3'}],
 'BatteryPack': [{'operation_name': 'Width',
   'operation': '12*4*92+92',
   'unit': 'mm'},
  {'operation_name': 'Volume',
   'operation': '194*225*(12*4*92+92)/1000000',
   'unit': 'mm'},
  {'operation_name': 'Weight',
   'operation': '(10+1)*12*(0.7833333333333333*(205*72*174/1000000))',
   'unit': 'kg'},
  {'operation_name': 'Nominal Capacity',
   'operation': '12*((0.7833333333333333*(205*72*174/1000000)) * 65)',
   'unit': 'Wh'},
  {'operation_name': 'Nominal Power',
   'operation': '(12*((0.7833333333333333*(205*72*174/1000000)) * 65))/896',