In [1]:
# from beam_search import ctc_beam_search
import torch
import numpy as np

# 支持字符集：0~9
CHARS = "0123456789"
CHAR2IDX = {c: i for i, c in enumerate(CHARS)}
IDX2CHAR = {i: c for i, c in enumerate(CHARS)}
NUM_CLASSES = len(CHARS) + 1  # CTC中需要加一个 "blank"


def ctc_beam_search(log_probs, beam_width=10):
    """
    Beam Search 解码 CTC 输出
    log_probs: Tensor (T, C) - 每个时间步的 log softmax 概率
    返回: 最可能的字符序列字符串
    """
    T, C = log_probs.shape
    beam = [("", 0.0)]  # 每个元素: (当前序列, log概率)

    for t in range(T):
        new_beam = {}
        for seq, score in beam:
            for c in range(C):
                char = IDX2CHAR.get(c, "") if c < NUM_CLASSES - \
                    1 else ""  # blank 不加字符
                new_seq = seq + char if c != NUM_CLASSES - 1 else seq
                new_score = score + log_probs[t, c].item()

                if new_seq in new_beam:
                    new_beam[new_seq] = np.logaddexp(
                        new_beam[new_seq], new_score)
                else:
                    new_beam[new_seq] = new_score

        # 选出前 beam_width 个最高概率路径
        beam = sorted(new_beam.items(), key=lambda x: x[1], reverse=True)[
            :beam_width]

    return beam[0][0]  # 返回最高得分序列


def encode_label(label_str):
    """
    将字符串标签（如 "012345"）转为数字索引列表 [0,1,2,3,4,5]
    """
    return [CHAR2IDX[c] for c in label_str]


def decode_label(label_idx_list):
    """
    将预测数字索引（去重CTC输出）转为字符串，如 [0,1,1,2,2,3] -> "0123"
    """
    res = []
    prev = -1
    for idx in label_idx_list:
        if idx != prev and idx != NUM_CLASSES - 1:  # 排除blank
            res.append(IDX2CHAR[idx])
        prev = idx
    return ''.join(res)


def pad_label(label_list, max_len=6):
    """
    将标签列表pad到固定长度，适用于DataLoader（虽然CTC本身不要求）
    """
    return label_list + [NUM_CLASSES - 1] * (max_len - len(label_list))  # 用blank填充


def decode_output(log_probs, method='beam', beam_width=10):
    """
    解码模型输出
    - log_probs: Tensor(T, C)
    - method: 'argmax' or 'beam'
    """
    if method == 'argmax':
        pred = torch.argmax(log_probs, dim=1).cpu().numpy().tolist()
        return decode_label(pred)  # 原CTC贪婪解码
    elif method == 'beam':
        return ctc_beam_search(log_probs, beam_width=beam_width)
    else:
        raise ValueError("Unsupported decode method.")


def decode_ctc_postprocess(log_probs, blank=10, max_repeat=2):
    """
    改进版 CTC 解码器：保留重复，但限制重复次数（防止重复拖尾）
    Args:
        log_probs: (T, C)
        blank: blank 标签索引
        max_repeat: 每个数字最多重复次数（避免 111111 → 111）

    Returns:
        字符串预测结果
    """
    preds = torch.argmax(log_probs, dim=1).detach().cpu().numpy().tolist()

    result = []
    last_char = None
    repeat_count = 0

    for p in preds:
        if p == blank:
            last_char = None
            repeat_count = 0
            continue

        if p == last_char:
            repeat_count += 1
        else:
            repeat_count = 1
            last_char = p

        if repeat_count <= max_repeat:
            result.append(p)

    return ''.join(str(c) for c in result)



In [2]:
import torch
import torch.nn as nn

class CRNN(nn.Module):
    def __init__(self, img_height=32, num_classes=11, rnn_hidden=128, rnn_layers=2):
        super().__init__()

        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),    # Layer 1
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # H → H/2

            nn.Conv2d(64, 128, kernel_size=3, padding=1),  # Layer 2
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # H → H/4

            nn.Conv2d(128, 256, kernel_size=3, padding=1), # Layer 3
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # H → H/8

            nn.Conv2d(256, 256, kernel_size=3, padding=1), # Layer 4
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # H → H/16
        )

        # feature_compression: (B, W, 256×H') → (B, W, 256)
        self.feature_compression = nn.Linear(256 * (img_height // 16), 256)

        self.rnn = nn.LSTM(
            input_size=256,
            hidden_size=rnn_hidden,
            num_layers=rnn_layers,
            bidirectional=True,
            dropout=0.1,
            batch_first=False
        )

        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(rnn_hidden * 2, num_classes)

    def forward(self, x):
        conv = self.cnn(x)                  # (B, 256, H', W')
        B, C, H, W = conv.size()
        conv = conv.permute(0, 3, 1, 2)     # (B, W, C, H)
        conv = conv.reshape(B, W, C * H)    # (B, T, 256×H')
        conv = self.feature_compression(conv)  # (B, T, 256)
        conv = conv.permute(1, 0, 2)        # (T, B, 256)

        rnn_out, _ = self.rnn(conv)
        rnn_out = self.dropout(rnn_out)
        output = self.fc(rnn_out)           # (T, B, num_classes)
        return output


In [3]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd
from torchvision import transforms
# from utils import encode_label
import random


from torchvision import transforms
from torchvision.transforms import InterpolationMode
import torch
import random


def get_train_transform(img_height, img_width):
    return transforms.Compose([
        transforms.Resize((img_height, img_width)),
        transforms.RandomRotation(degrees=2),
        transforms.ToTensor(),
        transforms.Normalize((0.5,)*3, (0.5,)*3)
    ])


def get_test_transform(img_height, img_width):
    return transforms.Compose([
        transforms.Resize((img_height, img_width), interpolation=InterpolationMode.BICUBIC),
        transforms.ToTensor(),
        transforms.Normalize((0.5,)*3, (0.5,)*3)
    ])


class MeterDigitDataset(Dataset):
    def __init__(self, csv_file, image_dir, img_height=32, img_width=128, is_train=True):
        self.data = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.img_height = img_height
        self.img_width = img_width
        self.is_train = is_train

        # self.transform = transforms(img_height, img_width)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        filename = row['filename']
        label_str = str(row['label']).zfill(6)

        # 加载图像
        img_path = os.path.join(self.image_dir, filename)
        image = Image.open(img_path).convert('RGB')
        image = self.transform(image)

        # 编码标签
        label_encoded = encode_label(label_str)

        label_length = len(label_encoded)
        input_length = self.img_width // 16

        return {
            'image': image,
            'label': torch.tensor(label_encoded),
            'label_length': torch.tensor(label_length),
            'input_length': torch.tensor(input_length),
            'filename': filename
        }


In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
# from crnn_model import CRNN
# from crnn_model import CRNNTransformer
# from dataset import MeterDigitDataset
# from utils import NUM_CLASSES
from torch.utils.data import Subset, random_split
# from utils import decode_label
# from dataset import MeterDigitDataset, get_train_transform, get_test_transform
# from utils import decode_output
from torch.optim.lr_scheduler import ReduceLROnPlateau


def train_model(
    image_dir,
    label_csv,
    epochs=40,
    batch_size=32,
    lr=1e-4,
    img_height=32,
    img_width=128,
    checkpoint_dir='/kaggle/working/'
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    full_dataset = MeterDigitDataset(
        csv_file=label_csv,
        image_dir=image_dir,
        img_height=img_height,
        img_width=img_width
    )

    total_len = len(full_dataset)
    train_len = int(total_len * 0.9)
    val_len = total_len - train_len

    train_indices, val_indices = torch.utils.data.random_split(
        range(total_len), [train_len, val_len], generator=torch.Generator().manual_seed(42)
    )

    train_dataset = Subset(full_dataset, train_indices)
    train_dataset.dataset.transform = get_train_transform(img_height, img_width)

    val_dataset = Subset(full_dataset, val_indices)
    val_dataset.dataset.transform = get_test_transform(img_height, img_width)

    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    model = CRNN(img_height=img_height, num_classes=NUM_CLASSES).to(device)
    criterion = nn.CTCLoss(blank=NUM_CLASSES - 1, zero_infinity=True)
    optimizer = Adam(model.parameters(), lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

    best_loss = float('inf')
    best_acc = 0.0
    os.makedirs(checkpoint_dir, exist_ok=True)

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0

        for batch in train_loader:
            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            label_lengths = batch['label_length'].to(device)
            outputs = model(images)
            input_lengths = torch.full(size=(images.size(0),), fill_value=outputs.size(0), dtype=torch.long).to(device)

            optimizer.zero_grad()
            log_probs = nn.functional.log_softmax(outputs, dim=2)
            loss = criterion(log_probs, labels, input_lengths, label_lengths)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)

        # === 验证 ===
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in val_loader:
                images = batch['image'].to(device)
                labels = batch['label'].to(device)
                label_lengths = batch['label_length'].to(device)

                outputs = model(images)
                input_lengths = torch.full(size=(images.size(0),), fill_value=outputs.size(0), dtype=torch.long).to(device)
                log_probs = nn.functional.log_softmax(outputs, dim=2)
                loss = criterion(log_probs, labels, input_lengths, label_lengths)
                val_loss += loss.item()

                preds = torch.argmax(log_probs, dim=2).permute(1, 0)

                for i in range(images.size(0)):
                    log_prob = log_probs[:, i, :]
                    pred_str = decode_output(log_prob, method='argmax')[:6].ljust(6, '0')
                    start = int(sum(label_lengths[:i]))
                    end = start + int(label_lengths[i])
                    true_seq = labels[start:end].detach().cpu().numpy().tolist()
                    true_str = decode_label(true_seq)[:6].ljust(6, '0')

                    if pred_str[:5] == true_str[:5]:
                        correct += 1
                    total += 1

        acc = correct / total
        avg_val_loss = val_loss / len(val_loader)
        print(f"Epoch {epoch}: train_loss = {avg_loss:.4f} | val_loss = {avg_val_loss:.4f} | val_acc = {acc:.2%}")

        scheduler.step(avg_val_loss)

        # === 保存 val_loss 最低的模型 ===
        if avg_val_loss < best_loss:
            best_loss = avg_val_loss
            torch.save(model.state_dict(), os.path.join(checkpoint_dir, 'new_best_model.pth'))
            print("Best model (val_loss) updated.")

        # === 保存 val_acc 最高的模型 ===
        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), os.path.join(checkpoint_dir, 'best_acc_model.pth'))
            print("Best model (val_acc) updated.")


# === 用于CTC的collate函数：拼接label序列 ===


def collate_fn(batch):
    # 自动拼接 label、image 等变量，处理不等长序列
    images = torch.stack([b['image'] for b in batch])
    labels = torch.cat([b['label'] for b in batch])
    label_lengths = torch.stack([b['label_length'] for b in batch])
    input_lengths = torch.stack([b['input_length'] for b in batch])
    filenames = [b['filename'] for b in batch]
    return {
        'image': images,
        'label': labels,
        'label_length': label_lengths,
        'input_length': input_lengths,
        'filename': filenames
    }


# === 主函数入口 ===
if __name__ == '__main__':
    image_dir = '/kaggle/input/numdigit/cropped_digits'
    label_csv = '/kaggle/input/numdigit/labels.csv'

    train_model(
        image_dir=image_dir,
        label_csv=label_csv,
        epochs=45,
        batch_size=16,
        lr=1e-4
    )


Epoch 1: train_loss = 2.3936 | val_loss = 2.3417 | val_acc = 0.00%
Best model (val_loss) updated.
Epoch 2: train_loss = 2.1705 | val_loss = 2.0887 | val_acc = 0.00%
Best model (val_loss) updated.
Epoch 3: train_loss = 1.8503 | val_loss = 1.8188 | val_acc = 0.00%
Best model (val_loss) updated.
Epoch 4: train_loss = 1.4580 | val_loss = 1.5807 | val_acc = 1.19%
Best model (val_loss) updated.
Best model (val_acc) updated.
Epoch 5: train_loss = 1.0927 | val_loss = 1.0921 | val_acc = 25.00%
Best model (val_loss) updated.
Best model (val_acc) updated.
Epoch 6: train_loss = 0.8223 | val_loss = 0.7650 | val_acc = 70.24%
Best model (val_loss) updated.
Best model (val_acc) updated.
Epoch 7: train_loss = 0.6109 | val_loss = 0.6448 | val_acc = 71.43%
Best model (val_loss) updated.
Best model (val_acc) updated.
Epoch 8: train_loss = 0.4716 | val_loss = 0.6258 | val_acc = 78.57%
Best model (val_loss) updated.
Best model (val_acc) updated.
Epoch 9: train_loss = 0.3781 | val_loss = 0.6108 | val_acc = 7

In [3]:
import os
import torch
import pandas as pd
from PIL import Image
from torchvision import transforms
# from model import CRNN        
# from utils import decode_output 

def load_model(model_path, device, img_height=32, num_classes=11):
    model = CRNN(img_height=img_height, num_classes=num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model

def get_transform(img_height=32, img_width=128):
    return transforms.Compose([
        transforms.Resize((img_height, img_width)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,)*3, (0.5,)*3)
    ])

def predict_folder(image_folder, model_path, output_csv, img_height=32, img_width=128):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = load_model(model_path, device, img_height)

    transform = get_transform(img_height, img_width)
    results = []

    image_files = sorted([f for f in os.listdir(image_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])

    for filename in image_files:
        image_path = os.path.join(image_folder, filename)
        image = Image.open(image_path).convert('RGB')
        image = transform(image).unsqueeze(0).to(device)  # (1, 3, H, W)

        with torch.no_grad():
            output = model(image)        # (T, B, C)
            log_probs = torch.nn.functional.log_softmax(output, dim=2)
            pred = decode_output(log_probs[:, 0, :], method='argmax')[:6].ljust(6, '0')
            # pred = decode_output(log_probs[:, 0, :], method='argmax')[:6].zfill(6)
            # pred = decode_output_no_merge(log_probs[:, 0, :], blank=NUM_CLASSES - 1)[:6].ljust(6, '0')
            # pred = decode_ctc_postprocess(log_probs, blank=10, max_repeat=3)[:6].ljust(6, '0')
            # pred = decode_output(log_probs[:, 0, :])  # 解码第一个 batch 的预测
            # ✅ 在最后一位前插入小数点，如 '09575.9'
            pred = pred[:5] + '.' + pred[5]

        results.append({'id': filename, 'number': pred})

    df = pd.DataFrame(results)
    df.to_csv(output_csv, index=False)
    print(f"预测结果已保存至：{output_csv}")

if __name__ == '__main__':
    image_folder = '/kaggle/input/numdigit/cropped_digits_test'              
    model_path = '/kaggle/input/numdigit/best_acc_model.pth'     
    output_csv = 'prediction_results.csv'

    predict_folder(image_folder, model_path, output_csv)


预测结果已保存至：prediction_results.csv
