# Attention 与 KV 缓存
https://www.lanqiao.cn/problems/20010/learning/?contest_id=251

```python
from typing import List, Optional, Tuple
import torch
import torch.nn as nn
import math
import time

device = 'cuda' if torch.cuda.is_available() else 'cpu'


class Config:
    def __init__(self, hidden_size: int = 512, num_attention_heads: int = 8) -> None:
        self.hidden_size = hidden_size
        self.num_attention_heads = num_attention_heads


class AttentionWithoutCache(nn.Module):
    def __init__(self, config: Config) -> None:
        super().__init__()
        self.config = config
        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        assert self.head_dim * self.num_heads == self.hidden_size, "hidden_size must be divisible by num_heads."

        self.query_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.key_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.value_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.o_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        bsz, seq_len, _ = hidden_states.size()

        query_states = self.query_proj(hidden_states).view(
            bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        key_states = self.key_proj(hidden_states).view(
            bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        value_states = self.value_proj(hidden_states).view(
            bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        attn_weights = torch.matmul(
            query_states, key_states.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attn_weights = torch.softmax(attn_weights, dim=-1)

        attn_output = torch.matmul(attn_weights, value_states).transpose(
            1, 2).reshape(bsz, seq_len, self.hidden_size)
        attn_output = self.o_proj(attn_output)

        return attn_output


class AttentionWithCache(nn.Module):
    def __init__(self, config: Config) -> None:
        super().__init__()
        self.config = config
        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        assert self.head_dim * \
            self.num_heads == self.hidden_size, "hidden_size must be divisible by num_heads."

        self.query_proj = nn.Linear(
            self.hidden_size, self.hidden_size, bias=False)
        self.key_proj = nn.Linear(
            self.hidden_size, self.hidden_size, bias=False)
        self.value_proj = nn.Linear(
            self.hidden_size, self.hidden_size, bias=False)
        self.o_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)

    def forward(self, hidden_states: torch.Tensor, key_cache: Optional[torch.Tensor] = None, value_cache: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        bsz, q_len, _ = hidden_states.size()

        query_states = self.query_proj(hidden_states).view(
            bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)

        if key_cache is not None:
            key_states = torch.cat([key_cache, self.key_proj(hidden_states).view(
                bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)], dim=2)
        else:
            key_states = self.key_proj(hidden_states).view(
                bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)

        if value_cache is not None:
            value_states = torch.cat([value_cache, self.value_proj(hidden_states).view(
                bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)], dim=2)
        else:
            value_states = self.value_proj(hidden_states).view(
                bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)

        attn_weights = torch.matmul(
            query_states, key_states.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attn_weights = torch.softmax(attn_weights, dim=-1)

        attn_output = torch.matmul(attn_weights, value_states).transpose(
            1, 2).reshape(bsz, q_len, self.hidden_size)
        attn_output = self.o_proj(attn_output)

        return attn_output, key_states, value_states


def generate_input(bsz: int, seq_len: int, hidden_size: int) -> torch.Tensor:
    """生成随机的输入张量。"""
    return torch.randn(bsz, seq_len, hidden_size)


def test_without_cache(model: nn.Module, input_sequence: torch.Tensor, generation_length: int) -> List[torch.Tensor]:
    model.eval()
    with torch.no_grad():
        generated_sequence = []

        for step in range(generation_length):
            output = model(input_sequence)[:,-1:,:]
            generated_sequence.append(output)
            input_sequence = torch.concat((input_sequence, output), dim=1)

        return generated_sequence


def test_with_cache(model: nn.Module, input_sequence: torch.Tensor, generation_length: int) -> List[torch.Tensor]:
    # TODO
    model.eval()
    with torch.no_grad():
        generated_sequence = []
        key_cache = None
        value_cache = None
        
        for step in range(generation_length):
            attn_output, key_cache, value_cache = model(input_sequence, key_cache, value_cache)
            output = attn_output[:, -1:, :]  # Take the last token
            generated_sequence.append(output)
            input_sequence = output  # For next step, we only use the generated token
            
        return generated_sequence


def main():
    config = Config(hidden_size=512, num_attention_heads=8)

    bsz = 10
    seq_len = 10
    hidden_size = config.hidden_size
    input_tensor = generate_input(bsz, 1, hidden_size).to(device)

    # 使用缓存
    attention_layer_with_cache = AttentionWithCache(config).to(device)
    attention_layer_with_cache.load_state_dict(torch.load('model.pth'))
    start_time = time.time()
    with_cache_outputs = test_with_cache(attention_layer_with_cache, input_tensor, seq_len)
    time_with_cache = time.time() - start_time
    print(f"Time with cache: {time_with_cache:.6f} seconds.")

    # 不使用缓存
    attention_layer_without_cache = AttentionWithoutCache(config).to(device)
    attention_layer_without_cache.load_state_dict(torch.load('model.pth'))
    start_time = time.time()
    without_cache_outputs = test_without_cache(attention_layer_without_cache, input_tensor, seq_len)
    time_without_cache = time.time() - start_time
    print(f"Time without cache: {time_without_cache:.6f} seconds.")

    if time_with_cache < time_without_cache:
        print("Using cache is faster.")
    else:
        print("Using cache did not improve performance.")

    for idx, (without_cache_output, with_cache_output) in enumerate(zip(without_cache_outputs, with_cache_outputs)):
        assert torch.allclose(with_cache_output, without_cache_output, atol=0.001)


if __name__ == "__main__":

    main()
```

# 图像分割推理与结果处理
https://www.lanqiao.cn/problems/20009/learning/?contest_id=251

```python 
from collections import OrderedDict
from typing import Any, Dict, List

from cv2 import connectedComponentsWithStats
import numpy as np
from PIL import Image
import torch
from torch import nn
from torchvision import transforms
from torchvision.models import resnet
from torchvision.models.segmentation.fcn import FCN


class FCNHead(nn.Sequential):
    def __init__(self, in_channels: int, channels: int) -> None:
        inter_channels = in_channels // 4
        layers = [
            nn.Conv2d(in_channels, inter_channels, 3, padding=1, bias=False),
            nn.BatchNorm2d(inter_channels),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Conv2d(inter_channels, channels, 1)
        ]

        super(FCNHead, self).__init__(*layers)


class IntermediateLayerGetter(nn.ModuleDict):

    def __init__(self, model: nn.Module, return_layers: Dict[str, str]) -> None:
        if not set(return_layers).issubset([name for name, _ in model.named_children()]):
            raise ValueError("return_layers are not present in model")

        orig_return_layers = return_layers
        return_layers = {k: v for k, v in return_layers.items()}
        layers = OrderedDict()
        for name, module in model.named_children():
            layers[name] = module
            if name in return_layers:
                del return_layers[name]
            if not return_layers:
                break

        super(IntermediateLayerGetter, self).__init__(layers)
        self.return_layers = orig_return_layers

    def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
        out = OrderedDict()
        for name, module in self.named_children():
            x = module(x)
            if name in self.return_layers:
                out_name = self.return_layers[name]
                out[out_name] = x
        return out


class FCNImageSegmenter(FCN):
    def __init__(self, num_classes: int = 21, **kwargs: Any) -> None:
        backbone = resnet.resnet101(
            weights=None,
            replace_stride_with_dilation=[False, True, True],
        )
        return_layers = {"layer4": "out"}
        return_layers["layer3"] = "aux"
        backbone = IntermediateLayerGetter(
            backbone, return_layers=return_layers)

        inplanes = 1024
        aux_classifier = FCNHead(inplanes, num_classes)
        inplanes = 2048
        classifier = FCNHead(inplanes, num_classes)

        super(FCNImageSegmenter, self).__init__(
            backbone, classifier, aux_classifier)


def load_model(file_path: str) -> FCNImageSegmenter:
    # TODO
    pass


class ImageLoader:
    def __init__(self, mean: List[float] = [0.485, 0.456, 0.406], std: List[float] = [0.229, 0.224, 0.225]) -> None:
        # TODO
        pass
    
def load_model(file_path: str) -> FCNImageSegmenter:
    # TODO
    model = FCNImageSegmenter()
    model.load_state_dict(torch.load(file_path))
    model.eval()
    return model


class ImageLoader:
    def __init__(self, mean: List[float] = [0.485, 0.456, 0.406], std: List[float] = [0.229, 0.224, 0.225]) -> None:
        # TODO
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=mean, std=std)
        ])
    
    def load_image(self, image_path: str) -> torch.Tensor:
        # TODO
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)
        return image.unsqueeze(0)  # Add batch dimension


    def pred(model: nn.Module, input_batch: torch.Tensor) -> torch.Tensor:
        # TODO
        with torch.no_grad():
            output = model(input_batch)['out']
        return torch.argmax(output.squeeze(0), dim=0)


    def find_connected_components(prediction: np.ndarray, label_mapping: Dict[int, str], connectivity: int=8) -> Dict[str, List[List[List[int]]]]:
        # TODO
        result = {}
        
        for label_num, label_name in label_mapping.items():
            mask = (prediction == label_num).astype(np.uint8)
            num_labels, labels, stats, centroids = connectedComponentsWithStats(mask, connectivity=connectivity)
            
            components = []
            for i in range(1, num_labels):  # Skip background (0)
                # Get coordinates of all pixels in this component
                y, x = np.where(labels == i)
                points = list(zip(y.tolist(), x.tolist()))  # Convert to list of [y,x] pairs
                components.append(points)
            
            if components:
                result[label_name] = components
        
        return result



def main():
    label_mapping = {0: 'Background', 
                     1: 'Aero plane',
                     2: 'Bicycle',
                     3: 'Bird',
                     4: 'Boat',
                     5: 'Bottle',
                     6: 'Bus',
                     7: 'Car',
                     8: 'Cat',
                     9: 'Chair',
                     10: 'Cow',
                     11: 'Dining table',
                     12: 'Dog',
                     13: 'Horse',
                     14: 'Motorbike',
                     15: 'Person',
                     16: 'Potted plant',
                     17: 'Sheep',
                     18: 'Sofa',
                     19: 'Train',
                     20: 'Tv/Monitor'}
    model = load_model('model.pth')
    loader = ImageLoader()
    input_batch = loader.load_image('test.jpg')
    output = pred(model, input_batch)
    components = find_connected_components(output.detach().numpy(), label_mapping)
    
    for label, class_points in components.items():
        for i, points in enumerate(class_points, 1):
            print(label, i, points[:10])


if __name__ == '__main__':
    main()
```

# Pytorch 神经网络
https://www.lanqiao.cn/problems/20008/learning/?contest_id=251

```python
import random
from typing import Tuple
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


def read_csv(file_path: str) -> Tuple[np.ndarray, np.ndarray]:
    #TODO
    df = pd.read_csv(file_path)

    target = df['target']
    featrues = df.drop(columns=['target'])

    return featrues.values,target.values

class Classifier(nn.Module):
    #TODO
    def __init__(self):
        super(Classifier, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(100, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            
            nn.Linear(64, 5)
        )
    def forward(self, x):
        return self.model(x)


def train_nn(X_train: torch.Tensor, y_train: torch.Tensor) -> Classifier:
    model = Classifier()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    epochs = 150
    for epoch in range(epochs):
        #TODO
        output = model(X_train)
        loss = criterion(output, y_train)

        optimizer.zero_grad()
        
        loss.backward()
        optimizer.step()
    return model

def save_model(model: nn.Module, file_path: str) -> None:
    #TODO
    torch.save(model.state_dict(), file_path)

def main() -> None:

    features, targets = read_csv('train_data.csv')
    X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.2, random_state=42)

    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    model = train_nn(X_train_tensor, y_train_tensor)
    save_model(model, 'model.pth')


if __name__ == '__main__':
    main()
```

# 提升 XGB 回归模型训练效果
https://www.lanqiao.cn/problems/20007/learning/?contest_id=251

```python
import json
from typing import Tuple
import numpy as np
from sklearn.model_selection import train_test_split
import xgboost as xgb


def read_data(filename: str) -> Tuple[np.ndarray, np.ndarray]:
    #TODO
    with open(filename,'r') as f:
        data = json.load(f)
        features = data['features']
        target = data['target']
        
    features = np.array(features)
    target = np.array(target)

    return (features, target)

def train(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, y_test: np.ndarray):
    model = xgb.XGBRegressor(
        objective='reg:squarederror',
        learning_rate=0.1,
        max_depth=4,
        subsample=0.8,
        reg_lambda=1.0,  # 修正参数名称
        reg_alpha=0.0,   # 修正参数名称
        eval_metric='rmse',  # 将评估指标移至此处
        n_estimators=100
    )

    model.fit(
        X_train, y_train,
        eval_set=[(X_test, y_test)],
        verbose=False  # 禁用训练日志输出
    )

    return model

def remove_least_important_feature(X_train: np.ndarray, X_test: np.ndarray, model: xgb.XGBRegressor) -> Tuple[int, np.ndarray, np.ndarray]:
    # get feature_importances
    importances = model.feature_importances_

    # get least_important feature ids
    least_important_ids = np.argmin(importances)

    # 观察文件数据格式为numpy数组
    X_train_removed = np.delete(X_train, least_important_ids, axis=1)
    X_test_removed = np.delete(X_test, least_important_ids, axis=1)

    return (least_important_ids, X_train_removed, X_test_removed)

def main() -> None:
    X, y = read_data('data.json')
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    model = train(X_train, y_train, X_test, y_test)
    least_important_index, X_train_reduced, X_test_reduced = remove_least_important_feature(X_train, X_test, model)
    model_retrained = train(X_train_reduced, y_train, X_test_reduced, y_test)

if __name__ == '__main__':
    main()
```

# 自定义K-means距离
https://www.lanqiao.cn/problems/20006/learning/?contest_id=251


```python 
from typing import Callable, Optional, Tuple
import numpy as np
np.random.seed(42)

class CustomKMeans:
    def __init__(self, 
                 n_clusters: int = 3, 
                 max_iter: int = 300, 
                 distance_metric: Callable[[np.ndarray, np.ndarray], np.ndarray] = None) -> None:
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        assert distance_metric is not None
        self.distance_metric = distance_metric
        self.cluster_centers_: Optional[np.ndarray] = None
        self.labels_: Optional[np.ndarray] = None

    def initialize_centroids(self, X: np.ndarray) -> np.ndarray:
        indices = np.random.choice(X.shape[0], self.n_clusters, replace=False)
        return X[indices]

    def assign_clusters(self, X: np.ndarray, centroids: np.ndarray) -> np.ndarray:
        distances = np.array([self.distance_metric(x, centroids) for x in X])
        labels = np.argmin(distances, axis=1)
        return labels

    def compute_centroids(self, X: np.ndarray, labels: np.ndarray) -> np.ndarray:
        centroids = np.zeros((self.n_clusters, X.shape[1]))
        for i in range(self.n_clusters):
            points_in_cluster = X[labels == i]
            centroids[i] = np.mean(points_in_cluster, axis=0) if len(points_in_cluster) > 0 else self.cluster_centers_[i]
        return centroids

    def fit(self, X: np.ndarray, tol: float = 1e-6) -> 'CustomKMeans':

        self.cluster_centers_ = self.initialize_centroids(X)

        for iteration in range(self.max_iter):
            self.labels_ = self.assign_clusters(X, self.cluster_centers_)
            new_centroids = self.compute_centroids(X, self.labels_)
            if np.allclose(self.cluster_centers_, new_centroids, atol=tol):
                break
            self.cluster_centers_ = new_centroids

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        #TODO
        return self.assign_clusters(X, self.cluster_centers_)


def euclidean_distance(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
    #TODO
    return np.linalg.norm(x2-x1, axis=1,ord=2)
def manhattan_distance(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
    #TODO
    return np.sum(np.abs(x2-x1),axis=1)
def chebyshev_distance(x1: np.ndarray, x2: np.ndarray) -> np.ndarray:
    #TODO
    return np.max(np.abs(x2-x1),axis=1)


def main() -> None:
    X = np.random.rand(100, 2)

    kmeans_euclidean = CustomKMeans(n_clusters=3, distance_metric=euclidean_distance)
    kmeans_euclidean.fit(X)
    
    kmeans_manhattan = CustomKMeans(n_clusters=3, distance_metric=manhattan_distance)
    kmeans_manhattan.fit(X)
    
    kmeans_chebyshev = CustomKMeans(n_clusters=3, distance_metric=chebyshev_distance)
    kmeans_chebyshev.fit(X)
    
    x = np.random.rand(3, 2)
    euclidean_predict = kmeans_euclidean.predict(x)
    manhattan_predict = kmeans_manhattan.predict(x)
    chebyshev_predict = kmeans_chebyshev.predict(x)
    print(f'{euclidean_predict=},{manhattan_predict=},{chebyshev_predict=}')

if __name__ == '__main__':
    main()
```

# 基于特征阈值划分数据集
https://www.lanqiao.cn/problems/20005/learning/?contest_id=251

```python
import numpy as np

def divide_on_feature(X, feature_i, threshold):
    #TODO
    if X.ndim == 1:
        X = X.reshape(-1,1)

    x = X[:,feature_i]
    
    subset1 = X[x>=threshold]
    subset2 = X[x<threshold]

    return [subset1,subset2]

def main():
    # 示例数据集
    X = np.array([[1, 2],
                  [3, 4],
                  [5, 6],
                  [7, 8],
                  [9, 10]])
    feature_i = 0
    threshold = 5
    
    # 执行数据集划分
    subsets = divide_on_feature(X, feature_i, threshold)
    
    print("Subset 1:")
    print(subsets[0])
    print("\nSubset 2:")
    print(subsets[1])
    return

if __name__ == '__main__':
    main()

```

# 图像旋转
https://www.lanqiao.cn/problems/20004/learning/?contest_id=251  

```python 
import numpy as np

def rotate_image(image, angle):
    # 获取输入图像尺寸
    height, width = image.shape
    
    # 将角度转换为弧度（顺时针旋转使用负角度）
    theta = np.radians(-angle)
    # 计算旋转后图像的边界框大小
    cos_theta = np.abs(np.cos(theta))
    sin_theta = np.abs(np.sin(theta))
    new_width = int(np.ceil(width * cos_theta + height * sin_theta))
    new_height = int(np.ceil(width * sin_theta + height * cos_theta))
    # 对于特定角度，保持原始尺寸
    if angle % 90 == 0:
        new_width = width
        new_height = height
    
    # 创建输出图像
    output = np.zeros((new_height, new_width), dtype=np.uint8)
    
    # 计算输入和输出的中心点
    center_x_in = (width - 1) / 2
    center_y_in = (height - 1) / 2
    center_x_out = (new_width - 1) / 2
    center_y_out = (new_height - 1) / 2
    
    # 从输入图像的每个像素映射到输出位置
    for y in range(height):
        for x in range(width):
            # 转换为以中心为原点的坐标
            src_x = x - center_x_in
            src_y = y - center_y_in     
            # 应用旋转矩阵（顺时针）
            dest_x = src_x * np.cos(theta) + src_y * np.sin(theta)
            dest_y = -src_x * np.sin(theta) + src_y * np.cos(theta)
            
            # 转换到输出坐标系
            dest_x = int(np.round(dest_x + center_x_out))
            dest_y = int(np.round(dest_y + center_y_out))
            
            # 检查边界并赋值
            if (0 <= dest_x < new_width) and (0 <= dest_y < new_height):
                output[dest_y, dest_x] = image[y, x]
    
    return output
def main():
    image = np.array([[100, 150, 200, 50, 75],
                      [80, 120, 160, 40, 60],
                      [180, 210, 240, 90, 110],
                      [20, 30, 40, 10, 15],
                      [130, 170, 220, 70, 95]], dtype=np.uint8)
    rotated_image = rotate_image(image, 90)
    print(rotated_image)

if __name__ == '__main__':
    main()
```