最长连续相同字符子串

In [1]:
def longest_consecutive_chars(s):
    if not s:
        return '', 0
    
    max_char = s[0]
    max_len = 1
    current_char = s[0]
    current_len = 1
    
    for i in range(1, len(s)):
        if s[i] == current_char:
            current_len += 1
            if current_len > max_len:
                max_len = current_len
                max_char = current_char
        else:
            current_char = s[i]
            current_len = 1
    
    return max_char, max_len

# 测试
print(longest_consecutive_chars("aaabbc"))  # 输出: ('a', 3)

('a', 3)


寻找Local Max Value

In [3]:
def find_local_max(arr, n):
    """
    找出所有local max value
    local max定义：在长度为2n+1的窗口中最大，
    且前n个元素严格递增，后n个元素严格递减
    """
    result = []
    length = len(arr)
    
    for i in range(n, length - n):
        window = arr[i-n:i+n+1]
        
        # 检查arr[i]是否为窗口最大值
        if arr[i] != max(window):
            continue
            
        # 检查前n个是否严格递增
        increasing = True
        for j in range(i-n+1, i+1):
            if arr[j] <= arr[j-1]:
                increasing = False
                break
                
        # 检查后n个是否严格递减
        decreasing = True
        for j in range(i+1, i+n+1):
            if arr[j] >= arr[j-1]:
                decreasing = False
                break
                
        if increasing and decreasing:
            result.append(arr[i])
    
    return result

# 测试
arr = [1, 3, 10, 4, 2, 19, 5, 5]
n = 2
print(find_local_max(arr, n))  # 输出: [10]

[10]


补全Bootstrap算法代码

bootstrap函数：有放回地随机抽取n个样本索引

In [2]:
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from random import randint, seed

def bootstrap(n: int) -> list[int]:
    """
    Step 1: Bootstrap the train samples for each base classifier.
    """
    indices = [randint(0, n-1) for _ in range(n)]
    return indices

def fit(classifiers, x, y):
    """
    Step 2: Train each classifier based on its own bootstrapped samples.
    """
    n_samples = len(x)
    for clf in classifiers:
        indices = bootstrap(n_samples)
        x_bootstrapped = [x[i] for i in indices]
        y_bootstrapped = [y[i] for i in indices]
        clf.fit(x_bootstrapped, y_bootstrapped)

def predict(classifiers, x):
    """
    Step 3: Assign class labels by a majority vote of the base classifiers.
    """
    predictions = np.array([clf.predict(x) for clf in classifiers])
    # Majority vote
    final_predictions = np.apply_along_axis(
        lambda x: np.bincount(x).argmax(), 
        axis=0, 
        arr=predictions
    )
    # predictions = stats.mode(predictions)[0]
    return final_predictions.tolist()

def solution(x_train, y_train, x_test, n_estimators):
    """
    Step 4: Pull everything together
    """
    seed(42)
    classifiers = [DecisionTreeClassifier(random_state=0) 
                   for _ in range(n_estimators)]
    fit(classifiers, x_train, y_train)
    return predict(classifiers, x_test)

In [3]:
bootstrap(5)

[2, 0, 0, 0, 3]

Linear Regression

In [None]:
class LinearRegressionGD:
    def __init__(self, learning_rate=0.01, n_iter=1000, 
                 fit_intercept=True, method='batch'):
        """
        learning_rate: 学习率
        n_iter: 迭代次数
        fit_intercept: 是否添加偏置项
        method: 'batch'（批量梯度下降）或 'sgd'（随机梯度下降）
        """
        self.learning_rate = learning_rate
        self.n_iter = n_iter
        self.fit_intercept = fit_intercept
        self.method = method
        self.theta = None
        self.loss_history = []  # 记录损失变化
    
    def _add_intercept(self, X):
        """添加偏置项"""
        return np.c_[np.ones(X.shape[0]), X]
    
    def _compute_loss(self, X, y):
        """计算均方误差损失"""
        m = len(y)
        y_pred = X @ self.theta
        loss = (1/(2*m)) * np.sum((y_pred - y) ** 2)
        return loss
    
    def fit(self, X, y, verbose=False):
        """训练模型"""
        # 添加偏置项
        if self.fit_intercept:
            X = self._add_intercept(X)
        
        m, n = X.shape
        self.theta = np.zeros(n)  # 初始化参数
        
        for i in range(self.n_iter):
            if self.method == 'batch':
                # 批量梯度下降
                gradient = (1/m) * X.T @ (X @ self.theta - y)
                self.theta -= self.learning_rate * gradient
            
            elif self.method == 'sgd':
                # 随机梯度下降（随机选一个样本）
                idx = np.random.randint(m)
                x_i = X[idx:idx+1]  # 保持2D形状
                y_i = y[idx:idx+1]
                gradient = x_i.T @ (x_i @ self.theta - y_i)
                self.theta -= self.learning_rate * gradient
            
            # 记录损失（每100次迭代）
            if i % 100 == 0 or i == self.n_iter - 1:
                loss = self._compute_loss(X, y)
                self.loss_history.append((i, loss))
                if verbose:
                    print(f"Iteration {i}: loss = {loss:.6f}")
        
        return self
    
    def predict(self, X):
        """预测"""
        if self.fit_intercept:
            X = self._add_intercept(X)
        return X @ self.theta

Logistic Regression

In [None]:
import numpy as np

class LogisticRegression:
    def __init__(self, learning_rate=0.01, n_iter=1000, fit_intercept=True):
        self.learning_rate = learning_rate
        self.n_iter = n_iter
        self.fit_intercept = fit_intercept
        self.theta = None
    
    def _add_intercept(self, X):
        """添加偏置项"""
        intercept = np.ones((X.shape[0], 1))
        return np.hstack((intercept, X))
    
    def _sigmoid(self, z):
        """Sigmoid函数"""
        # 数值稳定性处理
        z = np.clip(z, -500, 500)
        return 1 / (1 + np.exp(-z))
    
    def fit(self, X, y):
        # 添加偏置项
        if self.fit_intercept:
            X = self._add_intercept(X)
        
        n_samples, n_features = X.shape
        self.theta = np.zeros(n_features)
        
        # 梯度下降
        for i in range(self.n_iter):
            # 计算预测值
            z = np.dot(X, self.theta)
            h = self._sigmoid(z)
            
            # 计算梯度
            gradient = np.dot(X.T, (h - y)) / n_samples
            
            # 更新参数
            self.theta -= self.learning_rate * gradient
            
            # 可选的：计算损失（用于监控）
            # loss = -np.mean(y * np.log(h + 1e-8) + (1-y) * np.log(1-h + 1e-8))
    
    def predict_proba(self, X):
        """预测概率"""
        if self.fit_intercept:
            X = self._add_intercept(X)
        return self._sigmoid(np.dot(X, self.theta))
    
    def predict(self, X, threshold=0.5):
        """预测类别"""
        proba = self.predict_proba(X)
        return (proba >= threshold).astype(int)

TP = 80, FN = 20, FP = 15, TN = 85
Recall = TP / (TP + FN) = 80 / (80 + 20) = 0.8 或 80%
FPR = FP / (FP + TN) = FP / N

3 问题：如果验证集损失（validation loss）显著高于训练集损失（training loss），可能的原因是什么？

回答要点：这是过拟合（Overfitting） 的典型标志。模型过度学习了训练数据中的噪声和细节，导致在未见过的验证数据上泛化能力变差。处理方法包括：获取更多数据、进行数据增强、增加正则化（如L1/L2）、使用Dropout（对神经网络）、或简化模型复杂度。

4 问题：在处理分类问题时，如何有意识地增加模型的偏差（Bias）并减少方差（Variance）？

回答要点：这通常是为了解决过拟合（高方差）。方法包括：简化模型（如减少树的最大深度、减少神经网络层数）、增加正则化强度、减少特征数量、或使用Bagging类方法（如随机森林）来平均多个高方差模型的结果。

In [None]:
神经网络输出计算题
1. MLP输出计算
问题：

输入层：3个神经元，输入为 [1.0, 0.5, -0.2]

隐藏层：2个神经元，激活函数为线性

输出层：1个神经元，激活函数为sigmoid

权重：

W1 = [[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]]

b1 = [0.1, 0.2]

W2 = [[0.7], [0.8]]

b2 = [0.3]

答案：

步骤1：计算隐藏层输出
h1 = 1.0*0.1 + 0.5*0.3 + (-0.2)*0.5 + 0.1 = 0.1 + 0.15 - 0.1 + 0.1 = 0.25
h2 = 1.0*0.2 + 0.5*0.4 + (-0.2)*0.6 + 0.2 = 0.2 + 0.2 - 0.12 + 0.2 = 0.48

步骤2：计算输出层输入
z = 0.25*0.7 + 0.48*0.8 + 0.3 = 0.175 + 0.384 + 0.3 = 0.859

步骤3：应用sigmoid
output = 1 / (1 + e^(-0.859)) ≈ 1 / (1 + 0.424) ≈ 1 / 1.424 ≈ 0.702

答案：0.702（保留三位小数）

In [12]:
h1 = 1.0*0.1 + 0.5*0.3 + (-0.2)*0.5 + 0.1
h2 = 1.0*0.2 + 0.5*0.4 + (-0.2)*0.6 + 0.2
z = 0.25*0.7 + 0.48*0.8 + 0.3
output = 1 / (1 + np.exp(-0.859))
h1, h2, z, output

(0.25, 0.48000000000000004, 0.859, np.float64(0.7024516833527672))

### Implementing Gradient Descent in Python: The Cost Function

In [None]:
import numpy as np

def cost(X, y, theta):
    m = len(y)
    predictions = X.dot(theta)
    cost = (1/m) * np.sum(np.square(predictions-y)) # Compute mean square error
    return cost

def gradient_descent(X, y, theta, alpha, iterations):
    m = len(y)
    cost_history = np.zeros(iterations)
    theta_history = np.zeros((iterations,2))
    for i in range(iterations): # Iterate until convergence
        prediction = np.dot(X,theta)  # Matrix multiplication between X and theta
        theta = theta - (1/m)*alpha*(X.T.dot((prediction - y))) # Gradient update rule
        theta_history[i,:] = theta.T
        cost_history[i] = cost(X,y,theta)
    return theta, cost_history, theta_history

X = 2 * np.random.rand(100,1)
y = 4 +3 * X+np.random.randn(100,1)

lr = 0.01 # Learning Rate
n_iter = 1000 # Max number of iterations
theta = np.random.randn(2,1) # Randomly initialized parameters
X_b = np.c_[np.ones((len(X),1)),X] # add bias parameter to X
theta, cost_history, theta_history = gradient_descent(X_b,y,theta,lr,n_iter) # Gradient Descent

In [None]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def cost_function(h, y):
    return (-y * np.log(h) - (1 - y) * np.log(1 - h)).mean()

def logistic_regression(X, y, num_iterations, learning_rate):
    # Add intercept to X
    intercept = np.ones((X.shape[0], 1))
    X = np.concatenate((intercept, X), axis=1)

    # Weights initialization
    theta = np.zeros(X.shape[1])

    for i in range(num_iterations):
        z = np.dot(X, theta)
        h = sigmoid(z)
        gradient = np.dot(X.T, (h - y)) / y.size
        theta -= learning_rate * gradient

        z = np.dot(X, theta)
        h = sigmoid(z)
        loss = cost_function(h, y)

        if i % 10000 == 0:
            print(f'Loss: {loss}\t')

    return theta

def predict_prob(X, theta):
    # Add intercept to X
    intercept = np.ones((X.shape[0], 1))
    X = np.concatenate((intercept, X), axis=1)
    return sigmoid(np.dot(X, theta))

def predict(X, theta, threshold=0.5):
    return predict_prob(X, theta) >= threshold


### Classification Algorithms and Metrics
#### k-Nearest Neighbors (k-NN) Algorithm

In [3]:
import math

# The 'euclidean_distance' function computes the Euclidean distance between two points
def euclidean_distance(point1, point2):
    # print(point1, point2)
    squares = [(p - q) ** 2 for p, q in zip(point1, point2)] # Calculate squared distance for each dimension
    return math.sqrt(sum(squares)) # Return the square root of the sum of squares

# Test it
point1 = (1, 2) # The coordinates of the first point
point2 = (4, 6) # The coordinates of the second point
print(euclidean_distance(point1, point2)) # 5.0

from collections import Counter

def k_nearest_neighbors(data, query, k, distance_fn):
    neighbor_distances_and_indices = []
    
    # # Compute distance from each training data point
    # for idx, label in enumerate(data):
    #     distance = distance_fn(label[:-1], query)
    #     neighbor_distances_and_indices.append((distance, idx))
    for idx, (features, label) in enumerate(data):  # Unpack correctly here
        distance = distance_fn(features, query)  # Use features directly
        neighbor_distances_and_indices.append((distance, idx, label))  # Store label too

    # Sort array by distance
    sorted_neighbor_distances_and_indices = sorted(neighbor_distances_and_indices)
    
    # Select k closest data points
    k_nearest_distances_and_indices = sorted_neighbor_distances_and_indices[:k]
    
    # Obtain class labels for those k data points
    # k_nearest_labels = [data[i][1] for distance, i in k_nearest_distances_and_indices]
    k_nearest_labels = [label for _, _, label in k_nearest_distances_and_indices]
    
    # Majority vote
    most_common = Counter(k_nearest_labels).most_common(1)
    return most_common[0][0] # Return the label of the class that receives the majority vote

# Define the dataset (training set)
# Each element of the dataset is a tuple (features, label)
data = [
    ((2, 3), 0),
    ((5, 4), 0),
    ((9, 6), 1),
    ((4, 7), 0),
    ((8, 1), 1),
    ((7, 2), 1)
]
query = (5, 3)  # test point

# Perform the classification
predicted_label = k_nearest_neighbors(data, query, k=3, distance_fn=euclidean_distance)
print(predicted_label)  # Expected class label is 0

5.0
0


#### Implementing Naive Bayes Classifier

In [4]:
import pandas as pd

def calculate_prior_probabilities(y):
    # Calculate prior probabilities for each class
    return y.value_counts(normalize=True)

def calculate_likelihoods(X, y):
    likelihoods = {}
    for column in X.columns:
        likelihoods[column] = {}
        for class_ in y.unique():
            # Filter feature column data for each class
            class_data = X[y == class_][column]
            counts = class_data.value_counts()
            total_count = len(class_data)  # Total count of instances for current class
            likelihoods[column][class_] = counts / total_count  # Direct likelihoods without smoothing
    return likelihoods

def naive_bayes_classifier(X_test, priors, likelihoods):
    predictions = []
    for _, data_point in X_test.iterrows():
        class_probabilities = {}
        for class_ in priors.index:
            class_probabilities[class_] = priors[class_]
            for feature in X_test.columns:
                # Use .get to safely retrieve probability and get a default of 1/total to handle unseen values
                feature_probs = likelihoods[feature][class_]
                class_probabilities[class_] *= feature_probs.get(data_point[feature], 1 / (len(feature_probs) + 1))

        # Predict class with maximum posterior probability
        predictions.append(max(class_probabilities, key=class_probabilities.get))

    return predictions

def calculate_likelihoods_with_smoothing(X, y):
    likelihoods = {}
    for column in X.columns:
        likelihoods[column] = {}
        for class_ in y.unique():
            # Calculate normalized counts with smoothing
            class_data = X[y == class_][column]
            counts = class_data.value_counts()
            total_count = len(class_data) + len(X[column].unique())  # total count with smoothing
            likelihoods[column][class_] = (counts + 1) / total_count  # add-1 smoothing
    return likelihoods

data = {
    'Temperature': ['Hot', 'Hot', 'Cold', 'Hot', 'Cold', 'Cold', 'Cold'],
    'Humidity': ['High', 'High', 'Normal', 'Normal', 'High', 'Normal', 'Normal'],
    'Weather': ['Sunny', 'Sunny', 'Snowy', 'Rainy', 'Snowy', 'Snowy', 'Sunny']
}
df = pd.DataFrame(data)

# Split features and labels
X = df[['Temperature', 'Humidity']]
y = df['Weather']

# Calculate prior probabilities
priors = calculate_prior_probabilities(y)

# Calculate likelihoods with smoothing
likelihoods = calculate_likelihoods_with_smoothing(X, y)

# New observation
X_test = pd.DataFrame([{'Temperature': 'Cold', 'Humidity': 'Normal'}])

# Make prediction
prediction = naive_bayes_classifier(X_test, priors, likelihoods)
print("Predicted Weather: ", prediction[0])  # Output: Predicted Weather:  Snowy

Predicted Weather:  Snowy


In [9]:
# y, priors, X, X.columns, likelihoods

In [10]:
import math

def calculate_prior_probabilities(y):
    """
    Calculate prior probabilities P(Class)
    
    Args:
        y: list of class labels
    Returns:
        dict: {class: probability}
    """
    counts = {}
    total = len(y)
    
    for label in y:
        counts[label] = counts.get(label, 0) + 1
    
    priors = {}
    for label, count in counts.items():
        priors[label] = count / total
    
    return priors


def calculate_likelihoods(X, y):
    """
    Calculate likelihoods P(Feature|Class) without smoothing
    
    Args:
        X: list of lists, shape [n_samples, n_features]
        y: list of class labels, length n_samples
    Returns:
        dict: {feature_index: {class: {feature_value: probability}}}
    """
    # Get unique classes
    unique_classes = list(set(y))
    n_features = len(X[0]) if X else 0
    
    # Initialize data structures
    likelihoods = {}
    for feat_idx in range(n_features):
        likelihoods[feat_idx] = {}
        for class_ in unique_classes:
            likelihoods[feat_idx][class_] = {}
    
    # Count class occurrences for each feature value
    class_counts = {}
    for class_ in unique_classes:
        class_counts[class_] = 0
    
    # First pass: count total instances per class
    for i in range(len(y)):
        class_label = y[i]
        class_counts[class_label] = class_counts.get(class_label, 0) + 1
    
    # Second pass: count feature values per class
    for i in range(len(X)):
        class_label = y[i]
        features = X[i]
        
        for feat_idx, feat_value in enumerate(features):
            # Initialize if not exists
            if feat_value not in likelihoods[feat_idx][class_label]:
                likelihoods[feat_idx][class_label][feat_value] = 0
            
            likelihoods[feat_idx][class_label][feat_value] += 1
    
    # Convert counts to probabilities
    for feat_idx in likelihoods:
        for class_label in likelihoods[feat_idx]:
            total_in_class = class_counts[class_label]
            for feat_value in likelihoods[feat_idx][class_label]:
                count = likelihoods[feat_idx][class_label][feat_value]
                likelihoods[feat_idx][class_label][feat_value] = count / total_in_class
    
    return likelihoods, class_counts


def calculate_likelihoods_with_smoothing(X, y):
    """
    Calculate likelihoods P(Feature|Class) with Laplace (add-1) smoothing
    
    Args:
        X: list of lists, shape [n_samples, n_features]
        y: list of class labels, length n_samples
    Returns:
        dict: {feature_index: {class: {feature_value: probability}}}
    """
    # Get unique classes
    unique_classes = list(set(y))
    n_features = len(X[0]) if X else 0
    
    # Find all unique values for each feature
    unique_feature_values = []
    for feat_idx in range(n_features):
        values = set()
        for sample in X:
            values.add(sample[feat_idx])
        unique_feature_values.append(list(values))
    
    # Initialize data structures
    likelihoods = {}
    for feat_idx in range(n_features):
        likelihoods[feat_idx] = {}
        for class_ in unique_classes:
            likelihoods[feat_idx][class_] = {}
            # Initialize all feature values for this class
            for value in unique_feature_values[feat_idx]:
                likelihoods[feat_idx][class_][value] = 0
    
    # Count class occurrences
    class_counts = {}
    for class_ in unique_classes:
        class_counts[class_] = 0
    
    # Count occurrences
    for i in range(len(X)):
        class_label = y[i]
        features = X[i]
        class_counts[class_label] += 1
        
        for feat_idx, feat_value in enumerate(features):
            likelihoods[feat_idx][class_label][feat_value] += 1
    
    # Apply Laplace smoothing and convert to probabilities
    for feat_idx in likelihoods:
        num_unique_values = len(unique_feature_values[feat_idx])
        
        for class_label in likelihoods[feat_idx]:
            total_in_class = class_counts[class_label]
            
            for feat_value in likelihoods[feat_idx][class_label]:
                count = likelihoods[feat_idx][class_label][feat_value]
                # Laplace smoothing: (count + 1) / (total_in_class + num_unique_values)
                likelihoods[feat_idx][class_label][feat_value] = (count + 1) / (total_in_class + num_unique_values)
    
    return likelihoods, class_counts, unique_feature_values


def naive_bayes_classifier(X_test, priors, likelihoods, use_log=True):
    """
    Classify test samples using Naive Bayes
    
    Args:
        X_test: list of lists, test samples
        priors: dict from calculate_prior_probabilities
        likelihoods: dict from calculate_likelihoods
        use_log: if True, use log probabilities to avoid underflow
    Returns:
        list: predicted class labels
    """
    predictions = []
    unique_classes = list(priors.keys())
    n_features = len(X_test[0]) if X_test else 0
    
    for sample in X_test:
        best_class = None
        best_score = -float('inf') if use_log else 0
        
        for class_ in unique_classes:
            # Start with prior probability
            if use_log:
                score = math.log(priors[class_])  # log(P(class))
            else:
                score = priors[class_]  # P(class)
            
            # Multiply by likelihoods for each feature
            for feat_idx, feat_value in enumerate(sample):
                if feat_idx in likelihoods and class_ in likelihoods[feat_idx]:
                    # Get probability, use small value if feature value not seen
                    prob = likelihoods[feat_idx][class_].get(feat_value, 1e-10)
                    
                    if use_log:
                        score += math.log(prob)  # log(P(feature|class))
                    else:
                        score *= prob  # P(feature|class)
            
            # Update best class
            if (use_log and score > best_score) or (not use_log and score > best_score):
                best_score = score
                best_class = class_
        
        predictions.append(best_class)
    
    return predictions


def predict_proba(X_test, priors, likelihoods, use_log=True):
    """
    Get probability estimates for each class
    
    Args:
        X_test: list of lists, test samples
        priors: dict from calculate_prior_probabilities
        likelihoods: dict from calculate_likelihoods
        use_log: if True, use log probabilities
    Returns:
        list of dicts: [{class: probability}, ...] for each test sample
    """
    predictions = []
    unique_classes = list(priors.keys())
    n_features = len(X_test[0]) if X_test else 0
    
    for sample in X_test:
        class_scores = {}
        
        # Calculate raw scores (log or linear)
        for class_ in unique_classes:
            if use_log:
                score = math.log(priors[class_])
            else:
                score = priors[class_]
            
            for feat_idx, feat_value in enumerate(sample):
                if feat_idx in likelihoods and class_ in likelihoods[feat_idx]:
                    prob = likelihoods[feat_idx][class_].get(feat_value, 1e-10)
                    if use_log:
                        score += math.log(prob)
                    else:
                        score *= prob
            
            class_scores[class_] = score
        
        # Convert to probabilities (normalize)
        if use_log:
            # Convert from log space: exp(score) / sum(exp(scores))
            # Use log-sum-exp trick for numerical stability
            max_score = max(class_scores.values())
            exp_scores = {}
            total = 0
            
            for class_, score in class_scores.items():
                exp_score = math.exp(score - max_score)
                exp_scores[class_] = exp_score
                total += exp_score
            
            # Normalize
            probabilities = {}
            for class_, exp_score in exp_scores.items():
                probabilities[class_] = exp_score / total
        else:
            # Normalize linear probabilities
            total = sum(class_scores.values())
            probabilities = {}
            for class_, score in class_scores.items():
                probabilities[class_] = score / total
        
        predictions.append(probabilities)
    
    return predictions


# Example usage with simple data
def example():
    # Training data: [Outlook, Temperature, Humidity, Wind]
    X_train = [
        ['Sunny', 'Hot', 'High', 'Weak'],
        ['Sunny', 'Hot', 'High', 'Strong'],
        ['Overcast', 'Hot', 'High', 'Weak'],
        ['Rain', 'Mild', 'High', 'Weak'],
        ['Rain', 'Cool', 'Normal', 'Weak'],
        ['Rain', 'Cool', 'Normal', 'Strong'],
        ['Overcast', 'Cool', 'Normal', 'Strong'],
        ['Sunny', 'Mild', 'High', 'Weak'],
        ['Sunny', 'Cool', 'Normal', 'Weak'],
        ['Rain', 'Mild', 'Normal', 'Weak'],
        ['Sunny', 'Mild', 'Normal', 'Strong'],
        ['Overcast', 'Mild', 'High', 'Strong'],
        ['Overcast', 'Hot', 'Normal', 'Weak'],
        ['Rain', 'Mild', 'High', 'Strong']
    ]
    
    # Target: PlayTennis? (Yes/No)
    y_train = ['No', 'No', 'Yes', 'Yes', 'Yes', 'No', 'Yes', 'No', 'Yes', 'Yes', 'Yes', 'Yes', 'Yes', 'No']
    
    # Test data
    X_test = [
        ['Sunny', 'Cool', 'High', 'Strong'],
        ['Rain', 'Mild', 'High', 'Weak']
    ]
    
    # Calculate probabilities
    print("=== Naive Bayes Classification Example ===")
    
    # Without smoothing
    print("\n1. Without Smoothing:")
    priors = calculate_prior_probabilities(y_train)
    likelihoods, _ = calculate_likelihoods(X_train, y_train)
    
    predictions = naive_bayes_classifier(X_test, priors, likelihoods, use_log=True)
    print(f"Test sample 1: {X_test[0]} -> Predicted: {predictions[0]}")
    print(f"Test sample 2: {X_test[1]} -> Predicted: {predictions[1]}")
    
    # With smoothing
    print("\n2. With Laplace Smoothing:")
    likelihoods_smooth, _, _ = calculate_likelihoods_with_smoothing(X_train, y_train)
    
    predictions_smooth = naive_bayes_classifier(X_test, priors, likelihoods_smooth, use_log=True)
    print(f"Test sample 1: {X_test[0]} -> Predicted: {predictions_smooth[0]}")
    print(f"Test sample 2: {X_test[1]} -> Predicted: {predictions_smooth[1]}")
    
    # Get probability estimates
    print("\n3. Probability Estimates for Test Sample 1:")
    proba = predict_proba([X_test[0]], priors, likelihoods_smooth, use_log=True)
    for class_, prob in proba[0].items():
        print(f"  P({class_} | features) = {prob:.4f}")
    
    return predictions_smooth

if __name__ == "__main__":
    example()

=== Naive Bayes Classification Example ===

1. Without Smoothing:
Test sample 1: ['Sunny', 'Cool', 'High', 'Strong'] -> Predicted: No
Test sample 2: ['Rain', 'Mild', 'High', 'Weak'] -> Predicted: Yes

2. With Laplace Smoothing:
Test sample 1: ['Sunny', 'Cool', 'High', 'Strong'] -> Predicted: No
Test sample 2: ['Rain', 'Mild', 'High', 'Weak'] -> Predicted: Yes

3. Probability Estimates for Test Sample 1:
  P(No | features) = 0.7201
  P(Yes | features) = 0.2799


Naive Bayes实现

In [None]:
import numpy as np
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self):
        self.class_priors = {}
        self.feature_likelihoods = defaultdict(list) # 存储每个类别下每个特征取值的概率
        
    def fit(self, X, y):
        # 计算先验概率 P(y)
        total_samples = len(y)
        classes, counts = np.unique(y, return_counts=True)
        for cls, cnt in zip(classes, counts):
            self.class_priors[cls] = cnt / total_samples
            
        # 计算似然概率 P(x|y) - 这里以离散特征为例
        # 实际面试中需根据题目要求处理连续特征（如假设高斯分布）
        for cls in classes:
            X_cls = X[y == cls]
            for feature_idx in range(X.shape[1]):
                feature_vals, val_counts = np.unique(X_cls[:, feature_idx], return_counts=True)
                probabilities = val_counts / len(X_cls)
                self.feature_likelihoods[(cls, feature_idx)] = dict(zip(feature_vals, probabilities))
                
    def predict(self, X):
        predictions = []
        for sample in X:
            posteriors = {}
            for cls, prior in self.class_priors.items():
                likelihood = 1.0
                for feature_idx, feature_val in enumerate(sample):
                    # 获取该特征值在给定类别下的概率，如果未见则使用平滑（如拉普拉斯平滑）
                    prob_dict = self.feature_likelihoods.get((cls, feature_idx), {})
                    likelihood *= prob_dict.get(feature_val, 1e-6) # 使用一个极小值做平滑
                posteriors[cls] = prior * likelihood
            predictions.append(max(posteriors, key=posteriors.get))
        return predictions

In [None]:
def fit(self, X, y, alpha=1):
    # ... 计算先验概率 ...
    
    for cls in classes:
        X_cls = X[y == cls]
        n_cls = len(X_cls)
        for feature_idx in range(X.shape[1]):
            # 计算每个特征值的出现次数
            feature_vals, val_counts = np.unique(X_cls[:, feature_idx], return_counts=True)
            # 获取该特征所有可能取值（来自整个训练集）
            all_vals = np.unique(X[:, feature_idx])
            V = len(all_vals)  # 可能取值数
            
            # 拉普拉斯平滑
            probabilities = {}
            for val, count in zip(feature_vals, val_counts):
                probabilities[val] = (count + alpha) / (n_cls + alpha * V)
            # 为未出现的值也分配概率
            for val in all_vals:
                if val not in probabilities:
                    probabilities[val] = alpha / (n_cls + alpha * V)
                    
            self.feature_likelihoods[(cls, feature_idx)] = probabilities

#### Bernoulli Naive Bayes Classifier

In [None]:
import numpy as np

class NaiveBayes():
    def __init__(self, smoothing=1.0):
        self.smoothing = smoothing
        self.classes = None
        self.priors = None
        self.likelihoods = None

    def forward(self, X, y):
        self.classes, class_counts = np.unique(y, return_counts=True)
        self.priors = {cls: np.log(class_counts[i] / len(y)) for i, cls in enumerate(self.classes)}
        self.likelihoods = {}
        for cls in self.classes:
            X_cls = X[y == cls]
            prob = (np.sum(X_cls, axis=0) + self.smoothing) / (X_cls.shape[0] + 2 * self.smoothing)
            self.likelihoods[cls] = (np.log(prob), np.log(1 - prob))

    def _compute_posterior(self, sample):
        posteriors = {}
        for cls in self.classes:
            posterior = self.priors[cls]
            prob_1, prob_0 = self.likelihoods[cls]
            likelihood = np.sum(sample * prob_1 + (1 - sample) * prob_0)
            posterior += likelihood
            posteriors[cls] = posterior
        return max(posteriors, key=posteriors.get)

    def predict(self, X):
        return np.array([self._compute_posterior(sample) for sample in X])

#### Gaussian Naive Bayes Classifier

In [None]:
import numpy as np

def gaussian_naive_bayes(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray) -> np.ndarray:
	"""
	Implements Gaussian Naive Bayes classifier.
	
	Args:
		X_train: Training features (shape: N_train x D)
		y_train: Training labels (shape: N_train)
		X_test: Test features (shape: N_test x D)
	
	Returns:
		Predicted class labels for X_test (shape: N_test)
	"""
	classes = np.unique(y_train)
	n_classes = len(classes)
	n_features = X_train.shape[1]
	
	# Compute class priors, means, and variances
	priors = np.zeros(n_classes)
	means = np.zeros((n_classes, n_features))
	variances = np.zeros((n_classes, n_features))
	
	for idx, c in enumerate(classes):
		X_c = X_train[y_train == c]
		priors[idx] = X_c.shape[0] / X_train.shape[0]
		means[idx] = np.mean(X_c, axis=0)
		variances[idx] = np.var(X_c, axis=0)
	
	# Add small epsilon for numerical stability
	variances = variances + 1e-9
	
	# Predict for test samples
	predictions = []
	for x in X_test:
		posteriors = []
		for idx, c in enumerate(classes):
			# Log prior
			log_prior = np.log(priors[idx])
			# Log likelihood (Gaussian)
			log_likelihood = -0.5 * np.sum(np.log(2 * np.pi * variances[idx]) + 
										   (x - means[idx])**2 / variances[idx])
			posteriors.append(log_prior + log_likelihood)
		predictions.append(classes[np.argmax(posteriors)])
	
	return np.array(predictions)

### Implementing Decision Tree Building in Python

In [13]:
def gini_index(groups, classes):
    n_instances = float(sum([len(group) for group in groups]))
    gini = 0.0
    for group in groups:
        size = len(group)
        if size == 0:
            continue
        score = 0.0
        for class_val in classes:
            score += p * p
        gini += (1.0 - score) * (size / n_instances)
    return gini


def test_split(index, value, dataset):
    left, right = list(), list()
    for row in dataset:
        if row[index] < value:
            left.append(row)
        else:
            right.append(row)
    return left, right


def get_split(dataset):
    class_values = list(set(row[-1] for row in dataset))  # Find unique classes in the dataset
    b_index, b_value, b_score, b_groups = 999, 999, 999, None
    for index in range(len(dataset[0])-1):    # Exclude the last column which is the class
        for row in dataset:
            groups = test_split(index, row[index], dataset)
            gini = gini_index(groups, class_values)
            if gini < b_score:
                b_index, b_value, b_score, b_groups = index, row[index], gini, groups
    return {'index': b_index, 'value': b_value, 'groups': b_groups}


# Age, Movie Genre, Decision (watch or not)

dataset = [
    [18, 1, 0],
    [20, 0, 1],
    [23, 2, 1],
    [25, 1, 1],
    [30, 1, 0],
]

split = get_split(dataset)
print('\nBest Split:')
print('Column Index: %s, Value: %s' % ((split['index']), (split['value'])))
print(split['groups'])
# Output: Column Index: 0, Value: 20


Best Split:
Column Index: 0, Value: 20
([[18, 1, 0]], [[20, 0, 1], [23, 2, 1], [25, 1, 1], [30, 1, 0]])


In [17]:
import math

def gini_index(groups, classes):
    """Calculate Gini impurity for split groups"""
    n_instances = float(sum(len(group) for group in groups))
    if n_instances == 0:
        return 0
    
    gini = 0.0
    for group in groups:
        size = len(group)
        if size == 0:
            continue
        
        score = 0.0
        # Count proportion of each class
        for class_val in classes:
            proportion = sum(1 for row in group if row[-1] == class_val) / size
            score += proportion * proportion
        
        # Weight by group size
        gini += (1.0 - score) * (size / n_instances)
    
    return gini


def test_split(index, value, dataset):
    """Split dataset based on feature threshold"""
    left, right = [], []
    for row in dataset:
        if row[index] < value:
            left.append(row)
        else:
            right.append(row)
    return left, right


def get_split(dataset):
    """Find the best split point for dataset"""
    class_values = list(set(row[-1] for row in dataset))

    if len(class_values) == 1:
        return None  # All same class!
            
    b_index, b_value, b_score, b_groups = 999, 999, float('inf'), None
    
    for index in range(len(dataset[0]) - 1):  # Exclude class column
        # Get unique values for this feature, sorted
        feature_values = sorted({row[index] for row in dataset})
        
        # Test thresholds between consecutive values
        for i in range(len(feature_values) - 1):
            threshold = (feature_values[i] + feature_values[i + 1]) / 2.0
            left, right = test_split(index, threshold, dataset)
            groups = (left, right)
            
            gini = gini_index(groups, class_values)
            
            # Update best split if better
            if gini < b_score:
                b_index, b_value, b_score, b_groups = index, threshold, gini, groups
    
    # # If no valid split found (e.g., all same class)
    # if b_groups is None:
    #     return None
    
    return {'index': b_index, 'value': b_value, 'groups': b_groups}


def create_terminal(group):
    """Create terminal/leaf node with majority class"""
    if not group:
        return None
    
    outcomes = [row[-1] for row in group]
    # Return most common class
    return max(set(outcomes), key=outcomes.count)


def build_tree(train, max_depth, min_size):
    """Build decision tree"""
    root = get_split(train)
    if root is None:  # All same class or empty
        return create_terminal(train)
    
    recurse_split(root, max_depth, min_size, 1)
    return root


def recurse_split(node, max_depth, min_size, depth):
    """Recursively split nodes"""
    if node is None:
        return
    
    left, right = node['groups']
    del node['groups']  # Clean up
    
    # Check for empty groups
    if not left or not right:
        terminal_class = create_terminal(left + right)
        node['left'] = node['right'] = terminal_class
        return
    
    # Check max depth
    if depth >= max_depth:
        node['left'] = create_terminal(left)
        node['right'] = create_terminal(right)
        return
    
    # Process left child
    if len(left) <= min_size:
        node['left'] = create_terminal(left)
    else:
        left_split = get_split(left)
        if left_split:
            node['left'] = left_split
            recurse_split(node['left'], max_depth, min_size, depth + 1)
        else:
            node['left'] = create_terminal(left)
    
    # Process right child
    if len(right) <= min_size:
        node['right'] = create_terminal(right)
    else:
        right_split = get_split(right)
        if right_split:
            node['right'] = right_split
            recurse_split(node['right'], max_depth, min_size, depth + 1)
        else:
            node['right'] = create_terminal(right)


def predict(node, row):
    """Make prediction for a single row"""
    if not isinstance(node, dict):
        return node  # Leaf node, return class
    
    if row[node['index']] < node['value']:
        return predict(node['left'], row)
    else:
        return predict(node['right'], row)
# def predict(node, sample):
#     """
#     Predict class for a single sample by traversing the tree
    
#     Args:
#         node: Current node in the tree (starts with root)
#         sample: Feature values [f1, f2, ...] (without class label)
    
#     Returns: Predicted class
#     """
#     # If we've reached a leaf node (not a dictionary), return the class
#     if not isinstance(node, dict):
#         return node
    
#     # Get the feature index and threshold value for this split
#     feature_index = node['index']      # Which feature to check
#     threshold = node['value']          # Threshold value for the split
    
#     # Get the feature value from the sample
#     feature_value = sample[feature_index]
    
#     # Decide which child to go to based on the comparison
#     if feature_value < threshold:
#         return predict(node['left'], sample)   # Go left
#     else:
#         return predict(node['right'], sample)  # Go right

# Test the corrected code
dataset = [
    [5, 3, 0], [6, 3, 0], [6, 4, 0], [10, 3, 1],
    [11, 4, 1], [12, 8, 0], [5, 5, 0], [12, 4, 1]
]

print("Building tree...")
tree = build_tree(dataset, max_depth=2, min_size=1)

print("\nTree structure:")
def print_tree(node, depth=0):
    indent = "  " * depth
    if isinstance(node, dict):
        print(f"{indent}[Feature {node['index']} < {node['value']:.1f}]")
        print_tree(node['left'], depth + 1)
        print_tree(node['right'], depth + 1)
    else:
        print(f"{indent}[Class {node}]")

print_tree(tree)

print("\nMaking predictions:")
test_samples = [
    [7, 3],   # Should predict 0 (Age < 10)
    [11, 5],  # Should predict 1 (Age ≥ 10, Feature2 < 8)
    [13, 9]   # Should predict 0 (Age ≥ 10, Feature2 ≥ 8)
]

for sample in test_samples:
    # Add dummy class for prediction (will be ignored)
    prediction = predict(tree, sample + [-1])
    print(f"Sample {sample} → Predicted class: {prediction}")

Building tree...

Tree structure:
[Feature 0 < 8.0]
  [Class 0]
  [Feature 1 < 6.0]
    [Class 1]
    [Class 0]

Making predictions:
Sample [7, 3] → Predicted class: 0
Sample [11, 5] → Predicted class: 1
Sample [13, 9] → Predicted class: 0


Decision Tree 原理与实现

In [None]:
import numpy as np

class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None
    
    def fit(self, X, y):
        self.tree = self._grow_tree(X, y)
    
    def predict(self, X):
        return np.array([self._predict_one(x, self.tree) for x in X])
    
    # ========== 核心递归函数 ==========
    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))
        
        # 停止条件
        if (depth >= self.max_depth or 
            n_samples < self.min_samples_split or 
            n_classes == 1):
            leaf_value = self._most_common_label(y)
            return {'type': 'leaf', 'value': leaf_value}
        
        # 寻找最佳分裂
        best_feat, best_thresh = self._best_split(X, y)
        if best_feat is None:
            leaf_value = self._most_common_label(y)
            return {'type': 'leaf', 'value': leaf_value}
        
        # 递归构建子树
        left_idx = X[:, best_feat] <= best_thresh
        right_idx = X[:, best_feat] > best_thresh
        
        left_subtree = self._grow_tree(X[left_idx], y[left_idx], depth+1)
        right_subtree = self._grow_tree(X[right_idx], y[right_idx], depth+1)
        
        return {
            'type': 'node',
            'feature': best_feat,
            'threshold': best_thresh,
            'left': left_subtree,
            'right': right_subtree
        }
    
    # ========== 关键辅助函数 ==========
    def _best_split(self, X, y):
        best_gain = -1
        split_feat, split_thresh = None, None
        
        for feat in range(X.shape[1]):
            thresholds = np.unique(X[:, feat])
            for thresh in thresholds:
                gain = self._information_gain(X, y, feat, thresh)
                if gain > best_gain:
                    best_gain = gain
                    split_feat, split_thresh = feat, thresh
        
        return split_feat, split_thresh
    
    def _information_gain(self, X, y, feat, thresh):
        # 计算基尼不纯度
        parent_gini = self._gini(y)
        
        left_idx = X[:, feat] <= thresh
        right_idx = X[:, feat] > thresh
        
        if len(y[left_idx]) == 0 or len(y[right_idx]) == 0:
            return 0
        
        n = len(y)
        n_left, n_right = len(y[left_idx]), len(y[right_idx])
        
        left_gini = self._gini(y[left_idx])
        right_gini = self._gini(y[right_idx])
        
        child_gini = (n_left/n) * left_gini + (n_right/n) * right_gini
        
        return parent_gini - child_gini
    
    def _gini(self, y):
        # 基尼不纯度: 1 - sum(p_i^2)
        n = len(y)
        if n == 0:
            return 0
        proportions = np.bincount(y) / n
        return 1 - np.sum(proportions ** 2)
    
    def _most_common_label(self, y):
        return np.bincount(y).argmax()
    
    def _predict_one(self, x, node):
        if node['type'] == 'leaf':
            return node['value']
        
        if x[node['feature']] <= node['threshold']:
            return self._predict_one(x, node['left'])
        else:
            return self._predict_one(x, node['right'])

# ========== 使用示例 ==========
if __name__ == "__main__":
    # 创建简单数据
    X = np.array([[1, 2], [2, 3], [3, 3], [6, 5], [7, 8], [8, 8]])
    y = np.array([0, 0, 0, 1, 1, 1])
    
    # 训练
    tree = DecisionTree(max_depth=3)
    tree.fit(X, y)
    
    # 预测
    X_test = np.array([[2.5, 3], [7.5, 7]])
    predictions = tree.predict(X_test)
    print(f"Predictions: {predictions}")  # 应输出 [0, 1]

In [None]:
import numpy as np
from collections import Counter

class DecisionTree:
    """单个决策树（简化版）"""
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None
    
    def fit(self, X, y):
        self.tree = self._grow_tree(X, y)
    
    def _grow_tree(self, X, y, depth=0):
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))
        
        # 停止条件
        if (depth >= self.max_depth or 
            n_samples < self.min_samples_split or 
            n_classes == 1):
            return np.bincount(y).argmax()
        
        # 随机选择特征子集（√n_features）
        feat_indices = np.random.choice(n_features, int(np.sqrt(n_features)), replace=False)
        
        # 寻找最佳分裂
        best_gain = -1
        best_feat, best_thresh = None, None
        
        for feat in feat_indices:
            thresholds = np.unique(X[:, feat])
            for thresh in thresholds:
                left = y[X[:, feat] <= thresh]
                right = y[X[:, feat] > thresh]
                
                if len(left) == 0 or len(right) == 0:
                    continue
                
                gain = self._gini_gain(y, left, right)
                if gain > best_gain:
                    best_gain = gain
                    best_feat, best_thresh = feat, thresh
        
        if best_gain <= 0:
            return np.bincount(y).argmax()
        
        # 递归构建子树
        left_idx = X[:, best_feat] <= best_thresh
        right_idx = ~left_idx
        
        left_tree = self._grow_tree(X[left_idx], y[left_idx], depth+1)
        right_tree = self._grow_tree(X[right_idx], y[right_idx], depth+1)
        
        return (best_feat, best_thresh, left_tree, right_tree)
    
    def _gini_gain(self, parent, left, right):
        def gini(arr):
            if len(arr) == 0: return 0
            p = np.bincount(arr) / len(arr)
            return 1 - np.sum(p ** 2)
        
        n = len(parent)
        n_l, n_r = len(left), len(right)
        return gini(parent) - (n_l/n * gini(left) + n_r/n * gini(right))
    
    def predict_one(self, x, node):
        if not isinstance(node, tuple):  # 叶节点
            return node
        
        feat, thresh, left, right = node
        if x[feat] <= thresh:
            return self.predict_one(x, left)
        else:
            return self.predict_one(x, right)
    
    def predict(self, X):
        return np.array([self.predict_one(x, self.tree) for x in X])

### Gradient Descent: Building Optimization Algorithms from Scratch
#### Implementing Stochastic Gradient Descent (SGD)

In [None]:
# Importing Necessary Library
import numpy as np

# Linear regression problem
X = np.array([0, 1, 2, 3, 4, 5]) 
Y = np.array([0, 1.1, 1.9, 3, 4.2, 5.2])  

# Model initialization
m = np.random.randn()  # Initialize the slope (random number)
b = np.random.randn()  # Initialize the intercept (random number)

learning_rate = 0.01  # Define the learning rate
epochs = 10000  # Define the number of iterations

# SGD implementation
for _ in range(epochs):
    random_index = np.random.randint(len(X))  # select a random sample
    x = X[random_index]
    y = Y[random_index]
    pred = m * x + b  # Calculate the predicted y
    # Calculate gradients for m (slope) and b (intercept)
    grad_m = (pred - y) * x 
    grad_b = (pred - y)
    m -= learning_rate * grad_m  # Update m using the calculated gradient
    b -= learning_rate * grad_b  # Update b using the calculated gradient

import matplotlib.pyplot as plt

# Plot the data points
plt.scatter(X, Y, color = "m", marker = "o", s = 30)

# Predicted line for the model
y_pred = m * X + b

# Plotting the predicted line
plt.plot(X, y_pred, color = "g")

# Adding labels to the plot
plt.xlabel('X')
plt.ylabel('Y')

plt.show()

#### Implementing Mini-Batch Gradient Descent in Python

A distinguishing feature of MBGD is its capacity to tune the size of the mini-batches. MBGD behaves as Batch Gradient Descent if the batch size equates to the dataset size. If the batch size is 1, it acts like SGD. However, a mini-batch size between 10 and 1000 is typically selected in practice.

In [None]:
def gradient_descent(X, y, learning_rate=0.01, batch_size=16, epochs=100):
    m, n = X.shape
    theta = np.random.randn(n, 1)  # random initialization
    # print("theta: ", theta)

    for epoch in range(epochs):
        shuffled_indices = np.random.permutation(m)
        X_shuffled = X[shuffled_indices]
        y_shuffled = y[shuffled_indices]

        for i in range(0, m, batch_size):
            xi = X_shuffled[i:i + batch_size]
            yi = y_shuffled[i:i + batch_size]

            gradients = 2 / batch_size * xi.T.dot(xi.dot(theta) - yi)
            theta = theta - learning_rate * gradients

    return theta

from sklearn.metrics import mean_absolute_error

# Apply function to some data
X = np.random.rand(100, 3)
y = 5 * X[:, 0] - 3 * X[:, 1] + 2 * X[:, 2] + np.random.randn(100, 1)  # sample linear regression problem
# true_theta = np.array([5, -3, 2]).reshape(-1, 1)
# y = X.dot(true_theta) + np.random.randn(100, 1) * 0.5  # Added some noise

print((5 * X[:, 0]).shape)

# # METHOD 1: Using reshape (fixes your original approach)
linear_combo = 5 * X[:, 0] - 3 * X[:, 1] + 2 * X[:, 2]
y = linear_combo.reshape(-1, 1) + np.random.randn(100, 1)


theta = gradient_descent(X, y)

print("X: ", X.shape, X, "y: ", y.shape, y)
print("theta: ", theta.shape, theta)

# Predict and calculate MAE
predictions = X.dot(theta)
mae = mean_absolute_error(y, predictions)
print(f"MAE: {mae}")  # MAE: 1.0887166179544072


# GET THE FINAL COEFFICIENTS HERE:
print("="*60)
print("FINAL COEFFICIENTS FROM GRADIENT DESCENT:")
print("="*60)

# Method 1: Direct print
print(f"\nTheta (coefficients) vector:\n{theta}")

# Method 2: Formatted print (more readable)
print(f"\nFormatted coefficients:")
for i in range(len(theta)):
    print(f"θ{i} (coefficient for X[:, {i}]): {theta[i, 0]:.6f}")

# Method 3: Compare with true values (you know they should be ~5, -3, 2)
print(f"\nComparison with true values used to generate y:")
true_coefficients = [5, -3, 2]
for i in range(len(theta)):
    print(f"θ{i}: True={true_coefficients[i]}, Estimated={theta[i, 0]:.6f}, "
          f"Error={abs(true_coefficients[i] - theta[i, 0]):.6f}")

# Method 4: Create a prediction equation
print(f"\nFinal prediction equation:")
terms = []
for i in range(len(theta)):
    terms.append(f"{theta[i, 0]:.4f}*X{i+1}")
print(f"y_pred = {' + '.join(terms)}")

# Calculate error
predictions = X.dot(theta)
mae = mean_absolute_error(y, predictions)
print(f"\nMean Absolute Error: {mae:.6f}")

#### Add Momentum to Gradient Descent

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def func(x):   
    return x**2

def grad_func(x): 
    return 2*x

gamma = 0.9
learning_rate = 0.01
v = 0
epochs = 50

theta_plain = 4.0  
theta_momentum = 4.0

history_plain = []    
history_momentum = []    

for _ in range(epochs):
    history_plain.append(theta_plain)
    gradient = grad_func(theta_plain)
    theta_plain = theta_plain - learning_rate * gradient

    history_momentum.append(theta_momentum)
    gradient = grad_func(theta_momentum)
    v = gamma * v + learning_rate * gradient
    theta_momentum = theta_momentum - v

plt.figure(figsize=(12, 7))
plt.plot([func(theta) for theta in history_plain], label='Gradient Descent')
plt.plot([func(theta) for theta in history_momentum], label='Momentum-based Gradient Descent')
plt.xlabel('Epoch')
plt.ylabel('Cost')
plt.legend()
plt.grid()
plt.show()

#### RMSProp in Python Code (Root Mean Square Propagation)

In [None]:
def RMSProp(learning_rate, rho, epsilon, grad, s_prev):
    # Update squared gradient
    s = rho * s_prev + (1 - rho) * np.power(grad, 2)

    # Calculate updates
    updates = learning_rate * grad / (np.sqrt(s) + epsilon)
    return updates, s

def f(x, y):
    return x**2 + y**2

def df(x, y):
    return np.array([2*x, 2*y])

coordinates = np.array([5.0, 4.0])
learning_rate = 0.1
rho = 0.9
epsilon = 1e-6
max_epochs = 100

s_prev = np.array([0, 0])

for epoch in range(max_epochs + 1):
    grad = df(coordinates[0], coordinates[1])
    updates, s_prev = RMSProp(learning_rate, rho, epsilon, grad, s_prev)
    coordinates -= updates
    if epoch % 20 == 0:
        print(f"Epoch {epoch}, current state: {coordinates}")

#### ADAM (Adaptive Moment Estimation)

In [None]:
def ADAM(beta1, beta2, epsilon, grad, m_prev, v_prev, learning_rate):
    # Update biased first-moment estimate
    m = beta1 * m_prev + (1 - beta1) * grad

    # Update biased second raw moment estimate
    v = beta2 * v_prev + (1 - beta2) * np.power(grad, 2)

    # Calculate updates
    updates = learning_rate * m / (np.sqrt(v) + epsilon)
    return updates, m, v

    # m_hat = m / (1 - np.power(beta1, epoch+1))  # Correcting the bias for the first moment
    # v_hat = v / (1 - np.power(beta2, epoch+1))  # Correcting the bias for the second moment
    
    # updates = learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)
    # return updates, m, v

def f(x, y):
    return x ** 2 + y ** 2

def df(x, y):
    return np.array([2 * x, 2 * y])

coordinates = np.array([3.0, 4.0])
learning_rate = 0.02
beta1 = 0.9
beta2 = 0.9999
epsilon = 1e-8
max_epochs = 150

m_prev = np.array([0, 0])
v_prev = np.array([0, 0])

for epoch in range(max_epochs + 1):
    grad = df(coordinates[0], coordinates[1])
    updates, m_prev, v_prev = ADAM(beta1, beta2, epsilon, grad, m_prev, v_prev, learning_rate)
    coordinates -= updates
    if epoch % 30 == 0:
        print(f"Epoch {epoch}, current state: {coordinates}")

### Ensemble Methods from Scratch

Bagging, or bootstrap aggregating, is a technique in ensemble learning that aims to reduce the variance of the machine learning model. The essence of bagging involves generating multiple subsets from the original dataset and then using these subsets to train separate models. Note that the subsets are chosen with replacement, so it is possible to have duplicate data points in a single subset. The final prediction is then made by aggregating the predictions from these individual models. Essentially, it is a voting for the best answer: the final class prediction is the class that was predicted by the majority of votes.



In [None]:
import numpy as np
from scipy import stats
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

# Load the data
iris = datasets.load_iris()
X = iris.data
y = iris.target

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42)

# Parameters
n_models = 100
random_states = [i for i in range(n_models)]

# Helper function for bootstrapping
def bootstrapping(X, y):
    n_samples = X.shape[0]
    idxs = np.random.choice(n_samples, n_samples, replace=True)
    return X[idxs], y[idxs]

# Helper function for bagging prediction
def predict(X, models):
    predictions = np.array([model.predict(X) for model in models])
    predictions = stats.mode(predictions)[0]
    return predictions

# Create a list to store all the tree models
tree_models = []

# Iteratively train decision trees on bootstrapped samples
for i in range(n_models):
    X_, y_ = bootstrapping(X_train, y_train)
    tree = DecisionTreeClassifier(max_depth=2, random_state=random_states[i])
    tree.fit(X_, y_)
    tree_models.append(tree)

# Predict on the test set
y_pred = predict(X_test, tree_models)

# Print the accuracy
print("Accuracy: ", accuracy_score(y_test, y_pred))

#### Implementing the Random Forest in Python

In [None]:
import numpy as np
from scipy import stats
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

class RandomForest:
    def __init__(self, n_trees=100, max_depth=None, random_state=None):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.random_states = np.random.RandomState(random_state).randint(0,10000,size=n_trees)
        self.trees = []

    def bootstrapping(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def fit(self, X, y):
        for i in range(self.n_trees):
            X_, y_ = self.bootstrapping(X, y)
            tree = DecisionTreeClassifier(max_depth=self.max_depth, random_state=self.random_states[i])
            tree.fit(X_, y_)
            self.trees.append(tree)

    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        return stats.mode(tree_preds)[0]

from sklearn import datasets
from sklearn.model_selection import train_test_split

iris = datasets.load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42)

rf = RandomForest(n_trees=100, max_depth=2, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)

print("Accuracy: ", accuracy_score(y_test, y_pred))

RandomForest

In [None]:
class RandomForest:
    """随机森林主类"""
    def __init__(self, n_estimators=100, max_depth=10, min_samples_split=2):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.trees = []
    
    def fit(self, X, y):
        n_samples = X.shape[0]
        
        for i in range(self.n_estimators):
            # 1. Bootstrap采样（有放回随机抽样）
            indices = np.random.choice(n_samples, n_samples, replace=True)
            X_boot = X[indices]
            y_boot = y[indices]
            
            # 2. 训练决策树
            tree = DecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split
            )
            tree.fit(X_boot, y_boot)
            self.trees.append(tree)
            
            # 可选：打印进度
            if (i+1) % 20 == 0:
                print(f"Trained tree {i+1}/{self.n_estimators}")
    
    def predict(self, X):
        # 收集所有树的预测
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        
        # 多数投票（沿树的方向取众数）
        final_predictions = []
        for i in range(X.shape[0]):
            votes = tree_preds[:, i]
            # 使用Counter找众数
            most_common = Counter(votes).most_common(1)[0][0]
            final_predictions.append(most_common)
        
        return np.array(final_predictions)

First, let's define our terms. Boosting is a technique in which several weak learners are combined to create a strong learner, thereby improving our predictive model. AdaBoost largely follows the same principle. However, it introduces an important twist: it adapts by focusing more on instances that were incorrectly predicted in previous iterations by assigning them higher weights.

In [None]:
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

class AdaBoost:
    def __init__(self, num_learners=10, learning_rate=1):
        self.num_learners = num_learners
        self.learning_rate = learning_rate
        self.models = []
        self.model_weights = []

    def fit(self, X, y):
        M, N = X.shape
        W = np.ones(M) / M  # Initialize weights
        y = y * 2 - 1  # Convert y to {-1, 1}

        for _ in range(self.num_learners):
            tree = DecisionTreeClassifier(max_depth=1)
            tree.fit(X, y, sample_weight=W)
            
            pred = tree.predict(X)
            error = W.dot(pred != y)
            if error > 0.5:
                break

            beta = self.learning_rate * np.log((1 - error) / error)  # Compute beta
            W = W * np.exp(beta * (pred != y))  # Update weights
    
            W = W / W.sum()  # Normalize weights
            
            self.models.append(tree)
            self.model_weights.append(beta)

    def predict(self, X):
        Hx = sum(beta * h.predict(X) for h, beta in zip(self.models, self.model_weights))  # Weighted aggregate of predictions
        return Hx > 0  # Calculate majority vote

data = make_classification(n_samples=1000)  # Creates a synthetic dataset
X = data[0]
y = data[1]

# Split data into training and testing datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

ada = AdaBoost(num_learners=10, learning_rate=0.5)  # Initialize AdaBoost model
ada.fit(X_train, y_train)  # Train the model
pred = ada.predict(X_test)
print('AdaBoost accuracy:', accuracy_score(y_test, pred))  # Accuracy as correct predictions over total predictions

### Unsupervised Learning and Clustering
#### k-Means Clustering

In [None]:
import random

# Toy dataset with 2D points
data = [(2,3), (5,3.4), (1.3,1), (3,4), (2,3.5), (7,5)]

# k-Means settings
k = 2  
centers = random.sample(data, k)  

print(centers)

# Definition of Euclidean distance
def distance(point1, point2):
    return ((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2)**0.5

# k-Means algorithm
def k_means(data, centers, k):
    while True:
        clusters = [[] for _ in range(k)] 

        # Assign data points to the closest center
        for point in data:
            distances = [distance(point, center) for center in centers]
            index = distances.index(min(distances)) 
            clusters[index].append(point)

        # Update centers to be the mean of points in a cluster
        new_centers = []
        for cluster in clusters:
            center = (sum([point[0] for point in cluster])/len(cluster), 
                      sum([point[1] for point in cluster])/len(cluster)) 
            new_centers.append(center)

        # Break loop if centers don't change significantly
        if max([distance(new, old) for new, old in zip(new_centers, centers)]) < 0.0001:
            break
        else:
            centers = new_centers
    return clusters, centers

clusters, centers = k_means(data, centers, k)

# Let's print the cluster centers
for i, center in enumerate(centers):
    print(f"Cluster{i+1} center is : {center}")
# Cluster1 center is : (2.66, 2.98)
# Cluster2 center is : (7.0, 5.0)

# Let's print the clusters
for i, cluster in enumerate(clusters):
    print(f"Cluster{i+1} points are : {cluster}")
# Cluster1 points are : [(2, 3), (5, 3.4), (1.3, 1), (3, 4), (2, 3.5)]
# Cluster2 points are : [(7, 5)]

import matplotlib.pyplot as plt

colors = ['r', 'g', 'b', 'y', 'c', 'm']
fig, ax = plt.subplots()

# Plot points
for i, cluster in enumerate(clusters):
    for point in cluster:
        ax.scatter(*point, color=colors[i])

# Plot centers
for i, center in enumerate(centers):
    ax.scatter(*center, color='black', marker='x', s=300)

ax.set_title('Clusters and their centers')
plt.show()

K-Means

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def initialize_centroids(X, k):
    """Randomly initialize centroids from the dataset."""
    np.random.seed(42)
    random_indices = np.random.permutation(X.shape[0])
    centroids = X[random_indices[:k]]
    return centroids

def compute_distances(X, centroids):
    """Compute the distance between each data point and the centroids."""
    distances = np.zeros((X.shape[0], centroids.shape[0]))
    for i, centroid in enumerate(centroids):
        distances[:, i] = np.linalg.norm(X - centroid, axis=1)
    return distances

def assign_clusters(distances):
    """Assign each data point to the closest centroid."""
    return np.argmin(distances, axis=1)

def compute_centroids(X, labels, k):
    """Compute the new centroids as the mean of all data points assigned to each cluster."""
    centroids = np.zeros((k, X.shape[1]))
    for i in range(k):
        centroids[i, :] = X[labels == i].mean(axis=0)
    return centroids

def kmeans(X, k, max_iters=100):
    """K-means clustering algorithm."""
    centroids = initialize_centroids(X, k)
    
    for i in range(max_iters):
        old_centroids = centroids
        distances = compute_distances(X, centroids)
        labels = assign_clusters(distances)
        centroids = compute_centroids(X, labels, k)
        
        # If centroids do not change, we have converged
        if np.all(centroids == old_centroids):
            break
    
    return centroids, labels

# Example usage
if __name__ == "__main__":
    # Generate some synthetic data
    from sklearn.datasets import make_blobs
    X, _ = make_blobs(n_samples=300, centers=3, cluster_std=0.60, random_state=0)
    print(X.shape, X[:3])

    # Run the K-means algorithm
    k = 3
    centroids, labels = kmeans(X, k)

    # Visualize the results
    plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis')
    plt.scatter(centroids[:, 0], centroids[:, 1], s=300, c='red')  # Centroids
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.title('K-means Clustering')
    plt.show()

#### Mini-Batch K-Means Algorithm

In [None]:
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(0)
data = np.vstack([np.random.normal(loc=3, scale=1, size=(100,2)), np.random.normal(loc=-3, scale=1, size=(100,2))])

def euclidean_distance(a, b):
    return np.linalg.norm(a - b, axis=-1)

def initialize_centers(data, k):
    idx = np.random.choice(len(data), size=k)
    return data[idx, :]

# Implement mini-batch K-Means
def mini_batch_kMeans(data, k, iterations=10, batch_size=20):
    centers = initialize_centers(data, k)
    for _ in range(iterations):
        idx = np.random.choice(len(data), size=batch_size)
        batch = data[idx, :]
        dists = euclidean_distance(batch[:, None, :], centers[None, :, :])
        labels = np.argmin(dists, axis=1)
        for i in range(k):
            if np.sum(labels == i) > 0:
                centers[i] = np.mean(batch[labels == i], axis=0)
    return centers

centers = mini_batch_kMeans(data, k=2)

plt.scatter(data[:, 0], data[:, 1], s=50)
plt.scatter(centers[:, 0], centers[:, 1], c='red', s=200, alpha=0.5)
plt.show()

#### Principal Component Analysis (PCA)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

np.random.seed(0)
# Creating 200-point 3D dataset
X = np.dot(np.random.random(size=(3, 3)), np.random.normal(size=(3, 200))).T
# Plotting the dataset
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.scatter(X[:,0], X[:,1], X[:,2])
plt.title("Scatter Plot of Original Dataset")
plt.show()

# Calculate the mean and the standard deviation
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
# Make the dataset standard 
X = (X - X_mean) / X_std

# Calculate Covariance Matrix 
cov_matrix = np.cov(X.T)

# Break into eigenvectors and eigenvalues
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)

# Sort out eigenvalues and corresponding eigenvectors
eigen_pairs = [(np.abs(eigenvalues[i]), eigenvectors[:,i]) for i in range(len(eigenvalues))]
eigen_pairs.sort(reverse=True)

# Make the projection matrix
W = np.hstack((eigen_pairs[0][1].reshape(3,1), eigen_pairs[1][1].reshape(3,1)))
# Change the original dataset
X_pca = X.dot(W)

plt.figure()
plt.scatter(X_pca[:, 0],X_pca[:, 1])
plt.title("Scatter Plot of Transformed Dataset Using PCA")
plt.show()

#### DBSCAN (Density-Based Spatial Clustering of Applications with Noise) algorithm

In [None]:
data_points = np.array([
    [1.2, 1.9], [2.1, 2], [2, 3.5], [3.3, 3.9], [3.2, 5.1],
    [8.5, 7.9], [8.1, 7.8], [9.5, 6.5], [9.5, 7.2], [7.7, 8.6],
    [6.0, 6.0]
])

def euclidean_distance(a, b):
    return np.linalg.norm(a - b, axis=-1)

def dbscan(data, Eps, MinPt):
    point_label = [0] * len(data)
    # Initialize list to maintain count of surrounding points within radius Eps for each point. 
    point_count = []
    core = []
    noncore = []

    # Check for each point if it falls within the Eps radius of point at index i
    for i in range(len(data)):
        point_count.append([])
        for j in range(len(data)):
            if euclidean_distance(data[i], data[j]) <= Eps and i != j:
                point_count[i].append(j)
        
        # If a point has atleast MinPt points within its Eps radius (excluding itself), classify it as a core point, and vice versa
        if len(point_count[i]) >= MinPt:
            core.append(i)
        else:
            noncore.append(i)
    ...
    ...
    """
    This code iterates over core points and assesses its neighbors for each. If a neighbor has not been assigned to a cluster, 
    it's assigned to the current core point's cluster. Core points among these neighbors are put in a queue to repeat the 
    same process. Once all points in a cluster are labeled, they go to the next cluster. The final output lists all points 
    with their respective cluster IDs.
    """    
    ID = 1
    for point in core:
        # If the point has not been assigned to a cluster yet
        if point_label[point] == 0:
            point_label[point] = ID
            # Create an empty list to hold 'neighbour points'  
            queue = []
            for x in point_count[point]:
                if point_label[x] == 0:
                    point_label[x] = ID
                    # If neighbor point is also a core point, add it to the queue 
                    if x in core:
                        queue.append(x)
            
            # Check points from the queue
            while queue:
                neighbours = point_count[queue.pop(0)]
                for y in neighbours:
                    if point_label[y] == 0:
                        point_label[y] = ID
                        if y in core:
                            queue.append(y)
            ID += 1  

    return point_label

labels = dbscan(data_points, 2, 2)

for i in range(len(labels)):
    if labels[i] == 1:
        plt.scatter(data_points[i][0], data_points[i][1], s=100, c='r')
    elif labels[i] == 2:
        plt.scatter(data_points[i][0], data_points[i][1], s=100, c='g')
    else:
        plt.scatter(data_points[i][0], data_points[i][1], s=100, c='b')

plt.show()

This code iterates over core points and assesses its neighbors for each. If a neighbor has not been assigned to a cluster, it's assigned to the current core point's cluster. Core points among these neighbors are put in a queue to repeat the same process. Once all points in a cluster are labeled, they go to the next cluster. The final output lists all points with their respective cluster IDs.

#### Neural Network

In [55]:
def sigmoid(x):
    return 1.0/(1+ np.exp(-x))

def sigmoid_derivative(x):
    return x * (1.0 - x)

class NeuralNetwork:
    def __init__(self, x, y, learning_rate=0.1):
        self.input = x
        self.weights1   = np.random.rand(self.input.shape[1],4)
        self.weights2   = np.random.rand(4,1)
        self.y = y
        self.output = np.zeros(self.y.shape)
        self.learning_rate = learning_rate

    def feedforward(self):
        # Implements feedforward method using dot product and sigmoid function
        self.layer1 = sigmoid(np.dot(self.input, self.weights1))
        self.output = sigmoid(np.dot(self.layer1, self.weights2))

    def backprop(self):
        # Performs backpropagation and updates weights
        d_weights2 = np.dot(self.layer1.T, (2*(self.y - self.output) * sigmoid_derivative(self.output)))
        d_weights1 = np.dot(self.input.T, (np.dot(2*(self.y - self.output) * sigmoid_derivative(self.output), self.weights2.T) * sigmoid_derivative(self.layer1)))
    
        self.weights1 += self.learning_rate * d_weights1
        self.weights2 += self.learning_rate * d_weights2

    def train(self, epochs):
        # Repeatedly performs feedforward and backpropagation for several epochs
        for epoch in range(epochs):
            self.feedforward()
            self.backprop()

    def predict(self, new_input):
        layer1 = sigmoid(np.dot(new_input, self.weights1))
        output = sigmoid(np.dot(layer1, self.weights2))
        return output

X = np.array([[0, 0, 1],
              [0, 1, 1],
              [1, 0, 1],
              [1, 1, 1]])
Y = np.array([[0], [1], [1], [0]])
nn = NeuralNetwork(X, Y)

nn.train(10000)
print("\nPredictions:")
for i, x in enumerate(X):
    print(f"Input: {x} ---> Prediction: {nn.predict(np.array([x]))}, Expected: {Y[i]}")


Predictions:
Input: [0 0 1] ---> Prediction: [[0.01466264]], Expected: [0]
Input: [0 1 1] ---> Prediction: [[0.9647285]], Expected: [1]
Input: [1 0 1] ---> Prediction: [[0.9648335]], Expected: [1]
Input: [1 1 1] ---> Prediction: [[0.04059226]], Expected: [0]


https://www.deep-ml.com/problems

#### Implement Decision Tree for Regression

In [None]:
import numpy as np

def decision_tree_regressor(X_train, y_train, X_test, max_depth=2, min_samples_split=2):
    """
    Build a decision tree for regression and predict on test data.
    
    Args:
        X_train: Training features, shape (n_samples, n_features)
        y_train: Training targets, shape (n_samples,)
        X_test: Test features, shape (m_samples, n_features)
        max_depth: Maximum depth of the tree
        min_samples_split: Minimum samples required to split a node
    
    Returns:
        List of predictions for X_test, rounded to 4 decimal places
    """
    X_train = np.array(X_train, dtype=float)
    y_train = np.array(y_train, dtype=float)
    X_test = np.array(X_test, dtype=float)
    
    def calculate_mse(y):
        if len(y) == 0:
            return 0.0
        return np.mean((y - np.mean(y))**2)
    
    def find_best_split(X, y):
        n_samples, n_features = X.shape
        if n_samples < min_samples_split:
            return None
        
        current_mse = calculate_mse(y)
        best_gain = 0
        best_split = None
        
        for feature_idx in range(n_features):
            values = np.unique(X[:, feature_idx])
            for i in range(len(values) - 1):
                threshold = (values[i] + values[i + 1]) / 2
                
                left_mask = X[:, feature_idx] <= threshold
                right_mask = ~left_mask
                
                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue
                
                left_mse = calculate_mse(y[left_mask])
                right_mse = calculate_mse(y[right_mask])
                
                n_left = np.sum(left_mask)
                n_right = np.sum(right_mask)
                weighted_mse = (n_left * left_mse + n_right * right_mse) / n_samples
                
                gain = current_mse - weighted_mse
                
                if gain > best_gain:
                    best_gain = gain
                    best_split = (feature_idx, threshold)
        
        return best_split
    
    def build_tree(X, y, depth):
        if depth >= max_depth or len(y) < min_samples_split or len(np.unique(y)) == 1:
            return {'leaf': True, 'value': float(np.mean(y))}
        
        split = find_best_split(X, y)
        
        if split is None:
            return {'leaf': True, 'value': float(np.mean(y))}
        
        feature_idx, threshold = split
        left_mask = X[:, feature_idx] <= threshold
        right_mask = ~left_mask
        
        return {
            'leaf': False,
            'feature': feature_idx,
            'threshold': threshold,
            'left': build_tree(X[left_mask], y[left_mask], depth + 1),
            'right': build_tree(X[right_mask], y[right_mask], depth + 1)
        }
    
    def predict_single(node, x):
        if node['leaf']:
            return node['value']
        if x[node['feature']] <= node['threshold']:
            return predict_single(node['left'], x)
        else:
            return predict_single(node['right'], x)
    
    tree = build_tree(X_train, y_train, 0)
    predictions = [predict_single(tree, x) for x in X_test]
    
    return [round(p, 4) for p in predictions]

#### Train Logistic Regression with Gradient Descent

In [None]:
import numpy as np

def train_logreg(X: np.ndarray, y: np.ndarray, learning_rate: float, iterations: int) -> tuple[list[float], list[float]]:
    """
    Train logistic regression using gradient descent with sum-based BCE loss.
    
    Args:
        X: Feature matrix of shape (n_samples, n_features)
        y: Binary labels of shape (n_samples,)
        learning_rate: Step size for gradient descent
        iterations: Number of training iterations
    
    Returns:
        Tuple of (coefficients, losses) where:
        - coefficients: List of learned weights (bias first, then feature weights)
        - losses: List of sum-based BCE loss values at each iteration
    """
    def sigmoid(x):
        return 1 / (1 + np.exp(-x))
    
    # Reshape y to column vector
    y = y.reshape(-1, 1)
    
    # Add bias column (ones) as FIRST column
    X = np.hstack((np.ones((X.shape[0], 1)), X))
    
    # Initialize coefficients to ZEROS
    B = np.zeros((X.shape[1], 1))
    
    losses = []
    
    for _ in range(iterations):
        # Forward pass: compute predictions
        y_pred = sigmoid(X @ B)
        
        # Gradient descent update
        gradient = X.T @ (y_pred - y)
        B -= learning_rate * gradient
        
        # Compute SUM-based BCE loss (not mean)
        loss = -np.sum(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred))
        losses.append(float(loss))
    
    coefficients = [float(b) for b in B.flatten()]
    return coefficients, losses

In [None]:
我现在有interview的一些资料，根据下面资料生成interview时的面试题和对应答案，

第二个OA: 70分钟，6道选择题+1道填空题＋3道code。
选择题基本上都是ML相关题，比如要你算recall，LDA和PCA区别，overfitting处理啥的。填空题是给你一个NN结构和input，你算最后output。最搞心态的是，前面都是linear function，最后输出层它搞一个sigmoid function，然后算出来x还是个小数。你告诉我这没计算器怎么算？最后结果要求小数点后三位。


第一道code：给一个数组和一个区间n，算有哪些local max value。比如[1,3,10,4,2,19,5,5]和区间n=2，10是一个local max value，因为[1,3,10,4,2]里10最大，然后10前面的数字严格单调递增，后面的数字严格单调递减（大于等于，小于等于都不行）。所以19不是。
第二道code：手写bootstrap算法，补全code块（不准对已有code任何改动）。给你多个sklearn的classifiers，x和y。输出经过majority voting后的predicted y。答案大概长这样：
from random import randint, seed
from sklearn.tree import DecisionTreeClassifier
import numpy as np

def bootstrap(n: int) -> list[int]:
    """
    Step 1: Bootstrap the train samples for each base classifier.
    """
    indices = [randint(0, n-1) for _ in range(n)]
    return indices

def fit(classifiers: list[DecisionTreeClassifier], x: list[list[float]], y: list[int]):
    """
    Step 2: Train each classifier based on its own bootstrapped samples.
    """
    n_samples = len(x)
    for clf in classifiers:
        indices = bootstrap(n_samples)
        x_bootstrapped = [x[i] for i in indices]
        y_bootstrapped = [y[i] for i in indices]
        clf.fit(x_bootstrapped, y_bootstrapped)

def predict(classifiers: list[DecisionTreeClassifier], x: list[list[float]]) -> list[int]:
    """
    Step 3: Assign class labels by a majority vote of the base classifiers.
    """
    predictions = np.array([clf.predict(x) for clf in classifiers])
    # Majority vote
    final_predictions = np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=predictions)
    return final_predictions.tolist()

def solution(x_train: list[list[float]], y_train: list[int], x_test: list[list[float]], n_estimators: int) -> list[int]:
    """
    Step 4: Pull everything together
    """
    seed(42)
    classifiers = [DecisionTreeClassifier(random_state=0) for _ in range(n_estimators)]
    fit(classifiers, x_train, y_train)
    return predict(classifiers, x_test)
复制代码
第三道code：手动实现Naive Bayes算法。到这我已经没时间了。


OA2


5 道选择
问Random Forest和XGBoost的区别
怎么increase bias and reduce variance
LDA和PCA分别的适用场景
validation loss is significantly higher than training loss，问有可能是什么原因
给了4个confusion matrices, 选出所有recall >= 90%同时FPR < 10%的



1道填空
一个有1个hidden layer的MLP，hidden activation是linear，output activation是sigmoid，给了input和所有weights，算output



3道coding
find the longest contiguous substring consisting of the same character：找到连续出现最多次的character，返回这个character和连续出现的次数
Bootstrap
Decision Tree

70分钟10题， CodeSignal Online Assessment。时间很紧，楼主也不记得原题了，请大家谅解。几个tips
1- coding基本写出来就不错了。时间太紧了，不要worry about optimality，除非test 没过再回来改。ML coding 的description非常的长，可能看起来会很花时间，所以要复习一下tree algorithm自己写一写，再去。可以先做coding，再回来做选择题。
2 - multiple choices 考ML fundamental 考的非常非常细。如果不是很confident就不要花太多时间纠结了。


7个Multiple Choices，抱歉不是全记得了，就写几个记得 ，TLDR 硬币基地很喜欢考各种tree algorithm，复习好了tree再去做这个tech screen

1 - naive bayes 和 knn的优劣（比如说curse of dimensionality, multi-colinearity 方面）
2- implement forward propagation for 3 layers of simple feedforward with linear/no activation function in first 2 layers and sigmoid in last layers. 这题有点变态，因为没说能让用calculator，楼主很诚实， 所以楼主纠结了一下sigmoid 估算。
3- emsemble algorithm的优劣（比如说是不是training extensive，是不是容易overfit）
4- random forest和GBT 的优劣 (比如说inference time，overfitting， difficult to train
5 - LDA 和PCA的优劣和区别


3个coding。
第一题coding非常简单，求string中longest consequtive sequence with identical characters 长度. 比如说aaabbc -> 3 。
第二题要求implement random forest里面的bootstrapping，bagging 的training和prediction。
第三道题楼主，没时间做了，也是implement random forest里面的其他的一些compoment
楼主的Multiple choices是随便答的因为没有复习，coding做到后面也没太多时间做了，总之没啥准备。希望想去的同学还是准备好再做这个。
楼主确实做的很烂，主要是真的很久没有用到各种tree algorithm了，但是recruiter说senior 这个算过，staff不算过。

70 分钟10道题
前几个都是MLE八股文， 还有算MLP output, 最后是两道coding, 一道 logistic regression, test case 全过， 第二道naive bayes, 没时间写了直接提交了。

70 分钟 10 道题目

1-7 是一些简单的ml basics, 问一些model的不同，怎么处理各种情况，比如overfitting
8 一个非常简单的leetcode，大概是找字符串里面最后一个最大的连续重复的substring
9 不让用numpy然后手写gradient descent，但是给的蛮简单的，是一个linear function然后迭代的formula也都给了，稍微注意一下matrix dimension就好了
10 最后一题也是手写，写kmeans，我只写到了更新cluster center，最后的main function没时间了， 但是最后也过了

OA2是mle coding+basics；有选择+填空+coding；影响最深的是有个填空题算loss，精确到小数点后三位… 还没给计算器，实在不知道这题的意义在哪里
coding是 decision tree，logistic regression和knn；里面给了些implementation然后让你填上剩下的、我觉得这种对我非常不友好，不如让我写一个…
还有就是之前看instruction说不用run，就没看test case是不是pass；太蠢了，太久没做这种OA了；不知道自己咋想的
反正fail了，move forward