In [None]:
import pandas as pd
from pathlib import Path
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from copy import deepcopy
import numpy as np
import warnings
warnings.filterwarnings('ignore')

##  数据读取

In [None]:
# 数据读取
columns_to_int16 = ['日期', '时间', '生产线编号', '物料推送气缸状态', '物料推送数', '物料待抓取数', '放置容器数', '容器上传检测数',
                    '填装检测数', '填装定位器状态', '物料抓取数', '填装旋转数', '填装下降数', '填装数', '加盖检测数', 
                    '加盖定位数', '推盖数', '加盖下降数', '加盖数', '拧盖检测数', '拧盖定位数', '拧盖下降数', '拧盖旋转数', 
                    '拧盖数', '合格数', '不合格数','机器状态']
columns_to_int32 = ['0_Duration', '1_Duration', '2_Duration', '3_Duration', '4_Duration', '5_Duration', 
                    '6_Duration', '7_Duration', '8_Duration', '9_Duration', '10_Duration']

def transfer_data_int(df):
    df[columns_to_int16] = df[columns_to_int16].astype('Int16')
    df[columns_to_int32] = df[columns_to_int32].astype('Int32')
    return df

def load_and_convert(file_paths):
    """批量读取 CSV 并转换类型，返回字典"""
    return {fp.stem: transfer_data_int(pd.read_csv(fp)) for fp in file_paths}

# 处理结果
processed_files = list(Path('temp_data/train').glob('M*.csv'))
train_data_1 = load_and_convert(processed_files[:4])
train_data_2 = load_and_convert(processed_files[4:8])
dev_data = load_and_convert(processed_files[8:])

# 预测数据
predicted_files = list(Path('temp_data/test').glob('M*.csv'))
test_data = load_and_convert(predicted_files)

In [None]:
# 特征筛选
feature_columns = ['物料推送气缸状态', '物料推送数', '物料待抓取数', '放置容器数', '容器上传检测数', '填装检测数',
       '填装定位器状态', '物料抓取数', '填装旋转数', '填装下降数', '加盖检测数', '加盖定位数', '推盖数',
       '加盖下降数', '拧盖检测数', '拧盖定位数', '拧盖下降数', '拧盖旋转数', '拧盖数', #'合格数','不合格数',
       '0_Duration', '1_Duration', '2_Duration', '3_Duration',
       '4_Duration', '5_Duration', '6_Duration', '7_Duration', '8_Duration',
       '9_Duration', '10_Duration']

In [None]:
train_data_1 = pd.concat(train_data_1.values(), ignore_index=True)
train_data_2 = pd.concat(train_data_2.values(), ignore_index=True)
dev_data = pd.concat(dev_data.values(), ignore_index=True)

X_train1, y_train1 = train_data_1[feature_columns], train_data_1['机器状态']
X_train2, y_train2 = train_data_2[feature_columns], train_data_2['机器状态']
X_dev, y_dev = dev_data[feature_columns], dev_data['机器状态']

X_test_1, y_test_1 = test_data["M201"][feature_columns],test_data["M201"]['机器状态']
X_test_2, y_test_2 = test_data["M202"][feature_columns],test_data["M202"]['机器状态']


## 模型评估指标

In [None]:
from sklearn.metrics import accuracy_score, recall_score

# 故障准确率
def Fault_Accuracy(y_,y_pred_):
    y_np = np.array(y_).astype(int)
    y_pred_np = np.array(y_pred_).astype(int)
    fault_index = np.where(y_np!= 0)[0]
    
    fault_accuracy = accuracy_score(y_np[fault_index],y_pred_np[fault_index])
    return fault_accuracy

# 报警准确率（不管报警内容是否正确）
def Warning_Accuracy(y_,y_pred_):
    y_np = np.array(y_).astype(int)
    y_pred_np = np.array(y_pred_).astype(int)
    
    y_np[y_np != 0] = 1
    y_pred_np[y_pred_np != 0] = 1

    warning_accuracy = accuracy_score(y_np,y_pred_np)
    return warning_accuracy

def Recall(y_,y_pred_):
    fault_labels = list(range(10))
    y_np = np.array(y_).astype(int)
    y_pred_np = np.array(y_pred_).astype(int)

    recalls = recall_score(y_np, y_pred_np, average=None,labels=fault_labels)
    print("单个类别的召回率:")
    labeled_recalls = {int(label): float(recall) for label, recall in zip(fault_labels, recalls)}
    # 打印结果
    for label, recall in labeled_recalls.items():
        print('%d : %.4f '%(label,recall))

## 增量学习

In [None]:
# 定义参数
xgb_params_01 = {'objective': 'multi:softmax',
                 'num_class': 10,
                 'learning_rate': 0.08,
                 'verbose': -1,}

xgb_params_02 = {'objective': 'multi:softmax',
                 'num_class': 10,
                 'learning_rate': 0.08,
                 'verbose': -1,
                 'process_type': 'update',
                 'updater': 'refresh',
                 'refresh_leaf': True}

In [None]:
import xgboost as xgb
from xgboost import DMatrix

def incremental_learning_start(X_train, y_train, X_test):
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test)
    
    model = xgb.train(xgb_params_01, dtrain, num_boost_round=100)
    
    y_pred = model.predict(dtest)
    print('初模型训练完毕')
    return model, y_pred

def incremental_learning_updata(X_train, y_train, X_test, model):
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test)
    
    model_copy = xgb.train(xgb_params_02, dtrain, num_boost_round=100, xgb_model=model)
    model = model_copy
    
    y_pred = model.predict(dtest)
    print('增量训练完毕')
    return model, y_pred

In [None]:
model, y_pred1 = incremental_learning_start(X_train1, y_train1, X_dev)
print('Accuracy : %.6f' % (accuracy_score(y_dev, y_pred1)))
print('Fault_Accuracy : %.6f' % (Fault_Accuracy(y_dev, y_pred1)))
print('Warning_Accuracy : %.6f' % (Warning_Accuracy(y_dev, y_pred1)))
Recall(y_dev, y_pred1)

In [None]:
model, y_pred2 = incremental_learning_updata(X_train2, y_train2, X_dev, model)
print('Accuracy : %.6f' % (accuracy_score(y_dev, y_pred2)))
print('Fault_Accuracy : %.6f' % (Fault_Accuracy(y_dev, y_pred2)))
print('Warning_Accuracy : %.6f' % (Warning_Accuracy(y_dev, y_pred2)))
Recall(y_dev, y_pred2)


## 经验学习

In [None]:
from xgboost import XGBClassifier

def xgb_experience_learning_start(X_train, y_train, X_test, num_class=10):
    """
    初始模型训练
    """
    model = XGBClassifier(
        objective='multi:softmax',
        num_class=num_class,
        learning_rate=0.08,
        use_label_encoder=False,
        verbosity=0
    )
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    return y_pred, model

def xgb_experience_learning(X_train_new, y_train_new, X_test, prev_model, num_class=10):
    """
    利用旧模型预测结果作为新特征进行经验学习
    """
    # 复制数据，避免覆盖原始数据
    X_train_new_feat = X_train_new.copy()
    X_test_feat = X_test.copy()
    
    # 加入旧模型预测结果
    X_train_new_feat['prev_pred'] = prev_model.predict(X_train_new)
    X_test_feat['prev_pred'] = prev_model.predict(X_test)
    
    # 新模型训练
    model = XGBClassifier(
        objective='multi:softmax',
        num_class=num_class,
        learning_rate=0.08,
        use_label_encoder=False,
        verbosity=0
    )
    model.fit(X_train_new_feat, y_train_new)
    
    # 新模型预测
    y_pred = model.predict(X_test_feat)
    
    return y_pred, model

In [None]:
y_pred_1, model_1 = xgb_experience_learning_start(X_train1, y_train1, X_dev)
print('Accuracy : %.6f' % (accuracy_score(y_dev, y_pred_1)))
print('Fault_Accuracy : %.6f' % (Fault_Accuracy(y_dev, y_pred_1)))
print('Warning_Accuracy : %.6f' % (Warning_Accuracy(y_dev, y_pred_1)))
Recall(y_dev, y_pred_1)


In [None]:
y_pred_2, model_2 = xgb_experience_learning(X_train2, y_train2, X_dev, model_1)
print('Accuracy : %.6f' % (accuracy_score(y_dev, y_pred_2)))
print('Fault_Accuracy : %.6f' % (Fault_Accuracy(y_dev, y_pred_2)))
print('Warning_Accuracy : %.6f' % (Warning_Accuracy(y_dev, y_pred_2)))
Recall(y_dev, y_pred_2)