In [None]:

# # Meter Reading Recognition Model Training
# 
# This notebook provides an interactive way to train recognition models for meter reading using PaddleOCR.

# ## 1. Environment Setup

In [None]:

import os
import sys
import glob
import subprocess
import time
import random
import matplotlib.pyplot as plt
from IPython.display import display, Image, clear_output
import numpy as np

# Ensure we're in the PaddleOCR root directory
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname("__file__"), "../.."))
os.chdir(ROOT_DIR)
print(f"Working directory: {os.getcwd()}")

# Check if key directories exist
assert os.path.exists('ppocr'), "Not in PaddleOCR root directory!"
assert os.path.exists('tools'), "PaddleOCR tools directory not found!"

# ## 2. Dataset Selection

In [None]:

# Default dataset paths (can be changed)
REC_DATASET_DIR = "dataset/rec_dataset_1"
DET_DATASET_DIR = "dataset/det_dataset_1"
TRAIN_DATA_DIR = "train_data/meter_detection"

# Cell for selecting dataset (edit these variables)
rec_dataset_dir = REC_DATASET_DIR  # Change this to your recognition dataset directory
det_dataset_dir = DET_DATASET_DIR  # Change this to your detection dataset directory 
train_data_dir = TRAIN_DATA_DIR    # Change this to your train data directory

# Create necessary directories
os.makedirs(rec_dataset_dir, exist_ok=True)
os.makedirs(os.path.join(rec_dataset_dir, "images"), exist_ok=True)

# Check if recognition dataset exists
if not os.path.exists(os.path.join(rec_dataset_dir, "train_list.txt")) or not os.path.exists(os.path.join(rec_dataset_dir, "test_list.txt")):
    print(f"Recognition dataset not found in {rec_dataset_dir}.")
    print("You'll need to create it by extracting text regions from detection results.")
else:
    # Count image files and labels
    train_labels = os.path.join(rec_dataset_dir, "train_list.txt")
    test_labels = os.path.join(rec_dataset_dir, "test_list.txt")
    
    train_count = 0
    test_count = 0
    
    if os.path.exists(train_labels):
        with open(train_labels, 'r') as f:
            train_count = len(f.readlines())
    
    if os.path.exists(test_labels):
        with open(test_labels, 'r') as f:
            test_count = len(f.readlines())
    
    images_count = len(glob.glob(os.path.join(rec_dataset_dir, "images/*.jpg")))
    
    print(f"Recognition dataset statistics:")
    print(f"  - Training samples: {train_count}")
    print(f"  - Test samples: {test_count}")
    print(f"  - Image files: {images_count}")
    
    # Check if placeholder labels are present
    if train_count > 0:
        with open(train_labels, 'r') as f:
            first_line = f.readline().strip()
            if "###" in first_line:
                print("\nWARNING: Dataset contains placeholder labels ('###').")
                print("You need to replace them with actual text labels before training.")

# ## 3. Data Preparation

In [None]:

# Extract recognition dataset from detection results if needed
def extract_recognition_dataset():
    """Extract recognition dataset from detection results"""
    if not os.path.exists(det_dataset_dir):
        print(f"Error: Detection dataset not found at {det_dataset_dir}")
        return False
    
    if not os.path.exists(os.path.join(train_data_dir, "train_label.txt")) or not os.path.exists(os.path.join(train_data_dir, "test_label.txt")):
        print(f"Error: Detection labels not found in {train_data_dir}")
        return False
    
    extract_cmd = f"""python model_training/rec_train/scripts/extract_meter_readings.py \
        --det_data_dir={det_dataset_dir} \
        --rec_output_dir={rec_dataset_dir} \
        --train_data={os.path.join(train_data_dir, "train_label.txt")} \
        --test_data={os.path.join(train_data_dir, "test_label.txt")} \
        --margin=5"""
    
    print(f"Extraction command: {extract_cmd}")
    
    # Uncomment to actually run the extraction
    # result = subprocess.run(extract_cmd, shell=True)
    # return result.returncode == 0
    
    print("Extraction command is ready. Uncomment to execute.")
    return False

# Check if dataset needs to be extracted
if not os.path.exists(os.path.join(rec_dataset_dir, "train_list.txt")) or not os.path.exists(os.path.join(rec_dataset_dir, "test_list.txt")):
    print("Recognition dataset not found. Preparing extraction command.")
    extracted = extract_recognition_dataset()
    if extracted:
        print("Recognition dataset extraction completed successfully.")
    else:
        print("Dataset extraction prepared but not executed. Uncomment to run.")
else:
    print("Recognition dataset already exists. Skipping extraction.")

# ## 4. Visualize Dataset

In [None]:

# Visualize recognition dataset
def visualize_recognition_dataset(dataset_dir, max_samples=5):
    """Visualize samples from the recognition dataset"""
    train_list = os.path.join(dataset_dir, "train_list.txt")
    
    if not os.path.exists(train_list):
        print(f"Training list not found: {train_list}")
        return
    
    with open(train_list, 'r') as f:
        samples = f.readlines()
    
    # Randomly select samples to visualize
    if len(samples) > max_samples:
        samples = random.sample(samples, max_samples)
    
    print(f"Visualizing {len(samples)} samples from recognition dataset:")
    
    for i, sample in enumerate(samples):
        parts = sample.strip().split('\t')
        if len(parts) >= 2:
            img_path = parts[0]
            label = parts[1]
            
            full_img_path = os.path.join(dataset_dir, "images", img_path)
            if os.path.exists(full_img_path):
                print(f"Sample {i+1}: {img_path}, Label: {label}")
                display(Image(filename=full_img_path))
            else:
                print(f"Image not found: {full_img_path}")

# Uncomment to visualize dataset
# visualize_recognition_dataset(rec_dataset_dir)

# ## 5. Character Dictionary

In [None]:

# Check character dictionary
dict_path = "model_training/rec_train/meter_dict.txt"

if not os.path.exists(dict_path):
    print(f"Character dictionary not found: {dict_path}")
    print("Creating a default dictionary with digits and common symbols...")
    
    # Create default dictionary
    default_chars = "0123456789.,-/:"
    os.makedirs(os.path.dirname(dict_path), exist_ok=True)
    
    with open(dict_path, 'w') as f:
        for char in default_chars:
            f.write(f"{char}\n")
    
    print(f"Created default dictionary with {len(default_chars)} characters: {default_chars}")
else:
    # Read existing dictionary
    with open(dict_path, 'r') as f:
        chars = [line.strip() for line in f if line.strip()]
    
    print(f"Character dictionary found with {len(chars)} characters:")
    print(''.join(chars))

# ## 6. Training Configuration

In [None]:

# Training configuration
# You can modify these parameters
config = {
    # Basic parameters
    "mode": "standard",  # "standard" or "distillation"
    "gpu_ids": "0",      # Comma-separated GPU IDs
    "max_epochs": 500,   # Maximum epochs
    
    # Dataset parameters
    "rec_dataset_dir": rec_dataset_dir,
    
    # Learning parameters
    "learning_rate": 0.001,
    "batch_size": 64,
    "save_epoch_step": 5,
    "eval_batch_step": 500,
    
    # Model parameters
    "pretrained_model": "pretrain_models/en_PP-OCRv3_rec_train/best_accuracy",
    "dict_path": dict_path,
    "model_save_dir": "./model_training/rec_train/output/meter_rec",  # For standard mode
}

# Update parameters based on mode
if config["mode"] == "distillation":
    config["model_save_dir"] = "./model_training/rec_train/output/meter_rec_distillation"

# Print configuration
print("Training Configuration:")
for key, value in config.items():
    print(f"  {key}: {value}")

# ## 7. Download Pretrained Model (if needed)

In [None]:

# Check and download pretrained model if needed
def check_download_pretrained(pretrained_path):
    """Check if pretrained model exists, download if needed"""
    if os.path.exists(pretrained_path) or os.path.exists(f"{pretrained_path}.pdparams"):
        print(f"Pretrained model found at {pretrained_path}")
        return True
    
    # For built-in pretrained models, download them
    if pretrained_path.startswith("pretrain_models/"):
        model_name = pretrained_path.split("/")[1]
        print(f"Downloading pretrained model: {model_name}")
        
        # Different download URLs based on model type
        if "en_PP-OCRv3_rec_train" in model_name:
            url = "https://paddleocr.bj.bcebos.com/PP-OCRv3/english/en_PP-OCRv3_rec_train.tar"
        else:
            print(f"Unknown pretrained model: {model_name}")
            return False
        
        # Create directory and download
        os.makedirs("pretrain_models", exist_ok=True)
        download_cmd = f"wget {url} -P pretrain_models/"
        extract_cmd = f"tar -xf pretrain_models/{os.path.basename(url)} -C pretrain_models/"
        
        print(f"Running: {download_cmd}")
        subprocess.run(download_cmd, shell=True)
        
        print(f"Running: {extract_cmd}")
        subprocess.run(extract_cmd, shell=True)
        
        return os.path.exists(pretrained_path) or os.path.exists(f"{pretrained_path}.pdparams")
    
    return False

# Check pretrained model
if not check_download_pretrained(config["pretrained_model"]):
    print(f"Warning: Pretrained model {config['pretrained_model']} not found and couldn't be downloaded.")
    print("You may need to download it manually or specify a different pretrained model.")

# ## 8. Prepare Configuration Files

In [None]:

# Prepare config files for training
def prepare_config_files():
    """Copy and prepare configuration files"""
    # Create config directory
    config_dir = "configs/rec/meter"
    os.makedirs(config_dir, exist_ok=True)
    
    # Copy standard and distillation config files
    config_files = {
        "standard": "model_training/rec_train/configs/meter_PP-OCRv3_rec.yml",
        "distillation": "model_training/rec_train/configs/meter_PP-OCRv3_rec_distillation.yml"
    }
    
    for mode, source in config_files.items():
        if os.path.exists(source):
            target = os.path.join(config_dir, os.path.basename(source))
            copy_cmd = f"cp {source} {target}"
            print(f"Copying config {source} to {target}")
            subprocess.run(copy_cmd, shell=True)
        else:
            print(f"Config file not found: {source}")
    
    # Copy character dictionary to utils directory
    dict_target = "ppocr/utils/meter_dict.txt"
    os.makedirs(os.path.dirname(dict_target), exist_ok=True)
    if os.path.exists(config["dict_path"]):
        copy_cmd = f"cp {config['dict_path']} {dict_target}"
        print(f"Copying dictionary to {dict_target}")
        subprocess.run(copy_cmd, shell=True)

# Prepare config files
prepare_config_files()

# ## 9. Start Training

In [None]:

# Generate training command
def generate_training_cmd(config):
    """Generate the training command based on configuration"""
    if config["mode"] == "standard":
        cmd = f"""python -m paddle.distributed.launch --gpus='{config["gpu_ids"]}' tools/train.py \
            -c configs/rec/meter/meter_PP-OCRv3_rec.yml \
            -o Global.pretrained_model={config["pretrained_model"]} \
            Global.character_dict_path={config["dict_path"]} \
            Global.save_model_dir={config["model_save_dir"]} \
            Train.dataset.data_dir={config["rec_dataset_dir"]} \
            Train.dataset.label_file_list=['{os.path.join(config["rec_dataset_dir"], "train_list.txt")}'] \
            Eval.dataset.data_dir={config["rec_dataset_dir"]} \
            Eval.dataset.label_file_list=['{os.path.join(config["rec_dataset_dir"], "test_list.txt")}'] \
            Train.loader.batch_size_per_card={config["batch_size"]} \
            Optimizer.lr.values=[{config["learning_rate"]},{config["learning_rate"]/10}] \
            Global.epoch_num={config["max_epochs"]} \
            Global.save_epoch_step={config["save_epoch_step"]} \
            Global.eval_batch_step=[0, {config["eval_batch_step"]}]"""
    else:  # distillation mode
        cmd = f"""python -m paddle.distributed.launch --gpus='{config["gpu_ids"]}' tools/train.py \
            -c configs/rec/meter/meter_PP-OCRv3_rec_distillation.yml \
            -o Architecture.Models.Teacher.pretrained={config["pretrained_model"]} \
            Architecture.Models.Student.pretrained={config["pretrained_model"]} \
            Global.character_dict_path={config["dict_path"]} \
            Global.save_model_dir={config["model_save_dir"]} \
            Train.dataset.data_dir={config["rec_dataset_dir"]} \
            Train.dataset.label_file_list=['{os.path.join(config["rec_dataset_dir"], "train_list.txt")}'] \
            Eval.dataset.data_dir={config["rec_dataset_dir"]} \
            Eval.dataset.label_file_list=['{os.path.join(config["rec_dataset_dir"], "test_list.txt")}'] \
            Train.loader.batch_size_per_card={config["batch_size"]} \
            Optimizer.lr.values=[{config["learning_rate"]},{config["learning_rate"]/10}] \
            Global.epoch_num={config["max_epochs"]} \
            Global.save_epoch_step={config["save_epoch_step"]} \
            Global.eval_batch_step=[0, {config["eval_batch_step"]}]"""
    return cmd

training_cmd = generate_training_cmd(config)
print("Generated training command:")
print(training_cmd)

# Uncomment the following to execute training
# def run_with_output(cmd):
#     """Run a command and display output in real-time"""
#     process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
#     for line in iter(process.stdout.readline, ''):
#         print(line, end='')
#     return process.wait()
# 
# print("Starting training...")
# run_with_output(training_cmd)

# ## 10. Evaluation

In [None]:

# Evaluation after training
def generate_eval_cmd(config):
    """Generate evaluation command"""
    model_dir = config["model_save_dir"]
    eval_cmd = f"""python tools/eval.py \
        -c configs/rec/meter/meter_PP-OCRv3_rec.yml \
        -o Global.checkpoints={model_dir}/best_accuracy \
        Global.character_dict_path={config["dict_path"]} \
        Eval.dataset.data_dir={config["rec_dataset_dir"]} \
        Eval.dataset.label_file_list=['{os.path.join(config["rec_dataset_dir"], "test_list.txt")}']"""
    return eval_cmd

eval_cmd = generate_eval_cmd(config)
print("Evaluation command:")
print(eval_cmd)

# Uncomment to run evaluation
# print("Running evaluation...")
# run_with_output(eval_cmd)

# ## 11. Inference Example

In [None]:

# Run inference on a sample image
def generate_infer_cmd(config, sample_image):
    """Generate inference command for a sample image"""
    model_dir = config["model_save_dir"]
    infer_cmd = f"""python tools/infer_rec.py \
        -c configs/rec/meter/meter_PP-OCRv3_rec.yml \
        -o Global.infer_img={sample_image} \
        Global.checkpoints={model_dir}/best_accuracy \
        Global.character_dict_path={config["dict_path"]}"""
    return infer_cmd

# Choose a sample image (modify path as needed)
sample_image = os.path.join(rec_dataset_dir, "images/sample.jpg")
if not os.path.exists(sample_image):
    # Try to find any image
    sample_images = glob.glob(os.path.join(rec_dataset_dir, "images/*.jpg"))
    if sample_images:
        sample_image = sample_images[0]
        print(f"Using sample image: {sample_image}")
    else:
        print("No sample images found for inference.")
        sample_image = None

if sample_image:
    infer_cmd = generate_infer_cmd(config, sample_image)
    print("Inference command:")
    print(infer_cmd)
    
    # Uncomment to run inference
    # print("Running inference...")
    # run_with_output(infer_cmd)
    # print("Inference result should be displayed above.")

# ## 12. Export Model for Deployment

In [None]:

# Export the model for deployment
def generate_export_cmd(config):
    """Generate model export command"""
    model_dir = config["model_save_dir"]
    export_dir = os.path.join(model_dir, "inference")
    export_cmd = f"""python tools/export_model.py \
        -c configs/rec/meter/meter_PP-OCRv3_rec.yml \
        -o Global.pretrained_model={model_dir}/best_accuracy \
        Global.save_inference_dir={export_dir} \
        Global.character_dict_path={config["dict_path"]}"""
    return export_cmd

export_cmd = generate_export_cmd(config)
print("Export command:")
print(export_cmd)

# Uncomment to export model
# print("Exporting model...")
# run_with_output(export_cmd) 