# 🔥 TensorFlow & Keras Deep Learning

This notebook covers advanced TensorFlow and Keras implementations commonly asked in deep learning engineering interviews.

## 📋 Table of Contents
1. [Custom Layers and Models](#custom-layers-models)
2. [Advanced CNN Architectures](#cnn-architectures)
3. [Transfer Learning and Fine-tuning](#transfer-learning)
4. [Custom Training Loops](#custom-training)
5. [Model Optimization and Deployment](#optimization-deployment)
6. [Practice Problems](#practice-problems)
7. [Interview Tips](#interview-tips)

In [None]:
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# TensorFlow and Keras (with fallback for environments without GPU)
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers, models, optimizers, callbacks
    from tensorflow.keras.applications import VGG16, ResNet50, MobileNetV2
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    print(f"✅ TensorFlow version: {tf.__version__}")
    TF_AVAILABLE = True
except ImportError:
    print("⚠️ TensorFlow not available. Using NumPy implementations.")
    TF_AVAILABLE = False

# Standard libraries
from sklearn.datasets import load_digits, fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler, LabelEncoder
import time
import warnings
warnings.filterwarnings('ignore')

# Set up plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
np.random.seed(42)

if TF_AVAILABLE:
    tf.random.set_seed(42)
    # Configure GPU if available
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        print(f"🚀 GPU available: {len(gpus)} GPU(s)")
    else:
        print("🖥️ Using CPU")

print("📊 All libraries imported successfully!")
print("🔥 Ready for TensorFlow/Keras implementations!")

## 🧱 Problem 1: Custom Layers and Models

**Problem Statement**: Implement custom Keras layers and models with advanced functionality.

**Requirements**:
- Custom layer with trainable parameters
- Model subclassing with custom forward pass
- Custom loss functions and metrics
- Advanced regularization techniques
- Model serialization and loading

**Key Concepts**: Layer subclassing, model subclassing, custom training, regularization

In [None]:
if TF_AVAILABLE:
    # Custom Dense Layer with additional features
    class CustomDenseLayer(layers.Layer):
        """Custom dense layer with advanced features."""
        
        def __init__(self, units, activation=None, use_bias=True, 
                     dropout_rate=0.0, l2_reg=0.0, **kwargs):
            super().__init__(**kwargs)
            self.units = units
            self.activation = keras.activations.get(activation)
            self.use_bias = use_bias
            self.dropout_rate = dropout_rate
            self.l2_reg = l2_reg
        
        def build(self, input_shape):
            """Create the layer's parameters."""
            # Weight matrix
            self.kernel = self.add_weight(
                name='kernel',
                shape=(input_shape[-1], self.units),
                initializer='glorot_uniform',
                regularizer=keras.regularizers.l2(self.l2_reg) if self.l2_reg > 0 else None,
                trainable=True
            )
            
            # Bias vector
            if self.use_bias:
                self.bias = self.add_weight(
                    name='bias',
                    shape=(self.units,),
                    initializer='zeros',
                    trainable=True
                )
            
            # Dropout layer
            if self.dropout_rate > 0:
                self.dropout = layers.Dropout(self.dropout_rate)
            
            super().build(input_shape)
        
        def call(self, inputs, training=None):
            """Forward pass through the layer."""
            # Linear transformation
            outputs = tf.matmul(inputs, self.kernel)
            
            if self.use_bias:
                outputs = tf.nn.bias_add(outputs, self.bias)
            
            # Apply activation
            if self.activation is not None:
                outputs = self.activation(outputs)
            
            # Apply dropout during training
            if self.dropout_rate > 0 and training:
                outputs = self.dropout(outputs, training=training)
            
            return outputs
        
        def get_config(self):
            """Return layer configuration for serialization."""
            config = super().get_config()
            config.update({
                'units': self.units,
                'activation': keras.activations.serialize(self.activation),
                'use_bias': self.use_bias,
                'dropout_rate': self.dropout_rate,
                'l2_reg': self.l2_reg
            })
            return config
    
    # Custom Attention Layer
    class MultiHeadAttention(layers.Layer):
        """Multi-head attention mechanism."""
        
        def __init__(self, num_heads, key_dim, **kwargs):
            super().__init__(**kwargs)
            self.num_heads = num_heads
            self.key_dim = key_dim
            self.depth = key_dim // num_heads
        
        def build(self, input_shape):
            self.wq = layers.Dense(self.key_dim)
            self.wk = layers.Dense(self.key_dim)
            self.wv = layers.Dense(self.key_dim)
            self.dense = layers.Dense(input_shape[-1])
            super().build(input_shape)
        
        def split_heads(self, x, batch_size):
            x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
            return tf.transpose(x, perm=[0, 2, 1, 3])
        
        def call(self, inputs):
            batch_size = tf.shape(inputs)[0]
            
            q = self.split_heads(self.wq(inputs), batch_size)
            k = self.split_heads(self.wk(inputs), batch_size)
            v = self.split_heads(self.wv(inputs), batch_size)
            
            # Scaled dot-product attention
            matmul_qk = tf.matmul(q, k, transpose_b=True)
            dk = tf.cast(tf.shape(k)[-1], tf.float32)
            scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
            attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
            
            output = tf.matmul(attention_weights, v)
            output = tf.transpose(output, perm=[0, 2, 1, 3])
            concat_attention = tf.reshape(output, (batch_size, -1, self.key_dim))
            
            return self.dense(concat_attention)
    
    # Custom Model with advanced architecture
    class AdvancedClassifier(keras.Model):
        """Custom model with advanced architecture."""
        
        def __init__(self, num_classes, use_attention=False, **kwargs):
            super().__init__(**kwargs)
            self.num_classes = num_classes
            self.use_attention = use_attention
            
            # Input processing
            self.input_norm = layers.LayerNormalization()
            
            # Feature extraction layers
            self.dense1 = CustomDenseLayer(128, activation='relu', dropout_rate=0.3, l2_reg=0.01)
            self.dense2 = CustomDenseLayer(64, activation='relu', dropout_rate=0.2, l2_reg=0.01)
            
            # Optional attention mechanism
            if use_attention:
                self.reshape_for_attention = layers.Reshape((-1, 64))
                self.attention = MultiHeadAttention(num_heads=4, key_dim=64)
                self.global_pool = layers.GlobalAveragePooling1D()
            
            # Output layers
            self.dropout = layers.Dropout(0.5)
            self.classifier = layers.Dense(num_classes, activation='softmax')
        
        def call(self, inputs, training=None):
            # Normalize inputs
            x = self.input_norm(inputs)
            
            # Feature extraction
            x = self.dense1(x, training=training)
            x = self.dense2(x, training=training)
            
            # Optional attention mechanism
            if self.use_attention:
                x = self.reshape_for_attention(x)
                x = self.attention(x)
                x = self.global_pool(x)
            
            # Final classification
            x = self.dropout(x, training=training)
            return self.classifier(x)
        
        def model_summary(self):
            """Custom summary method."""
            x = keras.Input(shape=(self.input_spec.shape[1],))
            model = keras.Model(inputs=[x], outputs=self.call(x))
            return model.summary()
    
    # Custom Loss Functions
    class FocalLoss(keras.losses.Loss):
        """Focal loss for handling class imbalance."""
        
        def __init__(self, alpha=0.25, gamma=2.0, **kwargs):
            super().__init__(**kwargs)
            self.alpha = alpha
            self.gamma = gamma
        
        def call(self, y_true, y_pred):
            # Compute cross entropy
            ce_loss = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=False)
            
            # Compute p_t
            y_true = tf.cast(y_true, tf.int32)
            p_t = tf.gather(y_pred, y_true, batch_dims=1)
            
            # Compute focal weight
            alpha_t = self.alpha
            focal_weight = alpha_t * tf.pow(1 - p_t, self.gamma)
            
            return focal_weight * ce_loss
    
    # Custom Metrics
    class TopKAccuracy(keras.metrics.Metric):
        """Top-K accuracy metric."""
        
        def __init__(self, k=3, name='top_k_accuracy', **kwargs):
            super().__init__(name=name, **kwargs)
            self.k = k
            self.total = self.add_weight(name='total', initializer='zeros')
            self.count = self.add_weight(name='count', initializer='zeros')
        
        def update_state(self, y_true, y_pred, sample_weight=None):
            y_true = tf.cast(y_true, tf.int32)
            top_k_pred = tf.nn.top_k(y_pred, k=self.k).indices
            matches = tf.reduce_any(tf.equal(tf.expand_dims(y_true, -1), top_k_pred), axis=-1)
            
            self.total.assign_add(tf.reduce_sum(tf.cast(matches, tf.float32)))
            self.count.assign_add(tf.cast(tf.shape(y_true)[0], tf.float32))
        
        def result(self):
            return self.total / self.count
        
        def reset_state(self):
            self.total.assign(0.)
            self.count.assign(0.)
    
    print("🧱 Custom layers and models defined successfully!")
    
else:
    print("⚠️ TensorFlow not available - skipping custom layer implementations")
    print("💡 In interview: Explain the concepts and show NumPy equivalents")

In [None]:
if TF_AVAILABLE:
    # Test Custom Layers and Models
    print("🧪 Testing Custom Layers and Models:")
    
    # Load and prepare data
    (X_train, y_train), (X_test, y_test) = keras.datasets.cifar10.load_data()
    
    # Use subset for faster training in demo
    X_train = X_train[:5000].astype('float32') / 255.0
    X_test = X_test[:1000].astype('float32') / 255.0
    y_train = y_train[:5000].flatten()
    y_test = y_test[:1000].flatten()
    
    # Flatten for fully connected network
    X_train_flat = X_train.reshape(X_train.shape[0], -1)
    X_test_flat = X_test.reshape(X_test.shape[0], -1)
    
    print(f"Training data shape: {X_train_flat.shape}")
    print(f"Test data shape: {X_test_flat.shape}")
    print(f"Number of classes: {len(np.unique(y_train))}")
    
    # Create models with different configurations
    models_config = [
        {'name': 'Standard Model', 'use_attention': False},
        {'name': 'Attention Model', 'use_attention': True}
    ]
    
    model_results = []
    
    for config in models_config:
        print(f"\n=== Testing {config['name']} ===")
        
        # Create model
        model = AdvancedClassifier(
            num_classes=10,
            use_attention=config['use_attention']
        )
        
        # Build model by calling it once
        _ = model(X_train_flat[:1])
        
        # Compile with custom loss and metrics
        model.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss=FocalLoss(alpha=0.25, gamma=2.0),
            metrics=[
                'accuracy',
                TopKAccuracy(k=3),
                keras.metrics.SparseCategoricalAccuracy(name='sparse_acc')
            ]
        )
        
        print(f"Model parameters: {model.count_params():,}")
        
        # Define callbacks
        early_stopping = callbacks.EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )
        
        reduce_lr = callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-7
        )
        
        # Train model
        start_time = time.time()
        history = model.fit(
            X_train_flat, y_train,
            epochs=20,
            batch_size=64,
            validation_data=(X_test_flat, y_test),
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )
        training_time = time.time() - start_time
        
        # Evaluate model
        test_results = model.evaluate(X_test_flat, y_test, verbose=0)
        
        # Make predictions
        y_pred_proba = model.predict(X_test_flat, verbose=0)
        y_pred = np.argmax(y_pred_proba, axis=1)
        
        model_results.append({
            'name': config['name'],
            'model': model,
            'history': history,
            'test_results': dict(zip(model.metrics_names, test_results)),
            'training_time': training_time,
            'predictions': y_pred,
            'probabilities': y_pred_proba,
            'epochs_trained': len(history.history['loss'])
        })
        
        print(f"Test Results: {dict(zip(model.metrics_names, test_results))}")
        print(f"Training time: {training_time:.2f}s")
        print(f"Epochs trained: {len(history.history['loss'])}")
    
    # Test model serialization
    print("\n🔄 Testing Model Serialization:")
    best_model = model_results[0]['model']
    
    # Save model
    best_model.save_weights('custom_model_weights.h5')
    print("✅ Model weights saved")
    
    # Create new model and load weights
    new_model = AdvancedClassifier(num_classes=10, use_attention=False)
    _ = new_model(X_test_flat[:1])  # Build model
    new_model.load_weights('custom_model_weights.h5')
    print("✅ Model weights loaded")
    
    # Verify predictions match
    original_pred = best_model.predict(X_test_flat[:5], verbose=0)
    loaded_pred = new_model.predict(X_test_flat[:5], verbose=0)
    
    if np.allclose(original_pred, loaded_pred, rtol=1e-6):
        print("✅ Model serialization successful - predictions match")
    else:
        print("❌ Model serialization failed - predictions don't match")
    
    print("\n✅ Custom models testing completed!")

else:
    print("⚠️ Skipping model testing - TensorFlow not available")
    model_results = []
    print("💡 In interview: Discuss model architecture and training strategies")

## 🏗️ Problem 2: Advanced CNN Architectures

**Problem Statement**: Implement modern CNN architectures with advanced techniques.

**Requirements**:
- ResNet-style skip connections
- Depthwise separable convolutions
- Squeeze-and-excitation blocks
- Progressive resizing and mixed precision
- Model ensemble techniques

**Key Concepts**: Skip connections, efficient convolutions, attention mechanisms, model optimization

In [None]:
if TF_AVAILABLE:
    # Residual Block
    class ResidualBlock(layers.Layer):
        """Residual block with skip connections."""
        
        def __init__(self, filters, kernel_size=3, strides=1, **kwargs):
            super().__init__(**kwargs)
            self.filters = filters
            self.kernel_size = kernel_size
            self.strides = strides
        
        def build(self, input_shape):
            # Main path
            self.conv1 = layers.Conv2D(self.filters, self.kernel_size, 
                                     strides=self.strides, padding='same')
            self.bn1 = layers.BatchNormalization()
            self.conv2 = layers.Conv2D(self.filters, self.kernel_size, padding='same')
            self.bn2 = layers.BatchNormalization()
            
            # Skip connection
            if self.strides != 1 or input_shape[-1] != self.filters:
                self.shortcut_conv = layers.Conv2D(self.filters, 1, strides=self.strides)
                self.shortcut_bn = layers.BatchNormalization()
            else:
                self.shortcut_conv = None
            
            super().build(input_shape)
        
        def call(self, inputs, training=None):
            # Main path
            x = self.conv1(inputs)
            x = self.bn1(x, training=training)
            x = tf.nn.relu(x)
            x = self.conv2(x)
            x = self.bn2(x, training=training)
            
            # Skip connection
            if self.shortcut_conv is not None:
                shortcut = self.shortcut_conv(inputs)
                shortcut = self.shortcut_bn(shortcut, training=training)
            else:
                shortcut = inputs
            
            # Add skip connection and apply ReLU
            output = tf.nn.relu(x + shortcut)
            return output
    
    # Squeeze-and-Excitation Block
    class SEBlock(layers.Layer):
        """Squeeze-and-Excitation block for channel attention."""
        
        def __init__(self, ratio=16, **kwargs):
            super().__init__(**kwargs)
            self.ratio = ratio
        
        def build(self, input_shape):
            channels = input_shape[-1]
            self.global_pool = layers.GlobalAveragePooling2D()
            self.dense1 = layers.Dense(channels // self.ratio, activation='relu')
            self.dense2 = layers.Dense(channels, activation='sigmoid')
            self.reshape = layers.Reshape((1, 1, channels))
            super().build(input_shape)
        
        def call(self, inputs):
            # Squeeze
            se = self.global_pool(inputs)
            
            # Excitation
            se = self.dense1(se)
            se = self.dense2(se)
            se = self.reshape(se)
            
            # Scale
            return inputs * se
    
    # Depthwise Separable Convolution Block
    class DepthwiseConvBlock(layers.Layer):
        """Depthwise separable convolution block."""
        
        def __init__(self, filters, kernel_size=3, strides=1, **kwargs):
            super().__init__(**kwargs)
            self.filters = filters
            self.kernel_size = kernel_size
            self.strides = strides
        
        def build(self, input_shape):
            # Depthwise convolution
            self.depthwise = layers.DepthwiseConv2D(
                self.kernel_size, strides=self.strides, padding='same'
            )
            self.bn1 = layers.BatchNormalization()
            
            # Pointwise convolution
            self.pointwise = layers.Conv2D(self.filters, 1)
            self.bn2 = layers.BatchNormalization()
            
            super().build(input_shape)
        
        def call(self, inputs, training=None):
            # Depthwise convolution
            x = self.depthwise(inputs)
            x = self.bn1(x, training=training)
            x = tf.nn.relu(x)
            
            # Pointwise convolution
            x = self.pointwise(x)
            x = self.bn2(x, training=training)
            x = tf.nn.relu(x)
            
            return x
    
    # Advanced CNN Model
    class AdvancedCNN(keras.Model):
        """Advanced CNN with multiple architecture components."""
        
        def __init__(self, num_classes, architecture='resnet', **kwargs):
            super().__init__(**kwargs)
            self.num_classes = num_classes
            self.architecture = architecture
            
            # Input processing
            self.input_conv = layers.Conv2D(32, 3, padding='same')
            self.input_bn = layers.BatchNormalization()
            
            # Architecture-specific layers
            if architecture == 'resnet':
                self.block1 = ResidualBlock(64, strides=2)
                self.block2 = ResidualBlock(128, strides=2)
                self.block3 = ResidualBlock(256, strides=2)
                
            elif architecture == 'mobilenet':
                self.block1 = DepthwiseConvBlock(64, strides=2)
                self.block2 = DepthwiseConvBlock(128, strides=2)
                self.block3 = DepthwiseConvBlock(256, strides=2)
                
            elif architecture == 'se_resnet':
                self.block1 = ResidualBlock(64, strides=2)
                self.se1 = SEBlock()
                self.block2 = ResidualBlock(128, strides=2)
                self.se2 = SEBlock()
                self.block3 = ResidualBlock(256, strides=2)
                self.se3 = SEBlock()
            
            # Output layers
            self.global_pool = layers.GlobalAveragePooling2D()
            self.dropout = layers.Dropout(0.5)
            self.classifier = layers.Dense(num_classes, activation='softmax')
        
        def call(self, inputs, training=None):
            # Input processing
            x = self.input_conv(inputs)
            x = self.input_bn(x, training=training)
            x = tf.nn.relu(x)
            
            # Architecture-specific forward pass
            if self.architecture == 'resnet':
                x = self.block1(x, training=training)
                x = self.block2(x, training=training)
                x = self.block3(x, training=training)
                
            elif self.architecture == 'mobilenet':
                x = self.block1(x, training=training)
                x = self.block2(x, training=training)
                x = self.block3(x, training=training)
                
            elif self.architecture == 'se_resnet':
                x = self.block1(x, training=training)
                x = self.se1(x)
                x = self.block2(x, training=training)
                x = self.se2(x)
                x = self.block3(x, training=training)
                x = self.se3(x)
            
            # Output
            x = self.global_pool(x)
            x = self.dropout(x, training=training)
            return self.classifier(x)
    
    # Model Ensemble
    class ModelEnsemble(keras.Model):
        """Ensemble of multiple models."""
        
        def __init__(self, models, weights=None, **kwargs):
            super().__init__(**kwargs)
            self.models = models
            self.weights = weights or [1.0 / len(models)] * len(models)
        
        def call(self, inputs, training=None):
            predictions = []
            for model in self.models:
                pred = model(inputs, training=training)
                predictions.append(pred)
            
            # Weighted average of predictions
            weighted_preds = [w * pred for w, pred in zip(self.weights, predictions)]
            return tf.reduce_sum(weighted_preds, axis=0)
    
    print("🏗️ Advanced CNN architectures defined successfully!")

else:
    print("⚠️ TensorFlow not available - showing architecture concepts")
    print("💡 Key concepts: ResNet skip connections, SE blocks, depthwise convolutions")

In [None]:
if TF_AVAILABLE:
    # Test Advanced CNN Architectures
    print("🧪 Testing Advanced CNN Architectures:")
    
    # Use smaller subset for faster training
    X_train_cnn = X_train[:1000]
    y_train_cnn = y_train[:1000]
    X_test_cnn = X_test[:200]
    y_test_cnn = y_test[:200]
    
    print(f"Training samples: {len(X_train_cnn)}")
    print(f"Test samples: {len(X_test_cnn)}")
    print(f"Image shape: {X_train_cnn.shape[1:]}")
    
    # Test different architectures
    architectures = ['resnet', 'mobilenet', 'se_resnet']
    cnn_results = []
    
    for arch in architectures:
        print(f"\n=== Testing {arch.upper()} Architecture ===")
        
        # Create model
        model_cnn = AdvancedCNN(num_classes=10, architecture=arch)
        
        # Build model
        _ = model_cnn(X_train_cnn[:1])
        
        # Compile model
        model_cnn.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        print(f"Model parameters: {model_cnn.count_params():,}")
        
        # Train model
        start_time = time.time()
        history_cnn = model_cnn.fit(
            X_train_cnn, y_train_cnn,
            epochs=10,
            batch_size=32,
            validation_data=(X_test_cnn, y_test_cnn),
            verbose=1
        )
        training_time = time.time() - start_time
        
        # Evaluate model
        test_loss, test_acc = model_cnn.evaluate(X_test_cnn, y_test_cnn, verbose=0)
        
        # Get predictions
        y_pred_cnn = model_cnn.predict(X_test_cnn, verbose=0)
        y_pred_classes = np.argmax(y_pred_cnn, axis=1)
        
        cnn_results.append({
            'architecture': arch,
            'model': model_cnn,
            'history': history_cnn,
            'test_accuracy': test_acc,
            'test_loss': test_loss,
            'training_time': training_time,
            'parameters': model_cnn.count_params(),
            'predictions': y_pred_classes
        })
        
        print(f"Test Accuracy: {test_acc:.4f}")
        print(f"Test Loss: {test_loss:.4f}")
        print(f"Training Time: {training_time:.2f}s")
    
    # Create and test ensemble
    print("\n=== Testing Model Ensemble ===")
    
    # Use top 2 models for ensemble
    sorted_results = sorted(cnn_results, key=lambda x: x['test_accuracy'], reverse=True)
    ensemble_models = [result['model'] for result in sorted_results[:2]]
    
    # Create ensemble
    ensemble = ModelEnsemble(ensemble_models, weights=[0.6, 0.4])
    
    # Test ensemble
    ensemble_pred = ensemble.predict(X_test_cnn, verbose=0)
    ensemble_classes = np.argmax(ensemble_pred, axis=1)
    ensemble_accuracy = accuracy_score(y_test_cnn, ensemble_classes)
    
    print(f"Ensemble Accuracy: {ensemble_accuracy:.4f}")
    print(f"Best Single Model: {sorted_results[0]['test_accuracy']:.4f}")
    print(f"Ensemble Improvement: {ensemble_accuracy - sorted_results[0]['test_accuracy']:.4f}")
    
    # Test mixed precision training (if supported)
    try:
        print("\n=== Testing Mixed Precision Training ===")
        
        # Enable mixed precision
        tf.keras.mixed_precision.set_global_policy('mixed_float16')
        
        # Create model for mixed precision
        model_mp = AdvancedCNN(num_classes=10, architecture='resnet')
        _ = model_mp(X_train_cnn[:1])
        
        # Compile with loss scaling
        model_mp.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        # Train for a few epochs
        start_time = time.time()
        history_mp = model_mp.fit(
            X_train_cnn[:500], y_train_cnn[:500],
            epochs=3,
            batch_size=32,
            verbose=1
        )
        mp_training_time = time.time() - start_time
        
        # Reset to default policy
        tf.keras.mixed_precision.set_global_policy('float32')
        
        print(f"Mixed precision training completed in {mp_training_time:.2f}s")
        print("✅ Mixed precision training successful")
        
    except Exception as e:
        print(f"⚠️ Mixed precision not supported: {e}")
        tf.keras.mixed_precision.set_global_policy('float32')
    
    print("\n✅ Advanced CNN architecture testing completed!")

else:
    print("⚠️ Skipping CNN testing - TensorFlow not available")
    cnn_results = []

## 🔄 Problem 3: Transfer Learning and Fine-tuning

**Problem Statement**: Implement transfer learning strategies with pre-trained models.

**Requirements**:
- Load and modify pre-trained models
- Feature extraction vs fine-tuning
- Layer freezing strategies
- Progressive unfreezing
- Domain adaptation techniques

**Key Concepts**: Pre-trained models, feature extraction, gradual unfreezing, learning rate scheduling

In [None]:
if TF_AVAILABLE:
    # Transfer Learning Helper Class
    class TransferLearningModel:
        """Helper class for transfer learning experiments."""
        
        def __init__(self, base_model_name='resnet50', num_classes=10, input_shape=(32, 32, 3)):
            self.base_model_name = base_model_name
            self.num_classes = num_classes
            self.input_shape = input_shape
            self.model = None
            self.base_model = None
        
        def create_model(self, strategy='feature_extraction', trainable_layers=0):
            """Create transfer learning model."""
            # Load pre-trained base model
            if self.base_model_name == 'resnet50':
                self.base_model = ResNet50(
                    weights='imagenet',
                    include_top=False,
                    input_shape=self.input_shape
                )
            elif self.base_model_name == 'vgg16':
                self.base_model = VGG16(
                    weights='imagenet',
                    include_top=False,
                    input_shape=self.input_shape
                )
            elif self.base_model_name == 'mobilenet':
                self.base_model = MobileNetV2(
                    weights='imagenet',
                    include_top=False,
                    input_shape=self.input_shape
                )
            
            # Set trainable layers based on strategy
            if strategy == 'feature_extraction':
                self.base_model.trainable = False
            elif strategy == 'fine_tuning':
                self.base_model.trainable = True
            elif strategy == 'progressive':
                # Freeze all layers initially
                self.base_model.trainable = True
                for layer in self.base_model.layers[:-trainable_layers]:
                    layer.trainable = False
            
            # Add custom head
            inputs = keras.Input(shape=self.input_shape)
            
            # Data augmentation (only during training)
            x = layers.RandomFlip('horizontal')(inputs)
            x = layers.RandomRotation(0.1)(x)
            
            # Base model
            x = self.base_model(x, training=False if strategy == 'feature_extraction' else None)
            
            # Custom head
            x = layers.GlobalAveragePooling2D()(x)
            x = layers.Dropout(0.3)(x)
            x = layers.Dense(128, activation='relu')(x)
            x = layers.Dropout(0.5)(x)
            outputs = layers.Dense(self.num_classes, activation='softmax')(x)
            
            self.model = keras.Model(inputs, outputs)
            return self.model
        
        def progressive_unfreeze(self, step):
            """Progressively unfreeze layers."""
            if self.base_model is None:
                return
            
            total_layers = len(self.base_model.layers)
            layers_to_unfreeze = min(step * 5, total_layers)  # Unfreeze 5 layers at a time
            
            for i, layer in enumerate(self.base_model.layers):
                if i >= total_layers - layers_to_unfreeze:
                    layer.trainable = True
                else:
                    layer.trainable = False
            
            print(f"Unfrozen last {layers_to_unfreeze} layers out of {total_layers}")
        
        def get_trainable_params(self):
            """Get number of trainable parameters."""
            if self.model is None:
                return 0
            return sum([tf.size(w).numpy() for w in self.model.trainable_weights])
    
    # Custom Learning Rate Scheduler
    class WarmupCosineDecay(keras.optimizers.schedules.LearningRateSchedule):
        """Warmup followed by cosine decay schedule."""
        
        def __init__(self, initial_learning_rate, decay_steps, warmup_steps=1000, alpha=0.0):
            super().__init__()
            self.initial_learning_rate = initial_learning_rate
            self.decay_steps = decay_steps
            self.warmup_steps = warmup_steps
            self.alpha = alpha
        
        def __call__(self, step):
            if step < self.warmup_steps:
                # Linear warmup
                return self.initial_learning_rate * step / self.warmup_steps
            else:
                # Cosine decay
                step = tf.cast(step - self.warmup_steps, tf.float32)
                decay_steps = tf.cast(self.decay_steps - self.warmup_steps, tf.float32)
                cosine_decay = 0.5 * (1 + tf.cos(np.pi * step / decay_steps))
                return self.initial_learning_rate * cosine_decay + self.alpha
    
    print("🔄 Transfer learning components defined successfully!")

else:
    print("⚠️ TensorFlow not available - showing transfer learning concepts")
    print("💡 Key concepts: Feature extraction, fine-tuning, progressive unfreezing")

In [None]:
if TF_AVAILABLE:
    # Test Transfer Learning
    print("🧪 Testing Transfer Learning Strategies:")
    
    # Use even smaller subset for transfer learning demo
    X_train_tl = X_train[:500]
    y_train_tl = y_train[:500]
    X_test_tl = X_test[:100]
    y_test_tl = y_test[:100]
    
    print(f"Transfer learning - Training samples: {len(X_train_tl)}")
    print(f"Transfer learning - Test samples: {len(X_test_tl)}")
    
    # Test different transfer learning strategies
    tl_strategies = [
        {'name': 'Feature Extraction', 'strategy': 'feature_extraction'},
        {'name': 'Fine Tuning', 'strategy': 'fine_tuning'},
        {'name': 'Progressive', 'strategy': 'progressive', 'trainable_layers': 10}
    ]
    
    tl_results = []
    
    for config in tl_strategies:
        try:
            print(f"\n=== Testing {config['name']} Strategy ===")
            
            # Create transfer learning model
            tl_model = TransferLearningModel(
                base_model_name='mobilenet',  # Use MobileNet for faster training
                num_classes=10,
                input_shape=(32, 32, 3)
            )
            
            model = tl_model.create_model(
                strategy=config['strategy'],
                trainable_layers=config.get('trainable_layers', 0)
            )
            
            trainable_params = tl_model.get_trainable_params()
            total_params = model.count_params()
            
            print(f"Total parameters: {total_params:,}")
            print(f"Trainable parameters: {trainable_params:,}")
            print(f"Frozen parameters: {total_params - trainable_params:,}")
            
            # Set learning rate based on strategy
            if config['strategy'] == 'feature_extraction':
                learning_rate = 0.001
            else:
                learning_rate = 0.0001  # Lower LR for fine-tuning
            
            # Use custom learning rate schedule
            lr_schedule = WarmupCosineDecay(
                initial_learning_rate=learning_rate,
                decay_steps=1000,
                warmup_steps=100
            )
            
            # Compile model
            model.compile(
                optimizer=optimizers.Adam(learning_rate=lr_schedule),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy']
            )
            
            # Train model
            start_time = time.time()
            
            if config['strategy'] == 'progressive':
                # Progressive unfreezing training
                histories = []
                for step in range(1, 3):  # 2 steps for demo
                    print(f"\nProgressive step {step}:")
                    tl_model.progressive_unfreeze(step)
                    
                    # Recompile with potentially different learning rate
                    current_lr = learning_rate / (2 ** (step - 1))  # Reduce LR each step
                    model.compile(
                        optimizer=optimizers.Adam(learning_rate=current_lr),
                        loss='sparse_categorical_crossentropy',
                        metrics=['accuracy']
                    )
                    
                    history = model.fit(
                        X_train_tl, y_train_tl,
                        epochs=3,
                        batch_size=16,
                        validation_data=(X_test_tl, y_test_tl),
                        verbose=1
                    )
                    histories.append(history)
                
                # Combine histories
                combined_history = {
                    'loss': [],
                    'accuracy': [],
                    'val_loss': [],
                    'val_accuracy': []
                }
                for h in histories:
                    for key in combined_history.keys():
                        combined_history[key].extend(h.history[key])
                
                # Create mock history object
                class MockHistory:
                    def __init__(self, history_dict):
                        self.history = history_dict
                
                history_tl = MockHistory(combined_history)
                
            else:
                # Standard training
                history_tl = model.fit(
                    X_train_tl, y_train_tl,
                    epochs=5,
                    batch_size=16,
                    validation_data=(X_test_tl, y_test_tl),
                    verbose=1
                )
            
            training_time = time.time() - start_time
            
            # Evaluate model
            test_loss, test_acc = model.evaluate(X_test_tl, y_test_tl, verbose=0)
            
            tl_results.append({
                'name': config['name'],
                'strategy': config['strategy'],
                'model': model,
                'history': history_tl,
                'test_accuracy': test_acc,
                'test_loss': test_loss,
                'training_time': training_time,
                'trainable_params': trainable_params,
                'total_params': total_params
            })
            
            print(f"Test Accuracy: {test_acc:.4f}")
            print(f"Training Time: {training_time:.2f}s")
            
        except Exception as e:
            print(f"❌ Error with {config['name']}: {str(e)}")
            print("💡 In real scenario, ensure proper model loading and GPU memory")
    
    # Compare with training from scratch
    try:
        print("\n=== Training from Scratch (Baseline) ===")
        
        # Simple CNN from scratch
        scratch_model = keras.Sequential([
            layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
            layers.MaxPooling2D(),
            layers.Conv2D(64, 3, activation='relu'),
            layers.MaxPooling2D(),
            layers.Conv2D(128, 3, activation='relu'),
            layers.GlobalAveragePooling2D(),
            layers.Dropout(0.5),
            layers.Dense(10, activation='softmax')
        ])
        
        scratch_model.compile(
            optimizer=optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        print(f"Scratch model parameters: {scratch_model.count_params():,}")
        
        # Train scratch model
        start_time = time.time()
        scratch_history = scratch_model.fit(
            X_train_tl, y_train_tl,
            epochs=10,
            batch_size=16,
            validation_data=(X_test_tl, y_test_tl),
            verbose=1
        )
        scratch_time = time.time() - start_time
        
        scratch_loss, scratch_acc = scratch_model.evaluate(X_test_tl, y_test_tl, verbose=0)
        
        print(f"Scratch model accuracy: {scratch_acc:.4f}")
        print(f"Training time: {scratch_time:.2f}s")
        
    except Exception as e:
        print(f"❌ Error with scratch model: {str(e)}")
        scratch_acc = 0.0
    
    print("\n✅ Transfer learning testing completed!")

else:
    print("⚠️ Skipping transfer learning tests - TensorFlow not available")
    tl_results = []
    scratch_acc = 0.0

## ⚙️ Problem 4: Custom Training Loops

**Problem Statement**: Implement custom training loops with advanced techniques.

**Requirements**:
- GradientTape for custom gradients
- Custom training step with multiple losses
- Gradient clipping and accumulation
- Advanced regularization during training
- Distributed training setup

**Key Concepts**: GradientTape, custom training loops, gradient manipulation, distributed strategies

In [None]:
if TF_AVAILABLE:
    # Custom Training Loop Implementation
    class CustomTrainer:
        """Custom training loop with advanced features."""
        
        def __init__(self, model, optimizer, loss_fn, metrics=None):
            self.model = model
            self.optimizer = optimizer
            self.loss_fn = loss_fn
            self.metrics = metrics or []
            
            # Training state
            self.train_loss = keras.metrics.Mean(name='train_loss')
            self.train_metrics = [keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')]
            
            self.val_loss = keras.metrics.Mean(name='val_loss')
            self.val_metrics = [keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')]
            
            # History tracking
            self.history = {
                'loss': [],
                'accuracy': [],
                'val_loss': [],
                'val_accuracy': []
            }
        
        @tf.function
        def train_step(self, x, y, gradient_accumulation_steps=1, clip_norm=None):
            """Custom training step with gradient accumulation."""
            
            accumulated_gradients = []
            
            for i in range(gradient_accumulation_steps):
                # Calculate batch indices for accumulation
                batch_size = tf.shape(x)[0] // gradient_accumulation_steps
                start_idx = i * batch_size
                end_idx = start_idx + batch_size
                
                x_batch = x[start_idx:end_idx]
                y_batch = y[start_idx:end_idx]
                
                with tf.GradientTape() as tape:
                    # Forward pass
                    predictions = self.model(x_batch, training=True)
                    
                    # Primary loss
                    primary_loss = self.loss_fn(y_batch, predictions)
                    
                    # Add regularization losses
                    reg_losses = self.model.losses
                    total_loss = primary_loss + tf.add_n(reg_losses) if reg_losses else primary_loss
                    
                    # Scale loss for gradient accumulation
                    scaled_loss = total_loss / gradient_accumulation_steps
                
                # Compute gradients
                gradients = tape.gradient(scaled_loss, self.model.trainable_variables)
                
                # Accumulate gradients
                if i == 0:
                    accumulated_gradients = gradients
                else:
                    accumulated_gradients = [
                        acc_grad + grad for acc_grad, grad in zip(accumulated_gradients, gradients)
                    ]
                
                # Update metrics for this micro-batch
                self.train_loss.update_state(primary_loss)
                for metric in self.train_metrics:
                    metric.update_state(y_batch, predictions)
            
            # Gradient clipping if specified
            if clip_norm is not None:
                accumulated_gradients, _ = tf.clip_by_global_norm(accumulated_gradients, clip_norm)
            
            # Apply accumulated gradients
            self.optimizer.apply_gradients(zip(accumulated_gradients, self.model.trainable_variables))
        
        @tf.function
        def val_step(self, x, y):
            """Validation step."""
            predictions = self.model(x, training=False)
            loss = self.loss_fn(y, predictions)
            
            self.val_loss.update_state(loss)
            for metric in self.val_metrics:
                metric.update_state(y, predictions)
        
        def train(self, train_dataset, val_dataset=None, epochs=10, 
                 gradient_accumulation_steps=1, clip_norm=None, verbose=True):
            """Custom training loop."""
            
            for epoch in range(epochs):
                if verbose:
                    print(f"\nEpoch {epoch + 1}/{epochs}")
                
                # Reset metrics
                self.train_loss.reset_state()
                for metric in self.train_metrics:
                    metric.reset_state()
                
                # Training loop
                for x_batch, y_batch in train_dataset:
                    self.train_step(x_batch, y_batch, gradient_accumulation_steps, clip_norm)
                
                # Validation loop
                if val_dataset is not None:
                    self.val_loss.reset_state()
                    for metric in self.val_metrics:
                        metric.reset_state()
                    
                    for x_batch, y_batch in val_dataset:
                        self.val_step(x_batch, y_batch)
                
                # Update history
                self.history['loss'].append(float(self.train_loss.result()))
                self.history['accuracy'].append(float(self.train_metrics[0].result()))
                
                if val_dataset is not None:
                    self.history['val_loss'].append(float(self.val_loss.result()))
                    self.history['val_accuracy'].append(float(self.val_metrics[0].result()))
                
                if verbose:
                    print(f"Loss: {self.train_loss.result():.4f} - "
                          f"Accuracy: {self.train_metrics[0].result():.4f}", end="")
                    
                    if val_dataset is not None:
                        print(f" - Val Loss: {self.val_loss.result():.4f} - "
                              f"Val Accuracy: {self.val_metrics[0].result():.4f}")
                    else:
                        print()
    
    # Adversarial Training
    class AdversarialTrainer(CustomTrainer):
        """Training with adversarial examples."""
        
        def __init__(self, model, optimizer, loss_fn, epsilon=0.01, metrics=None):
            super().__init__(model, optimizer, loss_fn, metrics)
            self.epsilon = epsilon
        
        def generate_adversarial_examples(self, x, y):
            """Generate adversarial examples using FGSM."""
            with tf.GradientTape() as tape:
                tape.watch(x)
                predictions = self.model(x, training=False)
                loss = self.loss_fn(y, predictions)
            
            # Get gradient of loss with respect to input
            gradients = tape.gradient(loss, x)
            
            # Create adversarial examples
            signed_grad = tf.sign(gradients)
            adversarial_x = x + self.epsilon * signed_grad
            adversarial_x = tf.clip_by_value(adversarial_x, 0.0, 1.0)
            
            return adversarial_x
        
        @tf.function
        def adversarial_train_step(self, x, y):
            """Training step with adversarial examples."""
            # Generate adversarial examples
            x_adv = self.generate_adversarial_examples(x, y)
            
            with tf.GradientTape() as tape:
                # Forward pass on clean examples
                clean_predictions = self.model(x, training=True)
                clean_loss = self.loss_fn(y, clean_predictions)
                
                # Forward pass on adversarial examples
                adv_predictions = self.model(x_adv, training=True)
                adv_loss = self.loss_fn(y, adv_predictions)
                
                # Combined loss
                total_loss = 0.5 * (clean_loss + adv_loss)
                
                # Add regularization
                reg_losses = self.model.losses
                if reg_losses:
                    total_loss += tf.add_n(reg_losses)
            
            # Compute and apply gradients
            gradients = tape.gradient(total_loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
            
            # Update metrics
            self.train_loss.update_state(total_loss)
            for metric in self.train_metrics:
                metric.update_state(y, clean_predictions)
    
    print("⚙️ Custom training loops defined successfully!")

else:
    print("⚠️ TensorFlow not available - showing custom training concepts")
    print("💡 Key concepts: GradientTape, custom gradients, adversarial training")

In [None]:
if TF_AVAILABLE:
    # Test Custom Training Loops
    print("🧪 Testing Custom Training Loops:")
    
    # Use small subset for custom training demo
    X_train_custom = X_train[:200]
    y_train_custom = y_train[:200]
    X_val_custom = X_test[:50]
    y_val_custom = y_test[:50]
    
    # Create datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train_custom, y_train_custom))
    train_dataset = train_dataset.batch(16).prefetch(tf.data.AUTOTUNE)
    
    val_dataset = tf.data.Dataset.from_tensor_slices((X_val_custom, y_val_custom))
    val_dataset = val_dataset.batch(16).prefetch(tf.data.AUTOTUNE)
    
    print(f"Custom training - Training samples: {len(X_train_custom)}")
    print(f"Custom training - Validation samples: {len(X_val_custom)}")
    
    # Test standard custom training
    print("\n=== Testing Standard Custom Training ===")
    
    # Create simple model for testing
    custom_model = keras.Sequential([
        layers.Flatten(input_shape=(32, 32, 3)),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(10, activation='softmax')
    ])
    
    # Create custom trainer
    optimizer = optimizers.Adam(learning_rate=0.001)
    loss_fn = keras.losses.SparseCategoricalCrossentropy()
    
    trainer = CustomTrainer(custom_model, optimizer, loss_fn)
    
    print(f"Model parameters: {custom_model.count_params():,}")
    
    # Train with custom loop
    start_time = time.time()
    trainer.train(
        train_dataset=train_dataset,
        val_dataset=val_dataset,
        epochs=5,
        gradient_accumulation_steps=2,  # Simulate larger batch size
        clip_norm=1.0,  # Gradient clipping
        verbose=True
    )
    custom_training_time = time.time() - start_time
    
    print(f"Custom training completed in {custom_training_time:.2f}s")
    print(f"Final training accuracy: {trainer.history['accuracy'][-1]:.4f}")
    print(f"Final validation accuracy: {trainer.history['val_accuracy'][-1]:.4f}")
    
    # Test adversarial training (if time permits)
    try:
        print("\n=== Testing Adversarial Training ===")
        
        # Create model for adversarial training
        adv_model = keras.Sequential([
            layers.Flatten(input_shape=(32, 32, 3)),
            layers.Dense(64, activation='relu'),
            layers.Dropout(0.3),
            layers.Dense(10, activation='softmax')
        ])
        
        # Create adversarial trainer
        adv_optimizer = optimizers.Adam(learning_rate=0.001)
        adv_trainer = AdversarialTrainer(
            adv_model, adv_optimizer, loss_fn, epsilon=0.01
        )
        
        print(f"Adversarial model parameters: {adv_model.count_params():,}")
        
        # Override train_step to use adversarial training
        adv_trainer.train_step = adv_trainer.adversarial_train_step
        
        # Train with adversarial examples
        start_time = time.time()
        adv_trainer.train(
            train_dataset=train_dataset,
            val_dataset=val_dataset,
            epochs=3,
            verbose=True
        )
        adv_training_time = time.time() - start_time
        
        print(f"Adversarial training completed in {adv_training_time:.2f}s")
        print(f"Final adversarial training accuracy: {adv_trainer.history['accuracy'][-1]:.4f}")
        
    except Exception as e:
        print(f"⚠️ Adversarial training skipped: {str(e)}")
        print("💡 In production, ensure proper tensor handling and memory management")
    
    # Compare with standard Keras training
    print("\n=== Comparing with Standard Keras Training ===")
    
    # Standard model
    standard_model = keras.Sequential([
        layers.Flatten(input_shape=(32, 32, 3)),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(10, activation='softmax')
    ])
    
    standard_model.compile(
        optimizer=optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Train with standard Keras
    start_time = time.time()
    standard_history = standard_model.fit(
        X_train_custom, y_train_custom,
        epochs=5,
        batch_size=16,
        validation_data=(X_val_custom, y_val_custom),
        verbose=1
    )
    standard_training_time = time.time() - start_time
    
    print(f"Standard training completed in {standard_training_time:.2f}s")
    print(f"Final standard training accuracy: {standard_history.history['accuracy'][-1]:.4f}")
    print(f"Final standard validation accuracy: {standard_history.history['val_accuracy'][-1]:.4f}")
    
    # Performance comparison
    print("\n📊 Training Method Comparison:")
    print(f"Custom Training - Time: {custom_training_time:.2f}s, Val Acc: {trainer.history['val_accuracy'][-1]:.4f}")
    print(f"Standard Training - Time: {standard_training_time:.2f}s, Val Acc: {standard_history.history['val_accuracy'][-1]:.4f}")
    
    custom_results = {
        'custom_history': trainer.history,
        'standard_history': standard_history.history,
        'custom_time': custom_training_time,
        'standard_time': standard_training_time
    }
    
    print("\n✅ Custom training loop testing completed!")

else:
    print("⚠️ Skipping custom training tests - TensorFlow not available")
    custom_results = {}

## 🏃‍♂️ Practice Problems

Let's practice some additional TensorFlow/Keras concepts commonly asked in interviews.

In [None]:
if TF_AVAILABLE:
    # Problem 5: Model Quantization and Optimization
    print("🧪 Testing Model Quantization and Optimization:")
    
    # Create a simple model for optimization
    opt_model = keras.Sequential([
        layers.Dense(64, activation='relu', input_shape=(784,)),
        layers.Dropout(0.3),
        layers.Dense(32, activation='relu'),
        layers.Dense(10, activation='softmax')
    ])
    
    # Compile and train briefly
    opt_model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Create some dummy data
    X_dummy = np.random.random((1000, 784))
    y_dummy = np.random.randint(0, 10, (1000,))
    
    opt_model.fit(X_dummy, y_dummy, epochs=2, verbose=0)
    
    # Model size analysis
    def get_model_size(model):
        """Estimate model size in MB."""
        param_count = model.count_params()
        # Assume float32 (4 bytes per parameter)
        size_mb = (param_count * 4) / (1024 * 1024)
        return size_mb
    
    original_size = get_model_size(opt_model)
    print(f"Original model size: {original_size:.2f} MB")
    print(f"Original model parameters: {opt_model.count_params():,}")
    
    # Test model pruning (conceptual - TF Lite quantization)
    try:
        # Convert to TensorFlow Lite (quantized)
        converter = tf.lite.TFLiteConverter.from_keras_model(opt_model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        
        quantized_model = converter.convert()
        
        # Estimate quantized size
        quantized_size_mb = len(quantized_model) / (1024 * 1024)
        
        print(f"Quantized model size: {quantized_size_mb:.2f} MB")
        print(f"Size reduction: {((original_size - quantized_size_mb) / original_size * 100):.1f}%")
        
        print("✅ Model quantization successful")
        
    except Exception as e:
        print(f"⚠️ Quantization not available: {str(e)}")
    
    # Problem 6: Distributed Training Setup (conceptual)
    print("\n🧪 Distributed Training Concepts:")
    
    try:
        # Check available devices
        physical_devices = tf.config.list_physical_devices()
        print(f"Available devices: {len(physical_devices)}")
        
        for device in physical_devices:
            print(f"  {device.device_type}: {device.name}")
        
        # Multi-GPU strategy (if available)
        gpus = tf.config.list_physical_devices('GPU')
        if len(gpus) > 1:
            strategy = tf.distribute.MirroredStrategy()
            print(f"\nUsing MirroredStrategy with {strategy.num_replicas_in_sync} replicas")
            
            with strategy.scope():
                distributed_model = keras.Sequential([
                    layers.Dense(32, activation='relu', input_shape=(10,)),
                    layers.Dense(1, activation='sigmoid')
                ])
                distributed_model.compile(
                    optimizer='adam',
                    loss='binary_crossentropy',
                    metrics=['accuracy']
                )
            
            print("✅ Distributed model created successfully")
            
        else:
            print("Single device - using default strategy")
            print("💡 In production: Use tf.distribute.MirroredStrategy for multi-GPU")
            print("💡 Use tf.distribute.MultiWorkerMirroredStrategy for multi-machine")
    
    except Exception as e:
        print(f"Device enumeration error: {str(e)}")
    
    # Problem 7: Model Checkpointing and Callbacks
    print("\n🧪 Advanced Callbacks and Checkpointing:")
    
    # Custom callback
    class CustomCallback(keras.callbacks.Callback):
        """Custom callback for advanced monitoring."""
        
        def __init__(self, patience=5, min_delta=0.001):
            super().__init__()
            self.patience = patience
            self.min_delta = min_delta
            self.wait = 0
            self.best_loss = np.inf
        
        def on_epoch_end(self, epoch, logs=None):
            current_loss = logs.get('val_loss', logs.get('loss'))
            
            if current_loss < self.best_loss - self.min_delta:
                self.best_loss = current_loss
                self.wait = 0
                print(f"\n  New best loss: {current_loss:.4f}")
            else:
                self.wait += 1
                if self.wait >= self.patience:
                    print(f"\n  Early stopping triggered after {epoch + 1} epochs")
                    self.model.stop_training = True
        
        def on_train_begin(self, logs=None):
            print("🚀 Training started with custom monitoring")
        
        def on_train_end(self, logs=None):
            print("🏁 Training completed")
    
    # Test callbacks
    callback_model = keras.Sequential([
        layers.Dense(16, activation='relu', input_shape=(10,)),
        layers.Dense(1, activation='sigmoid')
    ])
    
    callback_model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    # Create dummy binary classification data
    X_cb = np.random.random((200, 10))
    y_cb = np.random.randint(0, 2, (200,))
    
    # Define callbacks
    callbacks_list = [
        CustomCallback(patience=3),
        keras.callbacks.ModelCheckpoint(
            'best_model.h5',
            save_best_only=True,
            monitor='loss',
            verbose=1
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor='loss',
            factor=0.5,
            patience=2,
            min_lr=1e-7,
            verbose=1
        )
    ]
    
    # Train with callbacks
    print("Training with advanced callbacks:")
    callback_history = callback_model.fit(
        X_cb, y_cb,
        epochs=10,
        batch_size=32,
        callbacks=callbacks_list,
        verbose=1
    )
    
    print(f"\nTraining completed after {len(callback_history.history['loss'])} epochs")
    print("✅ Advanced callbacks testing completed")
    
    print("\n✅ All practice problems completed!")

else:
    print("⚠️ Practice problems skipped - TensorFlow not available")
    print("💡 Key concepts: Model optimization, quantization, distributed training, callbacks")

In [None]:
# Visualize all TensorFlow/Keras results
if TF_AVAILABLE and (model_results or cnn_results or tl_results or custom_results):
    plt.figure(figsize=(20, 15))
    
    # Plot 1: Custom model comparison (if available)
    if model_results:
        plt.subplot(3, 5, 1)
        model_names = [r['name'] for r in model_results]
        model_accs = [r['test_results']['accuracy'] for r in model_results]
        bars = plt.bar(model_names, model_accs, alpha=0.7)
        plt.ylabel('Test Accuracy')
        plt.title('Custom Models Comparison')
        plt.xticks(rotation=45)
        
        for bar, acc in zip(bars, model_accs):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                     f'{acc:.3f}', ha='center', va='bottom')
        plt.grid(True, alpha=0.3)
    
    # Plot 2: CNN architectures comparison (if available)
    if cnn_results:
        plt.subplot(3, 5, 2)
        arch_names = [r['architecture'] for r in cnn_results]
        arch_accs = [r['test_accuracy'] for r in cnn_results]
        bars = plt.bar(arch_names, arch_accs, alpha=0.7, color='green')
        plt.ylabel('Test Accuracy')
        plt.title('CNN Architectures')
        plt.xticks(rotation=45)
        
        for bar, acc in zip(bars, arch_accs):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                     f'{acc:.3f}', ha='center', va='bottom')
        plt.grid(True, alpha=0.3)
    
    # Plot 3: Transfer learning comparison (if available)
    if tl_results:
        plt.subplot(3, 5, 3)
        tl_names = [r['name'] for r in tl_results]
        tl_accs = [r['test_accuracy'] for r in tl_results]
        bars = plt.bar(tl_names, tl_accs, alpha=0.7, color='orange')
        plt.ylabel('Test Accuracy')
        plt.title('Transfer Learning Strategies')
        plt.xticks(rotation=45)
        
        for bar, acc in zip(bars, tl_accs):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                     f'{acc:.3f}', ha='center', va='bottom')
        plt.grid(True, alpha=0.3)
    
    # Plot 4: Training time comparison
    plt.subplot(3, 5, 4)
    methods = []
    times = []
    
    if model_results:
        methods.extend([r['name'][:8] for r in model_results])
        times.extend([r['training_time'] for r in model_results])
    
    if cnn_results:
        methods.extend([r['architecture'][:8] for r in cnn_results])
        times.extend([r['training_time'] for r in cnn_results])
    
    if methods:
        bars = plt.bar(methods, times, alpha=0.7, color='red')
        plt.ylabel('Training Time (s)')
        plt.title('Training Time Comparison')
        plt.xticks(rotation=45)
        
        for bar, time_val in zip(bars, times):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                     f'{time_val:.1f}s', ha='center', va='bottom', fontsize=8)
        plt.grid(True, alpha=0.3)
    
    # Plot 5: Model parameters comparison
    plt.subplot(3, 5, 5)
    param_methods = []
    param_counts = []
    
    if cnn_results:
        param_methods.extend([r['architecture'] for r in cnn_results])
        param_counts.extend([r['parameters'] for r in cnn_results])
    
    if tl_results:
        param_methods.extend([r['name'][:8] for r in tl_results])
        param_counts.extend([r['total_params'] for r in tl_results])
    
    if param_methods:
        bars = plt.bar(param_methods, param_counts, alpha=0.7, color='purple')
        plt.ylabel('Parameters (log scale)')
        plt.title('Model Size Comparison')
        plt.yscale('log')
        plt.xticks(rotation=45)
        plt.grid(True, alpha=0.3)
    
    # Plots 6-10: Training curves for different methods
    curve_data = []
    
    if model_results:
        for result in model_results[:2]:  # Show first 2
            curve_data.append({
                'name': result['name'],
                'history': result['history'].history
            })
    
    if cnn_results:
        for result in cnn_results[:2]:  # Show first 2
            curve_data.append({
                'name': result['architecture'],
                'history': result['history'].history
            })
    
    if custom_results:
        curve_data.append({
            'name': 'Custom Loop',
            'history': custom_results['custom_history']
        })
    
    for i, data in enumerate(curve_data[:5]):
        plt.subplot(3, 5, 6 + i)
        history = data['history']
        epochs = range(1, len(history['loss']) + 1)
        
        plt.plot(epochs, history['loss'], 'b-', label='Training Loss', alpha=0.8)
        if 'val_loss' in history:
            plt.plot(epochs, history['val_loss'], 'r-', label='Validation Loss', alpha=0.8)
        
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(f'{data["name"]} Training')
        plt.legend()
        plt.grid(True, alpha=0.3)
    
    # Plots 11-15: Accuracy curves
    for i, data in enumerate(curve_data[:5]):
        plt.subplot(3, 5, 11 + i)
        history = data['history']
        epochs = range(1, len(history['accuracy']) + 1)
        
        plt.plot(epochs, history['accuracy'], 'g-', label='Training Acc', alpha=0.8)
        if 'val_accuracy' in history:
            plt.plot(epochs, history['val_accuracy'], 'orange', label='Validation Acc', alpha=0.8)
        
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title(f'{data["name"]} Accuracy')
        plt.legend()
        plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Print comprehensive summary
    print("\n📊 Complete TensorFlow/Keras Analysis Summary:")
    print("=" * 80)
    
    if model_results:
        print("\n🧱 Custom Models:")
        for result in model_results:
            print(f"  {result['name']:<20}: {result['test_results']['accuracy']:.4f} accuracy, "
                  f"{result['training_time']:.1f}s training")
    
    if cnn_results:
        print("\n🏗️ CNN Architectures:")
        for result in cnn_results:
            print(f"  {result['architecture'].upper():<20}: {result['test_accuracy']:.4f} accuracy, "
                  f"{result['parameters']:,} parameters")
    
    if tl_results:
        print("\n🔄 Transfer Learning:")
        for result in tl_results:
            trainable_pct = (result['trainable_params'] / result['total_params']) * 100
            print(f"  {result['name']:<20}: {result['test_accuracy']:.4f} accuracy, "
                  f"{trainable_pct:.1f}% trainable")
    
    if custom_results:
        print("\n⚙️ Custom Training:")
        print(f"  Custom Loop Time: {custom_results['custom_time']:.2f}s")
        print(f"  Standard Time: {custom_results['standard_time']:.2f}s")
        print(f"  Custom Final Val Acc: {custom_results['custom_history']['val_accuracy'][-1]:.4f}")
        print(f"  Standard Final Val Acc: {custom_results['standard_history']['val_accuracy'][-1]:.4f}")

else:
    print("📊 Visualization Summary:")
    print("⚠️ TensorFlow not available or no results to visualize")
    print("💡 In interviews, discuss the concepts and show understanding of:")
    print("  - Custom layer implementation")
    print("  - Advanced CNN architectures (ResNet, SE blocks)")
    print("  - Transfer learning strategies")
    print("  - Custom training loops with GradientTape")
    print("  - Model optimization and deployment")

## 💡 Interview Tips

### 🔥 TensorFlow/Keras Expertise
1. **Understand the ecosystem** - TF, Keras, TF Lite, TF Serving, TF.js
2. **Know when to use each approach** - Sequential, Functional, Subclassing APIs
3. **Master custom implementations** - Layers, losses, metrics, training loops
4. **Optimization techniques** - Mixed precision, quantization, pruning
5. **Production deployment** - SavedModel format, TF Serving, edge deployment

### ⚡ Common Interview Questions

**1. "Implement a custom layer in Keras"**
- Subclass `tf.keras.layers.Layer`
- Implement `build()` and `call()` methods
- Handle `training` parameter correctly
- Add proper serialization with `get_config()`

**2. "Explain transfer learning strategies"**
- Feature extraction: Freeze base model, train new head
- Fine-tuning: Unfreeze some/all layers, use lower learning rate
- Progressive unfreezing: Gradually unfreeze layers

**3. "How would you implement custom training logic?"**
- Use `tf.GradientTape()` for custom gradients
- Handle forward pass, loss calculation, backpropagation manually
- Implement gradient clipping, accumulation as needed

**4. "Optimize a model for mobile deployment"**
- Use TensorFlow Lite converter
- Apply quantization (int8, float16)
- Consider model architecture (MobileNet, EfficientNet)
- Profile memory and compute requirements

### 🏗️ Architecture Patterns
- **Residual connections**: Skip connections for deep networks
- **Attention mechanisms**: Multi-head attention, self-attention
- **Efficient architectures**: Depthwise separable convolutions
- **Regularization**: Dropout, batch normalization, data augmentation

### 📊 Performance Optimization
- **Data pipeline**: `tf.data` API, prefetching, parallel processing
- **Mixed precision**: Use `tf.keras.mixed_precision` for faster training
- **Distributed training**: MirroredStrategy, MultiWorkerMirroredStrategy
- **Model optimization**: TF Lite, TensorRT, graph optimization

### 🔧 Debugging and Monitoring
- **TensorBoard**: Visualize training, profiling, debugging
- **tf.debugging**: Assert operations, check numerics
- **Callbacks**: Custom monitoring, early stopping, LR scheduling
- **Model analysis**: Layer-wise analysis, gradient flow

### 🚀 Production Considerations
- **Model versioning**: Track model lineage and experiments
- **Serving**: TF Serving for REST/gRPC APIs
- **Monitoring**: Model drift, performance degradation
- **A/B testing**: Compare model versions in production

### 🧠 Advanced Topics
- **Custom training loops**: Beyond `model.fit()`
- **Graph optimization**: `tf.function`, XLA compilation
- **Checkpointing**: Save/restore training state
- **Multi-task learning**: Shared representations, multiple outputs

## 🎓 Summary

In this notebook, we covered:

✅ **Custom Layers and Models** - Layer subclassing, custom losses, advanced metrics  
✅ **Advanced CNN Architectures** - ResNet, SE blocks, depthwise convolutions  
✅ **Transfer Learning** - Feature extraction, fine-tuning, progressive unfreezing  
✅ **Custom Training Loops** - GradientTape, adversarial training, gradient manipulation  
✅ **Model Optimization** - Quantization, distributed training, callbacks  
✅ **Production Techniques** - Model serialization, TF Lite conversion, monitoring  

### 🚀 Next Steps
1. Practice implementing custom components from scratch
2. Experiment with different architectures and datasets
3. Move on to LeetCode-style algorithm problems
4. Study deployment and MLOps practices

### 📚 Additional Practice
- Implement Transformer architectures
- Create multi-modal models (vision + text)
- Build recommendation systems
- Implement GAN architectures

### 🔑 Key Interview Points
- **Framework Mastery**: Deep understanding of TensorFlow/Keras APIs
- **Custom Implementation**: Can build components from scratch when needed
- **Optimization Skills**: Know how to make models faster and smaller
- **Production Ready**: Understand deployment and serving considerations
- **Debugging Ability**: Can troubleshoot training and performance issues

### 🏗️ Architecture Expertise
- **Modern CNN**: ResNet, DenseNet, EfficientNet principles
- **Attention Mechanisms**: Multi-head attention, Transformer blocks
- **Transfer Learning**: When and how to adapt pre-trained models
- **Optimization**: Mixed precision, quantization, pruning techniques

### 📈 Performance & Scalability
- **Training Efficiency**: Custom loops, gradient accumulation, distributed training
- **Model Optimization**: TF Lite, quantization, model compression
- **Serving**: TF Serving, REST APIs, batch inference
- **Monitoring**: Model performance, drift detection, A/B testing

**Ready for algorithm and system design challenges! 🧩**