In [3]:
import os
import sys
import logging
import pickle
import mlflow
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import SGDRegressor, Ridge, BayesianRidge
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import LinearSVR
from sklearn.kernel_ridge import KernelRidge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

from warnings import filterwarnings
filterwarnings('ignore')

from minio import Minio

# settings
MINIO_HOST = os.environ.get('MINIO_HOST', '')
MINIO_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY', '')
MINIO_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY', '')

os.environ['MLFLOW_TRACKING_URI'] = ""
os.environ['MLFLOW_S3_ENDPOINT_URL'] = ""
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''

In [4]:
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
                    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
log = logging.getLogger(__name__)

In [42]:
class VesselMEFuelConsumptionRegression(object):
    def __init__(self):
        self.random_state = 42
        self.bucket = 'uploads'
        self.directory = 'data'
        self.features_scaler = MinMaxScaler()
        self.y_scaler = MinMaxScaler()
        
        self.date_col = ['measurement_time']
        
        self.numerical_columns = [
             'cyl_chargeair_press', 'AE_FO_inlet_flow',
             'draught_aft_side', 'AE_FO_inlet_Temp',
             'engine_speed','DG_1_power',
             'DG_2_power','DG_4_power',
             'CAC_CW_HT_pressure', 'CAC_in_Low_Temperature_CW_temp',
             'propeller_shaft_output', 'propeller_shaft_rpm',
             'propeller_shaft_thrust', 'cyl_chargeair_temp',
             'ship_speed_actual', 'Ship_SpeedLOG', 
             'cyl_exh_gas_temp_mean','torque',
             'AE_FO_outlet_flow', 'AE_FO_outlet_Temp',
             'Eng_in_HTCW_press', 'Eng_in_Jacket_HTCW_temp',
             'Eng_out_Jacket_HTCW_temp', 'Eng_Relative_load',
             'FO_Rack_position', 'FO_inlet_press',
             'fueloil_inlet_temperature', 'ME_FO_inlet_flow',
             'ME_FO_outlet_Temp', 'ME_FO_outlet_flow',
             'LO_Filter_P', 'LO_filter_in_press',
             'LO_in_press', 'LO_in_temp',
             'LO_out_temp_TC', 'LO_cooler_CW_out_temp'
        ]
        self.categorical_columns = [
            'DG_1_condition','DG_2_condition',
            'DG_3_condition','DG_4_condition',
            'ship_inclination'
        ]
        self.monitoring_col_name = 'ME_FO_consumption'
        self.monitoring_col = [self.monitoring_col_name]
        
        self.columns_used = self.date_col + self.numerical_columns + self.categorical_columns + self.monitoring_col
        self.for_normalization_cols = self.numerical_columns
        
        
    
    def load_dataset_(self, ship_id):
        client = Minio(
            MINIO_HOST,
            access_key=MINIO_ACCESS_KEY,
            secret_key=MINIO_SECRET_KEY,
            secure=False
        )
        data_path = f'{self.directory}/{ship_id}.csv'
        obj = client.get_object(self.bucket, data_path)
        df = pd.read_csv(obj, parse_dates=self.date_col, usecols=self.columns_used)
        return df
    
    
    def _preprocess(self, df):
        df = df.dropna()
        df[self.for_normalization_cols] = self.features_scaler.fit_transform(
            df[self.for_normalization_cols]
        )
        df[self.monitoring_col] = self.y_scaler.fit_transform(
            df[self.monitoring_col]
        )
        # remove zero variance data
        df = df.loc[:, (df != df.iloc[0]).any()]
        return df
    
    def _data_preparation(self, normalized_df, test_size=0.20):
        abort_cols = self.monitoring_col + self.date_col
        Y = normalized_df[self.monitoring_col]
        X = normalized_df[[column for column in normalized_df.columns if column not in abort_cols]]
        X_train,X_test,Y_train,Y_test=train_test_split(
            X,Y,test_size=test_size,random_state=self.random_state
        )
        return X_train,X_test,Y_train,Y_test
    
    
    def _metrics_gen(self, true_value, predicted):
        mae = mean_absolute_error(true_value, predicted)
        mse = mean_squared_error(true_value, predicted)
        r2 = r2_score(true_value, predicted)
        return mae, mse, r2
    
    def _store_scalers(self,
                       features_sc_path='feature_scaler.pkl',
                       y_sc_path='y_scaler.pkl'
                      ):
        pickle.dump(self.features_scaler, open(features_sc_path, 'wb'))
        pickle.dump(self.y_scaler, open(y_sc_path, 'wb'))
      
    
    @staticmethod
    def _log_to_mlflow(
        experiment_name,
        trained_models_dict,
        metrics_dict,
        scaler_features_path='feature_scaler.pkl',
        scaler_y_path='y_scaler.pkl',
        params = None
    ):
        mlflow.set_experiment(experiment_name)
        with mlflow.start_run():
            
            mlflow.log_artifact(scaler_features_path)
            mlflow.log_artifact(scaler_y_path)
            
            for model_name in trained_models_dict.keys():
                
                model_obj = trained_models_dict[model_name]
                model_metrics = metrics_dict[model_name]
                
                mlflow.sklearn.log_model(
                    model_obj,
                    model_name,
                    registered_model_name=model_name
                )
            for metrics_tuple in model_metrics.items():
                mlflow.log_metric(metrics_tuple[0], metrics_tuple[1])
            
            if params:
                for param_tuple in params.items():
                    mlflow.log_param(param_tuple[0], param_tuple[1])
        
    
    def _train(self, models, X_train,X_test,Y_train,Y_test):
        metrics = {}
        
        for model_item in models.items():
            
            model_name = model_item[0]
            models[model_name].fit(X_train, Y_train)
            
            predicted =  models[model_name].predict(X_test)
            
            mae, mse, r2 = self._metrics_gen(Y_test, predicted)
            
            log.info(f'{model_name} MAE: {mae}')
            log.info(f'{model_name} MSE: {mse}')
            log.info(f'{model_name} R2: {r2}')
            log.info(f'{model_name} Train Score: {models[model_name].score(X_train, Y_train)}')
            log.info(f'{model_name} Test Score: {models[model_name].score(X_test, Y_test)}')
            
            metrics[model_name] = {'mae': mae, 'mse': mse, 'r2': r2}
        
        self._store_scalers()
        return models, metrics

In [43]:
regr_obj = VesselMEFuelConsumptionRegression()
data = regr_obj.load_dataset_(ship_id='ship_1')

processed_data = regr_obj._preprocess(df=data)

X_train, X_test, Y_train, Y_test = regr_obj._data_preparation(normalized_df=processed_data)

In [44]:
input_models = {
    'sgd': SGDRegressor(),
    'ridge': Ridge(1.0),
#             'random_forest': RandomForestRegressor(max_depth=2, random_state=self.random_state),
#             'KNN': KNeighborsRegressor(n_neighbors=2),
#             'svr': LinearSVR(random_state=self.random_state, tol=1e-05),
    'bayesian_ridge': BayesianRidge(),
    'gradient_boosting_regressor': GradientBoostingRegressor(random_state=42)
}
models, metrics = regr_obj._train(input_models, X_train, X_test, Y_train, Y_test)

2022-11-30 13:45:31,113 - __main__ - INFO - sgd MAE: 0.006401958883048676
2022-11-30 13:45:31,114 - __main__ - INFO - sgd MSE: 0.0001282662628884933
2022-11-30 13:45:31,115 - __main__ - INFO - sgd R2: 0.9988257037215477
2022-11-30 13:45:31,154 - __main__ - INFO - sgd Train Score: 0.9988629498293933
2022-11-30 13:45:31,166 - __main__ - INFO - sgd Test Score: 0.9988257037215477
2022-11-30 13:45:31,294 - __main__ - INFO - ridge MAE: 0.003520304349171177
2022-11-30 13:45:31,295 - __main__ - INFO - ridge MSE: 4.4521071325636664e-05
2022-11-30 13:45:31,296 - __main__ - INFO - ridge R2: 0.9995924031214984
2022-11-30 13:45:31,348 - __main__ - INFO - ridge Train Score: 0.9996185323456126
2022-11-30 13:45:31,365 - __main__ - INFO - ridge Test Score: 0.9995924031214984
2022-11-30 13:45:31,908 - __main__ - INFO - bayesian_ridge MAE: 0.002362239083642028
2022-11-30 13:45:31,909 - __main__ - INFO - bayesian_ridge MSE: 2.0132378326462697e-05
2022-11-30 13:45:31,909 - __main__ - INFO - bayesian_ridge 

In [46]:
experiment_name = 'fuel_consumption_baseline_regression'
regr_obj._log_to_mlflow(
        experiment_name = experiment_name,
        trained_models_dict=models,
        metrics_dict=metrics
    )

2022-11-30 13:47:24,093 - botocore.credentials - INFO - Found credentials in environment variables.


Registered model 'sgd' already exists. Creating a new version of this model...
2022/11/30 13:47:35 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: sgd, version 5
Created version '5' of model 'sgd'.
Registered model 'ridge' already exists. Creating a new version of this model...
2022/11/30 13:47:38 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: ridge, version 5
Created version '5' of model 'ridge'.
Registered model 'bayesian_ridge' already exists. Creating a new version of this model...
2022/11/30 13:48:11 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: bayesian_ridge, version 5
Created version '5' of model 'bayesian_ridge'.
Registered model 'gradient_boosting_regressor' already exists. Creating a new version of