## Generate New Files

* Given a template and a definition of the variables, generate a set of new files

In [46]:
from pathlib import Path
import re
from pyDOE import lhs
import numpy as np
import pandas as pd

from scipy import stats

class TemplateProcessor:
    def __init__(self, template_path, verbose=False, output_file_path=None, variables_table_path=None, all_uniform=False):
        self.template_path = Path(template_path)
        if not self.template_path.exists():
            raise FileNotFoundError(f"Template file '{self.template_path}' not found.")
        
        self._verbose = verbose
        self._all_uniform = all_uniform

        self._valid_distributions = {
            'uniform':      {'parameters':2, 'description':('minimum','maximum'), 'types':['int','float']},
            'normal':       {'parameters':2, 'description':('mean','std_var'), 'types':['float']},
            'truncnormal':  {'parameters':4, 'description':('mean','std_var','minimum','maximum'), 'types':['float']},
            'lognormal':    {'parameters':2, 'description':('mean','std_var'), 'types':['float']},
            'triangular':   {'parameters':3, 'description':('minimum','maximum','most_likelly'), 'types':['float']},
            'constant':     {'parameters':1, 'description':('value'), 'types':['int','float','str']},
            'categorical':  {'parameters':2, 'description':('values_list','probabilities_list'), 'types':['int','float','str']},
            'table':        {'parameters':0, 'description':('no parameters'), 'types':['str']}
        }
        # std_dev from mean to define limits of normal distribution variables when using option all_uniform=True
        self._normal_limits_as_uniform = 2.

        self.variables_raw = self._extract_raw_text()
        self.variables = self._parse_variables()

        if variables_table_path is not None:
            self.set_variables_table(variables_table_path)

        self.experiments_table = None
        self._current_distribution = None
        if output_file_path is not None:
            self.set_output_file(output_file_path)
            self.generate_experiments()
            self.create_new_files()

    def __str__(self):
        return str(self.variables)

    def list_valid_distributions(self):
        for k,v in self._valid_distributions.items():
            print(f"'{k}'. parameters: {', '.join(v['description'])}, valid type(s): {', '.join(v['types'])}")

    def _extract_contents(self, text, open=r'\(', close=r'\)'):
        pattern = f'{open}(.*?){close}'
        matches = re.findall(pattern, text)
        return matches
    
    def _custom_split(self, text, sep=',', open='{', close='}'):
        result = []
        current_token = ''
        paren_count = 0

        for char in text:
            if char == sep and paren_count == 0:
                result.append(current_token.strip())
                current_token = ''
            elif char == open:
                paren_count += 1
                current_token += char
            elif char == close:
                paren_count -= 1
                current_token += char
            else:
                current_token += char

        result.append(current_token.strip())
        return result
    
    def _extract_raw_text(self):
        variables_raw = []
        with open(self.template_path, 'r') as file:
            for line_num, line in enumerate(file, start=1):              
                if line.count(r'<\var>') != line.count('<var>'):
                    raise ValueError(f"Unclosed <\\var> <var> at line {line_num}.")
                parts = self._extract_contents(text=line, open=r'<\\var>', close=r'<var>')
                if len(parts) > 0:
                    for part in parts:
                        var = part.strip()
                        if var == '':
                            raise ValueError(f"Empty variable name at line {line_num}.")
                        variables_raw.append(var)
        return variables_raw

    def _transform_variable(self, variable, variable_type):
        if isinstance(variable, list):
            var = list()
            for i in range(len(variable)):
                var.append(self._transform_variable(variable=variable[i], variable_type=variable_type))
                if var[-1] is None:
                    return None
            return var
        else:                
            try:
                var_type = eval(variable_type)
                return var_type(variable)
            except (ValueError, TypeError, NameError):
                return None

    def _parse_distribution(self, text, var_type):
        if text is None:
            if self._verbose:
                print("  No distribution provided. Will assume 'table'.")
            return 'table', list()

        parameters = self._custom_split(text)
        distribution = parameters[0].lower()
        if distribution not in self._valid_distributions:
            raise ValueError(f"Invalid distribution: '{distribution}'.")
        parameters = parameters[1:]
        if len(parameters) != self._valid_distributions[distribution]['parameters']:
            raise ValueError(f"Invalid number of parameters for distribution '{distribution}'. Expected {self._valid_distributions[distribution]['parameters']}, found {len(parameters)}.")

        if var_type is not None:
            if var_type not in self._valid_distributions[distribution]['types']:
                raise ValueError(f"Invalid type ({var_type}) for distribution '{distribution}'. Valid option(s): {', '.join(self._valid_distributions[distribution]['types'])}.")
            
            if distribution == 'categorical':
                param_list = parameters[0].lstrip('{').rstrip('}').split(',')
            else:
                param_list = parameters

            for i in range(len(param_list)):
                new_value = self._transform_variable(variable=param_list[i], variable_type=var_type)
                if new_value is None:
                    raise ValueError(f"Parameters for distribution '{distribution}' must be of type {var_type}. Cannot transform '{param_list[i]}'.")
                param_list[i] = new_value
    
            if distribution == 'categorical':
                parameters[0] = param_list
            else:
                parameters = param_list

        if distribution == 'categorical':
            param_list = parameters[1].lstrip('{').rstrip('}').split(',')
            if len(parameters[0]) != len(param_list):
                raise ValueError(f"Inconsistent number of values in distribution '{distribution}'. Values found: {len(parameters[0])}, associated probabilities: {len(param_list)}.")
            for i in range(len(param_list)):
                new_value = self._transform_variable(variable=param_list[i], variable_type='float')
                if new_value is None:
                    raise ValueError(f"Probabilities for distribution '{distribution}' must be of type float. Cannot transform '{param_list[i]}'.")
                param_list[i] = new_value
            s = sum(param_list)
            cum_list = [sum(param_list[:i+1])/s for i in range(len(param_list))]
            parameters[1] = cum_list

        return distribution, parameters
    
    def _check_variable_type(self, default, distribution, parameters):
        var_type = None
        for test_type in self._valid_distributions[distribution]['types']:
            if var_type is None:
                var_type = test_type
    
                if default is not None:
                    default_mod = self._transform_variable(variable=default, variable_type=test_type)
                    if default_mod is None:
                        var_type = None
                    else:
                        default = default_mod

                param_out = list()
                for param in parameters:
                    if var_type is not None:
                        param_out.append(self._transform_variable(variable=param, variable_type=var_type))
                        if param_out[-1] is None:
                            var_type = None
                
                if var_type is not None:
                    parameters = param_out

                    if self._verbose:
                        print(f"  No type provided. Will assume '{var_type}'.")
                
        return default, distribution, parameters, var_type

    def _parse_variable_options(self, text):
        default = None
        var_type = None

        distribution_text = self._extract_contents(text=text, open=r'\(', close=r'\)')
        if len(distribution_text) > 1:
            raise ValueError(f"Bad distribution options format in: '{text}'. Only one distribution with ( and ) can be defined.")

        options = self._custom_split(text=text, open='(', close=')')
        if len(distribution_text) == 1:
            if '(' not in options[-1] or ')' not in options[-1]:
                raise ValueError(f"Distribution options with ( and ) must be the last information in: '{options}'.")
            options = options[:-1]
            distribution_text = distribution_text[0].strip()
        else:
            distribution_text = None

        if len(options) > 2:
            raise ValueError(f"Bad options format in: '{options}'. Too many options.")
        elif len(options) > 0:
            if len(options) > 1 or options[0] != '':
                default = options[-1].strip()
                if len(options) == 2:
                    var_type = options[0].strip()
                    default = self._transform_variable(variable=default, variable_type=var_type)
                    if default is None:
                        raise ValueError(f"Default value ({options[-1]}) must be of the defined type ({var_type}).")

        distribution, parameters = self._parse_distribution(distribution_text, var_type)

        if var_type is None:
            default, distribution, parameters, var_type = self._check_variable_type(default, distribution, parameters)
        if var_type is None:
            raise ValueError(f"Couldn't find the variable type based on provided data: '{text}'. Possible type(s) for {distribution} are: {', '.join(self._valid_distributions[distribution]['types'])}.")

        return {'active': True, 'distribution': distribution, 'parameters': parameters, 'default': default, 'type': var_type}

    def _parse_key(self, text):
        key = text.split('[')[0].strip()
        if len(key) == 0:
            raise ValueError(f"Undefined variable name in: '{text}'.")
        return key

    def _parse_options(self, text):
        options = self._extract_contents(text=text, open=r'\[', close=r'\]')
        if len(options) > 1:
            raise ValueError(f"Bad options format in: '{text}'. Only one list with [ and ] can be defined.")

        if len(options) == 1:
            options_dict = self._parse_variable_options(options[0])
        else:
            options_dict = self._parse_variable_options('')

        return options_dict

    def _parse_variables(self):
        # General pattern: Variable[type, default, (distribution, par1, par2)]
        variables = {}
        repetition = list()
        for text in self.variables_raw:
            if self._verbose:
                print(f'Command found: {text}')
            key = self._parse_key(text)
            if key in variables:
                if self._verbose and key not in repetition:
                    print(f'  key {key} found more than once. Ignoring options in repetitions.')
                    repetition.append(key)
            else:
                options = self._parse_options(text)
                variables[key] = options
                if self._verbose:
                    print(f'  key: {key}')
                    print(f'  options: {options}')
        return variables

    def set_output_file(self, output_file_path):
        if output_file_path is None:
            self.output_file_path = None
        else:
            try:
                self.output_file_path = Path(output_file_path)
            except (ValueError, TypeError, NameError):
                self.output_file_path = None
                raise ValueError(f"Invalid file path: {str(output_file_path)}")

    def set_variables_table(self, variables_table_path):
        if not Path(variables_table_path).exists():
            print(f"CSV file '{variables_table_path}' not found.")
        else:
            try:
                df = pd.read_csv(variables_table_path, skipinitialspace=True)
                for key in df.columns:
                    if key not in self.variables:
                        print(f"Variable '{key}' not found in template. Will ignore data.")
                        continue
                    if self.variables[key]['distribution'] != 'table':
                        print(f"Variable '{key}' already has a distribution. Will ignore data.")
                        continue
                    self.variables[key]['values'] = list(df[key])
            except (ValueError, TypeError, NameError):
                print(f'Error reading variables table file: {variables_table_path}')

    def _check_generate_experiments(self, n_samples=0):
        tables_n_values = list()
        for data in self.variables.values():
            if data.get('distribution', False) == 'table':
                tables_n_values.append(len(data.get('values', [])))

        if len(tables_n_values) > 0:
            if not min(tables_n_values) == max(tables_n_values):
                print(f"Number of entries in 'table' variables is inconsistent, ranging from {min(tables_n_values)} to {max(tables_n_values)}. Cannot continue.")
                return None         
            tables_n_values = tables_n_values[0]
            if tables_n_values == 0:
                print("No values found for 'table' variables. Cannot continue.")
                return None
        else:
            tables_n_values = n_samples

        if n_samples < 1 and tables_n_values > 0:
            n_samples = tables_n_values
        elif n_samples < 1:
            print(f"Requested sample size ({n_samples}) is smaller than minimum (1). Cannot continue.")
            return None
        elif tables_n_values != n_samples:
            print(f"Number of values in 'table' variables ({tables_n_values}) is different from the requested sample size ({n_samples}). Cannot continue.")
            return None
        
        return n_samples

    def _InvCDF(self, probability, data, all_uniform):
        if min(probability) < 0:
            raise ValueError("Probability lower than zero!")
        if max(probability) > 1:
            raise ValueError("Probability larger than one!")

        distribution = data['distribution']
        parameters = data['parameters']
        
        if distribution == 'constant':
            return parameters[0]
        
        elif distribution == 'categorical':
            if all_uniform:
                n = len(parameters[0])
                p_cum_list = [(i+1) / n for i in range(n)]
            else:
                p_cum_list = parameters[1]

            def cat_prob(x):
                for i, p in enumerate(p_cum_list):
                    if p >= x:
                        return parameters[0][i]
                raise ValueError("Error in categorical probabilities!")
            vec_cat_prob = np.vectorize(cat_prob)
            return vec_cat_prob(probability)

        elif distribution == 'uniform' or (distribution == 'triangular' and all_uniform):
            a = parameters[0]
            b = parameters[1]
            if isinstance(a, int) and isinstance(b, int):
                n_int = b - a + 1
                return a + np.int32(np.floor(probability * n_int))
            else:
                return a + probability * (b - a)
            
        elif distribution == 'triangular':
            # a = parameters[0]
            # b = parameters[1]
            # c = parameters[2]

            loc = parameters[0]
            scale = parameters[1] - parameters[0]
            c = (parameters[2] - parameters[0]) / scale

            return stats.triang.ppf(probability, c, loc=loc, scale=scale)

            # def tri_prob(x):
            #     if x <= (c - a) / (b - a):
            #         return a + (b - a) * (x * (b - a) * (c - a))**0.5
            #     else:
            #         return b - (b - a) * ((1 - x) * (b - a) * (b - c))**0.5
            # vec_tri_prob = np.vectorize(tri_prob)
            # return vec_tri_prob(probability)
            
        elif distribution == 'normal':
            mean = parameters[0]
            std_dev = parameters[1]
            if all_uniform:
                a = mean - std_dev * self._normal_limits_as_uniform
                b = mean + std_dev * self._normal_limits_as_uniform
                return a + probability * (b - a)
            else:
                return stats.norm.ppf(probability, loc=mean, scale=std_dev)
        
        elif distribution == 'truncnormal':
            mean = parameters[0]
            std_dev = parameters[1]
            a = parameters[2]
            b = parameters[3]
            if all_uniform:
                return a + probability * (b - a)
            else:
                return stats.truncnorm.ppf(probability, (a - mean) / std_dev, (b - mean) / std_dev, loc=mean, scale=std_dev)
        
        elif distribution == 'lognormal':
            mean = parameters[0]
            std_dev = parameters[1]
            if all_uniform:
                a = np.log(mean) - std_dev * self._normal_limits_as_uniform
                b = np.log(mean) + std_dev * self._normal_limits_as_uniform
                return np.exp(a + probability * (b - a))
            else:
                return stats.lognorm.ppf(probability, s=std_dev, scale=mean)
        
        raise ValueError(f"Unknown distribution: {distribution}.")
        
    def set_variable_active(self, variable, active=True):
        if variable not in self.variables:
            raise ValueError(f"Unknown variable: {variable}.")
        if not active and self.variables[variable]['default'] is None:
            raise ValueError(f"Cannot deactivate a variable without a default value: {variable}.")
        self.variables[variable]['default'] = active

    def generate_experiments(self, n_samples=0, all_uniform=None):
        if self._verbose:
            print('Cheking parameters')
        n_samples = self._check_generate_experiments(n_samples=n_samples)
        if n_samples < 1:
            self.experiment_table = None
            return
        
        if all_uniform is None:
            all_uniform = self._all_uniform

        if self._verbose:
            print('Building experiments')
        samples = lhs(len(self.variables), samples=n_samples, criterion='maximin', iterations=5)

        if self._verbose:
            print('Calculating inverse CDF')
        df = pd.DataFrame()
        for column_index, var in enumerate(self.variables):
            data = self.variables[var]
            if self._verbose:
                print(f'   {var}: {data['distribution']}')
            if not data['active']:
                df[var] = [data['default']] * n_samples
            elif data['distribution'] == 'table':
                df[var] = data['values']
            else:
                df[var] = self._InvCDF(samples[:, column_index], data, all_uniform)
        self.experiments_table = df
        
    def create_new_files(self, output_file_path=None):
        if output_file_path is not None:
            self.set_output_file(output_file_path)
        if self.output_file_path is None:
            print('Output file not defined. Cannot continue.')
            return
        if self.experiment_table is None:
            self.generate_experiments()
            if self.experiment_table is None:
                print('Experiments table could not be created. Cannot continue.')
                return
              

        pass

template = TemplateProcessor(template_path=r'..\template\no_errors_distributions.dat',
                             verbose=True)
template.generate_experiments(1000)
print(template.experiments_table)

Command found: var_uniform_int[int,1,(uniform,1,10)]
  key: var_uniform_int
  options: {'active': True, 'distribution': 'uniform', 'parameters': [1, 10], 'default': 1, 'type': 'int'}
Command found: var_uniform[float,1,(uniform,0,1)]
  key: var_uniform
  options: {'active': True, 'distribution': 'uniform', 'parameters': [0.0, 1.0], 'default': 1.0, 'type': 'float'}
Command found: var_normal[float,1,(normal,0,1)]
  key: var_normal
  options: {'active': True, 'distribution': 'normal', 'parameters': [0.0, 1.0], 'default': 1.0, 'type': 'float'}
Command found: var_truncnormal[float,1,(truncnormal,0,1,-1, 1)]
  key: var_truncnormal
  options: {'active': True, 'distribution': 'truncnormal', 'parameters': [0.0, 1.0, -1.0, 1.0], 'default': 1.0, 'type': 'float'}
Command found: var_lognormal[float,1,(lognormal,1,1)]
  key: var_lognormal
  options: {'active': True, 'distribution': 'lognormal', 'parameters': [1.0, 1.0], 'default': 1.0, 'type': 'float'}
Command found: var_triangular[float,1,(triangula

In [47]:
template.experiments_table.to_csv('test.csv')

In [26]:
template = TemplateProcessor(template_path=r'..\template\no_errors_table_variables.dat',
                             variables_table_path=r'..\template\no_errors_table_variables.csv',
                             verbose=False)
print(template)

{'var1': {'active': True, 'distribution': 'table', 'parameters': [], 'default': '17', 'type': 'str', 'values': ['0', '1', 'a']}, 'var2': {'active': True, 'distribution': 'table', 'parameters': [], 'default': None, 'type': 'str', 'values': ['1', '1', 'b']}, 'var3': {'active': True, 'distribution': 'table', 'parameters': [], 'default': None, 'type': 'str', 'values': ['2', '1', 'c']}, 'var4': {'active': True, 'distribution': 'table', 'parameters': [], 'default': None, 'type': 'str', 'values': ['3', '1', 'd']}}


## No Errors Test

In [17]:
template = TemplateProcessor(template_path=r'..\template\no_errors.dat',
                             verbose=True)

Command found: No_Default
  No distribution provided. Will assume 'table'.
  No type provided. Will assume 'str'.
  key: No_Default
  options: {'active': True, 'distribution': 'table', 'parameters': [], 'default': None, 'type': 'str'}
Command found: With_Default_12[12]
  No distribution provided. Will assume 'table'.
  No type provided. Will assume 'str'.
  key: With_Default_12
  options: {'active': True, 'distribution': 'table', 'parameters': [], 'default': '12', 'type': 'str'}
Command found: With_Type_int[int,42]
  No distribution provided. Will assume 'table'.
  key: With_Type_int
  options: {'active': True, 'distribution': 'table', 'parameters': [], 'default': 42, 'type': 'int'}
Command found: normal_variable[float,0.5,(normal,0, 2.5)]
  key: normal_variable
  options: {'active': True, 'distribution': 'normal', 'parameters': [0.0, 2.5], 'default': 0.5, 'type': 'float'}
Command found: normal_variable_no_type[0.5,(normal,0, 2.5)]
  No type provided. Will assume 'float'.
  key: normal

## Error Catching Test

In [48]:
import tempfile

def process_temporary_file(text):
    try:
        with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
            temp_file.write(text)
            temp_file_path = Path(temp_file.name)
        
        template = TemplateProcessor(template_path=temp_file_path, verbose=False)
        return template

    finally:
        temp_file_path.unlink()

def test_function(func):
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            return result
        except Exception as e:
            return f"Error: {e}"
    return wrapper

def test_template_error(text):
    template = process_temporary_file(text)
    return template

@test_function
def test_parse(text):
    return test_template_error(text)

In [49]:
error_list = {
    'no error': r'<\var>invalid_var[1.5, (normal,0, 2.5)]<var>',
    'bogus text 1': r'<\var>invalid_var[1.5, (normal,0,2.)ABC]<var>',
    'bogus text 2': r'<\var>invalid_var[1.5, (normal,0,2.), ABC]<var>',
    'bogus text 3': r'<\var>invalid_var[1.5, (normal,0,2.)]ABC<var>',
    'bogus text 4': r'<\var>invalid_var[1.5, (normal,0,2.)], ABC<var>',
    'too few parameters': r'<\var>invalid_var[(normal,0)]<var>',
    'too many parameters': r'<\var>invalid_var[(normal,0, 2.5, 7)]<var>',
    'invalid distribution': r'<\var>invalid_var[(nomal,0, 2.5)]<var>',
    'missing comma 1': r'<\var>invalid_var[float 1.5, (normal,0, 2.5)]<var>',
    'missing comma 2': r'<\var>invalid_var[float, 1.5 (normal,0, 2.5)]<var>',
    'missing comma 3': r'<\var>invalid_var[float, 1.5, (normal 0, 2.5)]<var>',
    'missing comma 4': r'<\var>invalid_var[float, 1.5, (normal,0 2.5)]<var>',
    'missing comma 5': r'<\var>invalid_var,float[(normal,0, 2.5)]<var>',
    'missing comma 6': r'<\var>invalid_var, 1.5 [(normal,0, 2.5)]<var>',
    'type inconsistency': r'<\var>invalid_var[str, 1.5, (normal,0, 2.5)]<var>',
    'unclosed var': r'<\var>invalid_var[1.5, (normal,0, 2.5)]var>',
}

for k,v in error_list.items():
    print(f'{k}: {v}')
    print('    '+str(test_parse(v)))
    print()

no error: <\var>invalid_var[1.5, (normal,0, 2.5)]<var>
    {'invalid_var': {'active': True, 'distribution': 'normal', 'parameters': [0.0, 2.5], 'default': 1.5, 'type': 'float'}}

bogus text 1: <\var>invalid_var[1.5, (normal,0,2.)ABC]<var>
    {'invalid_var': {'active': True, 'distribution': 'normal', 'parameters': [0.0, 2.0], 'default': 1.5, 'type': 'float'}}

bogus text 2: <\var>invalid_var[1.5, (normal,0,2.), ABC]<var>
    Error: Distribution options with ( and ) must be the last information in: '['1.5', '(normal,0,2.)', 'ABC']'.

bogus text 3: <\var>invalid_var[1.5, (normal,0,2.)]ABC<var>
    {'invalid_var': {'active': True, 'distribution': 'normal', 'parameters': [0.0, 2.0], 'default': 1.5, 'type': 'float'}}

bogus text 4: <\var>invalid_var[1.5, (normal,0,2.)], ABC<var>
    {'invalid_var': {'active': True, 'distribution': 'normal', 'parameters': [0.0, 2.0], 'default': 1.5, 'type': 'float'}}

too few parameters: <\var>invalid_var[(normal,0)]<var>
    Error: Invalid number of paramet