In [1]:
# !pip install mlflow==2.1.1
# !pip install openpyxl
# !pip install boto3

In [2]:
import mlflow
import mlflow.sklearn
import mlflow.onnx
from mlflow.models.signature import infer_signature
import pandas as pd
import dask.dataframe as dd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import statsmodels.api as sm
from tqdm.notebook import tqdm
import catboost as cb
import numpy as np
import pandas as pd
from scipy import stats
from scipy.special import boxcox1p, inv_boxcox1p
from sklearn.metrics import mean_squared_error, mean_absolute_error
import os
import gc


os.environ['MLFLOW_TRACKING_URI'] = 'postgresql+psycopg2://postgres:JD643JcviPvhnRtbf@tis5000.vniizht.lan:5433/hakaton'
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://tis5000.vniizht.lan:9000"
os.environ['AWS_ACCESS_KEY_ID'] = 'hakaton'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'JD643JcviPvhnRtbf'
mlflow.set_experiment("exg-machine-downtime-forecast")
client = mlflow.tracking.MlflowClient()



In [3]:
PATH_TO_X_TRAIN = 'data/datasets/X_train.parquet'
PATH_TO_Y_TRAIN = 'data/datasets/y_train.parquet'
PATH_TO_MESSAGES = 'data/datasets/messages.xlsx'

In [4]:
import dask.dataframe as dd

X_train_full = dd.read_parquet(PATH_TO_X_TRAIN, engine="pyarrow")
y_train_full = dd.read_parquet(PATH_TO_Y_TRAIN, engine="pyarrow")


In [5]:
def get_single_exgauster_columns_dicts(X_train, y_train):
    
    all_columns = list(X_train.columns)
    x_columns_dict = {}
    for exg_number in [4, 5, 6, 7, 8, 9]:
        exg_name = f'ЭКСГАУСТЕР {exg_number}'
        x_columns_dict[exg_number] = [col for col in all_columns if exg_name in col]
        
    all_columns = list(y_train.columns)
    y_columns_dict = {}
    for exg_number in [4, 5, 6, 7, 8, 9]:
        exg_name = f'№{exg_number}'
        y_columns_dict[exg_number] = [col for col in all_columns if exg_name in col]
        
    return x_columns_dict, y_columns_dict

x_columns_dict, y_columns_dict = get_single_exgauster_columns_dicts(X_train_full, y_train_full)

In [6]:
from warnings import simplefilter

simplefilter(action="ignore", category=pd.errors.PerformanceWarning)


def add_features(X_train_full, x_columns_dict, exg_number):
    
    features = X_train_full[x_columns_dict[exg_number]].compute()
    features["date"] = features.index.date
    
    features[f'ЭКСГАУСТЕР {exg_number}. ВИБРАЦИЯ НА ОПОРЕ 4'] = features[f'ЭКСГАУСТЕР {exg_number}. ВИБРАЦИЯ НА ОПОРЕ 4'].abs()
    
    def compute_window_features(data, name_f):
        windows = ['7D', '30D']
        for f in tqdm(name_f):
            for window in windows:
                data[f"{f}_{window}_mean"] = data[f].rolling(window, min_periods=1).mean()
                data[f"{f}_{window}_std"] = data[f].rolling(window, min_periods=1).std()
                data[f"{f}_{window}_median"] = data[f].rolling(window, min_periods=1).median()
                data[f"{f}_{window}_max"] = data[f].rolling(window, min_periods=1).max()
            # data[f"{f}_1D_chg_mean"] = data[f"{f}"]/data[f"{f}_1D_mean"]
            data[f"{f}_7D_chg_mean"] = data[f"{f}"]/data[f"{f}_7D_mean"]
            data[f"{f}_30D_chg_mean"] = data[f"{f}"]/data[f"{f}_30D_mean"]
            # data[f"{f}_1D_chg_median"] = data[f"{f}"]/data[f"{f}_1D_median"]
            data[f"{f}_7D_chg_median"] = data[f"{f}"]/data[f"{f}_7D_median"]
            data[f"{f}_30D_chg_median"] = data[f"{f}"]/data[f"{f}_30D_median"]        
            data[f"{f}_diff_between_values"] = data[f"{f}"] / data[f"{f}"].rolling(1, min_periods=1).mean()
        return data
    
    x_data_train = compute_window_features(features, x_columns_dict[exg_number])    
    return x_data_train


def add_target(y_train_full, y_columns_dict, exg_number, tm):
    
    messages = pd.read_excel(PATH_TO_MESSAGES)
    messages["ДАТА_НАЧАЛА_НЕИСПРАВНОСТИ"] = pd.to_datetime(messages["ДАТА_НАЧАЛА_НЕИСПРАВНОСТИ"])
    messages["ДАТА_УСТРАНЕНИЯ_НЕИСПРАВНОСТИ"] = pd.to_datetime(messages["ДАТА_УСТРАНЕНИЯ_НЕИСПРАВНОСТИ"])

    fail_dates = messages[
        (messages["ВИД_СООБЩЕНИЯ"] == "M1")&
        (messages["ИМЯ_МАШИНЫ"] == f"ЭКСГАУСТЕР А/М №{exg_number}")&
        (True if not tm else messages["НАЗВАНИЕ_ТЕХ_МЕСТА"] == f"{tm}")&
        ((messages["ОПИСАНИЕ"] != "ТО")|
        (messages["ОПИСАНИЕ"] != "ТО(замена редуктора газовой задвижки №4)")|
        (messages["ОПИСАНИЕ"] != 'ТО согласованное')|
        (messages["ОПИСАНИЕ"] != 'Согласованное ТО')|
        (messages["ОПИСАНИЕ"] != 'ТО(согласованное)')|
        (messages["ОПИСАНИЕ"] != 'ТО (замена щеток на эл/двиг. эксг-ра)'))
    ]
    
    if tm:
        labels = y_train_full[f'Y_ЭКСГАУСТЕР А/М №{exg_number}_{tm}'].compute()
    else:
        labels = y_train_full[y_columns_dict[exg_number]].compute()
        labels = ((labels==1).sum(axis=1) > 0).astype(int)
    
    
    idx = 0
    template = pd.Series(np.zeros(labels.shape[0]))
    template.index = labels.index
    for i in range(fail_dates.shape[0]):
        dt_start = fail_dates.iloc[i]["ДАТА_НАЧАЛА_НЕИСПРАВНОСТИ"]
        dt_end = fail_dates.iloc[i]["ДАТА_УСТРАНЕНИЯ_НЕИСПРАВНОСТИ"]

        diff = dt_start - labels.iloc[idx:].index
        res = (diff).days*24*3600 + diff.seconds
        res = pd.Series(res)
        res = res[res > 0]
        template.iloc[idx:idx+res.shape[0]] = res.values
        idx += res.shape[0]
        first_zero = labels.index[idx]
        shape_fail = labels[(labels.index >= first_zero)&(labels.index <=  dt_end)].shape[0]
        idx += shape_fail
        
    template[idx:] = None
    template = template.dropna()
    template = template[template != 0]
    
    template = pd.DataFrame(template, columns=["y"])
    return template


def get_train_test_data(X_train_full, y_train_full, x_columns_dict, y_columns_dict, exg_number, tm):
    
    print('adding features')
    data = add_features(X_train_full, x_columns_dict, exg_number)
    
    print('adding targets')
    template = add_target(y_train_full, y_columns_dict, exg_number, tm)
    
    data = data.merge(template, left_index=True, right_index=True)
    
    print('train test split')
    X_train = data.loc[data.index < pd.to_datetime("2021-01-01")]
    X_test = data.loc[data.index >= pd.to_datetime("2021-01-01")]

    
    print('x y split')
    y_train = X_train["y"]
    X_train.drop(columns=["y","date"], inplace=True)
    
    y_test = X_test["y"]

    X_test.drop(columns=["y","date"], inplace=True)
    
    print('delete unused data')
    del data
    
    return X_train, X_test, y_train, y_test
    

In [7]:
def custom_metric(y_true, y_pred, alpha):
    
    weight_sum = 0.0
    error = 0.0
    
    for i in range(len(y_pred)):
        weight_sum += 1
        error += (y_pred[i] - y_true[i]) ** 2 * (1 / (y_true[i] * alpha))
    
    return np.sqrt(error / weight_sum)


class OrgRmseObjective(object):
    
    def __init__(self, alpha):
        self.alpha = alpha
        
    def calc_ders_range(self, approxes, targets, weights):
        assert len(approxes) == len(targets)

        result = []
        der1 = (targets - approxes) * (1 / (targets * self.alpha))
        der2 = -1 * (1 / (targets * self.alpha))

        result = [(d1, d2) for (d1, d2) in zip(der1, der2)]
        return result


class OrgRmseMetric(object):
    
    def __init__(self, alpha):
        self.alpha = alpha
        
    def get_final_error(self, error, weight):
        return np.sqrt(error / (weight + 1e-38))

    def is_max_optimal(self):
        return False

    def evaluate(self, approxes, target, weight):
        assert len(approxes) == 1
        assert len(target) == len(approxes[0])

        approx = approxes[0]

        error_sum = 0.0
        weight_sum = len(approx)

        error_sum = (((approx - target) ** 2) * (1 / (target * self.alpha))).sum()

        return error_sum, weight_sum

In [8]:
def make_model(X_train_full, y_train_full, exg_number, tm: str = None):

    depth = 5
    iterations = 3000
    learning_rate = 0.005
    od_type = "Iter"
    od_wait = 100
    lambd = 0.7
    model_name = 'catboost'
    artifact_path = "model"
    alpha = 10
    
    X_train, X_test, y_train, y_test = get_train_test_data(X_train_full, y_train_full, x_columns_dict, y_columns_dict, exg_number, tm)
    
    print(y_train.shape)
    print(y_test.shape)
    
    print('boxcox')
    y_train_boxcox = boxcox1p(y_train, lambd)
    y_test_boxcox = boxcox1p(y_test, lambd)

    # ПОДШИПНИК ОПОРНЫЙ №2 ЭКСГ. №4
    print('Starting train')
    
    run_name = f'exg_{exg_number}_boxcox' if not tm else f'exg_{exg_number}_{tm}_boxcox'
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id
        experiment_id = run.info.experiment_id
        print("MLflow:")
        print("  run_id:", run_id)
        print("  experiment_id:", experiment_id)

        # MLflow params
        print("Parameters:")
        print("  depth:", depth)
        print("  learning_rate:", learning_rate)
        print("  iterations:", iterations)
        mlflow.log_param("depth", depth)
        mlflow.log_param("learning_rate", learning_rate)
        mlflow.log_param("iterations", iterations)
        mlflow.log_param("od_type", od_type)
        mlflow.log_param("od_wait", od_wait)
        mlflow.log_param("lambd_boxcox", lambd)
        mlflow.log_param("loss_alpha", alpha)

        # Create and fit model
        model = cb.CatBoostRegressor(
            depth=depth, iterations=iterations, learning_rate=learning_rate, loss_function=OrgRmseObjective(alpha=alpha),
            task_type='CPU', random_seed=13, verbose=50, eval_metric=OrgRmseMetric(alpha=alpha),
            od_type=od_type, od_wait=od_wait
        )

        model.fit(X_train, y_train_boxcox,
                        eval_set=(X_test, y_test_boxcox))
        if tm:
            model_save_name = f"models/cb_regressor_exg_{exg_number}_{tm}_boxcox_org_loss.cbm"
            preds_df_name = f'test_preds/y_preds_exg_{exg_number}_{tm}.csv'
            y_df_save_name = f'test_preds/y_preds_exg_{exg_number}_{tm}.csv'
        else:
            model_save_name = f"models/cb_regressor_exg_{exg_number}_boxcox_org_loss.cbm"
            preds_df_name = f'test_preds/y_preds_exg_{exg_number}.csv'
            y_df_save_name = f'test_preds/y_preds_exg_{exg_number}.csv'
        
        model.save_model(model_save_name)
        
        predictions = model.predict(X_test)
        signature = infer_signature(X_test, predictions)
        metrics = {
           "mse": mean_absolute_error(y_test_boxcox, predictions),
           "rmse": mean_squared_error(y_test_boxcox, predictions, squared=False),
           "mae": mean_squared_error(y_test_boxcox, predictions),
           "target_metric": custom_metric(y_test_boxcox, predictions, alpha=alpha),
           "best_iteration": model.get_best_iteration()
        }
        
        mlflow.log_metrics(metrics)
        mlflow.catboost.log_model(model, artifact_path, signature=signature)
        
        # task_1 part
        preds_df = pd.DataFrame(predictions)
        y_df = pd.DataFrame(y_test_boxcox)

        preds_df.to_csv(preds_df_name)
        y_df.to_csv(y_df_save_name)
                
        del X_test
        del y_test
        del X_train
        del y_train
        gc.collect()
        

In [11]:
def make_all_models():
    
    for exg_number in [4, 5, 6, 7, 8, 9]:
        print(f'training model for exgauster {exg_number}')
        make_model(X_train_full, y_train_full, exg_number)

In [13]:
make_all_models()

training model for exgauster 7
adding features


  0%|          | 0/16 [00:00<?, ?it/s]

adding targets
train test split
x y split


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_train.drop(columns=["y","date"], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_test.drop(columns=["y","date"], inplace=True)


delete unused data
(6181308,)
(716505,)
boxcox
Starting train
MLflow:
  run_id: ab164797acee437dbcd2057a0968b442
  experiment_id: 8
Parameters:
  depth: 5
  learning_rate: 0.005
  iterations: 3000


  _check_train_params(params)
  _check_train_params(params)


0:	learn: 145.0354660	test: 61.1646135	best: 61.1646135 (0)	total: 5.91s	remaining: 4h 55m 9s
50:	learn: 141.0099450	test: 57.5610102	best: 57.5610102 (50)	total: 5m 15s	remaining: 5h 3m 59s
100:	learn: 136.5711323	test: 55.1973381	best: 55.1973381 (100)	total: 10m 34s	remaining: 5h 3m 20s
150:	learn: 131.9561558	test: 53.4288433	best: 53.4288433 (150)	total: 15m 54s	remaining: 5h
200:	learn: 127.1408344	test: 51.6314015	best: 51.6314015 (200)	total: 21m 10s	remaining: 4h 54m 49s
250:	learn: 122.6617655	test: 50.1049273	best: 50.1049273 (250)	total: 26m 26s	remaining: 4h 49m 35s
300:	learn: 118.5150367	test: 48.8659251	best: 48.8659251 (300)	total: 31m 44s	remaining: 4h 44m 32s
350:	learn: 114.6863445	test: 48.0163277	best: 48.0163277 (350)	total: 37m 5s	remaining: 4h 39m 55s
400:	learn: 111.0987167	test: 47.3841512	best: 47.3841512 (400)	total: 42m 26s	remaining: 4h 35m 6s
450:	learn: 107.4851934	test: 46.9720375	best: 46.9720375 (450)	total: 47m 46s	remaining: 4h 30m 3s
500:	learn: 1

