In [1]:
import tensorflow as tf

from tensorflow.keras.datasets import mnist

from tensorflow.keras.utils import to_categorical

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense



# Carregar os dados

(x_train, y_train), (x_test, y_test) = mnist.load_data()



# Pr√©-processamento

x_train = x_train.reshape(-1, 28, 28, 1).astype("float32") / 255.0

x_test  = x_test.reshape(-1, 28, 28, 1).astype("float32") / 255.0

y_train = to_categorical(y_train, 10)

y_test  = to_categorical(y_test, 10)



# Definir a CNN

model = Sequential([

    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),

    MaxPooling2D(pool_size=(2, 2)),

    Conv2D(64, kernel_size=(3, 3), activation='relu'),

    MaxPooling2D(pool_size=(2, 2)),

    Flatten(),

    Dense(128, activation='relu'),

    Dense(10, activation='softmax')

])



# Compilar o modelo

model.compile(optimizer='adam',

              loss='categorical_crossentropy',

              metrics=['accuracy'])



# Treinar o modelo

model.fit(x_train, y_train,

          epochs=5,

          batch_size=128,

          validation_split=0.2)



# Avaliar no conjunto de teste

test_loss, test_accuracy = model.evaluate(x_test, y_test)

print(f"Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}")

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m35s[0m 3us/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/5
[1m375/375[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m17s[0m 36ms/step - accuracy: 0.9273 - loss: 0.2430 - val_accuracy: 0.9796 - val_loss: 0.0709
Epoch 2/5
[1m375/375[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m13s[0m 34ms/step - accuracy: 0.9797 - loss: 0.0650 - val_accuracy: 0.9853 - val_loss: 0.0507
Epoch 3/5
[1m375/375[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m13s[0m 34ms/step - accuracy: 0.9862 - loss: 0.0447 - val_accuracy: 0.9868 - val_loss: 0.0467
Epoch 4/5
[1m375/375[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m12s[0m 31ms/step - accuracy: 0.9893 - loss: 0.0348 - val_accuracy: 0.9880 - val_loss: 0.0422
Epoch 5/5
[1m375/375[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m11s[0m 29ms/step - accuracy: 0.9913 - loss: 0.0278 - val_accuracy: 0.9873 - va

### Config (dimens√µes, batch, classes)

In [1]:
import os, csv, json, math, random, pathlib
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# ==== CONFIG ====
H, W = 180, 180
IMAGE_SIZE = (H, W)
BATCH = 64
NUM_CLASSES = 2
CLASS_NAMES = ("Cat", "Dog")   # nomes como est√£o nas pastas
CLASS_MAP = {name: i for i, name in enumerate(CLASS_NAMES)}
INV_CLASS_MAP = {v:k for k,v in CLASS_MAP.items()}

tau = 0.8          # threshold inicial mais brando
lambda_u_max = 1.0 # peso m√°ximo da perda unsupervisionada
c_u, c_i = 1.0, 1.0  # coef. UCB
alpha = 0.9          # EMA


### Helpers de leitura e record builders

In [2]:
def _safe_decode_resize_jpeg(path, image_size=IMAGE_SIZE):
    raw = tf.io.read_file(path)
    # O PetImages tem JPEGs (√†s vezes corrompidos). Se tiver PNG misturado use decode_image.
    img = tf.image.decode_jpeg(raw, channels=3)
    img = tf.image.resize(img, image_size)
    img = tf.cast(img, tf.float32)  # 0..255 (normalizamos no modelo)
    return img

IGNORE_ERRORS = tf.data.experimental.ignore_errors()

def build_records_from_dir(root_dir, class_map=CLASS_MAP, with_ids=True, seed=42):
    """
    Varre .../Cat/*.jpg e .../Dog/*.jpg gerando records {sid, path, label}
    """
    rng = np.random.RandomState(seed)
    records = []
    sid = 0
    for name, idx in class_map.items():
        cls_dir = os.path.join(root_dir, name)
        for fn in os.listdir(cls_dir):
            if not fn.lower().endswith((".jpg", ".jpeg", ".png")):
                continue
            p = os.path.join(cls_dir, fn)
            rec = {"sid": sid, "path": p}
            if class_map is not None:
                rec["label"] = idx
            records.append(rec)
            sid += 1
    rng.shuffle(records)
    return records

def split_L_U(records, n_L=400, seed=42):
    rng = np.random.RandomState(seed)
    arr = records.copy()
    rng.shuffle(arr)
    L = []
    U = []
    for r in arr:
        if len(L) < n_L:
            L.append(r)
        else:
            U.append(r)
    return L, U


Instructions for updating:
Use `tf.data.Dataset.ignore_errors` instead.


### Augmentations batch-first

In [3]:
weak_aug_seq = keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomTranslation(0.05, 0.05),
], name="weak_aug")

strong_aug_seq = keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.15),
    layers.RandomZoom(0.15),
    layers.RandomContrast(0.15),
], name="strong_aug")


### Datasets (vers√µes √∫nicas, sem duplicidade de batch)

In [4]:
def make_L_ds(L_records, batch=BATCH, image_size=IMAGE_SIZE, drop_remainder=False):
    paths = [r["path"] for r in L_records]
    labels = [r["label"] for r in L_records]
    ids    = [r["sid"]   for r in L_records]
    ds = tf.data.Dataset.from_tensor_slices((paths, labels, ids))
    def _load(path, y, sid):
        img = _safe_decode_resize_jpeg(path, image_size)
        return img, y, sid
    ds = ds.shuffle(4096, reshuffle_each_iteration=True)
    ds = ds.map(_load, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    ds = ds.apply(IGNORE_ERRORS)
    ds = ds.batch(batch, drop_remainder=drop_remainder)  # batch antes de aug
    # Sem aug no L por enquanto (poderia adicionar augment leve aqui tamb√©m)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

def make_U_ds(U_records, batch=BATCH, image_size=IMAGE_SIZE, drop_remainder=False):
    paths = [r["path"] for r in U_records]
    ids   = [r["sid"]  for r in U_records]
    ds = tf.data.Dataset.from_tensor_slices((paths, ids))
    def _loadU(path, sid):
        img = _safe_decode_resize_jpeg(path, image_size)
        return img, sid
    ds = ds.shuffle(4096, reshuffle_each_iteration=True)
    ds = ds.map(_loadU, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    ds = ds.apply(IGNORE_ERRORS)
    ds = ds.batch(batch, drop_remainder=drop_remainder)  # batch
    def _make_pair(batch_img, batch_sid):
        x_w = weak_aug_seq(batch_img, training=True)
        x_s = strong_aug_seq(batch_img, training=True)
        return x_w, x_s, batch_sid
    ds = ds.map(_make_pair, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

def make_val_ds(val_records, batch=BATCH, image_size=IMAGE_SIZE):
    paths = [r["path"] for r in val_records]
    labels= [r["label"] for r in val_records]
    ds = tf.data.Dataset.from_tensor_slices((paths, labels))
    def _load(path, y):
        img = _safe_decode_resize_jpeg(path, image_size)
        return img, y
    ds = ds.map(_load, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch).prefetch(tf.data.AUTOTUNE)
    return ds


### Modelo √∫nico com normaliza√ß√£o no modelo (evita duplicidade)

In [5]:
def build_model(num_classes=NUM_CLASSES, image_size=IMAGE_SIZE):
    inp = keras.Input(shape=image_size + (3,))
    x = layers.Rescaling(1./255)(inp)            # normaliza TUDO (L,U,Val) no mesmo lugar
    x = layers.Conv2D(32, 3, padding="same", activation="relu")(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(64, 3, padding="same", activation="relu")(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(128, 3, padding="same", activation="relu")(x)
    x = layers.Flatten()(x)
    out = layers.Dense(num_classes)(x)           # logits
    model = keras.Model(inp, out)
    return model

model = build_model()
optimizer = keras.optimizers.Adam(1e-3)
loss_obj  = keras.losses.SparseCategoricalCrossentropy(from_logits=True)


### M√©tricas por amostra, EMA/UCB

In [6]:
def softmax_logits(logits):
    return tf.nn.softmax(logits, axis=-1)

def pseudo_el2n(probs):  # incerteza (EL2N-like, proxy simples)
    # dist√¢ncia do one-hot do argmax
    yhat = tf.argmax(probs, axis=-1)
    oh = tf.one_hot(yhat, depth=tf.shape(probs)[-1])
    return tf.reduce_mean(tf.square(probs - oh), axis=-1)

def sym_kl(p, q, eps=1e-7):
    p = tf.clip_by_value(p, eps, 1.0)
    q = tf.clip_by_value(q, eps, 1.0)
    kl1 = tf.reduce_sum(p * tf.math.log(p/q), axis=-1)
    kl2 = tf.reduce_sum(q * tf.math.log(q/p), axis=-1)
    return 0.5*(kl1+kl2)

# tabelas EMA
u_mean, u_var = {}, {}
i_mean, i_var = {}, {}

def ema_update(mean_dict, var_dict, sid, val, alpha=0.9):
    m = mean_dict.get(sid, val)
    v = var_dict.get(sid, 0.0)
    new_m = alpha*m + (1.0-alpha)*val
    new_v = alpha*v + (1.0-alpha)*(val - new_m)**2
    mean_dict[sid] = float(new_m)
    var_dict[sid]  = float(new_v)

def ucb(mu, var, c=1.0):
    return mu + c * math.sqrt(max(var, 1e-12))


### Passos de treino (supervisionado e n√£o-rotulado) ‚Äî sem dupla normaliza√ß√£o

In [7]:
@tf.function
def train_step_supervised(batch):
    x_l, y_l, _ = batch
    with tf.GradientTape() as tape:
        logits_l = model(x_l, training=True)  # Rescaling acontece no modelo
        loss_sup = loss_obj(y_l, logits_l)
    grads = tape.gradient(loss_sup, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    acc = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(logits_l, -1), tf.cast(y_l, tf.int64)), tf.float32))
    return loss_sup, acc

def lambda_u_warmup(epoch, total=3, max_val=1.0):
    return max_val * min(1.0, (epoch+1)/total)

@tf.function
def train_step_unlabeled(batch, lambda_u=1.0):
    x_w, x_s, sids = batch
    with tf.GradientTape() as tape:
        logits_w = model(x_w, training=True)
        logits_s = model(x_s, training=True)
        probs_w  = softmax_logits(logits_w)
        probs_s  = softmax_logits(logits_s)

        conf = tf.reduce_max(probs_w, axis=-1)
        yhat = tf.argmax(probs_w, axis=-1)
        mask = conf >= tau

        loss_unsup = tf.constant(0., dtype=tf.float32)
        masked = tf.where(mask)
        if tf.shape(masked)[0] > 0:
            logits_s_mask = tf.gather(logits_s, masked[:,0])
            yhat_mask     = tf.gather(yhat, masked[:,0])
            loss_unsup = loss_obj(yhat_mask, logits_s_mask)

        loss = lambda_u * loss_unsup

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    u_batch = pseudo_el2n(probs_w)
    i_batch = sym_kl(probs_w, probs_s)
    return loss, u_batch, i_batch, sids


### Avalia√ß√£o e execu√ß√£o de uma rodada

In [8]:
def evaluate(model, val_ds):
    tot, ok, loss = 0, 0, 0.0
    for x,y in val_ds:
        logits = model(x, training=False)
        loss += float(loss_obj(y, logits))
        ok += int((tf.argmax(logits,-1).numpy() == y.numpy()).sum())
        tot += y.shape[0]
    return loss/max(len(list(val_ds)) or 1,1), ok/max(tot,1)

def run_one_round(L_ds, U_ds, epoch_idx, epochs=5):
    for ep in range(epochs):
        # supervisionado
        for batch in L_ds:
            _ = train_step_supervised(batch)
        # n√£o-rotulado
        lam = lambda_u_warmup(ep, total=3, max_val=lambda_u_max)
        for batch in U_ds:
            loss_unsup, u_batch, i_batch, sids = train_step_unlabeled(batch, lambda_u=lam)

            # atualiza EMA/UCB (use .numpy().item() p/ evitar erro de scalar)
            u_vals = u_batch.numpy()
            i_vals = i_batch.numpy()
            sid_vals = sids.numpy()
            for val_u, val_i, sid in zip(u_vals, i_vals, sid_vals):
                ema_update(u_mean, u_var, int(sid), float(np.asarray(val_u).item()), alpha)
                ema_update(i_mean, i_var, int(sid), float(np.asarray(val_i).item()), alpha)


### Sele√ß√£o, simula√ß√£o de anota√ß√£o e loop ASSL

In [9]:
import sys
from pathlib import Path

PROJECT_ROOT = Path("..").resolve() # Sobe um n√≠vel e resolve o caminho absoluto

# 2. Adiciona o caminho da raiz ao sys.path
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))
    print(f"‚úÖ Adicionado ao PYTHONPATH: {PROJECT_ROOT}")
else:
    print(f"üìÅ Pasta raiz j√° est√° no PYTHONPATH: {PROJECT_ROOT}")

‚úÖ Adicionado ao PYTHONPATH: C:\Users\felip\OneDrive\Documentos\GitHub\ActiveSemisupervisedLearningCNN-s


In [10]:
def acquire_topK_balanced(U_records, K, class_names=CLASS_NAMES):
    per_class = max(1, K // max(1, len(class_names)))
    buckets = {name: [] for name in class_names}

    for rec in U_records:
        sid = rec["sid"]
        p = rec["path"].replace("\\","/").lower()
        cname = None
        for name in class_names:
            if f"/{name.lower()}/" in p:
                cname = name; break
        if cname is None: 
            continue
        mu_u, vu_u = u_mean.get(sid, 0.0), u_var.get(sid, 0.0)
        mu_i, vi_i = i_mean.get(sid, 0.0), i_var.get(sid, 0.0)
        score = ucb(mu_u, vu_u, c_u) * ucb(mu_i, vi_i, c_i)
        buckets[cname].append((score, sid))

    selected = []
    for cname in class_names:
        cand = sorted(buckets[cname], key=lambda x: x[0], reverse=True)
        selected.extend([sid for _, sid in cand[:per_class]])

    if len(selected) < K:
        rest = []
        have = set(selected)
        for rec in U_records:
            sid = rec["sid"]
            if sid in have: continue
            mu_u, vu_u = u_mean.get(sid, 0.0), u_var.get(sid, 0.0)
            mu_i, vi_i = i_mean.get(sid, 0.0), i_var.get(sid, 0.0)
            rest.append((ucb(mu_u,vu_u,c_u)*ucb(mu_i,vi_i,c_i), sid))
        rest.sort(key=lambda x: x[0], reverse=True)
        selected.extend([sid for _, sid in rest[:K-len(selected)]])
    return selected[:K]

def simulate_sid2true_from_path(U_records, L_records, class_map=CLASS_MAP):
    sid2true = {}
    def infer(path):
        p = path.replace("\\","/").lower()
        for name, idx in class_map.items():
            if f"/{name.lower()}/" in p:
                return idx
        raise ValueError(f"N√£o infere classe de: {path}")
    for rec in U_records: sid2true[rec["sid"]] = infer(rec["path"])
    for rec in L_records: sid2true[rec["sid"]] = infer(rec["path"])
    return sid2true

def move_annotated_to_L(sid2label, U_records, L_records):
    by_id = {r["sid"]: r for r in U_records}
    moved = []
    keepU = []
    for r in U_records:
        sid = r["sid"]
        if sid in sid2label:
            nr = dict(r)
            nr["label"] = int(sid2label[sid])
            moved.append(nr)
        else:
            keepU.append(r)
    return keepU, (L_records + moved)

# ==== Dataset base ====
train_root = r"..\data\train"
val_root   = r"..\data\validation"

all_train = build_records_from_dir(train_root, class_map=CLASS_MAP)
L_records, U_records = split_L_U(all_train, n_L=400, seed=42)

val_records = build_records_from_dir(val_root, class_map=CLASS_MAP)
val_ds = make_val_ds(val_records, batch=BATCH, image_size=IMAGE_SIZE)

L_ds = make_L_ds(L_records, batch=BATCH, image_size=IMAGE_SIZE)
U_ds = make_U_ds(U_records, batch=BATCH, image_size=IMAGE_SIZE)

# simula√ß√£o de anota√ß√£o (sem arquivos)
sid2true = simulate_sid2true_from_path(U_records, L_records, class_map=CLASS_MAP)

ROUNDS = 10
EPOCHS_PER_ROUND = 5

for r in range(ROUNDS):
    print(f"== Rodada {r+1}/{ROUNDS} ==")
    run_one_round(L_ds, U_ds, epoch_idx=r, epochs=EPOCHS_PER_ROUND)

    # avalia√ß√£o
    vloss, vacc = evaluate(model, val_ds)
    print(f"Val loss: {vloss:.4f}  Val acc: {vacc:.4f}")

    # sele√ß√£o ativa balanceada
    selected_ids = acquire_topK_balanced(U_records, K=100, class_names=CLASS_NAMES)

    # ‚Äúanota√ß√£o‚Äù simulada apenas nos K escolhidos
    sid2label = {sid: sid2true[sid] for sid in selected_ids}
    # mover U->L
    U_records, L_records = move_annotated_to_L(sid2label, U_records, L_records)

    # limpar EMA/UCB dos sids movidos
    moved_set = set(sid2label.keys())
    for sid in list(u_mean.keys()):
        if sid in moved_set: u_mean.pop(sid, None); u_var.pop(sid, None)
    for sid in list(i_mean.keys()):
        if sid in moved_set: i_mean.pop(sid, None); i_var.pop(sid, None)

    # rebuild datasets
    L_ds = make_L_ds(L_records, batch=BATCH, image_size=IMAGE_SIZE)
    U_ds = make_U_ds(U_records, batch=BATCH, image_size=IMAGE_SIZE)


== Rodada 1/10 ==
Val loss: 14.1594  Val acc: 0.5002
== Rodada 2/10 ==


KeyboardInterrupt: 