# **WEIBULL HAI PHA**
*******************************

# **Cài đặt thư viện và biểu diễn dữ liệu**

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Read data from the CSV file
file_path = 'D:/Code ĐA2/Data/ai4i2020.csv'

# Load the dataset
df = pd.read_csv(file_path)

# Display basic information about the dataset
df.info(), df.head()


ModuleNotFoundError: No module named 'pandas'

In [None]:
import pandas as pd

# Assume df is your DataFrame and 'Tool wear [min]' is the column you want to check
count_zeros = (df['Tool wear [min]'] == 0).sum()

# Print the result
print(f"Number of elements equal to 0 in the 'Tool wear [min]' column: {count_zeros}")

# Remove rows where the value is 0 in the 'Tool wear [min]' column
df = df[df['Tool wear [min]'] != 0]

# Print the DataFrame after removal
print("DataFrame after removing rows with 0 values in the 'Tool wear [min]' column:")
print(df)

# **Tính toán giá trị tham số**

In [None]:
pip install numpy scipy matplotlib

# **Cố định t0 = const**

In [None]:
import numpy as np
from scipy.optimize import minimize

# Data from the DataFrame
data = df['Tool wear [min]'].values  # Observation times
events = df['Machine failure'].values  # Event indicator (1: event occurred, 0: censored)

# Fixed value of t_0
t0 = 150

# Log-likelihood function for the two-phase Weibull model
def weibull_two_phase_log_likelihood(params, data, events, t0):
    lambda1, p1, lambda2, p2 = params

    if lambda1 <= 0 or lambda2 <= 0 or p1 <= 0 or p2 <= 0:  # Check for valid parameter values
        return -np.inf

    log_likelihood = 0
    for i in range(len(data)):
        if data[i] <= 0:  # Skip invalid values
            continue

        t = data[i]
        if events[i] == 1:  # Event occurred
            if t <= t0:
                # Phase 1
                log_likelihood += (np.log(p1) + np.log(lambda1) +
                                   (p1 - 1) * np.log(lambda1 * t) - 
                                   (lambda1 * t) ** p1)
            else:
                # Phase 2
                f1_part = p1 * lambda1 * (lambda1 * t) ** (p1 - 1)
                f2_part = p2 * lambda2 * (lambda2 * (t - t0)) ** (p2 - 1)
                exp_part = np.exp(-(lambda1 * t) ** p1 - (lambda2 * (t - t0)) ** p2)
                log_likelihood += np.log(f1_part + f2_part) + np.log(exp_part)
        else:  # Event did not occur (censored)
            if t <= t0:
                # Phase 1
                log_likelihood += -((lambda1 * t) ** p1)
            else:
                # Phase 2
                log_likelihood += -((lambda1 * t0) ** p1 + (lambda2 * (t - t0)) ** p2)

    return log_likelihood

# Negative log-likelihood function
def weibull_two_phase_negative_log_likelihood(params, data, events, t0):
    return -weibull_two_phase_log_likelihood(params, data, events, t0)

# Initial parameters
initial_params = [0.01, 3, 0.03, 3]  # [lambda1, p1, lambda2, p2]

# Bounds: parameters must all be positive
bounds = [(1e-5, None), (1e-5, None), (1e-5, None), (1e-5, None)]

# Optimization
result = minimize(weibull_two_phase_negative_log_likelihood, initial_params,
                  args=(data, events, t0), bounds=bounds, method='L-BFGS-B')

# Check the result
if result.success:
    lambda1_est, p1_est, lambda2_est, p2_est = result.x
    #print(f'Initial parameters: {initial_params}')
    print(f'Estimated two-phase Weibull parameters:')
    print(f'  Phase 1 - Scale (lambda1): {lambda1_est}, Shape (p1): {p1_est}')
    print(f'  Phase 2 - Scale (lambda2): {lambda2_est}, Shape (p2): {p2_est}')
else:
    print("Optimization did not converge:")
    print(result.message)


In [None]:
# Function to calculate the value of S(t)
def S(t, p1, lambda1, p2, lambda2, t0):
    if t < t0:
        return np.exp(-(t * lambda1) ** p1)  # Phase 1
    else:
        return np.exp(-(t0 * lambda1) ** p1 - ((t - t0) * lambda2) ** p2)  # Phase 2

# Create a time array from 0 to 300
t_values = np.linspace(0, 300, 500)

# Calculate S(t) values for each t
S_values = [S(t, p1_est, lambda1_est, p2_est, lambda2_est, t0) for t in t_values]

# Plot the graph
plt.figure(figsize=(8, 6))
plt.plot(t_values, S_values, label=r'$S(t)$', color='b')
plt.axvline(x=t0, color='r', linestyle='--', label='Threshold $t_0$')
plt.title('Survival function graph $S(t)$ with estimated parameters')
plt.xlabel('Time $t$')
plt.ylabel('Survival function $S(t)$')
plt.legend()
plt.grid(True)
plt.show()


## **Coi t0 là một tham số**

In [None]:
import numpy as np
from scipy.optimize import minimize

# Data from DataFrame
data = df['Tool wear [min]'].values  # Observation time
events = df['Machine failure'].values  # Event indicator variable (1: event occurred, 0: censored)

# Log-likelihood function for the two-phase Weibull model with t0 as a parameter
def weibull_two_phase_log_likelihood(params, data, events):
    lambda1, p1, lambda2, p2, t0 = params

    if lambda1 <= 0 or lambda2 <= 0 or p1 <= 0 or p2 <= 0 or t0 <= 0:  # Check for valid values
        return -np.inf

    log_likelihood = 0
    for i in range(len(data)):
        if data[i] <= 0:  # Skip invalid values
            continue

        t = data[i]
        if events[i] == 1:  # Event occurred
            if t <= t0:
                # Phase 1
                log_likelihood += (np.log(p1) + np.log(lambda1) +
                                   (p1 - 1) * np.log(lambda1 * t) -
                                   (lambda1 * t) ** p1)
            else:
                # Phase 2
                f1_part = p1 * lambda1 * (lambda1 * t) ** (p1 - 1)
                f2_part = p2 * lambda2 * (lambda2 * (t - t0)) ** (p2 - 1)
                exp_part = np.exp(-(lambda1 * t) ** p1 - (lambda2 * (t - t0)) ** p2)
                log_likelihood += np.log(f1_part + f2_part) + np.log(exp_part)
        else:  # Event did not occur (censored)
            if t <= t0:
                # Phase 1
                log_likelihood += -((lambda1 * t) ** p1)
            else:
                # Phase 2
                log_likelihood += -((lambda1 * t0) ** p1 + (lambda2 * (t - t0)) ** p2)

    return log_likelihood

# Negative log-likelihood function
def weibull_two_phase_negative_log_likelihood(params, data, events):
    return -weibull_two_phase_log_likelihood(params, data, events)

# Initial parameters
initial_params = [0.01, 2, 0.01, 2, 150]  # [lambda1, p1, lambda2, p2, t0]

# Constraints: all parameters must be positive, and t0 must not exceed the maximum value of data
bounds = [(1e-5, None), (1e-5, None), (1e-5, None), (1e-5, None), (1e-5, max(data))]

# Optimization
result = minimize(weibull_two_phase_negative_log_likelihood, initial_params,
                  args=(data, events), bounds=bounds, method='L-BFGS-B')

# Check the result
if result.success:
    lambda1_est, p1_est, lambda2_est, p2_est, t0_est = result.x
    #print(f'Initial parameters: {initial_params}')
    print(f'Estimated Weibull two-phase parameters:')
    print(f'  Phase 1 - Scale (lambda1): {lambda1_est}, Shape (p1): {p1_est}')
    print(f'  Phase 2 - Scale (lambda2): {lambda2_est}, Shape (p2): {p2_est}')
    print(f'  Phase transition point (t0): {t0_est}')
else:
    print("Optimization did not converge:")
    print(result.message)


In [None]:
# Function to calculate S(t)
def S(t, p1, lambda1, p2, lambda2, t0):
    if t < t0:
        return np.exp(-(t * lambda1) ** p1)  # Phase 1
    else:
        return np.exp(-(t0 * lambda1) ** p1 - ((t - t0) * lambda2) ** p2)  # Phase 2

# Create time values from 0 to 300
t_values = np.linspace(0, 300, 500)

# Calculate S(t) for each t
S_values = [S(t, p1_est, lambda1_est, p2_est, lambda2_est, t0) for t in t_values]

# Plot the graph
plt.figure(figsize=(8, 6))
plt.plot(t_values, S_values, label=r'$S(t)$', color='b')
#plt.axvline(x=t0, color='r', linestyle='--', label='Threshold $t_0$')
plt.title('Survival function graph $S(t)$ with estimated parameters')
plt.xlabel('Time $t$')
plt.ylabel('Survival function $S(t)$')
plt.legend()
plt.grid(True)
plt.show()


# **Sử dụng thuật toán để tính giá trị t_0**

In [None]:
import pandas as pd
import numpy as np
from scipy.stats import linregress

def find_phase_boundary(csv_file, column_name, A, B, delta):
    # Read data from CSV file
    data = pd.read_csv(csv_file)
    failure_times = data[column_name].values
    N = len(failure_times)
    
    # Check input conditions
    if 2 * A > N + 1 or B > N - 2 * A + 2:
        raise ValueError("Invalid parameters: Check constraints for A and B.")

    # Calculate Kaplan-Meier failure probabilities
    failure_times_sorted = np.sort(failure_times)
    prob_failure = np.arange(1, N + 1) / (N + 1)  # Using simple percentiles

    # Loop through each k to split data and calculate slope differences
    slope_differences = []
    for k in range(A, N - A + 1):
        # Split data
        x1, y1 = failure_times_sorted[:k], prob_failure[:k]
        x2, y2 = failure_times_sorted[k:], prob_failure[k:]

        # Skip if all x values in one part are the same
        if len(set(x1)) == 1 or len(set(x2)) == 1:
            continue

        # Fit a linear regression for each part
        slope1, _, _, _, _ = linregress(x1, np.log(-np.log(1 - y1)))
        slope2, _, _, _, _ = linregress(x2, np.log(-np.log(1 - y2)))

        # Calculate slope difference
        delta_s = slope2 - slope1
        slope_differences.append((k, delta_s))

    # Filter values with |Δs| > δ
    significant_differences = [(k, delta_s) for k, delta_s in slope_differences if abs(delta_s) > delta]

    # Return results
    if len(significant_differences) >= B:
        # Get the B largest |Δs|
        significant_differences = sorted(significant_differences, key=lambda x: abs(x[1]), reverse=True)[:B]
        k_values = [item[0] for item in significant_differences]
        delta_s_values = [item[1] for item in significant_differences]
        return True, k_values, delta_s_values
    else:
        return False, [], []

# Path to CSV file
csv_file = "D:/Code ĐA2/Data/ai4i2020.csv"
column_name = "Tool wear [min]"  # Name of the column to process
A = 50  # Parameter A (at least 5)
B = 1  # Parameter B (number of suggested results)
delta = 0.5  # Slope difference threshold

# Call the function
flag, k_values, delta_s_values = find_phase_boundary(csv_file, column_name, A, B, delta)

# Print results
if flag:
    print("Suggested phase boundaries:")
    for i, (k, delta_s) in enumerate(zip(k_values, delta_s_values), 1):
        print(f"{i}. k = {k}, Δs = {delta_s:.4f}")
else:
    print("No significant phase difference could be found.")


In [None]:
import numpy as np
from scipy.optimize import minimize

# Data from DataFrame
data = df['Tool wear [min]'].values  # Observation time
events = df['Machine failure'].values  # Event indicator variable (1: event occurred, 0: censored)

# Fixed value of t_0
t0 = 189

# Log-likelihood function for the two-phase Weibull model
def weibull_two_phase_log_likelihood(params, data, events, t0):
    lambda1, p1, lambda2, p2 = params

    if lambda1 <= 0 or lambda2 <= 0 or p1 <= 0 or p2 <= 0:  # Check for valid values
        return -np.inf

    log_likelihood = 0
    for i in range(len(data)):
        if data[i] <= 0:  # Skip invalid values
            continue

        t = data[i]
        if events[i] == 1:  # Event occurred
            if t <= t0:
                # Phase 1
                log_likelihood += (np.log(p1) + np.log(lambda1) +
                                   (p1 - 1) * np.log(lambda1 * t) -
                                   (lambda1 * t) ** p1)
            else:
                # Phase 2
                f1_part = p1 * lambda1 * (lambda1 * t) ** (p1 - 1)
                f2_part = p2 * lambda2 * (lambda2 * (t - t0)) ** (p2 - 1)
                exp_part = np.exp(-(lambda1 * t) ** p1 - (lambda2 * (t - t0)) ** p2)
                log_likelihood += np.log(f1_part + f2_part) + np.log(exp_part)
        else:  # Censored event
            if t <= t0:
                # Phase 1
                log_likelihood += -((lambda1 * t) ** p1)
            else:
                # Phase 2
                log_likelihood += -((lambda1 * t0) ** p1 + (lambda2 * (t - t0)) ** p2)

    return log_likelihood

# Negative log-likelihood function
def weibull_two_phase_negative_log_likelihood(params, data, events, t0):
    return -weibull_two_phase_log_likelihood(params, data, events, t0)

# Initial parameters
initial_params = [0.01, 3, 0.03, 3]  # [lambda1, p1, lambda2, p2]

# Constraints: all parameters must be positive
bounds = [(1e-5, None), (1e-5, None), (1e-5, None), (1e-5, None)]

# Optimization
result = minimize(weibull_two_phase_negative_log_likelihood, initial_params,
                  args=(data, events, t0), bounds=bounds, method='L-BFGS-B')

# Check the results
if result.success:
    lambda1_est, p1_est, lambda2_est, p2_est = result.x
    #print(f'Initial parameters: {initial_params}')
    print(f'Estimated two-phase Weibull parameters:')
    print(f'  Phase 1 - Scale (lambda1): {lambda1_est}, Shape (p1): {p1_est}')
    print(f'  Phase 2 - Scale (lambda2): {lambda2_est}, Shape (p2): {p2_est}')
else:
    print("Optimization did not converge:")
    print(result.message)


In [None]:
# Function to calculate the survival function S(t)
def S(t, p1, lambda1, p2, lambda2, t0):
    if t < t0:
        return np.exp(-(t * lambda1) ** p1)  # Phase 1
    else:
        return np.exp(-(t0 * lambda1) ** p1 - ((t - t0) * lambda2) ** p2)  # Phase 2

# Generate a time series from 0 to 300
t_values = np.linspace(0, 300, 500)

# Calculate S(t) values for each t
S_values = [S(t, p1_est, lambda1_est, p2_est, lambda2_est, t0) for t in t_values]

# Plot the graph
plt.figure(figsize=(8, 6))
plt.plot(t_values, S_values, label=r'$S(t)$', color='b')
plt.axvline(x=t0, color='r', linestyle='--', label='Threshold $t_0$')
plt.title('Survival function graph $S(t)$ with estimated parameters')
plt.xlabel('Time $t$')
plt.ylabel('Survival function $S(t)$')
plt.legend()
plt.grid(True)
plt.show()


# **Tối ưu mô hình**

Mô hình Weibull để tính hàm S(t)

In [None]:
data = df['Tool wear [min]'].values  # Thời gian quan sát
events = df['Machine failure'].values  # Biến chỉ thị sự kiện (1: sự kiện xảy ra, 0: censored)

# Hàm log-likelihood cho phân phối Weibull
def weibull_log_likelihood(params, data, events):
    lambda_, p = params
    log_likelihood = 0
    
    if lambda_ <= 0:  # Kiểm tra giá trị lambda
        return -np.inf  # Trả về -inf nếu lambda không hợp lệ

    for i in range(len(data)):
        if data[i] <= 0:  # Bỏ qua giá trị không hợp lệ
            continue

        if events[i] == 1:  # Sự kiện xảy ra
            log_likelihood += (np.log(p) + np.log(lambda_) + 
                               (p - 1) * np.log(lambda_ * data[i]) - 
                               (lambda_ * data[i]) ** p)
        else:  # Sự kiện không xảy ra (censored)
            log_likelihood -= (lambda_ * data[i]) ** p

    return log_likelihood

# Hàm negative log-likelihood cho phân phối Weibull
def weibull_negative_log_likelihood(params, data, events):
    return -weibull_log_likelihood(params, data, events)  # Trả về negative log-likelihood

# Tham số khởi tạo
initial_params = [1.0, 5.0]  # [lambda, p] khởi tạo giá trị dương hợp lý

# Sử dụng minimize để tìm tham số tối ưu
result = minimize(weibull_negative_log_likelihood, initial_params, args=(data, events), 
                  bounds=((1e-5, None), (1e-5, None)), method='L-BFGS-B')

# Lấy tham số ước lượng
lambda_est, p_est = result.x
print(f'Tham số khởi tạo: \n(lambda,p): {initial_params}')
print(f'Ước lượng tham số Weibull:\nThang đo (lambda): {lambda_est}\nHình dạng (p): {p_est}')


# Generate values of t from 0 to 500
t_values = np.linspace(0, 1500, 100)
# Calculate S(t) = exp(-lambda * t)^p
S_t_values = np.exp(-(lambda_est * t_values) ** p_est)


Tính toán điểm khủy tay

In [None]:
import pandas as pd
import numpy as np
from scipy.stats import linregress
from scipy.optimize import minimize

def weibull_log_likelihood(params, data, events):
    lambda_, p = params
    log_likelihood = 0

    if lambda_ <= 0:  # Kiểm tra giá trị lambda
        return -np.inf  # Trả về -inf nếu lambda không hợp lệ

    for i in range(len(data)):
        if data[i] <= 0:  # Bỏ qua giá trị không hợp lệ
            continue

        if events[i] == 1:  # Sự kiện xảy ra
            log_likelihood += (np.log(p) + np.log(lambda_) +
                               (p - 1) * np.log(lambda_ * data[i]) -
                               (lambda_ * data[i]) ** p)
        else:  # Sự kiện không xảy ra (censored)
            log_likelihood -= (lambda_ * data[i]) ** p

    return log_likelihood

def weibull_negative_log_likelihood(params, data, events):
    return -weibull_log_likelihood(params, data, events)

def find_phase_boundary_weibull(csv_file, column_name, event_column, A, B, delta):
    # Đọc dữ liệu từ file CSV
    data = pd.read_csv(csv_file)
    failure_times = data[column_name].values
    events = data[event_column].values
    N = len(failure_times)

    # Tìm tham số Weibull
    initial_params = [1.0, 5.0]  # [lambda, p] khởi tạo
    result = minimize(weibull_negative_log_likelihood, initial_params, args=(failure_times, events), 
                      bounds=((1e-5, None), (1e-5, None)), method='L-BFGS-B')
    lambda_est, p_est = result.x

    # Tính xác suất sống sót S(t)
    survival_probs = np.exp(-(lambda_est * failure_times) ** p_est)

    # Giới hạn giá trị S(t) để tránh lỗi log
    epsilon = 1e-10
    survival_probs = np.clip(survival_probs, epsilon, 300)

    # Lặp qua từng k để chia dữ liệu và tính độ chênh lệch dốc
    slope_differences = []
    for k in range(A, N - A + 1):
        # Phân chia dữ liệu
        x1, y1 = failure_times[:k], survival_probs[:k]
        x2, y2 = failure_times[k:], survival_probs[k:]

        # Bỏ qua nếu tất cả giá trị x trong một phần giống nhau
        if len(set(x1)) == 1 or len(set(x2)) == 1:
            continue

        # Khớp đường thẳng cho mỗi phần
        slope1, _, _, _, _ = linregress(x1, np.log(-np.log(y1)))
        slope2, _, _, _, _ = linregress(x2, np.log(-np.log(y2)))

        # Tính độ chênh lệch dốc
        delta_s = slope2 - slope1
        slope_differences.append((k, delta_s))

    # Lọc các giá trị với |Δs| > δ
    significant_differences = [(k, delta_s) for k, delta_s in slope_differences if abs(delta_s) > delta]

    # Trả về kết quả
    if len(significant_differences) >= B:
        # Lấy B giá trị lớn nhất của |Δs|
        significant_differences = sorted(significant_differences, key=lambda x: abs(x[1]), reverse=True)[:B]
        k_values = [item[0] for item in significant_differences]
        delta_s_values = [item[1] for item in significant_differences]
        return True, k_values, delta_s_values
    else:
        return False, [], []

# Đường dẫn tới file CSV
csv_file = "D:/Code ĐA2/Data/ai4i2020.csv"
column_name = "Tool wear [min]"  # Tên cột dữ liệu cần xử lý
event_column = "Machine failure"  # Cột chỉ thị sự kiện
A = 25  # Tham số A (tối thiểu 5)
B = 1  # Tham số B (số kết quả gợi ý)
delta = 0.1  # Ngưỡng độ chênh lệch dốc

# Gọi hàm xử lý
flag, k_values, delta_s_values = find_phase_boundary_weibull(csv_file, column_name, event_column, A, B, delta)

# In kết quả
if flag:
    print("Suggested phase boundaries:")
    for i, (k, delta_s) in enumerate(zip(k_values, delta_s_values), 1):
        print(f"{i}. k = {k}, Δs = {delta_s:.4f}")
else:
    print("No significant phase difference could be found.")
