In [None]:
# -*- coding: utf-8 -*-
# Install dependencies
!pip install tensorflow==2.12.0 kagglehub opencv-python-headless==4.7.0.72 scikit-learn==1.2.2 transformers==4.30.2 streamlit==1.22.0

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Import libraries
import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, TFBertModel
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Download FashionGen dataset
try:
    import kagglehub
    path = kagglehub.dataset_download("bothin/fashiongen-validation")
    data_dir = "/content/bothin/fashiongen-validation"
    logger.info(f"Dataset downloaded to: {data_dir}")
except Exception as e:
    logger.error(f"Dataset download failed: {str(e)}")
    raise

# 1. Enhanced GAN Architecture
def build_generator(text_embed_dim=768, latent_dim=100):
    """Builds a DCGAN-based generator with text conditioning"""
    noise_input = tf.keras.Input(shape=(latent_dim,))
    text_input = tf.keras.Input(shape=(text_embed_dim,))

    # Concatenate and project to initial dense layer
    x = tf.keras.layers.concatenate([noise_input, text_input])
    x = tf.keras.layers.Dense(4*4*512, use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Reshape((4, 4, 512))(x)

    # Upsampling blocks
    x = tf.keras.layers.Conv2DTranspose(256, (5,5), strides=(2,2), padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    x = tf.keras.layers.Conv2DTranspose(128, (5,5), strides=(2,2), padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    x = tf.keras.layers.Conv2DTranspose(64, (5,5), strides=(2,2), padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    # Final output layer
    x = tf.keras.layers.Conv2DTranspose(3, (5,5), strides=(2,2), padding='same', activation='tanh')(x)

    return tf.keras.Model(inputs=[noise_input, text_input], outputs=x)

def build_discriminator(text_embed_dim=768):
    """Builds a PatchGAN discriminator with text conditioning"""
    image_input = tf.keras.Input(shape=(128, 128, 3))
    text_input = tf.keras.Input(shape=(text_embed_dim,))

    # Text processing pathway
    text = tf.keras.layers.Dense(512)(text_input)
    text = tf.keras.layers.LeakyReLU(0.2)(text)
    text = tf.keras.layers.Dense(128 * 128)(text)
    text = tf.keras.layers.Reshape((128, 128, 1))(text)

    # Image processing pathway
    img = tf.keras.layers.Conv2D(64, (5,5), strides=(2,2), padding='same')(image_input)
    img = tf.keras.layers.LeakyReLU(0.2)(img)

    # Concatenate text and image features
    combined = tf.keras.layers.concatenate([img, text])

    # Downsampling blocks
    x = tf.keras.layers.Conv2D(128, (5,5), strides=(2,2), padding='same')(combined)
    x = tf.keras.layers.LeakyReLU(0.2)(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    x = tf.keras.layers.Conv2D(256, (5,5), strides=(2,2), padding='same')(x)
    x = tf.keras.layers.LeakyReLU(0.2)(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    # Output layer
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    return tf.keras.Model(inputs=[image_input, text_input], outputs=x)

# 2. Improved Text Processing
class TextProcessor:
    def __init__(self):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.model = TFBertModel.from_pretrained('bert-base-uncased')

    def text_to_embedding(self, text):
        """Converts text to BERT embeddings with error handling"""
        try:
            inputs = self.tokenizer(text, return_tensors='tf', padding=True, truncation=True, max_length=128)
            outputs = self.model(inputs)
            return tf.reduce_mean(outputs.last_hidden_state, axis=1)
        except Exception as e:
            logger.error(f"Text processing failed: {str(e)}")
            return None

text_processor = TextProcessor()

# 3. Robust Data Pipeline
class DataLoader:
    def __init__(self, data_dir, img_size=(128, 128)):
        self.data_dir = data_dir
        self.img_size = img_size

    def _load_pair(self, img_path, txt_path):
        """Loads and validates a single image-text pair"""
        try:
            # Load image
            img = cv2.imread(img_path)
            if img is None:
                raise ValueError(f"Failed to read image: {img_path}")
            img = cv2.resize(img, self.img_size)
            img = (img.astype(np.float32) / 127.5) - 1.0  # Normalize to [-1, 1]

            # Load text
            with open(txt_path, 'r') as f:
                text = f.read().strip()
                if not text:
                    raise ValueError(f"Empty text file: {txt_path}")

            return img, text
        except Exception as e:
            logger.warning(f"Skipping invalid pair {img_path}: {str(e)}")
            return None, None

    def load_dataset(self):
        """Loads and validates the entire dataset"""
        images = []
        texts = []

        for root, _, files in os.walk(self.data_dir):
            for file in files:
                if file.endswith(".jpg"):
                    img_path = os.path.join(root, file)
                    txt_path = os.path.join(root, file.replace(".jpg", ".txt"))

                    if os.path.exists(txt_path):
                        img, text = self._load_pair(img_path, txt_path)
                        if img is not None and text is not None:
                            images.append(img)
                            texts.append(text)

        logger.info(f"Successfully loaded {len(images)} valid pairs")
        return np.array(images), np.array(texts)

# Load and prepare data
try:
    loader = DataLoader(data_dir)
    images, descriptions = loader.load_dataset()

    # Generate text embeddings
    text_embeddings = []
    valid_indices = []
    for idx, desc in enumerate(descriptions):
        embedding = text_processor.text_to_embedding(desc)
        if embedding is not None:
            text_embeddings.append(embedding.numpy()[0])
            valid_indices.append(idx)

    # Filter out failed embeddings
    images = images[valid_indices]
    text_embeddings = np.array(text_embeddings)
    logger.info(f"Final dataset shape: Images {images.shape}, Embeddings {text_embeddings.shape}")

    # Split dataset
    X_train, X_val, y_train, y_val = train_test_split(
        images, text_embeddings,
        test_size=0.2,
        random_state=42
    )
except Exception as e:
    logger.error(f"Data preparation failed: {str(e)}")
    raise

# 4. Enhanced Training System
class GANTrainer:
    def __init__(self, generator, discriminator, lr=0.0002, beta=0.5):
        self.generator = generator
        self.discriminator = discriminator
        self.gan = self._build_gan()
        self.lr = lr
        self.beta = beta

    def _build_gan(self):
        """Constructs combined GAN model"""
        self.discriminator.trainable = False
        noise_input = tf.keras.Input(shape=(100,))
        text_input = tf.keras.Input(shape=(768,))
        generated_image = self.generator([noise_input, text_input])
        validity = self.discriminator([generated_image, text_input])
        return tf.keras.Model([noise_input, text_input], validity)

    def compile_models(self):
        """Configures model optimizers and losses"""
        optimizer = tf.keras.optimizers.Adam(self.lr, self.beta)

        self.discriminator.compile(
            loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy']
        )

        self.gan.compile(
            loss='binary_crossentropy',
            optimizer=optimizer
        )

    def train(self, X_train, y_train, epochs=100, batch_size=32, save_interval=10):
        """Enhanced training loop with progress tracking"""
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):
            # Select random batch
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            real_images = X_train[idx]
            real_text = y_train[idx]

            # Generate fake images
            noise = np.random.normal(0, 1, (batch_size, 100))
            gen_images = self.generator.predict([noise, real_text])

            # Train discriminator
            d_loss_real = self.discriminator.train_on_batch([real_images, real_text], valid)
            d_loss_fake = self.discriminator.train_on_batch([gen_images, real_text], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # Train generator
            noise = np.random.normal(0, 1, (batch_size, 100))
            g_loss = self.gan.train_on_batch([noise, real_text], valid)

            # Save progress
            if epoch % save_interval == 0:
                self._save_weights(epoch)
                self._log_progress(epoch, d_loss, g_loss)
                self._generate_sample(epoch)

        self._save_weights('final')

    def _save_weights(self, epoch):
        """Saves model weights with epoch information"""
        self.generator.save_weights(f'/content/drive/MyDrive/generator_{epoch}.h5')
        self.discriminator.save_weights(f'/content/drive/MyDrive/discriminator_{epoch}.h5')

    def _log_progress(self, epoch, d_loss, g_loss):
        """Logs training metrics"""
        logger.info(
            f"Epoch {epoch} [D loss: {d_loss[0]:.4f}, acc: {100*d_loss[1]:.2f}%] "
            f"[G loss: {g_loss:.4f}]"
        )

    def _generate_sample(self, epoch):
        """Generates and saves sample images"""
        noise = np.random.normal(0, 1, (1, 100))
        sample_text = y_train[np.random.randint(0, y_train.shape[0])]
        gen_image = self.generator.predict([noise, sample_text.reshape(1, -1)])
        gen_image = (gen_image[0] * 127.5 + 127.5).astype(np.uint8)
        cv2.imwrite(f'sample_epoch_{epoch}.png', gen_image)

# Initialize and train
try:
    generator = build_generator()
    discriminator = build_discriminator()

    trainer = GANTrainer(generator, discriminator)
    trainer.compile_models()

    logger.info("Starting training...")
    trainer.train(X_train, y_train, epochs=100, batch_size=32)
except Exception as e:
    logger.error(f"Training failed: {str(e)}")
    raise

# 5. Production-Ready Streamlit App
%%writefile app.py
import streamlit as st
import numpy as np
import tensorflow as tf
from PIL import Image

# Configuration
MODEL_PATH = '/content/drive/MyDrive/generator_final.h5'
IMG_SIZE = (128, 128)

# Load models
@st.cache_resource
def load_generator():
    generator = build_generator()
    generator.load_weights(MODEL_PATH)
    return generator

generator = load_generator()

# App interface
st.title("🛍️ FashionGen AI")
st.markdown("Generate fashion items from text descriptions using AI")

# Input section
text_input = st.text_input("Describe your fashion item:",
                         placeholder="e.g., 'A red floral summer dress with puff sleeves'")

# Generation controls
col1, col2 = st.columns(2)
with col1:
    num_images = st.selectbox("Number of images", [1, 2, 4], index=0)
with col2:
    noise_level = st.slider("Creativity level", 0.0, 1.0, 0.5)

# Generate button
if st.button("✨ Generate Fashion"):
    if text_input:
        with st.spinner("Generating your fashion items..."):
            try:
                # Process text
                text_embed = text_processor.text_to_embedding(text_input)
                if text_embed is None:
                    st.error("Failed to process text description")
                    raise

                # Generate images
                noise = np.random.normal(0, noise_level, (num_images, 100))
                generated = generator.predict([noise, text_embed.numpy()])

                # Display results
                cols = st.columns(num_images)
                for i in range(num_images):
                    img = (generated[i] * 127.5 + 127.5).astype(np.uint8)
                    cols[i].image(img, caption=f"Variation {i+1}", use_column_width=True)

            except Exception as e:
                st.error(f"Generation failed: {str(e)}")
    else:
        st.warning("Please enter a description")

# Run the app
!npm install localtunnel
!streamlit run app.py --server.port 8501 &>/content/logs.txt &
!npx localtunnel --port 8501



MessageError: Error: credential propagation was unsuccessful

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

MessageError: Error: credential propagation was unsuccessful