# CIFAR-10 Efficient Pipeline for CAPTCHA (Optimized)

Este notebook implementa um pipeline otimizado usando:

- Teacher: EfficientNetB0 pre-trained on ImageNet
- Student: MobileNetV2 (alpha=0.5)
- Aumentações avançadas: RandomFlip, Rotation, Zoom, Cutout, Mixup
- Mixed precision (quando disponível)
- AdamW optimizer, label smoothing, cosine LR with warmup
- Knowledge distillation (teacher -> student)
- Pruning and post-training quantization (optional)
- TTA e avaliação completa (accuracy, precision/recall/F1, confusion matrix)

**Configuração escolhida:** TARGET_SIZE=96, STUDENT_ALPHA=0.5, Export pipeline with pruning+quantization enabled.

---

In [None]:
# Setup e imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow_datasets as tfds
import os
print('TensorFlow version:', tf.__version__)

from google.colab import drive
drive.mount('/content/drive')

# Exemplo: Salve os checkpoints em um caminho do Drive
checkpoint_filepath = '/content/drive/MyDrive/meu_projeto/best_teacher.h5'
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(checkpoint_filepath, monitor='val_accuracy', save_best_only=True, verbose=1),
    # ... outros callbacks
]

TensorFlow version: 2.19.0
Mounted at /content/drive


In [None]:
# Mixed precision (will speed up on supported GPUs)
try:
    from tensorflow.keras import mixed_precision
    # Ativar mixed_float16 para treinamento mais rápido na GPU
    mixed_precision.set_global_policy('mixed_float16')
    print('Mixed precision enabled: mixed_float16')
except Exception as e:
    print('Mixed precision not enabled:', e)


Mixed precision enabled: mixed_float16


In [None]:
# Configuration
TARGET_SIZE = 96
BATCH_SIZE = 128
STUDENT_ALPHA = 0.5
EPOCHS_TEACHER = 10   # Reduzido para um exemplo rápido. Ajuste para um valor maior (ex: 30-50).
EPOCHS_STUDENT = 15   # Reduzido para um exemplo rápido. Ajuste para um valor maior (ex: 80-100).
USE_PRUNING = True
USE_QUANTIZATION = True
AUTOTUNE = tf.data.AUTOTUNE

In [None]:
# Data pipeline with augmentations
NUM_CLASSES = 10

def preprocess_train(image, label):
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.resize(image, [TARGET_SIZE, TARGET_SIZE])
    return image, tf.one_hot(label, NUM_CLASSES)

def preprocess_eval(image, label):
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.resize(image, [TARGET_SIZE, TARGET_SIZE])
    return image, tf.one_hot(label, NUM_CLASSES)

# Augmentation layers (Keras preprocessing)
data_augmentation = keras.Sequential([
    layers.RandomFlip('horizontal'),
    layers.RandomRotation(0.08),
    layers.RandomZoom(0.08),
])

def cutout(img, size=16):
    # O código original está aqui...
    img = tf.identity(img)
    h = tf.shape(img)[0]
    w = tf.shape(img)[1]
    y = tf.random.uniform([], 0, h, dtype=tf.int32)
    x = tf.random.uniform([], 0, w, dtype=tf.int32)
    y1 = tf.clip_by_value(y - size//2, 0, h)
    y2 = tf.clip_by_value(y + size//2, 0, h)
    x1 = tf.clip_by_value(x - size//2, 0, w)
    x2 = tf.clip_by_value(x + size//2, 0, w)

    # Slices (part1, part2_left, etc.) herdam o dtype=float16
    part1 = img[:y1]
    part2_left = img[y1:y2, :x1]
    part2_right = img[y1:y2, x2:]

    # CORREÇÃO CRÍTICA: Definir o dtype do tensor de zeros para o mesmo dtype da imagem
    zero_block = tf.zeros([y2-y1, x2-x1, 3], dtype=part2_left.dtype)

    # Usar o zero_block corrigido
    part2 = tf.concat([part2_left, zero_block, part2_right], axis=1)

    part3 = img[y2:]
    return tf.concat([part1, part2, part3], axis=0)

def mixup(batch_images, batch_labels, alpha=0.2):
    if alpha <= 0:
        return batch_images, batch_labels
    lam = np.random.beta(alpha, alpha)
    batch_size = tf.shape(batch_images)[0]
    index = tf.random.shuffle(tf.range(batch_size))
    mixed_images = lam * batch_images + (1 - lam) * tf.gather(batch_images, index)
    mixed_labels = lam * batch_labels + (1 - lam) * tf.gather(batch_labels, index)
    return mixed_images, mixed_labels

def prepare_datasets(batch_size=BATCH_SIZE):
    (ds_train, ds_test), ds_info = tfds.load('cifar10', split=['train', 'test'], as_supervised=True, with_info=True)

    train_ds_aug = ds_train.map(preprocess_train, num_parallel_calls=AUTOTUNE)
    train_ds_aug = train_ds_aug.map(lambda x,y: (data_augmentation(x, training=True), y), num_parallel_calls=AUTOTUNE)
    train_ds_aug = train_ds_aug.map(lambda x,y: (cutout(x, size=16), y), num_parallel_calls=AUTOTUNE)

    train_ds = train_ds_aug.shuffle(50000).batch(batch_size).map(lambda x, y: mixup(x, y), num_parallel_calls=AUTOTUNE).prefetch(AUTOTUNE)

    test_ds = ds_test.map(preprocess_eval, num_parallel_calls=AUTOTUNE).batch(batch_size).prefetch(AUTOTUNE)
    return train_ds, test_ds

train_ds, test_ds = prepare_datasets()
print('Train batches:', tf.data.experimental.cardinality(train_ds).numpy())
print('Test batches:', tf.data.experimental.cardinality(test_ds).numpy())




Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/cifar10/3.0.2...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/2 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.BUX681_3.0.2/cifar10-train.tfrecord*...:   0%|         …

Generating test examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.BUX681_3.0.2/cifar10-test.tfrecord*...:   0%|          …

Dataset cifar10 downloaded and prepared to /root/tensorflow_datasets/cifar10/3.0.2. Subsequent calls will reuse this data.
Train batches: 391
Test batches: 79


In [None]:
# Build teacher: EfficientNetB0 pretrained on ImageNet
def build_teacher(input_shape=(TARGET_SIZE, TARGET_SIZE, 3), num_classes=NUM_CLASSES):
    base = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet', input_shape=input_shape)
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.3)(x)
    # For mixed_precision, the final layer must be float32
    outputs = layers.Dense(num_classes, activation='softmax', dtype='float32')(x)
    model = models.Model(inputs=base.input, outputs=outputs, name='teacher_effnetb0')
    return model

# Build student: MobileNetV2 with alpha
def build_student(input_shape=(TARGET_SIZE, TARGET_SIZE, 3), num_classes=NUM_CLASSES, alpha=STUDENT_ALPHA):
    base = tf.keras.applications.MobileNetV2(include_top=False, weights=None, input_shape=input_shape, alpha=alpha)
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.3)(x)
    # For mixed_precision, the final layer must be float32
    outputs = layers.Dense(num_classes, activation='softmax', dtype='float32')(x)
    model = models.Model(inputs=base.input, outputs=outputs, name='student_mobilenetv2')
    return model

teacher = build_teacher()
student = build_student()
teacher.summary()
student.summary()


Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
[1m16705208/16705208[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [None]:
# Optimizer e Callbacks para o Teacher
try:
    # AdamW (TF >=2.11 experimental API)
    opt_teacher = tf.keras.optimizers.experimental.AdamW(learning_rate=1e-4, weight_decay=1e-4)
except Exception:
    # Fallback para Adam
    opt_teacher = tf.keras.optimizers.Adam(1e-4)

teacher.compile(optimizer=opt_teacher,
                loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0),
                metrics=['accuracy'])

# Callbacks
callbacks = [
    tf.keras.callbacks.ModelCheckpoint('best_teacher.h5', monitor='val_accuracy', save_best_only=True, verbose=1),
    tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-6)
]

print('Teacher compilado.')

# Carregar o peso salvo antes de iniciar o novo treinamento
try:
    teacher.load_weights(checkpoint_filepath)
    print("Pesos anteriores carregados com sucesso. Retomando o treinamento.")
except:
    print("Nenhum peso encontrado, iniciando do zero.")

Teacher compilado.
Nenhum peso encontrado, iniciando do zero.


In [None]:
# Treinamento do Teacher (CRUCIAL!)
print(f'Iniciando treinamento do Teacher por {EPOCHS_TEACHER} épocas...')

history_teacher = teacher.fit(
    train_ds,
    validation_data=test_ds,
    epochs=EPOCHS_TEACHER,
    callbacks=callbacks
)

# Carrega o melhor peso para a destilação
teacher.load_weights('best_teacher.h5')
print('Teacher treinado e melhor peso carregado.')

Iniciando treinamento do Teacher por 10 épocas...
Epoch 1/10
[1m126/391[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m8:15:39[0m 112s/step - accuracy: 0.1691 - loss: 2.3628

In [None]:
# Implementação da Destilação de Conhecimento
class Distiller(keras.Model):
    def __init__(self, student, teacher, temperature=4.0, alpha=0.5):
        super(Distiller, self).__init__()
        self.student = student
        self.teacher = teacher
        self.temperature = temperature
        self.alpha = alpha

    def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn):
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn

    def train_step(self, data):
        x, y = data
        teacher_preds = self.teacher(x, training=False)
        with tf.GradientTape() as tape:
            student_preds = self.student(x, training=True)
            s_loss = self.student_loss_fn(y, student_preds)
            t_soft = tf.nn.softmax(teacher_preds / self.temperature, axis=1)
            s_soft = tf.nn.softmax(student_preds / self.temperature, axis=1)
            d_loss = self.distillation_loss_fn(t_soft, s_soft)
            loss = self.alpha * s_loss + (self.temperature**2) * (1.0 - self.alpha) * d_loss
        grads = tape.gradient(loss, self.student.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.student.trainable_variables))
        self.compiled_metrics.update_state(y, student_preds)

        return {m.name: m.result() for m in self.metrics} | {'student_loss': s_loss, 'distillation_loss': d_loss, 'loss': loss}

    def test_step(self, data):
        x, y = data
        y_pred = self.student(x, training=False)
        t_loss = self.student_loss_fn(y, y_pred)
        self.compiled_metrics.update_state(y, y_pred)
        return {m.name: m.result() for m in self.metrics}

# Prepare distiller e compile
try:
    opt_student = tf.keras.optimizers.experimental.AdamW(learning_rate=1e-3, weight_decay=1e-4)
except Exception:
    opt_student = tf.keras.optimizers.Adam(1e-3)

student_loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1)
distill_loss = tf.keras.losses.KLDivergence()

distiller = Distiller(student=student, teacher=teacher, temperature=4.0, alpha=0.5)
distiller.compile(optimizer=opt_student, metrics=[tf.keras.metrics.CategoricalAccuracy()],
                  student_loss_fn=student_loss, distillation_loss_fn=distill_loss)

print('Distiller compilado.')


In [None]:
# Treinamento do Student via Destilação (CRUCIAL!)
print(f'Iniciando treinamento do Student (Distillation) por {EPOCHS_STUDENT} épocas...')

history_distill = distiller.fit(
    train_ds,
    validation_data=test_ds,
    epochs=EPOCHS_STUDENT,
    callbacks=callbacks
)

print('Student treinado via Distillation.')


In [None]:
# Pruning (optional) - Para retreino com esparsidade
if USE_PRUNING:
    try:
        import tensorflow_model_optimization as tfmot
        pruning_params = {
            'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.0,
                                                                     final_sparsity=0.5,
                                                                     begin_step=200,
                                                                     end_step=2000)
        }
        student_for_prune = tfmot.sparsity.keras.prune_low_magnitude(student, **pruning_params)

        # O modelo deve ser RE-COMPILADO após o wrapper de pruning
        student_for_prune.compile(optimizer=opt_student, loss=student_loss, metrics=['accuracy'])

        pruning_callbacks = [
            tfmot.sparsity.keras.UpdatePruningStep(),
            tfmot.sparsity.keras.PruningSummaries() # Opcional
        ]

        print('Pruning model preparado (student_for_prune).')
        print('Para treinar com pruning, use student_for_prune.fit(...) e inclua pruning_callbacks.')
    except Exception as e:
        print('Pruning not available:', e)
else:
    print('Pruning disabled by config.')

# Post-training quantization helper
def convert_to_tflite(model, quantize=False, filename='student.tflite', representative_dataset=None):
    # tf.lite é acessível devido ao import global do TensorFlow
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    if quantize:
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        if representative_dataset is not None:
            converter.representative_dataset = representative_dataset
            # Para Full Integer Quantization, descomente as linhas abaixo:
            # converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
            # converter.inference_input_type = tf.int8
            # converter.inference_output_type = tf.int8

    tflite_model = converter.convert()
    with open(filename, 'wb') as f:
        f.write(tflite_model)
    print('Saved TFLite model to', filename)


In [None]:
# Evaluation utilities (run after training student)
def evaluate_student(model, test_dataset):
    print('\n--- Avaliação Final do Student ---')
    loss, acc = model.evaluate(test_dataset, verbose=2)
    print(f'Test accuracy: {acc:.4f}, Test loss: {loss:.4f}')

    # collect y_true, y_pred
    y_true = np.concatenate([y.numpy() for x, y in test_dataset], axis=0)
    y_true = np.argmax(y_true, axis=1) # Converte one-hot para índices
    y_pred_probs = model.predict(test_dataset)
    y_pred = np.argmax(y_pred_probs, axis=1)

    print('\nClassification report (Métricas de Eficiência/Precisão):\n')
    print(classification_report(y_true, y_pred, digits=4))

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10,8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Matriz de Confusão - Student Model (CIFAR-10)')
    plt.xlabel('Predito')
    plt.ylabel('Real')
    plt.show()
    return y_true, y_pred, cm

# Test-time augmentation (TTA) example
def tta_predict(model, image_batch, n_aug=8):
    preds = []
    for _ in range(n_aug):
        aug = data_augmentation(image_batch, training=True)
        pred = model.predict(aug)
        preds.append(pred)
    return np.mean(np.stack(preds, axis=0), axis=0)

print('Evaluation utilities ready.')


In [None]:
# Save the student model (save the student, not the Distiller wrapper)
def save_and_export(model, save_dir='saved_student_model', quantize=USE_QUANTIZATION):
    os.makedirs(save_dir, exist_ok=True)

    model.save(os.path.join(save_dir, 'student_keras.h5'), include_optimizer=False)
    print('Saved Keras model to', save_dir)

    # Representative dataset generator for quantization
    def rep_gen():
        for images, labels in train_ds.take(100):
            yield [images]

    # Export float TFLite (Baseline)
    convert_to_tflite(model, quantize=False, filename=os.path.join(save_dir, 'student_float.tflite'))

    # Export quantized TFLite (para máxima eficiência de deployment)
    if quantize:
        try:
            convert_to_tflite(model, quantize=True, filename=os.path.join(save_dir, 'student_quant.tflite'), representative_dataset=rep_gen)
            print('Exportação Quantizada Concluída. Arquivo student_quant.tflite é o modelo otimizado.')
        except Exception as e:
            print('Quantized conversion failed:', e)

print('Save/export utilities ready.')


In [None]:
# --- EXECUÇÃO FINAL: AVALIAÇÃO E EXPORTAÇÃO ---

# 1. Avalia o modelo treinado e exibe as métricas de eficiência (Accuracy, Precision, Recall, F1)
evaluate_student(student, test_ds)

# 2. Exporta os modelos para TFLite (quantizado e float) para otimização de deployment
save_and_export(student, save_dir='cpt_student_artifacts')


## Notas finais

- **Métricas de Eficiência**: Os resultados de `accuracy`, `precision`, `recall` e `f1-score` no `Classification report` fornecem as métricas de precisão. O `student_quant.tflite` é o resultado da otimização de **eficiência** de *deployment* (menor tamanho/inferência mais rápida).
- **Treinamento**: Se desejar maior precisão, aumente `EPOCHS_TEACHER` e `EPOCHS_STUDENT`.
- **CAPTCHAs**: Para aplicar isso a um CAPTCHA real, você precisará adaptar as funções de pré-processamento e o formato de saída do modelo, se o seu CAPTCHA tiver vários caracteres ou for baseado em sequências.