In [None]:
import numpy as np
import pandas as pd
import math
import torch
import config as cfg
from sksurv.util import Surv
from sklearn.model_selection import train_test_split
from utility.survival import Survival
from tools.regressors import CoxPH, RSF, DeepSurv, DSM, BNNmcd
from tools.feature_selectors import PHSelector
from utility.builder import Builder
from tools.file_reader import FileReader
from tools.data_ETL import DataETL
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from lifelines.statistics import proportional_hazard_test
from tools.evaluator import LifelinesEvaluator
from tools.Evaluations.util import predict_median_survival_time
import config as cfg
from sklearn.preprocessing import StandardScaler
from tools.formatter import Formatter
from xgbse.non_parametric import calculate_kaplan_vectorized
from utility.survival import make_event_times
from utility.data import get_window_size, get_lag
from utility.survival import coverage

matplotlib_style = 'default'
import matplotlib.pyplot as plt; plt.style.use(matplotlib_style)
plt.rcParams.update({'axes.labelsize': 'medium',
                     'axes.titlesize': 'medium',
                     'font.size': 14.0,
                     'text.usetex': True,
                     'text.latex.preamble': r'\usepackage{amsfonts} \usepackage{bm}'})

# Paths
DATASET_PATH_XJTU = "../data/XJTU-SY/csv/"
RAW_DATA_PATH_XJTU = ["../data/XJTU-SY/35Hz12kN/", "../data/XJTU-SY/37.5Hz11kN/", "../data/XJTU-SY/40Hz10kN/"]
RESULTS_PATH = "../results"
PLOTS_PATH = "../plots"
RESULT_PATH_XJTU= "../data/XJTU-SY/results/"

NEW_DATASET = False
DATASET = "xjtu"

PERCENTAGE = 0.25
N_POST_SAMPLES = 1000 # number of samples to draw from MCD posterior
CONDITION_SELECTOR = 0
N_CONDITION = len(RAW_DATA_PATH_XJTU)
PLOTS_PATH = PLOTS_PATH
N_BEARING = cfg.N_REAL_BEARING_XJTU
DATASET_PATH = DATASET_PATH_XJTU
N_BOOT = 0

#For the first time running, a NEW_DATASET is needed
if NEW_DATASET== True:
    Builder(DATASET, N_BOOT).build_new_dataset(bootstrap=N_BOOT)
    
data_util = DataETL(DATASET, N_BOOT)

In [None]:
# Build timeseries data
timeseries_data, boot, info_pack = FileReader(DATASET, DATASET_PATH).read_data(CONDITION_SELECTOR, N_BOOT)

# Make moving average
data = pd.DataFrame()
window_size = get_window_size(N_CONDITION)
lag = get_lag(N_CONDITION)
bearing_indicies =  list(range(1, (N_BEARING*2)+1))
for idx in bearing_indicies:
    event_time = data_util.event_analyzer(idx, info_pack)
    transformed_data = data_util.make_moving_average(timeseries_data, event_time, idx, window_size, lag)
    data = pd.concat([data, transformed_data], axis=0)
    data = data.reset_index(drop=True)

In [None]:
from lifelines import CoxPHFitter
from textwrap import fill
import numpy as np
from lifelines.statistics import proportional_hazard_test, TimeTransformers
from lifelines.utils import format_p_value
from lifelines.utils.lowess import lowess

cph = CoxPHFitter() #penalizer= 0.0001
X_spec = data.loc[:, ~data.columns.isin(['Fca','Fi','Fo','Fr','Frp','FoH', 'FiH', 'FrH', 'FrpH', 'FcaH', 'noise'])]
cph.fit(X_spec, duration_col="Survival_time", event_col= "Event")

# Compute and plot PH
training_df = X_spec
advice = True
show_plots = False
p_value_threshold = 0.05
plot_n_bootstraps = 15
columns = None
residuals = cph.compute_residuals(training_df, kind="scaled_schoenfeld")
test_results = proportional_hazard_test(cph, training_df, time_transform=["rank", "km"], precomputed_residuals=residuals)
residuals_and_duration = residuals.join(training_df[cph.duration_col])
Xs = cph.regressors.transform_df(training_df)
counter = 0
n = residuals_and_duration.shape[0]
axes = []
for variable in cph.params_.index.intersection(columns or cph.params_.index):
    minumum_observed_p_value = test_results.summary.loc[variable, "p"].min()
    
    # plot is done (regardless of test result) whenever `show_plots = True`
    if show_plots:
        axes.append([])
        print()
        print("Bootstrapping lowess lines. May take a moment...")
        print()
        from matplotlib import pyplot as plt

        fig = plt.figure()

        # plot variable against all time transformations.
        for i, (transform_name, transformer) in enumerate(TimeTransformers().iter(["rank", "km"]), start=1):
            p_value = test_results.summary.loc[(variable, transform_name), "p"]

            ax = fig.add_subplot(1, 2, i)

            y = residuals_and_duration[variable]
            tt = transformer(cph.durations, cph.event_observed, cph.weights)[cph.event_observed.values]

            ax.scatter(tt, y, alpha=0.75)

            y_lowess = lowess(tt.values, y.values)
            ax.plot(tt, y_lowess, color="k", alpha=1.0, linewidth=2)

            # bootstrap some possible other lowess lines. This is an approximation of the 100% confidence intervals
            for _ in range(plot_n_bootstraps):
                ix = sorted(np.random.choice(n, n))
                tt_ = tt.values[ix]
                y_lowess = lowess(tt_, y.values[ix])
                ax.plot(tt_, y_lowess, color="k", alpha=0.30)

            best_xlim = ax.get_xlim()
            ax.hlines(0, 0, tt.max(), linestyles="dashed", linewidths=1)
            ax.set_xlim(best_xlim)

            ax.set_xlabel("%s-transformed time\n(p=%.4f)" % (transform_name, p_value), fontsize=10)
            ax.grid(True)
            axes[-1].append(ax)

        fig.suptitle("Scaled Schoenfeld residuals of '%s'" % variable, fontsize=14)
        plt.tight_layout()
        plt.subplots_adjust(top=0.90)
        plt.savefig(f'{PLOTS_PATH}/schoenfeld_residuals_cond_{CONDITION_SELECTOR+1}_{variable}.png', format='png', bbox_inches="tight")

    if np.round(minumum_observed_p_value, 2) > p_value_threshold:
        continue

    counter += 1

    if counter == 1:
        if advice:
            print(
                fill(
                    """The ``p_value_threshold`` is set at %g. Even under the null hypothesis of no violations, some covariates will be below the threshold by chance. This is compounded when there are many covariates. Similarly, when there are lots of observations, even minor deviances from the proportional hazard assumption will be flagged."""
                    % p_value_threshold,
                    width=100,
                )
            )
            print()
            print(
                fill(
                    """With that in mind, it's best to use a combination of statistical tests and visual tests to determine the most serious violations. Produce visual plots using ``check_assumptions(..., show_plots=True)`` and looking for non-constant lines. See link [A] below for a full example.""",
                    width=100,
                )
            )
            print()
        test_results.print_summary()
        print()

    print()
    print(
        "%d. Variable '%s' failed the non-proportional test: p-value is %s."
        % (counter, variable, format_p_value(4)(minumum_observed_p_value)),
        end="\n\n",
    )

    if advice:
        values = Xs["beta_"][variable]
        value_counts = values.value_counts()
        n_uniques = value_counts.shape[0]

        # Arbitrary chosen to check for ability to use strata col.
        # This should capture dichotomous / low cardinality values.
        if n_uniques <= 6 and value_counts.min() >= 5:
            print(
                fill(
                    "   Advice: with so few unique values (only {0}), you can include `strata=['{1}', ...]` in the call in `.fit`. See documentation in link [E] below.".format(
                        n_uniques, variable
                    ),
                    width=100,
                )
            )
        else:
            print(
                fill(
                    """   Advice 1: the functional form of the variable '{var}' might be incorrect. That is, there may be non-linear terms missing. The proportional hazard test used is very sensitive to incorrect functional forms. See documentation in link [D] below on how to specify a functional form.""".format(
                        var=variable
                    ),
                    width=100,
                ),
                end="\n\n",
            )
            print(
                fill(
                    """   Advice 2: try binning the variable '{var}' using pd.cut, and then specify it in `strata=['{var}', ...]` in the call in `.fit`. See documentation in link [B] below.""".format(
                        var=variable
                    ),
                    width=100,
                ),
                end="\n\n",
            )
            print(
                fill(
                    """   Advice 3: try adding an interaction term with your time variable. See documentation in link [C] below.""",
                    width=100,
                ),
                end="\n\n",
            )