In [1]:
from dataclasses import dataclass, field
from Features import Feature_Info
from Data import IncomingData, Missing_info
from Bounds import Bounds
from Dual_Annealing_Optimization import Dual_Annealing_Optimization

# from Postprocessing import Avoid_small_changes
import numpy as np
import pandas as pd
from typing import List, Protocol
import os.path as osp
from shutil import move

from utils import read_json, read_pickle, write_json
from data_mapper import MapperHandler, read_input_folder, put_data
import re
import datetime
import logging

from validators import NOConnectionError, connection_check
from similarity import search_dates

from Checker import Np_KPI_checker, Ng_KPI_checker, Original_product_checker, Optimized_product_checker
from reporter import *

In [3]:
formatstr = '%(asctime)s: %(levelname)s: %(funcName)s Line: %(lineno)d %(message)s'
datestr = '%m/%d/%Y %H:%M:%S'
logging.basicConfig(
    level=logging.INFO, 
    format=formatstr, 
    datefmt=datestr, 
    handlers=[
        logging.FileHandler('data_download.log'),
        logging.StreamHandler()
        ]
    )

# set up second logger to track missing variables
handler = logging.FileHandler('missing.log')
handler.setFormatter(logging.Formatter(fmt=formatstr, datefmt=datestr))

missing_logger = logging.getLogger("missing_logger")
missing_logger.setLevel(logging.WARNING)
missing_logger.addHandler(handler)

In [4]:
# read configs any other files for setup
config = read_json("../Model/info.json")
reporter_info = read_json('../Model/reporter_info.json')
final_model = read_pickle('../Model/xgb_pacol2_sasol_main_product_30mins.pkl')
optimizer_info = read_json('../Model/p2_optimizer.json')
# previous_lims_ids = read_json('../Model/previous_lims.json')
# previous_lims_ids = {int(k):v for k,v in previous_lims_ids.items()}

# NOTE: load additional configs for the similarity check
short_to_long_map = read_json('../Model/short_to_long_name_map.json')


# combine the all object ids into a single variable
object_ids = [*config['controllable'], *config['noncontrollable'], *config['additional_ids'], *config['lims_ids']]

# separator to use ...FIXME: figure out where to put this elsewhere
sep = '___' 

  from pandas import MultiIndex, Int64Index


In [5]:
# initialize reporters
# Need to be improved
live_connection_checker = LiveConnectionChecker(*reporter_info['LiveConnection'])
missing_tag_checker = MissingDataChecker(*reporter_info['MissingTagCount'])
new_directive_checker = NewDirectiveChecker(*reporter_info['NewDirective'])
predicted_kpi_checker = PredictedKPIChecker(*reporter_info['PredictedKPI'])
optimized_kpi_checker = OptimizedKPIChecker(*reporter_info['OptimizedKPI'])
np_KPI_checker = NPSpecificConsChecker(*reporter_info['NPSpecificCons'])
ng_KPI_checker = NGSpecificConsChecker(*reporter_info['NGSpecificCons'])

In [6]:
mh = MapperHandler('Datapoint Name', 'Datapoint Id', 'Property Name', 'Property Id')
mapper, sub, name_to_id, id_to_name, final_object_ids, final_property_ids = mh.read_filter_create_extract(
    mapper_path=config['mapper_path'], object_ids=object_ids, property_ids=config['property_ids'], 
    *config['read_params']['args'], **config['read_params']['kwargs'])
reader = read_input_folder('../Input-30min/', '.csv', object_ids=final_object_ids, property_ids=final_property_ids, id_to_name=id_to_name, sep=sep)

In [7]:
# preallocate lists to hold incoming files and processed data ...
files = []
new_data = []
no_connection_files = []

for incoming_file, processed_data in reader:

    resample_data = processed_data.resample('60T', closed='right').mean()
    time = re.findall('\d{4,}_\d{2,}_\d{2,}_\d{2,}_\d{2,}', incoming_file)
    date_time = pd.to_datetime(time, format = "%Y_%m_%d_%H_%M")
    resample_data.index = date_time

    try:
        connection_check(resample_data)
    except NOConnectionError as e:
        print(e)
        # NOTE: add functionality to save file out if the data is missing and set the live connection boolean to false
        logging.info(f"Moving file: {incoming_file} to done folder and setting the live connection boolean to false")
        move(osp.join(args.input_dir, incoming_file), osp.join('../Error', incoming_file))
        # NOTE: if connection fails then update the checker
        epoch = resample_data.index[0].timestamp()
        live_connection_checker.add_value(epoch, 0)
        no_connection_files.append(resample_data)     

    else:
        files.append(incoming_file)
        new_data.append(resample_data)
        epoch = resample_data.index[0].timestamp()
        # NOTE: this is technically redundant since the default is 1 ...
        live_connection_checker.add_value(epoch, 1)

new_data = pd.concat(list(new_data))

Moved failed files: True
starting file: SasolDataExportedAt2023_04_05_18_00.csv


In [8]:
previous_data_path = '../Model/previous_tag_vals.csv'
previous_data = pd.read_csv(previous_data_path, parse_dates=['Date'], index_col='Date')

features = Feature_Info(optimizer_info)
incoming_data = IncomingData(new_data, features)

missing_info = Missing_info(incoming_data)

# Do missing operation here.
for i in range(incoming_data.value.shape[0]):
    timestamp = incoming_data.value.index[i]
    missing_info.missing_log(timestamp, missing_tag_checker)

nomissing_data = missing_info.missing_filling(previous_data)

# Uncomment this before go live
# processed_data.to_csv(previous_vals_path, index_label='Date')

logging.info('Generating Bounds')
bounds = Bounds(nomissing_data).final_bounds()

logging.info('Running model and optimization')

04/06/2023 09:54:19: INFO: <cell line: 19> Line: 19 Generating Bounds
04/06/2023 09:54:19: INFO: <cell line: 22> Line: 22 Running model and optimization


In [9]:
class OptimizationLowDifferenceError(Exception):
    '''
    class to return low difference error.
    
    Parameters
    ----------
    percent_different: float, % change from the original value
    min_changing_rate: float, % minimum change required by IES
    tag: corresponding tag name
    
    '''
    def __init__(self, percent_difference, min_changing_rate, feature_name) -> None:
        self.percent_difference = percent_difference
        self.min_changing_rate = min_changing_rate
        self.feature_name = feature_name

    def __str__(self):
        return f"Optimization values for {self.feature_name} within {self.min_changing_rate:.2f}% of original values. Optimization {self.percent_difference:.2f}% different."


@dataclass(frozen=True)
class Post_process:
    optimal_controls_vals: np.ndarray
    nomissing_data: IncomingData

    def calculate_percentage_changes(self, timestamp: datetime.datetime) -> np.ndarray:
        """
        method to calculate percentage of change between original vals and optimized vals

        Returns
        -------
        percent_different : 
            Percent change between optimal and original
        """
        original_controls_vals = self.nomissing_data.get_control_vals(timestamp)
        percent_difference = np.abs((self.optimal_controls_vals-original_controls_vals)/original_controls_vals+0.001)

        return percent_difference.values.flatten()
    
    def track_different_controls(self, timestamp: datetime.datetime) -> List[str]:
        """
        method to return differing controllable tags, difference being defined by threshold

        Record the small difference features into the logging

        Returns
        -------
        different_tags : 
            Different tags, similar being defined by the threshold 
        """
        logger = logging.getLogger(__name__)
        
        different_tags = []
        min_rate_change_threshold = np.array([item['min_rate'] for item in self.nomissing_data.features.controllable.values()])
        percentage_differences = self.calculate_percentage_changes(timestamp)
        
        for controllable_index in range(percentage_differences.shape[0]):
            try:
                if percentage_differences[controllable_index] > min_rate_change_threshold[controllable_index]:
                    different_tags.append(list(self.nomissing_data.features.controllable.keys())[controllable_index])
                else:
                    raise OptimizationLowDifferenceError(percentage_differences[controllable_index]*100, min_rate_change_threshold[controllable_index]*100, list(self.nomissing_data.features.controllable.keys())[controllable_index])
            except OptimizationLowDifferenceError as e:
                logger.warning(str(e))

        return different_tags
    
    def domain_fix(self, ) -> List[str]:
        '''
        method to incorporate domain requirement, 
        
        remove a few directives based on some additional kpis.
        
        return: a condensed different_tags list
        '''
        
        

In [10]:
class Domain_requirements_failure(Exception):
    '''
    class to return Domain_requirements_failure.
    
    Parameters
    ----------
    percent_different: float, % change from the original value
    min_changing_rate: float, % minimum change required by IES
    tag: corresponding tag name
    
    '''
    def __init__(self, actual_KPI, required_KPI,requirement_number) -> None:
        self.actual_KPI = actual_KPI
        self.required_KPI = required_KPI
        self.requirement_number = requirement_number

    def __str__(self):
        return f'Domain requirement_{self.requirement_number} need value {self.required_KPI}, the actual is {self.actual_KPI}.'

def domain_requirement_I(data: pd.DataFrame, timestamp: datetime.datetime, different_tags: list):
    logger = logging.getLogger(__name__)
    try:
        if ((actual_KPI:= data.at[timestamp, 'T8.PACOL2.6FIC-1402_A Augusta___Value'])<3) & ('T8.PACOL2.6FIC-1402_A Augusta' in different_tags):
            different_tags.remove('T8.PACOL2.6FIC-1402_A Augusta')
            raise Domain_requirements_failure(actual_KPI,3,'I')
    except Domain_requirements_failure as e:
        logger.warning(str(e))
    return different_tags

def domain_requirement_II(data: pd.DataFrame, timestamp: datetime.datetime, different_tags: list):
    logger = logging.getLogger(__name__)
    try:
        if ((actual_KPI:= data.at[timestamp, 'T8.PACOL2.6FIC-1402 Augusta___Value'])<3) & ('T8.PACOL2.6FIC-1402 Augusta' in different_tags):
            different_tags.remove('T8.PACOL2.6FIC-1402 Augusta')
            raise Domain_requirements_failure(actual_KPI,3,'II')
    except Domain_requirements_failure as e:
        logger.warning(str(e))
    return different_tags

# def domain_requirement_III(data: pd.DataFrame, timestamp: datetime.datetime, different_tags: list):
#     logger = logging.getLogger(__name__)
#     try:
#         if ((actual_KPI:= (np.abs(new_data.at[timestamp, 'T8.PACOL2.6TI-400_4 Augusta___Value'] - new_data.at[timestamp, 'T8.PACOL2.6TI-400_6 Augusta___Value'])>0)) & ('T8.PACOL2.6TIC-403 Augusta' in different_tags):
#             different_tags.remove('T8.PACOL2.6TIC-403 Augusta')
#             raise Domain_requirements_failure(actual_KPI,3,'III')
#     except Domain_requirements_failure as e:
#         logger.warning(str(e))
#     return different_tags

In [11]:
time_dif_controls_dict = {}

Optimization = Dual_Annealing_Optimization(nomissing_data, final_model)

result = [None] * nomissing_data.value.shape[0]
for i in range(nomissing_data.value.shape[0]):
    timestamp = nomissing_data.value.index[i]
    bound = bounds[i]
    
    optimal_controls_vals = Optimization.run_optimization(timestamp, bound)
    
    filter_small_changes = Post_process(optimal_controls_vals, nomissing_data)
    different_tags = filter_small_changes.track_different_controls()
    
    result[i] = optimal_controls_vals
    # Add required KPI values

    # Optional: add domain requirements and operations
    

In [12]:
checkers = [Np_KPI_checker(nomissing_data.value, timestamp, np_KPI_checker), 
            Ng_KPI_checker(nomissing_data.value, timestamp, ng_KPI_checker),
            Original_product_checker(nomissing_data.value, timestamp, predicted_kpi_checker, Optimization),
            Optimized_product_checker(nomissing_data.value, timestamp, predicted_kpi_checker, Optimization, optimal_controls_vals)
           ]

for checker in checkers:
    print(checker.get_checker_value())

0.9168371247665071
465.49976630955126
8.358605
8.420508


In [None]:
# now final results need to be written out:
# the final variable should be a dataframe with the timestamp as the index and the objectName{sep}PropertyName as the columns
# the contents of the files saved will only be the directives --- not any kpi values or byproduct values ...
controllable_vars = list(optimizer_info['controllable'].keys())
controllable_vars = [control + sep + "Value" for control in controllable_vars]

final: pd.DataFrame = pd.DataFrame(result, columns=controllable_vars, index=new_data.index) # this should be the end result of running the optimization

historical_data = pd.read_csv('../Model/final_historical_data.csv', parse_dates=['Date']).set_index('Date')
historical_data = historical_data.rename(columns=short_to_long_map)
historical_data.columns = [col + '___Value' for col in historical_data.columns.tolist()]

In [None]:
similar_datapoints = {}
for i, timestamp in enumerate(final.index):
    # TODO: swap out the controllable_vars with the more general tags_of_interest
    this_match  = search_dates(historical_data.loc[:, controllable_vars], final.loc[timestamp, controllable_vars])
    similar_datapoints[timestamp] = this_match

In [None]:
info_reporter = AdditionalInfoReporter([live_connection_checker, missing_tag_checker, new_directive_checker, np_specific_cons_checker, ng_specific_cons_checker, predicted_kpi_checker, optimized_kpi_checker])
out_prefix = 'data_pacol2_'
logging.info('Saving results')
for i, timestamp in enumerate(final.index):

    ## Note: Error lies in here!!!
    live_format, columns, file_name, this_epoch = put_data(final.loc[[timestamp], time_dif_tags_dict[timestamp]], args.output_dir, name_to_id, sep=sep, output_property_id=config['output_property_id'])
    # get all the additional info 
    additional_info = info_reporter.report(epoch=this_epoch)
    # append it to the ds info
    live_format.extend(additional_info)

    # Find validation point 
    matches = similar_datapoints[timestamp]
    if matches.shape[0] != 0 and new_directive_checker.value[timestamp.timestamp()] != 0:
        matches.rename(index={matches.index[0]: timestamp},inplace=True)
        val_live_format, _, _, _ = put_data(matches, args.output_dir, name_to_id_val, sep=sep, output_property_id=53)
        live_format.extend(val_live_format)
    
    # format to df and save
    out = pd.DataFrame(live_format, columns=columns)
    file_path = osp.join(args.output_dir, out_prefix+file_name)
    out.to_csv(file_path, sep=';', header=False, index=False)

In [None]:
# TODO: check args