In [1]:
import numpy as np
from scipy import stats

class CART:
    def __init__(self, max_depth=None, min_samples_split=2, mode="regression"):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.mode = mode
        self.tree = None
    
    def fit(self, X, y):
        self.tree = self._build_tree(X, y)

    def predict(self, X):
        return np.array([self._predict_row(row, self.tree) for row in X])
    
    def _build_tree(self, X, y, depth=0):
        num_samples, num_features = X.shape
        unique_values = np.unique(y)
        if len(unique_values) == 1 or (self.max_depth and depth == self.max_depth) or num_samples < self.min_samples_split:
            if self.mode == 'regression':
                return np.mean(y)
            else:
                return stats.mode(y, keepdims=False).mode
        
        best_split = None
        best_score = float('inf')

        for feature_idx in range(num_features):
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                left_mask = X[:, feature_idx] <= threshold
                left_y = y[left_mask]
                right_y = y[~left_mask]

                if len(left_y) == 0 or len(right_y) == 0:
                    continue

                if self.mode == 'regression':
                    left_rss = np.sum((left_y - left_y.mean()) ** 2)
                    right_rss = np.sum((right_y - right_y.mean()) ** 2)
                    score = (left_rss + right_rss) / num_samples
                else:
                    left_gini = self._gini(left_y)
                    right_gini = self._gini(right_y)
                    score = (len(left_y) * left_gini + len(right_y) * right_gini) / num_samples

                if score < best_score:
                    best_score = score
                    best_left_mask = left_mask
                    best_split = (feature_idx, threshold)

        if best_split is None:
            if self.mode == 'regression':
                return np.mean(y)
            else:
                return stats.mode(y, keepdims=False).mode

        left_tree = self._build_tree(X[best_left_mask], y[best_left_mask], depth + 1)
        right_tree = self._build_tree(X[~best_left_mask], y[~best_left_mask], depth + 1)
        
        return {
            'best_split': best_split,
            'left': left_tree,
            'right': right_tree
        }

    def _predict_row(self, row, tree):
        if isinstance(tree, dict):
            feature, threshold = tree['best_split']
            if row[feature] <= threshold:
                return self._predict_row(row, tree['left'])
            else:
                return self._predict_row(row, tree['right'])
        else:
            return tree
    
    def _gini(self, y):
        _, counts = np.unique(y, return_counts=True)
        p = counts / len(y)
        return 1 - np.sum(p ** 2)


In [None]:
class RandomForest:
    def __init__(self, n_estimators=10, max_depth=None, min_samples_split=2, max_features='sqrt', oob_score=False, mode="regression", random_state=0):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.oob_score = oob_score
        self.oob_score_ = None  # 用于存储OOB得分
        self.mode = mode
        self.random_state = random_state
        self.estimators = [[CART(max_depth, min_samples_split, mode), None] for _ in range(n_estimators)]

    def fit(self, X, y):
        np.random.seed(self.random_state)
        num_samples, num_features = X.shape
        if self.mode == "regression":
            oob_predictions = np.zeros(num_samples)  # 存储袋外样本的预测结果
        else:
            oob_predictions = [[] for _ in range(num_samples)]
        oob_count = np.zeros(num_samples)  # 记录每个样本被多少棵树预测过
        for i, (estimator, _) in enumerate(self.estimators):
            sample_idx = np.random.choice(num_samples, size=num_samples, replace=True)
            oob_idx = np.setdiff1d(np.arange(num_samples), sample_idx)
            X_k, y_k = X[sample_idx], y[sample_idx]
            
            if self.max_features == 'sqrt':
                feature_idx = np.random.choice(num_features, size=int(np.sqrt(num_features)), replace=False)
            elif self.max_features == 'log2':
                feature_idx = np.random.choice(num_features, size=int(np.log2(num_features)), replace=False)
            else:
                feature_idx = np.random.choice(num_features, size=num_features, replace=False)
            estimator.fit(X_k[:, feature_idx], y_k)
            self.estimators[i][1] = feature_idx

            # 对袋外样本进行预测
            for idx in oob_idx:
                oob_count[idx] += 1
                y_pred_oob = estimator._predict_row(X[idx, feature_idx], estimator.tree)
                
                if self.mode == "regression":
                    oob_predictions[idx] += y_pred_oob  # 回归任务是累加
                else:
                    oob_predictions[idx].append(y_pred_oob) # 分类时采用投票，记录每棵树的预测标签
        
        # 计算袋外得分
        if self.oob_score:
            if self.mode == "regression":
                # 计算MSE
                oob_predictions = oob_predictions[oob_count > 0]
                oob_predictions /= oob_count  # 对每个样本的袋外预测结果进行平均
                self.oob_score_ = np.mean((oob_predictions - y[oob_count > 0]) ** 2)
            elif self.mode == "classification":
                # 计算分类准确率
                oob_predictions = np.array([stats.mode(predictions, keepdims=False).mode for predictions in oob_predictions if predictions])
                self.oob_score_ = np.mean(oob_predictions == y[oob_count > 0])  # 计算准确率


    def predict(self, X):
        y_pred = []
        for estimator, feature_idx in self.estimators:
            y_pred.append(estimator.predict(X[:, feature_idx]))
        y_pred = np.array(y_pred).T

        if self.mode == 'classification':
            y_pred = stats.mode(y_pred, axis=1, keepdims=False).mode
        elif self.mode == 'regression':
            y_pred = np.mean(y_pred, axis=1)
        return y_pred

In [31]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier as SklearnRF
from sklearn.metrics import accuracy_score

# 加载Iris数据集
iris = load_iris()
X, y = iris.data, iris.target

# 拆分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 使用你实现的RandomForest进行训练
rf_custom = RandomForest(n_estimators=10, max_depth=3, mode="classification", random_state=42, oob_score=True)
rf_custom.fit(X_train, y_train)
y_pred_custom = rf_custom.predict(X_test)

# 使用sklearn的RandomForestClassifier进行训练
rf_sklearn = SklearnRF(n_estimators=10, max_depth=3, random_state=42)
rf_sklearn.fit(X_train, y_train)
y_pred_sklearn = rf_sklearn.predict(X_test)

# 对比预测结果
accuracy_custom = accuracy_score(y_test, y_pred_custom)
accuracy_sklearn = accuracy_score(y_test, y_pred_sklearn)

print("自实现 RandomForest 分类准确率:", accuracy_custom)
print('oob score:', rf_custom.oob_score_)
print("sklearn RandomForest 分类准确率:", accuracy_sklearn)


120 120
自实现 RandomForest 分类准确率: 1.0
oob score: 0.8739495798319328
sklearn RandomForest 分类准确率: 1.0


In [33]:
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor as SklearnRFRegressor

# 生成回归数据集
X, y = make_regression(n_samples=100, n_features=5, noise=0.1, random_state=42)

# 拆分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 使用你实现的RandomForest进行训练
rf_custom = RandomForest(n_estimators=10, max_depth=3, mode="regression", random_state=42, oob_score=True)
rf_custom.fit(X_train, y_train)
y_pred_custom = rf_custom.predict(X_test)

# 使用sklearn的RandomForestRegressor进行训练
rf_sklearn = SklearnRFRegressor(n_estimators=10, max_depth=3, random_state=42)
rf_sklearn.fit(X_train, y_train)
y_pred_sklearn = rf_sklearn.predict(X_test)

# 对比预测结果
mse_custom = mean_squared_error(y_test, y_pred_custom)
mse_sklearn = mean_squared_error(y_test, y_pred_sklearn)

print("自实现 RandomForest 回归 MSE:", mse_custom)
print('oob score:', rf_custom.oob_score_)
print("sklearn RandomForest 回归 MSE:", mse_sklearn)

80 80
自实现 RandomForest 回归 MSE: 11104.339410814619
oob score: 13381.455491254059
sklearn RandomForest 回归 MSE: 7577.326411673124


In [29]:
a = np.array([[], [1, 2], []])
b = np.array([1,0,1])
mask = b > 0
a[mask]

  a = np.array([[], [1, 2], []])


array([list([]), list([])], dtype=object)