<a href="https://colab.research.google.com/github/Bule-rain/PyTorch-/blob/main/%E6%AC%A2%E8%BF%8E%E4%BD%BF%E7%94%A8_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import zipfile
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import io

# 深度学习相关库
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
import py7zr  # 需要安装: !pip install py7zr
from google.colab import drive

# 设置随机种子
np.random.seed(42)
tf.random.set_seed(42)

# CIFAR-10类别标签
CLASS_NAMES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

def mount_drive_and_check():
    """挂载Drive并检查文件"""
    print("正在挂载Google Drive...")
    drive.mount('/content/drive')

    # 检查cifar-10.zip文件是否存在
    zip_path = '/content/drive/MyDrive/cifar-10.zip'
    if os.path.exists(zip_path):
        print(f"✅ 找到cifar-10.zip文件: {zip_path}")
        print(f"文件大小: {os.path.getsize(zip_path) / (1024*1024):.1f} MB")
        return zip_path
    else:
        print("❌ 未找到cifar-10.zip文件")
        print("请确保文件路径正确")
        return None

def explore_zip_contents(zip_path):
    """探索zip文件内容"""
    print("正在探索zip文件内容...")

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        file_list = zip_ref.namelist()
        print(f"zip文件中包含 {len(file_list)} 个文件:")
        for file in file_list:
            print(f"  - {file}")

        # 查找7z文件
        sevenZ_files = [f for f in file_list if f.endswith('.7z')]
        csv_files = [f for f in file_list if f.endswith('.csv')]

        print(f"\n找到 {len(sevenZ_files)} 个7z文件:")
        for sz_file in sevenZ_files:
            print(f"  - {sz_file}")

        print(f"找到 {len(csv_files)} 个CSV文件:")
        for csv_file in csv_files:
            print(f"  - {csv_file}")

        return file_list, sevenZ_files, csv_files

def extract_and_load_data(zip_path):
    """解压zip和7z文件，加载 标签CSV + 图片数据"""
    print("正在解压文件...")

    # 创建临时目录
    temp_dir = '/content/temp_cifar'
    os.makedirs(temp_dir, exist_ok=True)

    # 首先解压zip文件
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(temp_dir)
        print("✅ zip文件解压完成")

    # 查找解压后的文件
    extracted_files = os.listdir(temp_dir)
    print(f"解压后的文件: {extracted_files}")

    # 初始化数据变量
    train_labels_df = None  # 存训练标签
    test_images = None      # 存测试图片
    sample_submission = None# 存提交样本
    train_images = None     # 存训练图片（可选，若需要从7z加载训练图）

    # 处理CSV文件（标签、提交样本）
    for file in extracted_files:
        if file.endswith('.csv'):
            file_path = os.path.join(temp_dir, file)
            if 'trainLabels' in file:  # 匹配训练标签文件
                train_labels_df = pd.read_csv(file_path)
                print(f"✅ 加载训练标签数据: {train_labels_df.shape}")
            elif 'sampleSubmission' in file:  # 匹配提交样本
                sample_submission = pd.read_csv(file_path)
                print(f"✅ 加载提交样本: {sample_submission.shape}")

    # 处理7z文件（提取图片）
    for file in extracted_files:
        if file.endswith('.7z'):
            print(f"正在解压7z文件: {file}")
            sevenZ_path = os.path.join(temp_dir, file)

            try:
                with py7zr.SevenZipFile(sevenZ_path, mode='r') as archive:
                    archive.extractall(path=temp_dir)
                    print(f"✅ {file} 解压完成")

                    # 区分训练/测试7z，加载图片
                    if 'train' in file:
                        train_img_dir = os.path.join(temp_dir, 'train')  # 假设解压到 train 目录
                        if os.path.exists(train_img_dir):
                            train_images = []
                            for img_name in sorted(os.listdir(train_img_dir)):
                                img_path = os.path.join(train_img_dir, img_name)
                                img = Image.open(img_path).convert('RGB')
                                img = np.array(img)
                                train_images.append(img)
                            train_images = np.array(train_images)
                            print(f"✅ 加载训练图片 {train_images.shape[0]} 张")
                    elif 'test' in file:
                        test_img_dir = os.path.join(temp_dir, 'test')  # 假设解压到 test 目录
                        if os.path.exists(test_img_dir):
                            test_images = []
                            for img_name in sorted(os.listdir(test_img_dir)):
                                img_path = os.path.join(test_img_dir, img_name)
                                img = Image.open(img_path).convert('RGB')
                                img = np.array(img)
                                test_images.append(img)
                            test_images = np.array(test_images)
                            print(f"✅ 加载测试图片 {test_images.shape[0]} 张")
            except Exception as e:
                print(f"❌ 解压 {file} 失败: {e}")
                continue

    # 整合训练数据（标签 + 图片）
    if train_labels_df is not None and train_images is not None:
        # 确保标签和图片数量匹配（CIFAR-10 训练集 50000 张图 + 50000 条标签）
        if len(train_labels_df) == len(train_images):
            train_df = pd.DataFrame({
                'id': train_labels_df['id'],
                'label': train_labels_df['label'],
                'image': list(train_images)  # 存图片数组
            })
        else:
            print("⚠️ 训练标签和图片数量不匹配，跳过整合")
            train_df = None
    else:
        train_df = None

    # 构造测试数据DataFrame（仅图片，提交时用ID匹配）
    if test_images is not None:
        test_df = pd.DataFrame({
            'id': range(1, len(test_images)+1),  # 假设ID从1开始
            'image': list(test_images)
        })
    else:
        test_df = None

    return train_df, test_df, sample_submission, temp_dir

def cleanup_temp_files(temp_dir):
    """清理临时文件"""
    import shutil
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
        print(f"✅ 临时文件已清理: {temp_dir}")

def load_cifar_data():
    """完整的数据加载流程（适配图片+标签）"""
    # 安装py7zr（如果没有安装）
    try:
        import py7zr
    except ImportError:
        print("正在安装py7zr...")
        os.system("pip install py7zr")
        import py7zr

    # 挂载Drive并检查文件
    zip_path = mount_drive_and_check()
    if not zip_path:
        return None, None, None

    # 探索文件内容
    file_list, sevenZ_files, csv_files = explore_zip_contents(zip_path)

    # 解压并加载数据（图片+标签）
    train_df, test_df, sample_submission, temp_dir = extract_and_load_data(zip_path)

    # 数据概览
    if train_df is not None:
        print(f"\n📊 训练数据概览:")
        print(f"形状: {train_df.shape}")
        print(f"列名: {list(train_df.columns)}")
        print(train_df.head())

    if test_df is not None:
        print(f"\n📊 测试数据概览:")
        print(f"形状: {test_df.shape}")
        print(f"列名: {list(test_df.columns)}")
        print(test_df.head())

    if sample_submission is not None:
        print(f"\n📊 提交样本概览:")
        print(f"形状: {sample_submission.shape}")
        print(f"列名: {list(sample_submission.columns)}")
        print(sample_submission.head())

    # 询问是否清理临时文件
    print(f"\n临时文件保存在: {temp_dir}")
    print("如需清理临时文件，请调用: cleanup_temp_files(temp_dir)")

    return train_df, test_df, sample_submission

def quick_data_analysis(train_df, test_df, sample_submission):
    """快速数据分析（适配图片数据）"""
    print("\n=== 数据分析 ===")

    if train_df is not None:
        print("训练数据信息:")
        print(f"  形状: {train_df.shape}")
        print(f"  列名: {list(train_df.columns)}")
        print(f"  前几行:")
        print(train_df.head(2))

        # 检查标签分布
        if 'label' in train_df.columns:
            label_counts = train_df['label'].value_counts().sort_index()
            print(f"\n标签分布:")
            for label, count in label_counts.items():
                print(f"  {label}: {count} 张")

    if test_df is not None:
        print("\n测试数据信息:")
        print(f"  形状: {test_df.shape}")
        print(f"  列名: {list(test_df.columns)}")
        print(f"  前几行:")
        print(test_df.head(2))

    if sample_submission is not None:
        print("\n提交样本信息:")
        print(f"  形状: {sample_submission.shape}")
        print(f"  列名: {list(sample_submission.columns)}")
        print(f"  前几行:")
        print(sample_submission.head(2))

def preprocess_data(train_df, test_df):
    """数据预处理（适配图片数组）"""
    print("\n=== 数据预处理 ===")

    # 创建标签映射字典
    label_to_int = {
        'airplane': 0, 'automobile': 1, 'bird': 2, 'cat': 3, 'deer': 4,
        'dog': 5, 'frog': 6, 'horse': 7, 'ship': 8, 'truck': 9
    }

    # 分离特征（图片）和标签
    if train_df is not None and 'image' in train_df.columns and 'label' in train_df.columns:
        X_train = np.array(train_df['image'].tolist())  # 转成 numpy 数组
        # 将字符串标签转换为数字标签
        y_train = train_df['label'].map(label_to_int).values
        print(f"标签转换完成：{train_df['label'].iloc[0]} -> {y_train[0]}")
    else:
        X_train, y_train = None, None

    # 处理测试数据（图片）
    if test_df is not None and 'image' in test_df.columns:
        X_test = np.array(test_df['image'].tolist())
        test_ids = test_df['id'].values
    else:
        X_test, test_ids = None, None

    # 重塑为标准图像格式 (32x32x3)
    if X_train is not None and X_train.shape[1:] == (32, 32, 3):
        print("✅ 训练数据已为32x32x3图像格式")
    else:
        if X_train is not None:
            print(f"⚠️  训练图像维度异常，实际形状: {X_train.shape}")

    if X_test is not None and X_test.shape[1:] == (32, 32, 3):
        print("✅ 测试数据已为32x32x3图像格式")
    else:
        if X_test is not None:
            print(f"⚠️  测试图像维度异常，实际形状: {X_test.shape}")

    # 标准化到 [0,1] 范围
    if X_train is not None:
        X_train = X_train.astype('float32') / 255.0
        print(f"✅ 训练数据标准化完成，像素值范围: [{X_train.min():.3f}, {X_train.max():.3f}]")
    if X_test is not None:
        X_test = X_test.astype('float32') / 255.0
        print(f"✅ 测试数据标准化完成，像素值范围: [{X_test.min():.3f}, {X_test.max():.3f}]")

    # 标签one-hot编码
    y_train_onehot = keras.utils.to_categorical(y_train, 10) if y_train is not None else None

    print(f"\n预处理完成:")
    print(f"  训练图像: {X_train.shape if X_train is not None else 'None'}")
    print(f"  训练标签: {y_train_onehot.shape if y_train_onehot is not None else 'None'}")
    print(f"  测试图像: {X_test.shape if X_test is not None else 'None'}")
    print(f"  标签范围: {y_train.min()}-{y_train.max()}" if y_train is not None else "  标签范围: None")

    return X_train, y_train_onehot, X_test, test_ids

def create_model():
    """创建改进的CNN模型"""
    model = keras.Sequential([
        # 第一层卷积块
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
        layers.BatchNormalization(),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 第二层卷积块
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 第三层卷积块
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.Dropout(0.25),

        # 全连接层
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    return model

def train_model_enhanced(model, X_train, y_train):
    """增强版模型训练函数"""
    print("\n=== 开始训练 ===")

    # 检查数据
    if X_train is None or y_train is None:
        print("❌ 训练数据未正确加载，无法训练")
        return None, None

    # 检查数据形状
    if len(X_train.shape) != 4 or X_train.shape[1:] != (32, 32, 3):
        print(f"❌ 训练数据形状不正确: {X_train.shape}, 期望: (N, 32, 32, 3)")
        return None, None

    if len(y_train.shape) != 2 or y_train.shape[1] != 10:
        print(f"❌ 标签数据形状不正确: {y_train.shape}, 期望: (N, 10)")
        return None, None

    print(f"✅ 数据检查通过:")
    print(f"   训练数据形状: {X_train.shape}")
    print(f"   标签数据形状: {y_train.shape}")

    # 编译模型
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    try:
        # 分割验证集
        X_train_split, X_val, y_train_split, y_val = train_test_split(
            X_train, y_train,
            test_size=0.2,
            random_state=42,
            stratify=np.argmax(y_train, axis=1)
        )

        print(f"✅ 数据分割完成:")
        print(f"   训练集: {X_train_split.shape[0]} 张")
        print(f"   验证集: {X_val.shape[0]} 张")

        # 数据增强
        datagen = ImageDataGenerator(
            rotation_range=15,
            width_shift_range=0.1,
            height_shift_range=0.1,
            horizontal_flip=True,
            zoom_range=0.1,
            fill_mode='nearest'
        )

        # 回调函数
        callbacks = [
            EarlyStopping(
                monitor='val_accuracy',
                patience=15,
                restore_best_weights=True,
                verbose=1
            ),
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=7,
                min_lr=1e-7,
                verbose=1
            )
        ]

        print("🚀 开始训练...")

        # 计算步数
        batch_size = 32  # 减小批次大小避免内存问题
        steps_per_epoch = max(1, len(X_train_split) // batch_size)

        # 训练模型
        history = model.fit(
            datagen.flow(X_train_split, y_train_split, batch_size=batch_size),
            steps_per_epoch=steps_per_epoch,
            epochs=50,  # 减少初始epochs
            validation_data=(X_val, y_val),
            callbacks=callbacks,
            verbose=1
        )

        # 训练完成后的评估
        print("\n=== 训练完成 ===")
        val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
        print(f"✅ 最终验证准确率: {val_accuracy:.4f}")
        print(f"✅ 最终验证损失: {val_loss:.4f}")

        return model, history

    except Exception as e:
        print(f"❌ 训练过程中出现错误: {str(e)}")
        print("可能的解决方案:")
        print("1. 检查数据预处理是否正确")
        print("2. 减小批次大小")
        print("3. 检查GPU内存是否足够")
        return None, None

def plot_training_history(history):
    """绘制训练历史图表"""
    if history is None:
        print("❌ 没有训练历史数据可以绘制")
        return

    try:
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

        # 准确率图表
        ax1.plot(history.history['accuracy'], label='训练准确率')
        ax1.plot(history.history['val_accuracy'], label='验证准确率')
        ax1.set_title('模型准确率')
        ax1.set_xlabel('轮次')
        ax1.set_ylabel('准确率')
        ax1.legend()
        ax1.grid(True)

        # 损失图表
        ax2.plot(history.history['loss'], label='训练损失')
        ax2.plot(history.history['val_loss'], label='验证损失')
        ax2.set_title('模型损失')
        ax2.set_xlabel('轮次')
        ax2.set_ylabel('损失')
        ax2.legend()
        ax2.grid(True)

        plt.tight_layout()
        plt.show()

    except Exception as e:
        print(f"❌ 绘图时出现错误: {str(e)}")

def prepare_cifar10_data():
    """准备CIFAR-10数据（备用方案：直接从Keras加载）"""
    try:
        # 加载CIFAR-10数据
        (X_train, y_train), (X_test, y_test) = keras.datasets.cifar10.load_data()

        # 数据预处理
        X_train = X_train.astype('float32') / 255.0
        X_test = X_test.astype('float32') / 255.0

        # 标签one-hot编码
        y_train_onehot = keras.utils.to_categorical(y_train, 10)
        y_test_onehot = keras.utils.to_categorical(y_test, 10)

        print(f"✅ 数据加载完成:")
        print(f"   训练集: {X_train.shape}")
        print(f"   测试集: {X_test.shape}")
        print(f"   训练标签: {y_train_onehot.shape}")
        print(f"   测试标签: {y_test_onehot.shape}")

        return X_train, y_train_onehot, X_test, y_test_onehot

    except Exception as e:
        print(f"❌ 数据加载失败: {str(e)}")
        return None, None, None, None

def generate_submission_enhanced(model, X_test, test_ids, sample_submission):
    """增强版提交文件生成函数"""
    print("\n=== 生成提交文件 ===")

    if X_test is None or sample_submission is None:
        print("❌ 测试数据或提交样本未正确加载，无法生成提交文件")
        return None

    if model is None:
        print("❌ 模型未训练，无法生成预测")
        return None

    print(f"📊 开始预测 {len(X_test)} 张测试图片...")

    # 分批预测（避免内存不足）
    batch_size = 1000
    predictions = []

    for i in range(0, len(X_test), batch_size):
        end_idx = min(i + batch_size, len(X_test))
        batch_predictions = model.predict(X_test[i:end_idx], verbose=0)
        predictions.append(batch_predictions)

        # 显示进度
        progress = (end_idx / len(X_test)) * 100
        print(f"预测进度: {progress:.1f}% ({end_idx}/{len(X_test)})")

    # 合并所有预测结果
    predictions = np.vstack(predictions)
    predicted_classes = np.argmax(predictions, axis=1)

    # 将数字标签转回字符串标签
    int_to_label = {
        0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
        5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'
    }

    predicted_labels = [int_to_label[pred] for pred in predicted_classes]

    # 创建提交文件
    submission = sample_submission.copy()
    submission['label'] = predicted_labels

    # 验证提交文件格式
    print(f"\n📋 提交文件验证:")
    print(f"   形状: {submission.shape}")
    print(f"   列名: {list(submission.columns)}")
    print(f"   ID范围: {submission['id'].min()} - {submission['id'].max()}")
    print(f"   标签类型: {submission['label'].dtype}")

    # 检查预测分布
    print(f"\n📊 预测结果分布:")
    label_counts = submission['label'].value_counts().sort_index()
    for label, count in label_counts.items():
        percentage = (count / len(submission)) * 100
        print(f"   {label}: {count:,} 张 ({percentage:.1f}%)")

    # 保存到Drive
    submission_path = '/content/drive/MyDrive/cifar10_submission.csv'
    submission.to_csv(submission_path, index=False)

    print(f"\n✅ 提交文件已保存: {submission_path}")
    print(f"📁 文件大小: {os.path.getsize(submission_path) / (1024*1024):.1f} MB")

    # 显示提交文件前几行
    print(f"\n📝 提交文件预览:")
    print(submission.head(10))

    return submission

def validate_submission(submission, sample_submission):
    """验证提交文件的正确性"""
    print("\n=== 提交文件验证 ===")

    # 检查形状
    if submission.shape == sample_submission.shape:
        print("✅ 文件形状正确")
    else:
        print(f"❌ 文件形状错误: {submission.shape} vs {sample_submission.shape}")
        return False

    # 检查列名
    if list(submission.columns) == list(sample_submission.columns):
        print("✅ 列名正确")
    else:
        print(f"❌ 列名错误: {list(submission.columns)} vs {list(sample_submission.columns)}")
        return False

    # 检查ID是否完整
    if set(submission['id']) == set(sample_submission['id']):
        print("✅ ID完整")
    else:
        print("❌ ID不完整")
        return False

    # 检查标签是否合法
    valid_labels = {'airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck'}
    if set(submission['label']) <= valid_labels:
        print("✅ 标签合法")
    else:
        invalid_labels = set(submission['label']) - valid_labels
        print(f"❌ 发现非法标签: {invalid_labels}")
        return False

    print("🎉 提交文件验证通过！")
    return True

def main():
    """主函数 - 完整流程（适配CIFAR-10真实数据）"""
    print("🚀 CIFAR-10 图像分类项目开始")
    print("=" * 50)

    # 初始化变量，避免NameError
    X_train, y_train_onehot, X_test, test_ids = None, None, None, None
    model, history, submission = None, None, None

    try:
        # 1. 数据加载
        print("步骤1: 数据加载")
        train_df, test_df, sample_submission = load_cifar_data()

        if train_df is None and test_df is None:
            print("⚠️ 自定义数据加载失败，使用Keras内置CIFAR-10数据集")
            X_train, y_train_onehot, X_test, y_test_onehot = prepare_cifar10_data()
            test_ids = list(range(1, len(X_test) + 1))
            # 创建简单的提交样本格式
            sample_submission = pd.DataFrame({
                'id': test_ids,
                'label': ['airplane'] * len(test_ids)  # 占位符
            })
        else:
            # 2. 数据分析
            print("\n步骤2: 数据分析")
            quick_data_analysis(train_df, test_df, sample_submission)

            # 3. 数据预处理
            print("\n步骤3: 数据预处理")
            X_train, y_train_onehot, X_test, test_ids = preprocess_data(train_df, test_df)

        # 4. 模型创建
        print("\n步骤4: 模型创建")
        model = create_model()
        print("✅ 模型创建完成")
        print(f"模型参数数量: {model.count_params():,}")

        # 显示模型结构
        model.summary()

        # 5. 模型训练
        print("\n步骤5: 模型训练")
        if X_train is not None and y_train_onehot is not None:
            model, history = train_model_enhanced(model, X_train, y_train_onehot)

            if history is not None:
                # 6. 绘制训练曲线
                print("\n步骤6: 训练结果可视化")
                plot_training_history(history)
            else:
                print("❌ 训练失败，无法继续")
                return
        else:
            print("❌ 训练数据不可用，无法训练模型")
            return

        # 7. 生成预测和提交文件
        print("\n步骤7: 生成提交文件")
        if X_test is not None and sample_submission is not None:
            submission = generate_submission_enhanced(model, X_test, test_ids, sample_submission)

            if submission is not None:
                # 8. 验证提交文件
                print("\n步骤8: 验证提交文件")
                is_valid = validate_submission(submission, sample_submission)

                if is_valid:
                    print("🎉 项目完成！提交文件已生成并验证通过")
                else:
                    print("⚠️ 提交文件验证失败，请检查")
            else:
                print("❌ 提交文件生成失败")
        else:
            print("❌ 测试数据不可用，无法生成提交文件")

    except Exception as e:
        print(f"❌ 执行过程中出现错误: {str(e)}")
        import traceback
        traceback.print_exc()

    finally:
        # 清理临时文件（如果存在）
        temp_dir = '/content/temp_cifar'
        if 'temp_dir' in locals():
            try:
                cleanup_temp_files(temp_dir)
            except:
                pass

        print("\n" + "=" * 50)
        print("程序执行完毕")

if __name__ == '__main__':
    # 设置matplotlib后端（适配Colab环境）
    import matplotlib
    matplotlib.use('Agg')  # 或者 'inline' 如果在Jupyter中

    # 执行主函数
    main()

🚀 CIFAR-10 图像分类项目开始
步骤1: 数据加载
正在挂载Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ 找到cifar-10.zip文件: /content/drive/MyDrive/cifar-10.zip
文件大小: 715.4 MB
正在探索zip文件内容...
zip文件中包含 4 个文件:
  - sampleSubmission.csv
  - test.7z
  - train.7z
  - trainLabels.csv

找到 2 个7z文件:
  - test.7z
  - train.7z
找到 2 个CSV文件:
  - sampleSubmission.csv
  - trainLabels.csv
正在解压文件...
✅ zip文件解压完成
解压后的文件: ['train.7z', 'test.7z', 'trainLabels.csv', 'test', 'sampleSubmission.csv', 'train']
✅ 加载训练标签数据: (50000, 2)
✅ 加载提交样本: (300000, 2)
正在解压7z文件: train.7z
✅ train.7z 解压完成
✅ 加载训练图片 50000 张
正在解压7z文件: test.7z
✅ test.7z 解压完成
✅ 加载测试图片 300000 张

📊 训练数据概览:
形状: (50000, 3)
列名: ['id', 'label', 'image']
   id       label                                              image
0   1        frog  [[[59, 62, 63], [43, 46, 45], [50, 48, 43], [6...
1   2       truck  [[[125, 125, 116], [110, 101, 91], [102, 90, 8...
2   3       truck  [[[62, 64,

In [3]:
# 修复后的CIFAR-10模型训练代码

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt

# 6. 创建模型
def create_model():
    """创建改进的CNN模型"""
    model = keras.Sequential([
        # 第一层卷积块
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
        layers.BatchNormalization(),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 第二层卷积块
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 第三层卷积块
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.Dropout(0.25),

        # 全连接层
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    return model

# 7. 训练模型
def train_model_enhanced(model, X_train, y_train):
    """增强版模型训练函数"""
    print("\n=== 开始训练 ===")

    # 检查数据
    if X_train is None or y_train is None:
        print("❌ 训练数据未正确加载，无法训练")
        return None, None

    # 检查数据形状
    if len(X_train.shape) != 4 or X_train.shape[1:] != (32, 32, 3):
        print(f"❌ 训练数据形状不正确: {X_train.shape}, 期望: (N, 32, 32, 3)")
        return None, None

    if len(y_train.shape) != 2 or y_train.shape[1] != 10:
        print(f"❌ 标签数据形状不正确: {y_train.shape}, 期望: (N, 10)")
        return None, None

    print(f"✅ 数据检查通过:")
    print(f"   训练数据形状: {X_train.shape}")
    print(f"   标签数据形状: {y_train.shape}")

    # 编译模型
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    try:
        # 分割验证集
        X_train_split, X_val, y_train_split, y_val = train_test_split(
            X_train, y_train,
            test_size=0.2,
            random_state=42,
            stratify=np.argmax(y_train, axis=1)
        )

        print(f"✅ 数据分割完成:")
        print(f"   训练集: {X_train_split.shape[0]} 张")
        print(f"   验证集: {X_val.shape[0]} 张")

        # 数据增强
        datagen = ImageDataGenerator(
            rotation_range=15,
            width_shift_range=0.1,
            height_shift_range=0.1,
            horizontal_flip=True,
            zoom_range=0.1,
            fill_mode='nearest'
        )

        # 回调函数
        callbacks = [
            EarlyStopping(
                monitor='val_accuracy',
                patience=15,
                restore_best_weights=True,
                verbose=1
            ),
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=7,
                min_lr=1e-7,
                verbose=1
            )
        ]

        print("🚀 开始训练...")

        # 计算步数
        batch_size = 32  # 减小批次大小避免内存问题
        steps_per_epoch = max(1, len(X_train_split) // batch_size)

        # 训练模型
        history = model.fit(
            datagen.flow(X_train_split, y_train_split, batch_size=batch_size),
            steps_per_epoch=steps_per_epoch,
            epochs=50,  # 减少初始epochs
            validation_data=(X_val, y_val),
            callbacks=callbacks,
            verbose=1
        )

        # 训练完成后的评估
        print("\n=== 训练完成 ===")
        val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
        print(f"✅ 最终验证准确率: {val_accuracy:.4f}")
        print(f"✅ 最终验证损失: {val_loss:.4f}")

        return model, history

    except Exception as e:
        print(f"❌ 训练过程中出现错误: {str(e)}")
        print("可能的解决方案:")
        print("1. 检查数据预处理是否正确")
        print("2. 减小批次大小")
        print("3. 检查GPU内存是否足够")
        return None, None

def plot_training_history(history):
    """绘制训练历史图表"""
    if history is None:
        print("❌ 没有训练历史数据可以绘制")
        return

    try:
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

        # 准确率图表
        ax1.plot(history.history['accuracy'], label='训练准确率')
        ax1.plot(history.history['val_accuracy'], label='验证准确率')
        ax1.set_title('模型准确率')
        ax1.set_xlabel('轮次')
        ax1.set_ylabel('准确率')
        ax1.legend()
        ax1.grid(True)

        # 损失图表
        ax2.plot(history.history['loss'], label='训练损失')
        ax2.plot(history.history['val_loss'], label='验证损失')
        ax2.set_title('模型损失')
        ax2.set_xlabel('轮次')
        ax2.set_ylabel('损失')
        ax2.legend()
        ax2.grid(True)

        plt.tight_layout()
        plt.show()

    except Exception as e:
        print(f"❌ 绘图时出现错误: {str(e)}")

# 数据准备函数（如果需要）
def prepare_cifar10_data():
    """准备CIFAR-10数据"""
    try:
        # 加载CIFAR-10数据
        (X_train, y_train), (X_test, y_test) = keras.datasets.cifar10.load_data()

        # 数据预处理
        X_train = X_train.astype('float32') / 255.0
        X_test = X_test.astype('float32') / 255.0

        # 标签one-hot编码
        y_train_onehot = keras.utils.to_categorical(y_train, 10)
        y_test_onehot = keras.utils.to_categorical(y_test, 10)

        print(f"✅ 数据加载完成:")
        print(f"   训练集: {X_train.shape}")
        print(f"   测试集: {X_test.shape}")
        print(f"   训练标签: {y_train_onehot.shape}")
        print(f"   测试标签: {y_test_onehot.shape}")

        return X_train, y_train_onehot, X_test, y_test_onehot

    except Exception as e:
        print(f"❌ 数据加载失败: {str(e)}")
        return None, None, None, None

# 主执行代码
if __name__ == "__main__":
    print("=" * 50)
    print("CIFAR-10 CNN模型训练")
    print("=" * 50)

    # 步骤1: 准备数据（如果需要）
    print("\n步骤1：准备数据")
    print("-" * 30)
    # X_train, y_train_onehot, X_test, y_test_onehot = prepare_cifar10_data()

    # 步骤2: 创建模型
    print("\n步骤2：创建模型")
    print("-" * 30)
    model = create_model()
    print(f"✅ 模型创建完成")
    print(f"📊 模型参数量: {model.count_params():,}")
    print(f"📋 模型结构:")
    model.summary()

    # 步骤3: 训练模型（需要确保X_train和y_train_onehot已定义）
    print("\n步骤3：训练模型")
    print("-" * 30)

    # 检查变量是否存在
    try:
        # 这里假设X_train和y_train_onehot已经在之前的代码中定义
        # 如果没有，请取消注释上面的数据准备代码
        model_trained, history = train_model_enhanced(model, X_train, y_train_onehot)

        if model_trained is not None and history is not None:
            print("✅ 模型训练成功完成！")

            # 绘制训练历史
            plot_training_history(history)

            # 保存模型
            try:
                model_path = 'cifar10_model.h5'
                model_trained.save(model_path)
                print(f"✅ 模型已保存到: {model_path}")
            except Exception as e:
                print(f"❌ 模型保存失败: {str(e)}")
                print("尝试保存为SavedModel格式...")
                try:
                    model_trained.save('cifar10_model')
                    print("✅ 模型已保存为SavedModel格式")
                except Exception as e2:
                    print(f"❌ SavedModel保存也失败: {str(e2)}")
        else:
            print("❌ 模型训练失败")

    except NameError as e:
        print(f"❌ 变量未定义: {str(e)}")
        print("请确保X_train和y_train_onehot已经正确加载和预处理")
        print("可以取消注释数据准备代码来加载CIFAR-10数据")

CIFAR-10 CNN模型训练

步骤1：准备数据
------------------------------

步骤2：创建模型
------------------------------
✅ 模型创建完成
📊 模型参数量: 490,922
📋 模型结构:



步骤3：训练模型
------------------------------
❌ 变量未定义: name 'X_train' is not defined
请确保X_train和y_train_onehot已经正确加载和预处理
可以取消注释数据准备代码来加载CIFAR-10数据


In [None]:
# 7. 训练模型
def train_model_enhanced(model, X_train, y_train):
    """增强版模型训练函数"""
    print("\n=== 开始训练 ===")

    if X_train is None or y_train is None:
        print("❌ 训练数据未正确加载，无法训练")
        return None, None

    # 编译模型
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # 分割验证集
    X_train_split, X_val, y_train_split, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=42, stratify=np.argmax(y_train, axis=1)
    )

    print(f"✅ 数据分割完成:")
    print(f"   训练集: {X_train_split.shape[0]} 张")
    print(f"   验证集: {X_val.shape[0]} 张")

    # 数据增强
    datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True,
        zoom_range=0.1,
        fill_mode='nearest'
    )

    # 回调函数
    callbacks = [
        EarlyStopping(
            monitor='val_accuracy',
            patience=15,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=7,
            min_lr=1e-7,
            verbose=1
        )
    ]

    print("🚀 开始训练...")

    # 训练模型
    history = model.fit(
        datagen.flow(X_train_split, y_train_split, batch_size=64),
        steps_per_epoch=len(X_train_split) // 64,
        epochs=100,  # 设置较大值，依靠EarlyStopping自动停止
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )

    # 训练完成后的评估
    print("\n=== 训练完成 ===")
    val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
    print(f"✅ 最终验证准确率: {val_accuracy:.4f}")
    print(f"✅ 最终验证损失: {val_loss:.4f}")

    return model, history

def plot_training_history(history):
    """绘制训练历史图表"""
    if history is None:
        return

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    # 准确率图表
    ax1.plot(history.history['accuracy'], label='训练准确率')
    ax1.plot(history.history['val_accuracy'], label='验证准确率')
    ax1.set_title('模型准确率')
    ax1.set_xlabel('轮次')
    ax1.set_ylabel('准确率')
    ax1.legend()
    ax1.grid(True)

    # 损失图表
    ax2.plot(history.history['loss'], label='训练损失')
    ax2.plot(history.history['val_loss'], label='验证损失')
    ax2.set_title('模型损失')
    ax2.set_xlabel('轮次')
    ax2.set_ylabel('损失')
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.show()

# 执行步骤7
print("=" * 50)
print("步骤7：训练模型")
print("=" * 50)

# 训练模型
model, history = train_model_enhanced(model, X_train, y_train_onehot)

if model is not None and history is not None:
    print("✅ 模型训练成功完成！")

    # 绘制训练历史
    plot_training_history(history)

    # 保存模型
    model_path = '/content/drive/MyDrive/cifar10_model.h5'
    model.save(model_path)
    print(f"✅ 模型已保存到: {model_path}")
else:
    print("❌ 模型训练失败")

In [None]:
# 8. 生成提交文件
def generate_submission_enhanced(model, X_test, test_ids, sample_submission):
    """增强版提交文件生成函数"""
    print("\n=== 生成提交文件 ===")

    if X_test is None or sample_submission is None:
        print("❌ 测试数据或提交样本未正确加载，无法生成提交文件")
        return None

    if model is None:
        print("❌ 模型未训练，无法生成预测")
        return None

    print(f"📊 开始预测 {len(X_test)} 张测试图片...")

    # 分批预测（避免内存不足）
    batch_size = 1000
    predictions = []

    for i in range(0, len(X_test), batch_size):
        end_idx = min(i + batch_size, len(X_test))
        batch_predictions = model.predict(X_test[i:end_idx], verbose=0)
        predictions.append(batch_predictions)

        # 显示进度
        progress = (end_idx / len(X_test)) * 100
        print(f"预测进度: {progress:.1f}% ({end_idx}/{len(X_test)})")

    # 合并所有预测结果
    predictions = np.vstack(predictions)
    predicted_classes = np.argmax(predictions, axis=1)

    # 将数字标签转回字符串标签
    int_to_label = {
        0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
        5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'
    }

    predicted_labels = [int_to_label[pred] for pred in predicted_classes]

    # 创建提交文件
    submission = sample_submission.copy()
    submission['label'] = predicted_labels

    # 验证提交文件格式
    print(f"\n📋 提交文件验证:")
    print(f"   形状: {submission.shape}")
    print(f"   列名: {list(submission.columns)}")
    print(f"   ID范围: {submission['id'].min()} - {submission['id'].max()}")
    print(f"   标签类型: {submission['label'].dtype}")

    # 检查预测分布
    print(f"\n📊 预测结果分布:")
    label_counts = submission['label'].value_counts().sort_index()
    for label, count in label_counts.items():
        percentage = (count / len(submission)) * 100
        print(f"   {label}: {count:,} 张 ({percentage:.1f}%)")

    # 保存到Drive
    submission_path = '/content/drive/MyDrive/cifar10_submission.csv'
    submission.to_csv(submission_path, index=False)

    print(f"\n✅ 提交文件已保存: {submission_path}")
    print(f"📁 文件大小: {os.path.getsize(submission_path) / (1024*1024):.1f} MB")

    # 显示提交文件前几行
    print(f"\n📝 提交文件预览:")
    print(submission.head(10))

    return submission

def validate_submission(submission, sample_submission):
    """验证提交文件的正确性"""
    print("\n=== 提交文件验证 ===")

    # 检查形状
    if submission.shape == sample_submission.shape:
        print("✅ 文件形状正确")
    else:
        print(f"❌ 文件形状错误: {submission.shape} vs {sample_submission.shape}")
        return False

    # 检查列名
    if list(submission.columns) == list(sample_submission.columns):
        print("✅ 列名正确")
    else:
        print(f"❌ 列名错误: {list(submission.columns)} vs {list(sample_submission.columns)}")
        return False

    # 检查ID是否完整
    if set(submission['id']) == set(sample_submission['id']):
        print("✅ ID完整")
    else:
        print("❌ ID不完整")
        return False

    # 检查标签是否合法
    valid_labels = {'airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck'}
    if set(submission['label']) <= valid_labels:
        print("✅ 标签合法")
    else:
        invalid_labels = set(submission['label']) - valid_labels
        print(f"❌ 发现非法标签: {invalid_labels}")
        return False

    print("🎉 提交文件验证通过！")
    return True

# 执行步骤8
print("=" * 50)
print("步骤8：生成提交文件")
print("=" * 50)

# 生成提交文件
submission = generate_submission_enhanced(model, X_test, test_ids, sample_submission)

if submission is not None:
    # 验证提交文件
    is_valid = validate_submission(submission, sample_submission)

    if is_valid:
        print("\n🎉 任务完成！")
        print("=" * 50)
        print("✅ 模型训练完成")
        print("✅ 提交文件生成成功")
        print("✅ 文件验证通过")
        print("🚀 可以直接提交到Kaggle!")

        # 最终统计
        print(f"\n📈 最终统计:")
        print(f"   训练样本: 50,000 张")
        print(f"   测试样本: 300,000 张")
        print(f"   模型参数: {model.count_params():,}")
        print(f"   提交文件: /content/drive/MyDrive/cifar10_submission.csv")
    else:
        print("❌ 提交文件验证失败")
else:
    print("❌ 提交文件生成失败")

In [4]:
import zipfile
import os
import py7zr  # 需先安装：!pip install py7zr
from google.colab import drive

# 挂载 Google Drive（若文件在 Drive 中，需此步骤访问文件）
drive.mount('/content/drive')

# 1. 解压 cifar-10.zip 到当前目录（已有的解压逻辑，可根据实际情况调整）
zip_path = '/content/drive/MyDrive/cifar-10.zip'
extract_dir = '.'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)
print("✅ zip 文件已解压到当前目录")

# 2. 定义 7z 文件路径（假设解压后 test.7z 和 train.7z 在当前目录）
test_7z_path = os.path.join(extract_dir, 'test.7z')
train_7z_path = os.path.join(extract_dir, 'train.7z')

# 3. 解压 test.7z
try:
    with py7zr.SevenZipFile(test_7z_path, mode='r') as archive:
        archive.extractall(path=extract_dir)
    print("✅ test.7z 已成功解压")
except Exception as e:
    print(f"❌ 解压 test.7z 失败: {e}")

# 4. 解压 train.7z
try:
    with py7zr.SevenZipFile(train_7z_path, mode='r') as archive:
        archive.extractall(path=extract_dir)
    print("✅ train.7z 已成功解压")
except Exception as e:
    print(f"❌ 解压 train.7z 失败: {e}")

Mounted at /content/drive
✅ zip 文件已解压到当前目录
✅ test.7z 已成功解压
✅ train.7z 已成功解压


In [3]:
# 步骤1：安装py7zr
!pip install py7zr

Collecting py7zr
  Downloading py7zr-1.0.0-py3-none-any.whl.metadata (17 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting brotli>=1.1.0 (from py7zr)
  Downloading Brotli-1.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting pyzstd>=0.16.1 (from py7zr)
  Downloading pyzstd-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.5 kB)
Collecting pyppmd<1.3.0,>=1.1.0 (from py7zr)
  Downloading pyppmd-1.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.4 kB)
Collecting pybcj<1.1.0,>=1.0.0 (from py7zr)
  Downloading pybcj-1.0.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Collecting multivolumefile>=0.2.3 (from py7zr)
  Downloading multivolumefile-0.2.3-py3-none-any.whl.metadata (6.3 kB)
Collecting inflate64<1.1.0,>=1.0.0 (from py7zr)
  Downloading inflate64-1.0.3-cp311-cp311-manylinux_2_17_x86_64.manylinu

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
