# 05 池化層 Pooling Layer

## 學習目標

1. 理解池化層的作用（下採樣、平移不變性）
2. 實作 Max Pooling 的前向和反向傳播
3. 實作 Average Pooling
4. 理解 Max Pooling 反向傳播的「路由」機制

## 為什麼需要池化層？

1. **下採樣**：減少特徵圖的空間大小，降低計算量和參數量
2. **平移不變性**：即使物體稍微移動，池化後的特徵仍然相似
3. **增大感受野**：每次池化後，後續卷積層的感受野翻倍

In [None]:
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(42)
print("Pooling Layer module loaded!")

## 第一部分：Max Pooling 前向傳播

### 定義

Max Pooling 在每個窗口內取最大值。

- 輸入：$(N, C, H, W)$
- 輸出：$(N, C, H', W')$

其中：
$$H' = \frac{H - k_H}{S} + 1$$
$$W' = \frac{W - k_W}{S} + 1$$

### 公式

$$Y[n, c, i, j] = \max_{p \in [0, k_H), q \in [0, k_W)} X[n, c, i \cdot S + p, j \cdot S + q]$$

**重要**：需要記錄每個輸出位置對應的最大值來自哪個輸入位置，這在反向傳播時需要用到。

In [None]:
def maxpool2d_forward_naive(X, pool_size, stride=None):
    """
    Max Pooling 前向傳播（樸素版本）
    
    Parameters
    ----------
    X : np.ndarray, shape (N, C, H, W)
    pool_size : int or tuple
        池化窗口大小
    stride : int or None
        步長，預設等於 pool_size
    
    Returns
    -------
    Y : np.ndarray, shape (N, C, H', W')
    cache : dict
        包含最大值位置，供反向傳播使用
    """
    N, C, H, W = X.shape
    
    if isinstance(pool_size, int):
        kH, kW = pool_size, pool_size
    else:
        kH, kW = pool_size
    
    if stride is None:
        stride = kH
    
    H_out = (H - kH) // stride + 1
    W_out = (W - kW) // stride + 1
    
    Y = np.zeros((N, C, H_out, W_out))
    
    # 記錄最大值的位置（用於反向傳播）
    # 儲存每個輸出對應的輸入位置 (h_idx, w_idx)
    max_indices = np.zeros((N, C, H_out, W_out, 2), dtype=int)
    
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    h_start = i * stride
                    w_start = j * stride
                    
                    # 取出窗口
                    window = X[n, c, h_start:h_start+kH, w_start:w_start+kW]
                    
                    # 找最大值和其位置
                    Y[n, c, i, j] = np.max(window)
                    
                    # 記錄最大值的相對位置
                    max_pos = np.unravel_index(np.argmax(window), window.shape)
                    max_indices[n, c, i, j, 0] = h_start + max_pos[0]
                    max_indices[n, c, i, j, 1] = w_start + max_pos[1]
    
    cache = {'X_shape': X.shape, 'max_indices': max_indices, 
             'pool_size': (kH, kW), 'stride': stride}
    
    return Y, cache

# 測試
X = np.array([[[[1, 2, 3, 4],
                [5, 6, 7, 8],
                [9, 10, 11, 12],
                [13, 14, 15, 16]]]], dtype=float)

Y, cache = maxpool2d_forward_naive(X, pool_size=2, stride=2)

print("輸入:")
print(X[0, 0])
print(f"\n輸入形狀: {X.shape}")
print(f"\n輸出:")
print(Y[0, 0])
print(f"\n輸出形狀: {Y.shape}")
print("\n每個輸出對應的最大值位置 (h, w):")
print(cache['max_indices'][0, 0, :, :, :].reshape(2, 2, 2))

## 第二部分：Max Pooling 反向傳播

### 核心概念

Max 操作的梯度有一個重要特性：**梯度只流向最大值的位置**。

對於每個輸出位置 $(i, j)$：
- 找到對應的最大值輸入位置 $(h^*, w^*)$
- 將 $dY[n, c, i, j]$ 加到 $dX[n, c, h^*, w^*]$
- 其他位置的梯度為 0

### 公式

$$\frac{\partial L}{\partial X[n, c, h, w]} = \sum_{(i, j) \text{ s.t. } (h, w) \in \text{argmax}} \frac{\partial L}{\partial Y[n, c, i, j]}$$

In [None]:
def maxpool2d_backward_naive(dY, cache):
    """
    Max Pooling 反向傳播
    
    Parameters
    ----------
    dY : np.ndarray, shape (N, C, H_out, W_out)
        對輸出的梯度
    cache : dict
        來自 forward 的快取
    
    Returns
    -------
    dX : np.ndarray, shape (N, C, H, W)
        對輸入的梯度
    """
    X_shape = cache['X_shape']
    max_indices = cache['max_indices']
    
    N, C, H, W = X_shape
    _, _, H_out, W_out = dY.shape
    
    dX = np.zeros(X_shape)
    
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    # 找到最大值的位置
                    h_idx = max_indices[n, c, i, j, 0]
                    w_idx = max_indices[n, c, i, j, 1]
                    
                    # 梯度只流向最大值位置
                    dX[n, c, h_idx, w_idx] += dY[n, c, i, j]
    
    return dX

# 測試
dY = np.ones((1, 1, 2, 2))
dX = maxpool2d_backward_naive(dY, cache)

print("dY (全 1):")
print(dY[0, 0])
print("\ndX (梯度只流向最大值位置):")
print(dX[0, 0])
print("\n注意：最大值位置 (6, 8, 14, 16) 在原矩陣中的位置接收到梯度")

In [None]:
# 視覺化 Max Pooling 的梯度流
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

# 輸入
ax = axes[0]
im = ax.imshow(X[0, 0], cmap='Blues', vmin=0, vmax=16)
for i in range(4):
    for j in range(4):
        ax.text(j, i, f'{int(X[0,0,i,j])}', ha='center', va='center')
ax.set_title('Input X')
ax.set_xticks([])
ax.set_yticks([])

# Max Pooling 輸出
ax = axes[1]
im = ax.imshow(Y[0, 0], cmap='Blues', vmin=0, vmax=16)
for i in range(2):
    for j in range(2):
        ax.text(j, i, f'{int(Y[0,0,i,j])}', ha='center', va='center')
ax.set_title('MaxPool Output Y')
ax.set_xticks([])
ax.set_yticks([])

# dY
ax = axes[2]
dY_test = np.array([[[[1, 2], [3, 4]]]], dtype=float)
im = ax.imshow(dY_test[0, 0], cmap='Reds', vmin=0, vmax=4)
for i in range(2):
    for j in range(2):
        ax.text(j, i, f'{int(dY_test[0,0,i,j])}', ha='center', va='center')
ax.set_title('Gradient dY')
ax.set_xticks([])
ax.set_yticks([])

# dX
dX_test = maxpool2d_backward_naive(dY_test, cache)
ax = axes[3]
im = ax.imshow(dX_test[0, 0], cmap='Reds', vmin=0, vmax=4)
for i in range(4):
    for j in range(4):
        val = dX_test[0, 0, i, j]
        if val > 0:
            ax.text(j, i, f'{int(val)}', ha='center', va='center', fontweight='bold')
ax.set_title('Gradient dX (routed to max positions)')
ax.set_xticks([])
ax.set_yticks([])

plt.tight_layout()
plt.show()

## 第三部分：Average Pooling

Average Pooling 在每個窗口內取平均值。

### 前向傳播
$$Y[n, c, i, j] = \frac{1}{k_H \cdot k_W} \sum_{p, q} X[n, c, i \cdot S + p, j \cdot S + q]$$

### 反向傳播
梯度**平均分配**給窗口內的所有位置：
$$\frac{\partial L}{\partial X[n, c, h, w]} = \frac{1}{k_H \cdot k_W} \cdot dY[n, c, i, j]$$

In [None]:
def avgpool2d_forward(X, pool_size, stride=None):
    """
    Average Pooling 前向傳播
    """
    N, C, H, W = X.shape
    
    if isinstance(pool_size, int):
        kH, kW = pool_size, pool_size
    else:
        kH, kW = pool_size
    
    if stride is None:
        stride = kH
    
    H_out = (H - kH) // stride + 1
    W_out = (W - kW) // stride + 1
    
    Y = np.zeros((N, C, H_out, W_out))
    
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    h_start = i * stride
                    w_start = j * stride
                    window = X[n, c, h_start:h_start+kH, w_start:w_start+kW]
                    Y[n, c, i, j] = np.mean(window)
    
    cache = {'X_shape': X.shape, 'pool_size': (kH, kW), 'stride': stride}
    return Y, cache

def avgpool2d_backward(dY, cache):
    """
    Average Pooling 反向傳播
    """
    X_shape = cache['X_shape']
    kH, kW = cache['pool_size']
    stride = cache['stride']
    
    N, C, H, W = X_shape
    _, _, H_out, W_out = dY.shape
    
    dX = np.zeros(X_shape)
    
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    h_start = i * stride
                    w_start = j * stride
                    
                    # 梯度平均分配
                    dX[n, c, h_start:h_start+kH, w_start:w_start+kW] += dY[n, c, i, j] / (kH * kW)
    
    return dX

# 測試
Y_avg, cache_avg = avgpool2d_forward(X, pool_size=2, stride=2)
dX_avg = avgpool2d_backward(dY_test, cache_avg)

print("輸入:")
print(X[0, 0])
print("\nAverage Pooling 輸出:")
print(Y_avg[0, 0])
print("\ndY:")
print(dY_test[0, 0])
print("\ndX (Average Pooling):")
print(dX_avg[0, 0])
print("\n注意：梯度被平均分配到每個窗口的 4 個位置")

## 第四部分：完整的 Pooling 類別

In [None]:
class MaxPool2D:
    """
    Max Pooling 層
    """
    
    def __init__(self, pool_size, stride=None):
        if isinstance(pool_size, int):
            self.pool_size = (pool_size, pool_size)
        else:
            self.pool_size = pool_size
        
        self.stride = stride if stride is not None else self.pool_size[0]
        self.cache = None
    
    def forward(self, X):
        N, C, H, W = X.shape
        kH, kW = self.pool_size
        S = self.stride
        
        H_out = (H - kH) // S + 1
        W_out = (W - kW) // S + 1
        
        Y = np.zeros((N, C, H_out, W_out))
        max_indices = np.zeros((N, C, H_out, W_out, 2), dtype=int)
        
        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        h_start = i * S
                        w_start = j * S
                        window = X[n, c, h_start:h_start+kH, w_start:w_start+kW]
                        Y[n, c, i, j] = np.max(window)
                        max_pos = np.unravel_index(np.argmax(window), window.shape)
                        max_indices[n, c, i, j, 0] = h_start + max_pos[0]
                        max_indices[n, c, i, j, 1] = w_start + max_pos[1]
        
        self.cache = (X.shape, max_indices)
        return Y
    
    def backward(self, dY):
        X_shape, max_indices = self.cache
        N, C, H, W = X_shape
        _, _, H_out, W_out = dY.shape
        
        dX = np.zeros(X_shape)
        
        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        h_idx = max_indices[n, c, i, j, 0]
                        w_idx = max_indices[n, c, i, j, 1]
                        dX[n, c, h_idx, w_idx] += dY[n, c, i, j]
        
        return dX
    
    def __repr__(self):
        return f"MaxPool2D(pool_size={self.pool_size}, stride={self.stride})"


class AvgPool2D:
    """
    Average Pooling 層
    """
    
    def __init__(self, pool_size, stride=None):
        if isinstance(pool_size, int):
            self.pool_size = (pool_size, pool_size)
        else:
            self.pool_size = pool_size
        
        self.stride = stride if stride is not None else self.pool_size[0]
        self.cache = None
    
    def forward(self, X):
        N, C, H, W = X.shape
        kH, kW = self.pool_size
        S = self.stride
        
        H_out = (H - kH) // S + 1
        W_out = (W - kW) // S + 1
        
        Y = np.zeros((N, C, H_out, W_out))
        
        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        h_start = i * S
                        w_start = j * S
                        Y[n, c, i, j] = np.mean(X[n, c, h_start:h_start+kH, w_start:w_start+kW])
        
        self.cache = (X.shape, )
        return Y
    
    def backward(self, dY):
        X_shape, = self.cache
        N, C, H, W = X_shape
        kH, kW = self.pool_size
        S = self.stride
        _, _, H_out, W_out = dY.shape
        
        dX = np.zeros(X_shape)
        
        for n in range(N):
            for c in range(C):
                for i in range(H_out):
                    for j in range(W_out):
                        h_start = i * S
                        w_start = j * S
                        dX[n, c, h_start:h_start+kH, w_start:w_start+kW] += dY[n, c, i, j] / (kH * kW)
        
        return dX
    
    def __repr__(self):
        return f"AvgPool2D(pool_size={self.pool_size}, stride={self.stride})"

# 測試
maxpool = MaxPool2D(2, stride=2)
avgpool = AvgPool2D(2, stride=2)

print(maxpool)
print(avgpool)

## 第五部分：梯度檢驗

In [None]:
def gradient_check_maxpool(X, pool_size, stride, eps=1e-5):
    """
    Max Pooling 梯度檢驗
    """
    pool = MaxPool2D(pool_size, stride)
    
    # 前向傳播
    Y = pool.forward(X)
    
    # 假設 loss = sum(Y^2)
    dY = 2 * Y
    
    # 反向傳播
    dX = pool.backward(dY)
    
    # 數值梯度
    dX_numerical = np.zeros_like(X)
    X_test = X.copy()
    
    for idx in range(X.size):
        multi_idx = np.unravel_index(idx, X.shape)
        old_val = X_test[multi_idx]
        
        X_test[multi_idx] = old_val + eps
        pool_new = MaxPool2D(pool_size, stride)
        Y_plus = pool_new.forward(X_test)
        loss_plus = np.sum(Y_plus ** 2)
        
        X_test[multi_idx] = old_val - eps
        pool_new = MaxPool2D(pool_size, stride)
        Y_minus = pool_new.forward(X_test)
        loss_minus = np.sum(Y_minus ** 2)
        
        X_test[multi_idx] = old_val
        
        dX_numerical[multi_idx] = (loss_plus - loss_minus) / (2 * eps)
    
    diff = np.abs(dX - dX_numerical)
    rel_error = np.max(diff / (np.abs(dX) + np.abs(dX_numerical) + 1e-8))
    
    print("=== Max Pooling 梯度檢驗 ===")
    print(f"最大絕對誤差: {np.max(diff):.2e}")
    print(f"最大相對誤差: {rel_error:.2e}")
    print(f"通過: {rel_error < 1e-5}")
    
    return rel_error < 1e-5

# 測試
X_test = np.random.randn(2, 2, 4, 4)
gradient_check_maxpool(X_test, pool_size=2, stride=2)

## 練習題

### 練習 1：實作 Global Average Pooling

Global Average Pooling 對每個通道計算整個空間的平均值，常用於 CNN 的最後一層替代 FC。

輸入：$(N, C, H, W)$ → 輸出：$(N, C)$

In [None]:
class GlobalAvgPool2D:
    """
    Global Average Pooling
    
    對每個通道計算空間平均值
    輸入: (N, C, H, W)
    輸出: (N, C)
    """
    
    def __init__(self):
        self.cache = None
    
    def forward(self, X):
        """
        前向傳播
        """
        N, C, H, W = X.shape
        
        # 解答：對 H 和 W 維度取平均
        Y = np.mean(X, axis=(2, 3))  # (N, C)
        
        self.cache = X.shape
        return Y
    
    def backward(self, dY):
        """
        反向傳播
        
        dY: (N, C)
        dX: (N, C, H, W)
        """
        N, C, H, W = self.cache
        
        # 解答：梯度平均分配到所有空間位置
        dX = dY[:, :, np.newaxis, np.newaxis] / (H * W)
        dX = np.broadcast_to(dX, (N, C, H, W)).copy()
        
        return dX

# 測試
gap = GlobalAvgPool2D()
X_test = np.random.randn(2, 3, 4, 4)
Y = gap.forward(X_test)

print(f"輸入形狀: {X_test.shape}")
print(f"輸出形狀: {Y.shape}")

# 驗證
print(f"\n驗證：第一個通道的平均值")
print(f"手動計算: {np.mean(X_test[0, 0]):.6f}")
print(f"GAP 輸出: {Y[0, 0]:.6f}")

In [None]:
# Global Average Pooling 梯度檢驗
def gradient_check_gap(X, eps=1e-5):
    gap = GlobalAvgPool2D()
    
    Y = gap.forward(X)
    dY = 2 * Y  # loss = sum(Y^2)
    dX = gap.backward(dY)
    
    dX_numerical = np.zeros_like(X)
    X_test = X.copy()
    
    num_checks = min(20, X.size)
    indices = np.random.choice(X.size, num_checks, replace=False)
    
    for idx in indices:
        multi_idx = np.unravel_index(idx, X.shape)
        old_val = X_test[multi_idx]
        
        X_test[multi_idx] = old_val + eps
        gap_new = GlobalAvgPool2D()
        Y_plus = gap_new.forward(X_test)
        loss_plus = np.sum(Y_plus ** 2)
        
        X_test[multi_idx] = old_val - eps
        gap_new = GlobalAvgPool2D()
        Y_minus = gap_new.forward(X_test)
        loss_minus = np.sum(Y_minus ** 2)
        
        X_test[multi_idx] = old_val
        
        dX_numerical[multi_idx] = (loss_plus - loss_minus) / (2 * eps)
    
    max_error = 0
    for idx in indices:
        multi_idx = np.unravel_index(idx, X.shape)
        error = abs(dX[multi_idx] - dX_numerical[multi_idx]) / (abs(dX[multi_idx]) + abs(dX_numerical[multi_idx]) + 1e-8)
        max_error = max(max_error, error)
    
    print(f"Global Average Pooling 梯度檢驗 - 最大相對誤差: {max_error:.2e}")
    print(f"通過: {max_error < 1e-5}")

gradient_check_gap(X_test)

### 練習 2：比較 Max Pooling 和 Average Pooling 的效果

In [None]:
# 產生有結構的測試圖像
def create_test_image():
    """產生有邊緣和紋理的測試圖像"""
    img = np.zeros((16, 16))
    
    # 加入一些特徵
    img[2:6, 2:6] = 1       # 左上角方塊
    img[10:14, 10:14] = 1   # 右下角方塊
    img[6:10, :] = 0.5      # 水平條紋
    
    # 加入一些雜訊
    img += np.random.randn(16, 16) * 0.1
    
    return img[np.newaxis, np.newaxis, :, :]  # (1, 1, 16, 16)

img = create_test_image()

# 應用不同的池化
maxpool = MaxPool2D(2, stride=2)
avgpool = AvgPool2D(2, stride=2)

img_maxpool = maxpool.forward(img)
img_avgpool = avgpool.forward(img)

# 視覺化
fig, axes = plt.subplots(1, 3, figsize=(12, 4))

ax = axes[0]
im = ax.imshow(img[0, 0], cmap='gray')
ax.set_title(f'Original ({img.shape[2]}x{img.shape[3]})')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046)

ax = axes[1]
im = ax.imshow(img_maxpool[0, 0], cmap='gray')
ax.set_title(f'Max Pooling ({img_maxpool.shape[2]}x{img_maxpool.shape[3]})')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046)

ax = axes[2]
im = ax.imshow(img_avgpool[0, 0], cmap='gray')
ax.set_title(f'Average Pooling ({img_avgpool.shape[2]}x{img_avgpool.shape[3]})')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046)

plt.tight_layout()
plt.show()

print("觀察：")
print("- Max Pooling 保留了最強的特徵（高值），對邊緣和紋理更敏感")
print("- Average Pooling 產生更平滑的結果，但可能丟失細節")

### 練習 3：Flatten 層

Flatten 層將多維特徵圖展平成一維向量，連接卷積層和全連接層。

In [None]:
class Flatten:
    """
    Flatten 層
    
    將 (N, C, H, W) 展平成 (N, C*H*W)
    """
    
    def __init__(self):
        self.cache = None
    
    def forward(self, X):
        """
        前向傳播
        """
        self.cache = X.shape
        N = X.shape[0]
        return X.reshape(N, -1)
    
    def backward(self, dY):
        """
        反向傳播
        """
        return dY.reshape(self.cache)

# 測試
flatten = Flatten()
X_test = np.random.randn(2, 3, 4, 4)
Y = flatten.forward(X_test)

print(f"輸入形狀: {X_test.shape}")
print(f"輸出形狀: {Y.shape}")
print(f"預期: (2, {3*4*4}) = (2, 48)")

# 反向傳播
dY = np.random.randn(*Y.shape)
dX = flatten.backward(dY)
print(f"\ndX 形狀: {dX.shape} (應與輸入相同)")

## 總結

在這個 notebook 中，我們學習了：

### Pooling 層的作用
1. 下採樣，減少計算量
2. 提供一定的平移不變性
3. 增大後續層的感受野

### Max Pooling vs Average Pooling

| 特性 | Max Pooling | Average Pooling |
|------|-------------|----------------|
| 前向 | 取最大值 | 取平均值 |
| 反向 | 梯度只流向 max 位置 | 梯度平均分配 |
| 特點 | 保留最強特徵 | 產生平滑結果 |
| 常見用途 | CNN 中間層 | GAP 用於最後層 |

### 反向傳播的關鍵

- **Max Pooling**：需要記錄最大值位置，梯度「路由」到該位置
- **Average Pooling**：梯度除以窗口大小後均勻分配

### 下一步

我們已經有了所有 CNN 的基本組件！接下來將組裝成一個完整的 **LeNet** 網路。