# **Clasificación de Cámaras Mediante el Análisis del Ruido del Sensor en Imágenes Digitales**

### *Librerías*

In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt

### *Procesamiento de las imagenes*

In [None]:
DATA_DIR = "Dresden_Exp"

In [None]:
camera_dirs = sorted([d for d in os.listdir(DATA_DIR) if os.path.isdir(os.path.join(DATA_DIR, d))])

print(f"{'Carpeta':40} | Imágenes")
print("-" * 55)

for folder in camera_dirs:
    folder_path = os.path.join(DATA_DIR, folder)
    files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    print(f"{folder:40} | {len(files)}")


In [None]:
folder_names = []
image_counts = []

for folder in camera_dirs:
    folder_path = os.path.join(DATA_DIR, folder)
    files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    folder_names.append(folder)
    image_counts.append(len(files))

plt.figure(figsize=(12, 6))
plt.barh(folder_names, image_counts)
plt.xlabel("Cantidad de imágenes")
plt.title("Imágenes por carpeta (modelo de cámara)")
plt.tight_layout()
plt.show()

### *Extracción de parches*

In [None]:
OUTPUT_DIR = "patches_by_class_batch"
PATCH_SIZE = 48
PATCHES_PER_IMAGE = 25
BATCH_SIZE = 500

os.makedirs(OUTPUT_DIR, exist_ok=True)

def extract_random_patches(img, num_patches=10, patch_size=48):
    patches = []
    h, w, _ = img.shape
    for _ in range(num_patches):
        y = np.random.randint(0, h - patch_size)
        x = np.random.randint(0, w - patch_size)
        patch = img[y:y+patch_size, x:x+patch_size]
        patches.append(patch)
    return patches

for camera_model in tqdm(os.listdir(DATA_DIR)):
    model_dir = os.path.join(DATA_DIR, camera_model)
    if not os.path.isdir(model_dir):
        continue

    image_files = os.listdir(model_dir)
    num_batches = (len(image_files) + BATCH_SIZE - 1) // BATCH_SIZE

    for batch_idx in range(num_batches):
        start = batch_idx * BATCH_SIZE
        end = min((batch_idx + 1) * BATCH_SIZE, len(image_files))
        batch_files = image_files[start:end]

        patches = []
        labels = []

        for image_file in batch_files:
            img_path = os.path.join(model_dir, image_file)
            img = cv2.imread(img_path)
            if img is None or img.shape[0] < PATCH_SIZE or img.shape[1] < PATCH_SIZE:
                continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            extracted = extract_random_patches(img, PATCHES_PER_IMAGE, PATCH_SIZE)
            patches.extend(extracted)
            labels.extend([camera_model] * len(extracted))

        if patches:
            patches_array = np.array(patches)
            labels_array = np.array(labels)
            np.save(os.path.join(OUTPUT_DIR, f"{camera_model}_patches_batch{batch_idx}.npy"), patches_array)
            np.save(os.path.join(OUTPUT_DIR, f"{camera_model}_labels_batch{batch_idx}.npy"), labels_array)
            print(f"✅ Guardado: {camera_model} - batch {batch_idx}")

In [None]:
# Aqui se juntan todos los parches por batches para poder hacer la division del dataset 70-30, toco asi porque crear los parches para todo
# el conjunto de fotos de un solo sensor no se podia (Se quedaba sin memoria RAM)

PATCHES_DIR = "patches_by_class_batch"

all_patches = []
all_labels = []

# Solo buscar archivos que son batches de parches
for fname in sorted(os.listdir(PATCHES_DIR)):
    if "_patches_batch" in fname and fname.endswith(".npy"):
        base_name = fname.replace("_patches_batch", "_labels_batch")
        patches_path = os.path.join(PATCHES_DIR, fname)
        labels_path = os.path.join(PATCHES_DIR, base_name)

        # Verificar que existan ambos archivos
        if not os.path.exists(labels_path):
            print(f"Falta el archivo de etiquetas para: {fname}")
            continue

        # Cargar parches y etiquetas
        patches = np.load(patches_path)
        labels = np.load(labels_path)

        all_patches.append(patches)
        all_labels.append(labels)

# Concatenar todos los arrays
X = np.concatenate(all_patches, axis=0)
y = np.concatenate(all_labels, axis=0)

print(f"Parches totales: {X.shape}")
print(f"Etiquetas totales: {y.shape}")

In [None]:
# Asignacion de clases, le asigna una clase a cada sensor, en la siguiente celda puse el print para ver como queda

classes = sorted(np.unique(y))
class_to_idx = {cls: idx for idx, cls in enumerate(classes)}
y_idx = np.array([class_to_idx[label] for label in y])

In [None]:
print(class_to_idx)

In [None]:
#Split 70-30

X_train, X_test, y_train, y_test = train_test_split(
    X, y_idx, test_size=0.3, stratify=y_idx, random_state=42
)

In [None]:
#Aqui se guardan los splits para no hacer lo mismo cada vez que entremos al notebook

os.makedirs("final_dataset", exist_ok=True)
np.save("final_dataset/X_train.npy", X_train)
np.save("final_dataset/X_test.npy", X_test)
np.save("final_dataset/y_train.npy", y_train)
np.save("final_dataset/y_test.npy", y_test)
np.save("final_dataset/class_to_idx.npy", class_to_idx)

In [None]:
#Carga los splits y los convierte en tensores de pytorch para crear los Dataloaders

DATA_DIR="splits"

X_train = np.load(os.path.join(DATA_DIR, "X_train.npy"))
X_test = np.load(os.path.join(DATA_DIR, "X_test.npy"))
y_train = np.load(os.path.join(DATA_DIR, "y_train.npy"))
y_test = np.load(os.path.join(DATA_DIR, "y_test.npy"))
class_to_idx = np.load(os.path.join(DATA_DIR, "class_to_idx.npy"), allow_pickle=True).item()

# Convertir a tensores PyTorch
X_train_tensor = torch.tensor(X_train).permute(0, 3, 1, 2).float()  # [N, C, H, W]
X_test_tensor = torch.tensor(X_test).permute(0, 3, 1, 2).float()
y_train_tensor = torch.tensor(y_train).long()
y_test_tensor = torch.tensor(y_test).long()

# Crear Datasets y DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


### Arquitectura paper original

In [4]:
class CameraConvNet(nn.Module):
    def __init__(self, num_classes):
        super(CameraConvNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=7),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(128, 512, kernel_size=7),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(512, 2048, kernel_size=6),
            nn.ReLU()
        )
        self.classifier = nn.Sequential(
            nn.Linear(2048, 2048),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(2048, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.classifier(x)
        return x

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = len(class_to_idx)
model = CameraConvNet(num_classes).to(device)

# Ellos usan Softmax y LogLoss pero chat dice que es lo mismo
# que usar CrossEntropy, mire y creo que tiene sentido 
criterion = nn.CrossEntropyLoss()

# Estoy mirando el paper pero no dice nada del learning rate 
# solo dice: tasa de aprendizaje decreciente 

# el optimizador como tal no dicen cual usan pero en la seccion 2. BACKGROUND ON CONVOLUTIONAL NETWORKS
# dicen que es normal usar "gradient descent" entonces SGD?

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

#optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 200 #El paper dice que son 200 

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    print(f"Epoch [{epoch+1}/{epochs}] - Loss: {running_loss/len(train_loader):.4f} - Accuracy: {correct/total:.4f}")
