In [None]:
!pip install tensorflow
!pip install keras
!pip install numpy
!pip install pandas
!pip install matplotlib
!pip install opencv-python
!pip install codecarbon


In [None]:
import os, time, math, random
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D, MaxPooling2D, Dropout, Flatten
from keras.utils import to_categorical
from codecarbon import EmissionsTracker

# Repro & CPU-only
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)
os.environ['CUDA_VISIBLE_DEVICES'] = ''
print('TensorFlow version:', tf.__version__)


In [None]:
# CodeCarbon helpers
LOGDIR = Path('./codecarbon_logs').resolve()
LOGDIR.mkdir(parents=True, exist_ok=True)
print('[CodeCarbon] logs ->', LOGDIR)

CC_KWARGS = dict(
    measure_power_secs=1,
    save_to_file=True,
    output_dir=str(LOGDIR),
    log_level='warning',
    tracking_mode='process',  
)

@dataclass
class PhaseReport:
    secs: float
    emissions_kg: float

def run_phase_with_tracker(phase_name, fn):
    tracker = EmissionsTracker(project_name=phase_name, **CC_KWARGS)
    tracker.start()
    t0 = time.time()
    try:
        out = fn()
    finally:
        emissions = tracker.stop() or 0.0
    secs = time.time() - t0
    print(f'[{phase_name}] time={secs:.2f}s, emissions={emissions:.6f} kg')
    return out, PhaseReport(secs=secs, emissions_kg=emissions)


In [None]:
# Cleaning helpers (blur/black)
def is_blurry(image, threshold=100.0):
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)  # CIFAR is RGB
    variance = cv2.Laplacian(gray, cv2.CV_64F).var()
    return variance < threshold

def check_blur(images, threshold=100.0):
    idx = []
    for i, im in enumerate(images):
        if is_blurry(im, threshold):
            idx.append(i)
    return idx

def is_black(image, threshold=10.0):
    return float(np.mean(image)) < threshold

def check_black(images, threshold=10.0):
    idx = []
    for i, im in enumerate(images):
        if is_black(im, threshold):
            idx.append(i)
    return idx

# Offline augmentation (keeps arrays workflow)
def augment_numpy_images(images, labels, mult=2,
                         flip=True, rot_max_deg=10,
                         brightness_delta=0.08,
                         contrast_range=(0.9, 1.1)):
    if mult is None or mult <= 1:
        return images, labels
    # ensure uint8 for OpenCV ops
    if images.dtype != np.uint8:
        imgs_uint8 = np.clip(images * 255.0, 0, 255).astype(np.uint8)
    else:
        imgs_uint8 = images
    H, W = imgs_uint8.shape[1], imgs_uint8.shape[2]
    out_imgs = [imgs_uint8]
    out_lbls = [labels]
    for _ in range(mult - 1):
        aug = np.empty_like(imgs_uint8)
        for i in range(len(imgs_uint8)):
            im = imgs_uint8[i]
            if flip and np.random.rand() < 0.5:
                im = cv2.flip(im, 1)
            if rot_max_deg and rot_max_deg > 0:
                deg = float(np.random.uniform(-rot_max_deg, rot_max_deg))
                M = cv2.getRotationMatrix2D((W/2, H/2), deg, 1.0)
                im = cv2.warpAffine(im, M, (W, H), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
            cmin, cmax = contrast_range
            c = float(np.random.uniform(cmin, cmax))
            b = float(np.random.uniform(-brightness_delta, brightness_delta) * 255.0)
            im = im.astype(np.float32) * c + b
            im = np.clip(im, 0, 255).astype(np.uint8)
            aug[i] = im
        out_imgs.append(aug)
        out_lbls.append(labels.copy())
    X = np.concatenate(out_imgs, axis=0)
    Y = np.concatenate(out_lbls, axis=0)
    X = X.astype(np.float32) / 255.0
    return X, Y


In [None]:
# Phase 1: PREPROCESS
BLUR_THR = 100.0
BLACK_THR = 10.0
AUG_MULT = 2 
CLEAN_TEST = False

def preprocess_phase():
    (xtr, ytr), (xte, yte) = keras.datasets.cifar10.load_data()
    nclasses = int(len(np.unique(ytr)))
    print('Loaded CIFAR-10:', xtr.shape, ytr.shape, '| Test:', xte.shape, yte.shape)
    # Clean train
    bidx = check_blur(xtr, threshold=BLUR_THR)
    kidx = check_black(xtr, threshold=BLACK_THR)
    to_remove = sorted(set(bidx) | set(kidx))
    if to_remove:
        keep = np.ones(len(xtr), dtype=bool); keep[to_remove] = False
        xtr = xtr[keep]; ytr = ytr[keep]
        print(f'[clean train] removed {len(to_remove)} ->', xtr.shape)
    else:
        print('[clean train] no removals')

    # Scale to [0,1]
    xtr = xtr.astype('float32') / 255.0
    xte = xte.astype('float32') / 255.0
    # Offline augmentation
    if AUG_MULT and AUG_MULT > 1:
        xtr, ytr = augment_numpy_images(xtr, ytr, mult=AUG_MULT,
                                        flip=True, rot_max_deg=10,
                                        brightness_delta=0.08,
                                        contrast_range=(0.9, 1.1))
        print('[augment] after:', xtr.shape, ytr.shape)
    return xtr, ytr, xte, yte, nclasses

(prep_out, prep_report) = run_phase_with_tracker('preprocess', preprocess_phase)
Xtr, ytr, Xte, yte, nClasses = prep_out


In [None]:
def training_phase():
    ytr_oh = to_categorical(ytr, nClasses)
    yte_oh = to_categorical(yte, nClasses)
    model = Sequential([
        Conv2D(32, (3,3), activation='relu', input_shape=(32,32,3), padding='same'),
        MaxPooling2D((2,2)), Dropout(0.25),
        Conv2D(64, (3,3), activation='relu', padding='same'),
        MaxPooling2D((2,2)), Dropout(0.25),
        Conv2D(128, (3,3), activation='relu', padding='same'),
        MaxPooling2D((2,2)), Dropout(0.25),
        Flatten(),
        Dense(256, activation='relu'), Dropout(0.5),
        Dense(nClasses, activation='softmax')
    ])
    model.compile(optimizer=keras.optimizers.Adam(1e-3),
                  loss='categorical_crossentropy', metrics=['accuracy'])
    history = model.fit(Xtr, ytr_oh, validation_data=(Xte, yte_oh),
                        epochs=50, batch_size=64, verbose=1)
    params = model.count_params()
    return model, params

((model, params), train_report) = run_phase_with_tracker('train', training_phase)


In [None]:
def eval_phase():
    yte_oh = to_categorical(yte, nClasses)
    loss, acc = model.evaluate(Xte, yte_oh, verbose=0)
    print(f'Test accuracy: {acc:.4f}')
    return float(acc)

(test_acc, eval_report) = run_phase_with_tracker('evaluate', eval_phase)


In [None]:
row = {
    'model': 'Keras-Sequential-CNN(clean+offline-aug)',
    'params': params,
    'test_acc': round(test_acc, 4),
    'preprocess_secs': round(prep_report.secs, 2),
    'train_secs': round(train_report.secs, 2),
    'eval_secs': round(eval_report.secs, 2),
    'preprocess_emissions_kg': prep_report.emissions_kg,
    'train_emissions_kg': train_report.emissions_kg,
    'eval_emissions_kg': eval_report.emissions_kg,
    'total_secs': round(prep_report.secs + train_report.secs + eval_report.secs, 2),
    'total_emissions_kg': prep_report.emissions_kg + train_report.emissions_kg + eval_report.emissions_kg,
}
RESULTS_CSV = Path('results.csv')
df_new = pd.DataFrame([row])
if RESULTS_CSV.exists():
    try:
        df_out = pd.concat([pd.read_csv(RESULTS_CSV), df_new], ignore_index=True)
    except Exception:
        df_out = df_new
else:
    df_out = df_new
df_out.to_csv(RESULTS_CSV, index=False)
print(' Appended per-phase results →', RESULTS_CSV.resolve())
df_new
