In [39]:
from typing import Union
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import mean_absolute_error , mean_squared_error



class Extruder:
    def __init__(self, energy_ref: float , torque_ref: float, 
    screw_speed : float, solid_content : float, mass_flow : float ,
    scaler_path :str , energy_model_path :str , torque_model_path : str,
    screw_speed_gs:float = 50 , solid_content_gs:float = 2 ) -> None:
        self.energy_ref = energy_ref
        self.torque_ref = torque_ref
        self.screw_speed = screw_speed
        self.solid_content = solid_content
        self.mass_flow = mass_flow
        self.scaler = joblib.load(scaler_path)  
        self.energy_model = joblib.load(energy_model_path)
        self.torque_model = joblib.load(torque_model_path)

        self.screw_speed_gs = screw_speed_gs
        self.solid_content_gs = solid_content_gs
        self.energy = None
        self.torque = None
        self.error_mae = None
        self.error_mse = None
        self.delta_sc = None
        self.delta_sp = None
        self.best_mae = None
        self.best_mse = None

    def _search_for_best_params(self , sp_step:float , sc_step,torque_ref:float , enregy_ref,screw_speed:float , solid_content:float, mass_flow:float , mode :str):
        potentail_solid_content = np.arange(70,85 , sc_step)
        potentail_screw_speed = np.arange(800,1200 , sp_step)
        potentail_solid_content , potentail_screw_speed = np.meshgrid(potentail_solid_content ,potentail_screw_speed)
        potentail_solid_content = potentail_solid_content.flatten()
        potentail_screw_speed = potentail_screw_speed.flatten()

        
        mass_flows = np.ones(potentail_screw_speed.shape)*mass_flow

        torques ,energies =  self._predict(potentail_screw_speed ,potentail_solid_content ,mass_flows)

        # potential_features = np.array([mass_flows , potentail_screw_speed ,potentail_solid_content ]).transpose()
        # potential_features_scaled = self.scaler.transform(potential_features)

        # energies = self.energy_model.predict(potential_features_scaled)
        # torques = self.torque_model.predict(potential_features_scaled)
        zipped = np.array([energies , torques])
        if mode.lower().strip() == "mae":
            maes = np.apply_along_axis(lambda x: mean_absolute_error([x[0] ,x[1]] , [enregy_ref,torque_ref]) ,0, zipped)
            min_mae = np.min(maes)
            best_screw_speed = potentail_screw_speed[np.argmin(maes)]
            best_solid_content = potentail_solid_content[np.argmin(maes)]
            delta_screw_speed = best_screw_speed - screw_speed 
            delta_solid_content = best_solid_content - solid_content 
            return delta_screw_speed , delta_solid_content , min_mae
        elif mode.lower().strip() == "mse":
            mses = np.apply_along_axis(lambda x: mean_squared_error([x[0] ,x[1]] , [enregy_ref,torque_ref]) ,0, zipped)
            min_mse = np.min(mses)
            best_screw_speed = potentail_screw_speed[np.argmin(mses)]
            best_solid_content = potentail_solid_content[np.argmin(mses)]
            delta_screw_speed = best_screw_speed - screw_speed 
            delta_solid_content = best_solid_content - solid_content
            return delta_screw_speed , delta_solid_content , min_mse
        else:
            raise ValueError("invalid mode!")



    def _find_error(self,energy_ref:float , energy:float , torque_ref:float , torque:float):
        mse = mean_squared_error([energy_ref ,torque_ref ] , [energy ,torque ])
        mae = mean_absolute_error([energy_ref ,torque_ref ] , [energy ,torque ])
        return mse , mae

    def _predict(self,screw_speed : Union[list , float ] ,solid_content: Union[list , float ]  ,mass_flow: Union[list , float ]):
        # if (isinstance(screw_speed , int) or isinstance(screw_speed , float)) and (isinstance(solid_content , int) or isinstance(solid_content , float)) and (isinstance(mass_flow , int) or isinstance(mass_flow , float)):
        #     self._predict(float(screw_speed) , float(solid_content) , float(mass_flow))

        if isinstance(screw_speed , float) and isinstance(solid_content , float) and isinstance(mass_flow , float):
            return self._predict(np.array([screw_speed]) , np.array([solid_content]) , np.array([mass_flow]))
            

        elif isinstance(screw_speed , np.ndarray) and isinstance(solid_content , np.ndarray) and isinstance(mass_flow , np.ndarray):
            features = np.array([mass_flow , screw_speed , solid_content]).transpose()
            features_scaled = self.scaler.transform(features)
            energy = self.energy_model.predict(features_scaled)
            torque = self.torque_model.predict(features_scaled)
            # print(energy)
            # print("______________________")
            # print(torque)

            return  torque , energy

        raise TypeError("Not valid type for features")
            


    def start(self , mse_thresh : float =1000.0 , mae_thresh:float = 0 ):
        self.torque , self.energy = self._predict(float(self.screw_speed ), float(self.solid_content) , float(self.mass_flow))
        self.error_mse , self.error_mae = self._find_error(self.energy_ref , self.energy , self.torque_ref , self.torque)

        if mae_thresh:
            print("error mode: mae")
            if self.error_mae < mae_thresh:
                self.torque_pred , self.energy_pred = self.torque , self.energy
                return 0 , 0 ,self.error_mae 

            else:
                self.delta_sp , self.delta_sc , self.best_mae = self._search_for_best_params( self.screw_speed_gs , self.solid_content_gs ,
                self.torque_ref,self.energy_ref,self.screw_speed,self.solid_content,self.mass_flow, mode = "mae" )



                if self.best_mae > mae_thresh:
                    print(f"condition not satisfied!\nyou can decrease steps for sp and sc \nbest params for mae = {self.best_mae}\ndelta_screw_speed = {self.delta_sp} \ndelta_solid_content = {self.delta_sc}")

                else:
                    print(f"condition satisfied! \n best params for mse = {self.best_mae}\ndelta_screw_speed = {self.delta_sp} \ndelta_solid_content = {self.delta_sc}")

                self.torque_pred , self.energy_pred = self._predict(float(self.screw_speed + self.delta_sp) ,float(self.solid_content + self.delta_sc), float(self.mass_flow) )

                return self.delta_sp , self.delta_sc , self.best_mae 

        else:
            print("error mode: mse")
            if self.error_mse < mse_thresh:
                self.torque_pred , self.energy_pred = self.torque , self.energy
                return 0 , 0 ,self.error_mse

            else:
                self.delta_sp , self.delta_sc , self.best_mse = self._search_for_best_params( self.screw_speed_gs , self.solid_content_gs ,
                self.torque_ref,self.energy_ref,self.screw_speed,self.solid_content,self.mass_flow,mode = "mse"  )

                if self.best_mse > mse_thresh:
                    print(f"condition not satisfied!\nyou can decrease steps for sp and sc \nbest params for mse = {self.best_mse}\ndelta_screw_speed = {self.delta_sp} \ndelta_solid_content = {self.delta_sc}")

                else:
                    print(f"condition satisfied! \n best params for mse = {self.best_mse}\ndelta_screw_speed = {self.delta_sp} \ndelta_solid_content = {self.delta_sc}")

                self.torque_pred , self.energy_pred = self._predict(float(self.screw_speed + self.delta_sp) ,float(self.solid_content + self.delta_sc), float(self.mass_flow) )

                return self.delta_sp , self.delta_sc , self.best_mse 

    @property
    def pred_energy(self):
        if self.delta_sc:
            return self.energy_pred

    @property
    def pred_torque(self):
        if self.delta_sc:
            return self.torque_pred

                




    












        




In [40]:
energy_model_path = "model_Energy.h5"
torque_model_path = "model_Torque.h5"
scaler_path = "scaler_ext.joblib"
extruder = Extruder(energy_ref= 26857.9711783368 , torque_ref= 8.1217 ,screw_speed= 400 , solid_content=10 , mass_flow=2.28 ,
energy_model_path=energy_model_path, torque_model_path= torque_model_path , scaler_path= scaler_path,
screw_speed_gs= 5 , solid_content_gs= 0.1)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [41]:
extruder.start(mae_thresh= 10)



exit pred
error mode: mae
bbessst 83.79999999999922 1105
searchedddddddddddddddddd
condition not satisfied!
you can decrease steps for sp and sc 
best params for mae = 97.94686161218067
delta_screw_speed = 705 
delta_solid_content = 73.79999999999922




(705, 73.79999999999922, 97.94686161218067)

In [42]:
extruder.pred_energy

array([27053.822557])

In [43]:
extruder.pred_torque

array([8.07935544])