In [None]:
!git clone https://github.com/chenkenanalytic/handwritting_data_all.git

In [None]:
!cat /content/handwritting_data_all/all_data.zip* > /content/handwritting_data_all/all_data.zip

In [None]:
!unzip -O big5 /content/handwritting_data_all/all_data.zip -d "/content"

In [None]:
import os
import cv2
import numpy as np
import random
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
import shutil


# 初始化列表
trianImages = []
trianLabels = []
testImages = []
testLabels = []

def countfile(dir_path):

    all_files = []
    for item in os.listdir(dir_path):
        full_path = os.path.join(dir_path, item)
        if os.path.isfile(full_path) and item.lower().endswith('.png'):

            label = item.split('_')[0]
            all_files.append((full_path, label))
        elif os.path.isdir(full_path):

            countfile(full_path)


    random.shuffle(all_files)


    split_index = int(len(all_files) * 0.8)

    for i, (file_path, label) in enumerate(all_files):
        if i < split_index:
            trianImages.append(file_path)
            trianLabels.append(label)
        else:
            testImages.append(file_path)
            testLabels.append(label)


countfile('/content/cleaned_data')


training_folder = '/content/TrainingSamples'
testFolder = '/content/TestSamples'

if not os.path.exists(training_folder):
    os.makedirs(training_folder)
if not os.path.exists(testFolder):
    os.makedirs(testFolder)

def load_image(path):

    img = cv2.imread(path)
    if img is None:

        return None
    return img

def translation(img, original_filename, save_dir):

    height, width = img.shape[:2]

    Tx = random.randint(-int(width/8), int(width/8))
    Ty = random.randint(-int(height/8), int(height/8))
    T = np.float32([[1, 0, Tx], [0, 1, Ty]])
    img_trans = cv2.warpAffine(img, T, (width, height))
    save_path = os.path.join(save_dir, f'{original_filename}_trans.png')
    cv2.imwrite(save_path, img_trans)
    return save_path

def rotation(img, original_filename, save_dir):

    height, width = img.shape[:2]

    angle = random.uniform(-30, 30)
    rotation_matrix = cv2.getRotationMatrix2D((width/2, height/2), angle, 1)
    img_rotated = cv2.warpAffine(img, rotation_matrix, (width, height))
    save_path = os.path.join(save_dir, f'{original_filename}_rot.png')
    cv2.imwrite(save_path, img_rotated)
    return save_path

def scaling(img, original_filename, save_dir):

    height, width = img.shape[:2]

    scale_factor = random.uniform(0.8, 1.2)
    new_width, new_height = int(width * scale_factor), int(height * scale_factor)
    img_scaled = cv2.resize(img, (new_width, new_height))

    if (new_width, new_height) != (width, height):
        img_scaled = cv2.resize(img_scaled, (width, height))
    save_path = os.path.join(save_dir, f'{original_filename}_scale.png')
    cv2.imwrite(save_path, img_scaled)
    return save_path

def smoothing(img, original_filename, save_dir):

    kernel_size = random.choice([3, 5, 7])
    blurred = cv2.GaussianBlur(img, (kernel_size, kernel_size), 0)
    save_path = os.path.join(save_dir, f'{original_filename}_smooth.png')
    cv2.imwrite(save_path, blurred)
    return save_path

def brightness_adjustment(img, original_filename, save_dir):

    alpha = random.uniform(0.7, 1.3)
    brightened = cv2.convertScaleAbs(img, alpha=alpha, beta=0)
    save_path = os.path.join(save_dir, f'{original_filename}_bright.png')
    cv2.imwrite(save_path, brightened)
    return save_path


augmentation_functions = [translation, rotation, scaling, smoothing, brightness_adjustment]

def process_single_image(args):

    img_path, index, save_dir, label = args
    try:

        img = load_image(img_path)
        if img is None:
            return []


        original_filename = os.path.splitext(os.path.basename(img_path))[0]


        augmented_paths = []
        for aug_func in augmentation_functions:
            save_path = aug_func(img, original_filename, save_dir)
            augmented_paths.append(save_path)


        results = [(path, label) for path in augmented_paths]

        return results

    except Exception as e:
        print(f"Image processing {img_path} Error: {str(e)}")
        return []

def augment_training_data(trianImages, trianLabels, save_dir, max_workers=None):

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    augmented_images = []
    augmented_labels = []


    tasks = []
    for i, (img_path, label) in enumerate(zip(trianImages, trianLabels)):
        tasks.append((img_path, i, save_dir, label))

    start_time = time.time()
    successful_originals = 0
    total_augmented = 0



    if max_workers is None:
        max_workers = min(os.cpu_count(), 8)

    with ProcessPoolExecutor(max_workers=max_workers) as executor:

        future_to_task = {executor.submit(process_single_image, task): task for task in tasks}


        for i, future in enumerate(as_completed(future_to_task)):
            results = future.result()
            if results:
                successful_originals += 1
                total_augmented += len(results)

                # Separate paths and tags
                paths, labels = zip(*results)
                augmented_images.extend(paths)
                augmented_labels.extend(labels)


    end_time = time.time()


    return augmented_images, augmented_labels

augmented_trainImages, augmented_trainLabels = augment_training_data(trianImages, trianLabels, training_folder)





In [None]:
import tensorflow as tf
from tf.keras import layers, models

IMG_HEIGHT = 28
IMG_WIDTH = 28
NUM_CLASSES = 10

def create_cnn_model(img_height=IMG_HEIGHT, img_width=IMG_WIDTH, num_classes=NUM_CLASSES):

    model = models.Sequential([

        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(img_height, img_width, 1)),
        layers.MaxPooling2D((2, 2)),


        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),


        layers.Conv2D(64, (3, 3), activation='relu'),


        layers.Flatten(),

        layers.Dense(64, activation='relu'),
        layers.Dropout(0.5),

        layers.Dense(num_classes, activation='softmax')
    ])

    return model
model = create_cnn_model()
model.summary()

In [None]:
def load_and_preprocess_image(image_path, label):
    image = tf.io.read_file(image_path)

    image = tf.image.decode_image(image, channels=1, expand_animations=False)

    image.set_shape([None, None, 1])
    image = tf.image.resize(image, [IMG_HEIGHT, IMG_WIDTH])

    image = tf.cast(image, tf.float32) / 255.0
    return image, label

def create_dataset(image_paths, labels, batch_size=32, is_training=True):

    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)

    if is_training:

        dataset = dataset.shuffle(buffer_size=len(image_paths))
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE) # 预读取数据，提升训练效率

    return dataset


BATCH_SIZE = 32

train_dataset = create_dataset(trianImages, trianLabels, BATCH_SIZE, is_training=True)
test_dataset = create_dataset(testImages, testLabels, BATCH_SIZE, is_training=False)

In [None]:

model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)



In [None]:

def create_fixed_model(input_shape=(28, 28), num_classes=10):
    model = tf.keras.models.Sequential([

        tf.keras.layers.InputLayer(input_shape=input_shape),

        tf.keras.layers.Lambda(lambda x: tf.cast(x, tf.float32)),

        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(num_classes)
    ])
    return model


model_fixed = create_fixed_model(input_shape=(28, 28), num_classes=10)
model_fixed.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
import tensorflow as tf
import numpy as np
from sklearn.preprocessing import LabelEncoder

def debug_and_fix_model():



    label_encoder = LabelEncoder()

    all_labels = trianLabels + testLabels
    label_encoder.fit(all_labels)

    # Convert labels to integers
    trianLabels_int = label_encoder.transform(trianLabels)
    testLabels_int = label_encoder.transform(testLabels)



    # Image preprocessing functions
    def load_and_preprocess_image(image_path, label):
        image = tf.io.read_file(image_path)
        image = tf.image.decode_image(image, channels=1, expand_animations=False)
        image = tf.image.resize(image, [28, 28])
        image.set_shape([28, 28, 1])
        image = tf.cast(image, tf.float32) / 255.0
        label = tf.cast(label, tf.int32)
        return image, label

    # 4. Create a dataset
    BATCH_SIZE = 32

    # Training dataset
    train_dataset = tf.data.Dataset.from_tensor_slices((trianImages, trianLabels_int))
    train_dataset = train_dataset.map(load_and_preprocess_image,
                                     num_parallel_calls=tf.data.AUTOTUNE)
    train_dataset = train_dataset.shuffle(len(trianImages))
    train_dataset = train_dataset.batch(BATCH_SIZE)
    train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

    # Test dataset
    test_dataset = tf.data.Dataset.from_tensor_slices((testImages, testLabels_int))
    test_dataset = test_dataset.map(load_and_preprocess_image,
                                   num_parallel_calls=tf.data.AUTOTUNE)
    test_dataset = test_dataset.batch(BATCH_SIZE)
    test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)

    num_classes = len(label_encoder.classes_)


    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(num_classes)
    ])


    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )


    model.summary()

    return model, train_dataset, test_dataset, label_encoder


print("Start the model training process...")
model, train_dataset, test_dataset, label_encoder = debug_and_fix_model()


EPOCHS = 10
print(f"\n=== Training begins, total {EPOCHS} epochs ===")

history = model.fit(
        train_dataset,
        epochs=EPOCHS,
        validation_data=test_dataset,
        verbose=1
)

print("Training successfully completed!")

test_loss, test_accuracy = model.evaluate(test_dataset, verbose=0)
print(f"\nmodel evaluation results:")
print(f"Test set loss: {test_loss:.4f}")
print(f"Test set accuracy: {test_accuracy:.4f}")

model.save('chinese_char_model.h5')
import pickle
with open('label_encoder.pkl', 'wb') as f:
    pickle.dump(label_encoder, f)


