In [None]:
working_on_kaggle = False

In [None]:
if working_on_kaggle:
    !pip install --quiet gdown
    !apt-get install -y fonts-noto-cjk > /dev/null

    import os
   
    from getpass import getpass
    
    token = getpass('Your GitHub token: ')
    username = "iamlucaconti"
    repo_name = "PDDLR-algorithm"
    git_url = f"https://{username}:{token}@github.com/giankev/{repo_name}.git"
    os.system(f"git clone {git_url} /kaggle/working/{repo_name}")
    %cd PDDLR-algorithm/

import sys
sys.path.append('./scr/')

# Import

In [None]:
import os
import re
import numpy as np
import pandas as pd

import cv2
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from tqdm import tqdm
import gdown
import tarfile
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torchvision import transforms
from torchvision.transforms.functional import to_pil_image


from pdlpr import PDLPR 

from trainer import train, set_seed, evaluate_model

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# Globals

In [None]:
provinces = ["皖", "沪", "津", "渝", "冀", "晋", "蒙", "辽", "吉", "黑", "苏", "浙", "京", "闽", "赣",
             "鲁", "豫", "鄂", "湘", "粤", "桂", "琼", "川", "贵", "云", "藏", "陕", "甘", "青", "宁", "新", "警", "学", "O"]

alphabets = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N',
             'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'O']

ads = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R',
       'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'O']

unique_chars = set(provinces[:-1] + alphabets[:-1] + ads[:-1])  # escludi 'O'
char_list = sorted(list(unique_chars))  # ordinamento per coerenza
char_list = ["-"] + char_list
char2idx = {char: i for i, char in enumerate(char_list)}
idx2char = {i: c for c, i in char2idx.items()}

num_classes = len(char_list)
print("Num classes: ", num_classes)

# Functions

## Decoding and encoding plates

In [None]:
def decode_plate(s):
    idx   = list(map(int, s.split("_")))
    try:
        return provinces[idx[0]] + alphabets[idx[1]] + "".join(ads[i] for i in idx[2:])
    except Exception:
        return None


def decode_ccpd_label(label_str, provinces, alphabets, ads):
    """Decodifica stringa del tipo '0_0_22_27_27_33_16' in targa es. '皖AWWX6G' """
    indices = list(map(int, label_str.strip().split('_')))
    if len(indices) != 7:
        raise ValueError("Label must contain 7 indices")

    province = provinces[indices[0]]
    alphabet = alphabets[indices[1]]
    ad_chars = [ads[i] for i in indices[2:]]

    return province + alphabet + ''.join(ad_chars)

def encode_plate(plate_str, char2idx):
    """Converte la stringa '皖AWWX6G' in lista di indici [3, 12, 30, 30, ...]"""
    return [char2idx[c] for c in plate_str]

def decode_plate_from_list(label_indices, idx2char):
    """Converte una lista di indici [3, 12, 30, ...] nella stringa '皖AWWX6G'"""
    return ''.join([idx2char[i] for i in label_indices])

def greedy_decode(logits, blank_index, idx2char):
    preds = logits.argmax(dim=2)  # (B, T)
    decoded_batch = []
    for pred in preds:
        chars = []
        prev = None
        for p in pred:
            p = p.item()
            if p != blank_index and p != prev:
                chars.append(idx2char[p])
            prev = p
        decoded_batch.append(''.join(chars))
    return decoded_batch


## Image processing

In [None]:
def split_bbox(bbox_str):
    # Split on one or more underscores
    tokens = re.split(r'_+', bbox_str)
    if len(tokens) == 4 and all(t.isdigit() for t in tokens):
        return tuple(map(int, tokens))
    return (None,) * 4


def crop_and_resize(img, x1, y1, x2, y2):
    # Controlla che il bounding box sia valido
    if x2 <= x1 or y2 <= y1:
        return None
    
    # Ritaglia
    cropped_img = img[y1:y2, x1:x2]

    # Controlla che l'immagine ritagliata non sia vuota
    if cropped_img.size == 0:
        return None

    # Resize a 48x144
    try:
        return cv2.resize(cropped_img, (144, 48))
    except Exception as e:
        return None

## Dataframe

In [None]:

def download_and_extract_dataset(url, output_path, extract_path, extracted_folder_path):
    """
    Downloads and extracts a dataset if not already present.

    Args:
        url (str): Google Drive URL of the dataset.
        output_path (str): Path where the .tar file will be saved.
        extract_path (str): Directory where the archive will be extracted.
        extracted_folder_path (str): Expected folder resulting from extraction.
    """
    
    # Download the dataset if it doesn't already exist
    if not os.path.exists(output_path):
        print("Downloading the dataset...")
        gdown.download(url, output_path, fuzzy=True, quiet=False)
    else:
        print("Dataset already exists, download skipped.")

    # Extract the dataset if the folder doesn't already exist
    if not os.path.exists(extracted_folder_path):
        print("Extracting the dataset...")
        os.makedirs(extract_path, exist_ok=True)
        with tarfile.open(output_path) as tar:
            tar.extractall(path=extract_path)
        print("Extraction completed.")
    else:
        print("Dataset folder already exists, extraction skipped.")


def create_dataframe(folder_path, char2idx):
    all_files = sorted(os.listdir(folder_path))
    jpg_files = [f for f in all_files if f.endswith('.jpg')]

    rows = []
    for fname in jpg_files:
        parts = fname[:-4].split("-")
        if len(parts) < 6:
            continue

        try:
            x1, y1, x2, y2 = split_bbox(parts[2])
            plate = decode_plate(parts[4])
            label = encode_plate(plate, char2idx)
        except Exception as e:
            print(f"Errore con file {fname}: {e}")
            continue

        rows.append({
            "image_path": os.path.join(folder_path, fname),
            "x1_bbox": x1, "y1_bbox": y1,
            "x2_bbox": x2, "y2_bbox": y2,
            "plate_number": plate,
            "label": label
        })

    return pd.DataFrame(rows)


def create_cropped_dataframe_inference(df, cropped_folder):
    """
    Crea un nuovo DataFrame con le immagini ritagliate e ridimensionate.

    Args:
        df (pd.DataFrame): DataFrame originale con bounding box e info.
        cropped_folder (str): Cartella dove salvare le immagini ritagliate.
        crop_and_resize_fn (function): Funzione che riceve (img, x1, y1, x2, y2) e restituisce l'immagine ritagliata e ridimensionata.

    Returns:
        pd.DataFrame: Nuovo DataFrame con path immagini ritagliate, plate_number e label.
    """

    os.makedirs(cropped_folder, exist_ok=True)
    cropped_rows = []

    for i, row in tqdm(df.iterrows(), total=len(df)):
        image_path = row["image_path"]
        img = cv2.imread(image_path)
        if img is None:
            print(f"Immagine non trovata o corrotta: {image_path}")
            continue

        try:
            x1 = int(float(row["x1_pred"]))
            y1 = int(float(row["y1_pred"]))
            x2 = int(float(row["x2_pred"]))
            y2 = int(float(row["y2_pred"]))
        except Exception as e:
            print(f"Errore nei bounding box per {image_path}: {e}")
            continue

        resized_img = crop_and_resize(img, x1, y1, x2, y2)
        if resized_img is None:
            print(f"Errore nel crop/resize dell'immagine: {image_path}")
            continue

        cropped_path = os.path.join(cropped_folder, f"cropped_{i}.jpg")
        cv2.imwrite(cropped_path, resized_img)

        cropped_rows.append({
            "image_path": cropped_path,
            "plate_number": row["plate_number"],
            "label": row["label"]
        })

    return pd.DataFrame(cropped_rows)

# Test phase

In [None]:
def infer_and_evaluate(model, image_tensor, target_indices, char2idx, idx2char, device='cuda'):
    model = model.to(device)
    model.eval()

    # Assumiamo batch_size = 1
    images = image_tensor.unsqueeze(0).to(device)       # (1, C, H, W)
    targets = [target_indices.to(device)]               # list of tensors
    target_lengths = torch.tensor([len(t) for t in targets], dtype=torch.long, device=device)
    targets_concat = torch.cat(targets)                 # flatten targets

    # Forward pass
    logits = model(images)                              # (1, T, C)

    # Decoding (greedy)
    blank_idx = char2idx['-']
    decoded = greedy_decode(logits, blank_idx, idx2char)

    # Prepare input for CTC loss
    log_probs = F.log_softmax(logits, dim=2).permute(1, 0, 2)  # (T, N, C)
    input_lengths = torch.full(size=(1,), fill_value=log_probs.size(0), dtype=torch.long).to(device)

    # CTC Loss
    ctc_loss_fn = nn.CTCLoss(blank=blank_idx, zero_infinity=True)
    loss = ctc_loss_fn(log_probs, targets_concat, input_lengths, target_lengths)

    # Print summary
    print(f"Predetta: {decoded[0]}")
    print(f"Target:   {decode_plate_from_list(target_indices.tolist(), idx2char)}")
    print(f"CTC Loss: {loss.item():.4f}")
    print(f"Len pred: {len(decoded[0])}, Len true: {target_lengths.item()}")

    return decoded[0], loss.item()

In [None]:
NUM_WORKERS = 0
SEED = 42
BATCH_SIZE = 128
TEST_BATCH_SIZE = 5
set_seed(SEED)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

pdlpr_checkpoint_path = "pdlpr_checkpoints/checkpoint_epoch30.pt"
yolo_checkpoint_path = ""

test_output_path = 'dataset/ccpd_test.tar'
test_extract_path = 'dataset'
test_folder_path = os.path.join(test_extract_path, 'ccpd_test')
test_cropped_folder = 'dataset/ccpd_test_cropped'

# Adatta i percorsi se stai lavorando su Kaggle
if working_on_kaggle:
    NUM_WORKERS = 2
    pdlpr_checkpoint_path = '/kaggle/working/' + pdlpr_checkpoint_path
    test_output_path = '/kaggle/working/' + test_output_path
    test_extract_path = '/kaggle/working/' + test_extract_path
    test_folder_path = os.path.join(test_extract_path, 'ccpd_test')
    test_cropped_folder = '/kaggle/working/' + test_cropped_folder

# Crea cartelle se non esistono
os.makedirs(os.path.dirname(test_output_path), exist_ok=True)
os.makedirs(test_cropped_folder, exist_ok=True)


if yolo_checkpoint_path is not None and not os.path.isfile(yolo_checkpoint_path):
    raise FileNotFoundError(f"YOLOvs checkpoint file not found: {yolo_checkpoint_path}")

if pdlpr_checkpoint_path is not None and not os.path.isfile(pdlpr_checkpoint_path):
    raise FileNotFoundError(f"PDLPR checkpoint file not found: {pdlpr_checkpoint_path}")


# Dataset

## Download and extraction 

In [None]:
# URL_TEST di download
file_id_test = '1PnYtN0P6m36LmjztvhVmVLqZwZAp9Q3X'
url_test = f'https://drive.google.com/uc?id={file_id_test}'

download_and_extract_dataset(url_test, test_output_path, test_extract_path, test_folder_path)

if os.path.exists(test_folder_path):
    subfolders = [name for name in os.listdir(test_folder_path)
                  if os.path.isdir(os.path.join(test_folder_path, name))]
    subfolders = sorted(subfolders)
    print(f"Subfolders in '{test_folder_path}':")
    # for folder in subfolders:
    #     print(f"- {folder}")
else:
    print(f"The folder '{test_folder_path}' does not exist.")

## DataLoader

In [None]:
# Solo normalizzazione
test_transform = transforms.Compose([
    transforms.Resize((48, 144)),
    transforms.ToTensor(),
])

# Dataset personalizzato
class PlateDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row["image_path"]).convert("RGB")
        
        if self.transform:
            image = self.transform(image)
        
        label = row["label"]  # list
        label_tensor = torch.tensor(label, dtype=torch.long)
        return image, label_tensor

In [None]:
# TODO: carica il modello YOLO



# TODO: carica il modello pdlpr. 
pdlpr_model = PDLPR().to(device)
pdlpr_model.eval()  

if pdlpr_checkpoint_path and os.path.isfile(pdlpr_checkpoint_path):
    checkpoint = torch.load(pdlpr_checkpoint_path, map_location=device)
    pdlpr_model.load_state_dict(checkpoint['weights'])
    print(f"Checkpoint caricato da {pdlpr_checkpoint_path}")
else:
    raise FileNotFoundError(f"Checkpoint non trovato: {pdlpr_checkpoint_path}")


for subfolder in subfolders:
    print(f"\nEvaluation on CCPD_{subfolder}")
    subfolder_path = os.path.join(test_folder_path, subfolder)
    sub_df = create_dataframe(subfolder_path, char2idx)

    # TODO: preprocessing immagini per YOLO 
    
    # TODO: fare prediction con YOLO

    # TODO. salvare i bounding box come colonne di sub_df. Le colonne si devono chiamare x1_pred, y1_pred, x2_pred, y2_pred 


    # Creazione delle immagini croppate
    cropped_subfolder =  os.path.join(test_cropped_folder, subfolder)
    os.makedirs(cropped_subfolder, exist_ok=True)
    # Creazione delle dataset con le immagini croppate
    cropped_sub_df = create_cropped_dataframe_inference(sub_df, cropped_subfolder)
    # Creazione del dataloader per PDLPR model
    test_dataset = PlateDataset(cropped_sub_df, transform=test_transform)
    test_loader = DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
    
    test_loss, test_char_acc, test_seq_acc = evaluate_model(pdlpr_model, test_loader, char2idx, "cuda")