# SEV-CV: Self-Evolutionary Generative Transformers (TensorFlow, Kaggle)

This Kaggle notebook trains a minimal SEV-CV model with an evolutionary controller using TFDS data downloaded from a Google Drive folder.

### How to run on Kaggle (T4 GPU)
1) In Settings: enable GPU (T4) and turn Internet ON.
2) Step 1: Run setup and GPU check.
3) Step 2: Download TFDS data from your Google Drive folder link (uses gdown) into `/kaggle/working/tfds-cache`.
4) Step 3: Confirm TFDS_DIR points to that cache.
5) Step 4–6: Build models, data loaders, and training utils.
6) Step 7: Sanity check.
7) Step 8: Train. Set `dataset = 'coco2017'` or `'cifar10'` to use TFDS, or `'folder'` if you downloaded raw images into `DATA_DIR/custom_images`.

Notes:
- The provided Drive folder should contain TFDS-structured data (e.g., `coco/2017/...`, `cifar10/...`).
- For raw images, download a folder and use `dataset='folder'`.

## Step 1: Setup (TensorFlow + TFDS)

## Step 2: Download TFDS data from Google Drive (Kaggle)

In [None]:
# Kaggle-only: download TFDS datasets from shared Google Drive folder
# Provided link (must be public/shared):
DRIVE_FOLDER_URL = 'https://drive.google.com/drive/folders/1ztC0uxfQLzc627sWihuuzkLo-d5P4kgy'

import os, sys, subprocess

# Paths
DATA_DIR = '/kaggle/working/SEV-CV/datasets'
TFDS_DIR = '/kaggle/working/tfds-cache'
LOCAL_TFDS_DIR = TFDS_DIR
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(TFDS_DIR, exist_ok=True)

# Install gdown and download entire folder
try:
    import gdown  # type: ignore
except Exception:
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'gdown>=4.7.1'])
    import gdown  # type: ignore

print('Downloading TFDS folder from Drive into', TFDS_DIR)
gdown.download_folder(DRIVE_FOLDER_URL, output=TFDS_DIR, quiet=False, use_cookies=False)

# Show top-level TFDS contents
print('TFDS_DIR =', TFDS_DIR)
for root, dirs, files in os.walk(TFDS_DIR):
    print('Sample:', root)
    break

In [None]:
# Setup: TensorFlow + TFDS (Kaggle)
import sys, subprocess, pkgutil
import tensorflow as tf
import tensorflow_datasets as tfds

# Enable GPU memory growth to avoid OOM spikes
for gpu in tf.config.list_physical_devices('GPU'):
    try:
        tf.config.experimental.set_memory_growth(gpu, True)
    except Exception:
        pass

print('TF version:', tf.__version__)
print('TFDS version:', tfds.__version__)
print('GPUs:', tf.config.list_physical_devices('GPU'))

In [None]:
# Quick GPU check
import tensorflow as tf
print('GPUs:', tf.config.list_physical_devices('GPU'))
try:
    from tensorflow.python.client import device_lib
    print('Devices:', [d.name for d in device_lib.list_local_devices()])
except Exception:
    pass

## Step 4: Define models (SEV-G and SEV-D)

In [None]:
# Models: SEVGenerator and SEVDiscriminator (minimal)
import tensorflow as tf
import keras
from keras import layers

class MLPBlock(layers.Layer):
    def __init__(self, dim, mlp_ratio=4.0, drop=0.0):
        super().__init__()
        hidden = int(dim * mlp_ratio)
        self.fc1 = layers.Dense(hidden)
        self.act = layers.Activation(tf.nn.gelu)
        self.fc2 = layers.Dense(dim)
        self.drop = layers.Dropout(drop)
    def call(self, x, training=False):
        h = self.fc1(x)
        h = self.act(h)
        h = self.drop(h, training=training)
        h = self.fc2(h)
        h = self.drop(h, training=training)
        return h

class WindowAttention(layers.Layer):
    def __init__(self, dim, heads):
        super().__init__()
        self.attn = layers.MultiHeadAttention(num_heads=heads, key_dim=dim // heads)
        self.nq = layers.LayerNormalization(epsilon=1e-6)
        self.nk = layers.LayerNormalization(epsilon=1e-6)
    def call(self, x, training=False):
        q = self.nq(x)
        k = self.nk(x)
        return self.attn(q, k, training=training)

class TransformerBlock(layers.Layer):
    def __init__(self, dim, heads, mlp_ratio=4.0, drop=0.0):
        super().__init__()
        self.n1 = layers.LayerNormalization(epsilon=1e-6)
        self.attn = WindowAttention(dim, heads)
        self.drop = layers.Dropout(drop)
        self.n2 = layers.LayerNormalization(epsilon=1e-6)
        self.mlp = MLPBlock(dim, mlp_ratio, drop)
    def call(self, x, training=False):
        h = self.attn(self.n1(x), training=training)
        x = x + self.drop(h, training=training)
        h = self.mlp(self.n2(x), training=training)
        x = x + self.drop(h, training=training)
        return x

class PatchUpsample(layers.Layer):
    def __init__(self, out_ch):
        super().__init__()
        self.conv = layers.Conv2DTranspose(out_ch, 4, 2, padding="same")
        self.act = layers.Activation(tf.nn.gelu)
    def call(self, x, training=False):
        return self.act(self.conv(x))

class SEVGenerator(keras.Model):
    def __init__(self, latent_dim=128, base_dim=256, img_size=32, channels=3, depth=4, heads=4):
        super().__init__()
        if img_size % 4 != 0:
            raise ValueError("img_size must be divisible by 4")
        start = img_size // 4
        self.img_size = img_size
        self.fc = layers.Dense(start * start * base_dim)
        self.reshape = layers.Reshape((start, start, base_dim))
        self.up1 = PatchUpsample(base_dim // 2)
        self.up2 = PatchUpsample(base_dim // 4)
        self.to_tokens = layers.Lambda(lambda x: tf.reshape(x, [tf.shape(x)[0], -1, tf.shape(x)[-1]]))
        self.blocks = [TransformerBlock(base_dim // 4, heads) for _ in range(depth)]
        self.to_feat = layers.Lambda(lambda x: tf.reshape(x, [tf.shape(x)[0], img_size, img_size, tf.shape(x)[-1]]))
        self.out = layers.Conv2D(channels, 1, padding="same", activation="tanh")
    def call(self, z, training=False):
        h = self.fc(z)
        h = self.reshape(h)
        h = self.up1(h, training=training)
        h = self.up2(h, training=training)
        t = self.to_tokens(h)
        for b in self.blocks:
            t = b(t, training=training)
        f = self.to_feat(t)
        return self.out(f)

class SEVDiscriminator(keras.Model):
    def __init__(self, img_size=32, channels=3, base=64, heads=4, depth=2):
        super().__init__()
        self.stem = keras.Sequential([
            layers.Conv2D(base, 4, 2, padding="same"), layers.LeakyReLU(0.2),
            layers.Conv2D(base*2, 4, 2, padding="same"), layers.LeakyReLU(0.2),
        ])
        self.flatten_hw = layers.Lambda(lambda x: tf.reshape(x, [tf.shape(x)[0], -1, tf.shape(x)[-1]]))
        dim = base*2
        self.blocks = [TransformerBlock(dim, heads) for _ in range(depth)]
        self.pool = layers.GlobalAveragePooling1D()
        self.out = layers.Dense(1)
    def call(self, x, training=False):
        h = self.stem(x, training=training)
        t = self.flatten_hw(h)
        for b in self.blocks:
            t = b(t, training=training)
        h = self.pool(t)
        return self.out(h)

def build_g(latent_dim=128, img=32, ch=3):
    return SEVGenerator(latent_dim=latent_dim, img_size=img, channels=ch)

def build_d(img=32, ch=3):
    return SEVDiscriminator(img_size=img, channels=ch)

## Step 5: Define data loaders (TFDS or image folder)

In [None]:
# Config: choose dataset source on Kaggle
# Set one of these and run the cell:
# - Set SHARED_URL to a Google Drive file (zip/tfrecords) to fetch via gdown
# - Or set EXISTING_DATA_PATH to a path already present (e.g., from a Kaggle Dataset)
import os
SHARED_URL = ''  # e.g., 'https://drive.google.com/uc?id=FILE_ID'
EXISTING_DATA_PATH = ''  # e.g., '/kaggle/input/your-dataset'

if SHARED_URL:
    import gdown
    os.makedirs(DATA_DIR, exist_ok=True)
    out_zip = os.path.join(DATA_DIR, 'dataset.zip')
    print('Downloading from Drive via gdown...')
    gdown.download(SHARED_URL, out_zip, quiet=False)
    try:
        import zipfile
        with zipfile.ZipFile(out_zip, 'r') as zf:
            zf.extractall(DATA_DIR)
        print('Extracted to', DATA_DIR)
    except zipfile.BadZipFile:
        print('Downloaded file is not a zip; leaving as-is at', out_zip)
elif EXISTING_DATA_PATH and os.path.exists(EXISTING_DATA_PATH):
    # If TFDS-like structure exists, we can point TFDS there
    print('Using EXISTING_DATA_PATH =', EXISTING_DATA_PATH)
else:
    print('Using local DATA_DIR =', DATA_DIR)

In [None]:
# Data: CIFAR-10 and COCO2017 (subset)
import tensorflow_datasets as tfds
import tensorflow as tf
import os

def _preprocess(ex, img_size):
    x = tf.image.convert_image_dtype(ex['image'], tf.float32)
    h = tf.shape(x)[0]
    w = tf.shape(x)[1]
    side = tf.minimum(h, w)
    x = tf.image.resize_with_crop_or_pad(x, side, side)
    x = tf.image.resize(x, [img_size, img_size], method='bilinear')
    x = x * 2.0 - 1.0
    return x

def make_cifar10(batch=64, img=32, split='train', data_dir=None,
                 shuffle_buf=5000, parallel_calls=2, prefetch=2):
    ds = tfds.load('cifar10', split=split, as_supervised=False, shuffle_files=True, data_dir=data_dir)
    ds = ds.map(lambda e: _preprocess(e, img), num_parallel_calls=parallel_calls)
    ds = ds.shuffle(shuffle_buf).batch(batch, drop_remainder=True)
    return ds.prefetch(prefetch)

def make_coco2017(batch=32, img=128, split='train[0%:20%]', data_dir=None,
                  shuffle_buf=2000, parallel_calls=2, prefetch=2):
    ds = tfds.load('coco/2017', split=split, as_supervised=False, shuffle_files=True, data_dir=data_dir)
    ds = ds.map(lambda e: _preprocess(e, img), num_parallel_calls=parallel_calls)
    ds = ds.shuffle(shuffle_buf).batch(batch, drop_remainder=True)
    return ds.prefetch(prefetch)

# Optional: Load from a folder of images in Drive
def _load_image_file(path, img_size):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.convert_image_dtype(img, tf.float32)
    h = tf.shape(img)[0]
    w = tf.shape(img)[1]
    side = tf.minimum(h, w)
    img = tf.image.resize_with_crop_or_pad(img, side, side)
    img = tf.image.resize(img, [img_size, img_size], method='bilinear')
    img = img * 2.0 - 1.0
    return img

def make_folder_dataset(folder, batch=32, img=128, shuffle_buf=2000, parallel_calls=2, prefetch=2):
    files = tf.data.Dataset.list_files(os.path.join(folder, '**', '*.*'), shuffle=True)
    ds = files.map(lambda p: _load_image_file(p, img), num_parallel_calls=parallel_calls)
    ds = ds.shuffle(shuffle_buf).batch(batch, drop_remainder=True)
    return ds.prefetch(prefetch)

In [None]:
# Evolution controller + training utils
import dataclasses, random
import tensorflow as tf
from typing import List

@dataclasses.dataclass
class Individual:
    lr: float
    heads: int
    depth: int
    score: float = 0.0

class EvolutionController:
    def __init__(self, population=4, seed=0):
        random.seed(seed)
        self.population: List[Individual] = []
        for _ in range(population):
            self.population.append(Individual(lr=1e-4*10**random.uniform(-0.5,0.5), heads=random.choice([2,4,6]), depth=random.choice([2,4,6])))
    def mutate(self, ind: Individual) -> Individual:
        return Individual(
            lr=max(1e-5, min(5e-4, ind.lr * (1.0 + random.uniform(-0.3,0.3)))),
            heads=max(2, min(8, ind.heads + random.choice([-2,0,2]))),
            depth=max(2, min(8, ind.depth + random.choice([-2,0,2]))),
        )
    def select(self, k=2):
        self.population.sort(key=lambda i: i.score, reverse=True)
        self.population = self.population[:k] + [self.mutate(self.population[i%k]) for i in range(k, len(self.population))]

# Losses
@tf.function
def d_loss_fn(real_logits, fake_logits):
    return tf.reduce_mean(tf.nn.relu(1.0 - real_logits)) + tf.reduce_mean(tf.nn.relu(1.0 + fake_logits))

@tf.function
def g_loss_fn(fake_logits):
    return -tf.reduce_mean(fake_logits)

class HingeGAN:
    def __init__(self, img=32, z=128, heads=4, depth=4, lr=2e-4):
        self.G = SEVGenerator(latent_dim=z, img_size=img, channels=3, depth=depth, heads=heads)
        self.D = SEVDiscriminator(img_size=img, channels=3, heads=heads, depth=2)
        self.opt_g = keras.optimizers.Adam(lr, beta_1=0.0, beta_2=0.99)
        self.opt_d = keras.optimizers.Adam(lr, beta_1=0.0, beta_2=0.99)
        self.z = z
    @tf.function
    def step(self, real):
        b = tf.shape(real)[0]
        noise = tf.random.normal([b, self.z])
        with tf.GradientTape() as td:
            fake = self.G(noise, training=True)
            r = self.D(real, training=True)
            f = self.D(fake, training=True)
            dl = d_loss_fn(r, f)
        dvars = self.D.trainable_variables
        dgrads = td.gradient(dl, dvars)
        self.opt_d.apply_gradients(zip(dgrads, dvars))

        noise = tf.random.normal([b, self.z])
        with tf.GradientTape() as tg:
            fake = self.G(noise, training=True)
            f = self.D(fake, training=True)
            gl = g_loss_fn(f)
        gvars = self.G.trainable_variables
        ggrads = tg.gradient(gl, gvars)
        self.opt_g.apply_gradients(zip(ggrads, gvars))
        return dl, gl

    def sample_fitness(self, n=4):
        z = tf.random.normal([n, self.z])
        imgs = self.G(z, training=False)
        # simple proxy: encourage variance
        return float(tf.math.reduce_std(imgs).numpy())

## Step 6: Evolution + training utilities

In [None]:
# Train: Kaggle-only evolution-guided loop using TFDS from downloaded Drive cache
import time, os, tensorflow as tf

# Options: 'cifar10', 'coco2017', or 'folder'
dataset = 'coco2017'  # default
low_mem = True

# Defaults
img = 128 if dataset=='coco2017' else 32
batch = 32 if dataset=='coco2017' else 64
steps = 200

# Low-memory overrides
if low_mem:
    if dataset=='coco2017':
        img = 64
        batch = 8
    else:
        batch = 32
    steps = 100

# Source dirs (set by Step 2)
DATA_DIR = '/kaggle/working/SEV-CV/datasets'
TFDS_DIR = '/kaggle/working/tfds-cache'
LOCAL_TFDS_DIR = TFDS_DIR

# Folder dataset path (if using raw images)
custom_folder = os.path.join(DATA_DIR, 'custom_images')
os.makedirs(custom_folder, exist_ok=True)

# Data constructors with conservative buffers
make_kwargs = dict(shuffle_buf=1000, parallel_calls=1, prefetch=1) if low_mem else {}

def make_ds():
    if dataset=='cifar10':
        return make_cifar10(batch=batch, img=img, split='train', data_dir=TFDS_DIR, **make_kwargs)
    elif dataset=='coco2017':
        split = 'train[0%:5%]' if low_mem else 'train[0%:10%]'
        return make_coco2017(batch=batch, img=img, split=split, data_dir=TFDS_DIR, **make_kwargs)
    else:
        return make_folder_dataset(custom_folder, batch=batch, img=img, **make_kwargs)

# Build initial models
ds = make_ds()
it = iter(ds)
evo = EvolutionController(population=2 if low_mem else 4, seed=42)

def build_g_cfg(heads, depth):
    if low_mem:
        return SEVGenerator(latent_dim=96, img_size=img, channels=3, depth=max(2, depth//2), heads=max(2, heads//2), base_dim=192)
    return SEVGenerator(latent_dim=128, img_size=img, channels=3, depth=depth, heads=heads)

def build_d_cfg(heads):
    if low_mem:
        return SEVDiscriminator(img_size=img, channels=3, heads=max(2, heads//2), depth=1, base=48)
    return SEVDiscriminator(img_size=img, channels=3, heads=heads, depth=2)

models = []
for ind in evo.population:
    gan = HingeGAN(img=img, z=96 if low_mem else 128, heads=ind.heads, depth=ind.depth, lr=ind.lr)
    gan.G = build_g_cfg(ind.heads, ind.depth)
    gan.D = build_d_cfg(ind.heads)
    models.append(gan)

for step in range(1, steps+1):
    real = next(it)
    for gan in models:
        dl, gl = gan.step(real)
    if step % 25 == 0:
        for i, (gan, ind) in enumerate(zip(models, evo.population)):
            ind.score = gan.sample_fitness(n=2 if low_mem else 4)
        evo.select(k=1 if low_mem else 2)
        models = []
        for ind in evo.population:
            new_gan = HingeGAN(img=img, z=96 if low_mem else 128, heads=ind.heads, depth=ind.depth, lr=ind.lr)
            new_gan.G = build_g_cfg(ind.heads, ind.depth)
            new_gan.D = build_d_cfg(ind.heads)
            models.append(new_gan)
        print(f"[step {step}] evolved heads/depth/lr:", [(ind.heads, ind.depth, round(ind.lr,6)) for ind in evo.population])

print('Done.')

## Step 8: Train
Pick the dataset, adjust img/batch/steps, and run the loop. Models evolve every 25 steps.

In [None]:
# Sanity check: forward generator at chosen resolution
img_test = 128
G_test = SEVGenerator(latent_dim=128, img_size=img_test, channels=3, depth=2, heads=4)
y = G_test(tf.random.normal([2,128]), training=False)
print('gen out shape:', y.shape)

## Step 7: Sanity check

In [None]:
# Confirm TFDS_DIR for Kaggle-only workflow (no download here)
import os
TFDS_DIR = '/kaggle/working/tfds-cache'
LOCAL_TFDS_DIR = TFDS_DIR
print('TFDS dir:', TFDS_DIR)
print('Contents:')
for p in os.listdir(TFDS_DIR):
    print('-', p)

## Step 3: Confirm TFDS_DIR (Kaggle)