# Stochastic Gradient Descent (SGD) and Its Variants

---

## What is Gradient Descent?

**Gradient Descent** is an optimization algorithm used to minimize a cost function by iteratively moving in the direction of steepest descent (negative gradient).

### Key Concepts:
- **Cost Function (J)**: Measures how wrong our predictions are
- **Gradient (∇J)**: Direction of steepest increase in cost
- **Learning Rate (α)**: Step size in each iteration
- **Convergence**: When we reach the minimum (or close enough)

### Types of Gradient Descent:

1. **Batch Gradient Descent**: Uses entire dataset for each update
2. **Stochastic Gradient Descent (SGD)**: Uses one sample at a time
3. **Mini-batch Gradient Descent**: Uses small batches of samples

---

## Mathematical Foundation

### Linear Regression Cost Function (Mean Squared Error):
```
J(w, b) = (1/2m) × Σ(h(x_i) - y_i)²

where:
h(x_i) = w₁x₁ + w₂x₂ + ... + wₙxₙ + b  (prediction)
m = number of training examples
```

### Gradient Computation:
```
∂J/∂w_j = (1/m) × Σ(h(x_i) - y_i) × x_ij
∂J/∂b = (1/m) × Σ(h(x_i) - y_i)
```

### Parameter Updates:
```
w_j := w_j - α × ∂J/∂w_j
b := b - α × ∂J/∂b
```

### SGD vs Batch GD:
- **Batch GD**: Updates using all m examples
- **SGD**: Updates using 1 example at a time
- **Mini-batch**: Updates using k examples (1 < k < m)

---

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_regression, load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns

# Set random seed for reproducibility
np.random.seed(42)
plt.style.use('default')

## Complete GradientDescentRegressor Implementation

In [None]:
class GradientDescentRegressor:
    def __init__(self, learning_rate=0.01, max_iterations=1000, tolerance=1e-6):
        """
        Initialize the Gradient Descent Regressor
        
        Parameters:
        learning_rate: Step size for parameter updates
        max_iterations: Maximum number of iterations
        tolerance: Stopping criterion based on cost change
        """
        self.learning_rate = learning_rate
        self.max_iterations = max_iterations
        self.tolerance = tolerance
        self.weights = None
        self.bias = None
        self.cost_history = []
        self.weight_history = []
    
    def _add_intercept(self, X):
        """Add bias column to feature matrix"""
        return np.concatenate((intercept, X), axis=1)
    
    def _compute_cost(self, X, y, weights):
        """
        Compute Mean Squared Error cost
        
        Parameters:
        X: Feature matrix with intercept (n_samples, n_features + 1)
        y: Target values (n_samples,)
        weights: Current weights including bias (n_features + 1,)
        
        Returns:
        cost: Mean squared error
        """
        m = X.shape[0]  # number of samples
        predictions = X @ weights
        # cost = ?
        return cost
    
    def _compute_gradients(self, X, y, weights):
        """
        Compute gradients for all parameters
        
        Parameters:
        X: Feature matrix with intercept (n_samples, n_features + 1)
        y: Target values (n_samples,)
        weights: Current weights including bias (n_features + 1,)
        
        Returns:
        gradients: Gradient vector (n_features + 1,)
        """
        m = X.shape[0]  # number of samples
        predictions = X @ weights
        # errors = 
        # gradients for linear regression
        return gradients
    
    def fit_batch_gd(self, X, y):
        """
        Fit using Batch Gradient Descent
        """
        # Add intercept term
        X_with_intercept = self._add_intercept(X)
        n_features = X_with_intercept.shape[1]
        
        # Initialize weights
        weights = np.random.randn(n_features) * 0.01
        
        self.cost_history = []
        self.weight_history = []
        
        for iteration in range(self.max_iterations):
            # Compute cost and gradients
            # cost = 
            # gradients = 
            
            # weights = 
            
            # Store history
            self.cost_history.append(cost)
            self.weight_history.append(weights.copy())
            
            # Check for convergence
            if len(self.cost_history) > 1:
                if abs(self.cost_history[-2] - self.cost_history[-1]) < self.tolerance:
                    print(f"Batch GD converged after {iteration + 1} iterations")
                    break
        
        # Store final parameters
        self.bias = weights[0]
        self.weights = weights[1:]
        
        return self
    
    def fit_sgd(self, X, y):
        """
        Fit using Stochastic Gradient Descent
        """
        # Add intercept term
        X_with_intercept =
        n_samples, n_features = X_with_intercept.shape
        
        # Initialize weights
        weights = np.random.randn(n_features) * 0.01
        
        self.cost_history = []
        self.weight_history = []
        
        for iteration in range(self.max_iterations):
            # Shuffle the data
            indices = np.random.permutation(n_samples)
            
            for i in indices:
                # calculate the updated weights
                xi = X_with_intercept[i:i+1]  # Single sample (keep 2D)
                yi = y[i:i+1]
                # weights = weights - self.learning_rate * gradient.flatten()
            
            # Compute cost after each epoch
            # Update the cost and weight history
            
            # Check for convergence (less strict for SGD)
            if len(self.cost_history) > 10:
                recent_costs = self.cost_history[-10:]
                if np.std(recent_costs) < self.tolerance:
                    print(f"SGD converged after {iteration + 1} iterations")
                    break
        
        # Store final parameters
        self.bias = weights[0]
        self.weights = weights[1:]
        
        return self
    
    def fit_mini_batch_gd(self, X, y, batch_size=32):
        """
        Fit using Mini-batch Gradient Descent
        """
        # Add intercept term
        X_with_intercept = self._add_intercept(X)
        n_samples, n_features = X_with_intercept.shape
        
        # Initialize weights
        weights = np.random.randn(n_features) * 0.01
        
        self.cost_history = []
        self.weight_history = []
        
        for iteration in range(self.max_iterations):
            # Shuffle the data
            indices = np.random.permutation(n_samples)
            
            # Process mini-batches
            for start_idx in range(0, n_samples, batch_size):
                end_idx = min(start_idx + batch_size, n_samples)
                batch_indices = indices[start_idx:end_idx]

                X_batch = X_with_intercept[batch_indices]
                y_batch = y[batch_indices]
                
                weights = weights - self.learning_rate * gradients
            
            # Compute cost after each epoch and store cost and weight history
            cost = self._compute_cost(X_with_intercept, y, weights)
            
            # Check for convergence
            if len(self.cost_history) > 5:
                recent_costs = self.cost_history[-5:]
                if np.std(recent_costs) < self.tolerance:
                    print(f"Mini-batch GD converged after {iteration + 1} iterations")
                    break
        
        # Store final parameters
        self.bias = weights[0]
        self.weights = weights[1:]
        
        return self
    
    def fit_adagrad(self, X, y, batch_size=32, epsilon=1e-8):
        """
        AdaGrad - adapts learning rate based on historical gradients
        """
        X_with_intercept = self._add_intercept(X)
        n_samples, n_features = X_with_intercept.shape
        
        # Initialize weights and gradient accumulator
        weights = np.random.randn(n_features) * 0.01
        gradient_squared_sum = np.zeros_like(weights)  # Accumulates squared gradients
        
        self.cost_history = []
        
        for iteration in range(self.max_iterations):
            indices = np.random.permutation(n_samples)
            
            for start_idx in range(0, n_samples, batch_size):
                end_idx = min(start_idx + batch_size, n_samples)
                batch_indices = indices[start_idx:end_idx]
                
                X_batch = X_with_intercept[batch_indices]
                y_batch = y[batch_indices]

                # Implement Adaptive learning rate and gradients
                weights = weights - adaptive_lr * gradients
            
            cost = self._compute_cost(X_with_intercept, y, weights)
            self.cost_history.append(cost)
            
            if len(self.cost_history) > 5 and np.std(self.cost_history[-5:]) < 1e-6:
                break
        
        self.bias = weights[0]
        self.weights = weights[1:]
        return self
    
    def predict(self, X):
        """
        Make predictions on new data
        """
        return X @ self.weights + self.bias
    
    def score(self, X, y):
        """
        Calculate R² score
        """
        predictions = self.predict(X)
        ss_res = np.sum((y - predictions) ** 2)
        ss_tot = np.sum((y - np.mean(y)) ** 2)
        return 1 - (ss_res / ss_tot)

## Advanced Optimizers Class

No need to solve, just read through

In [None]:
class AdvancedOptimizers(GradientDescentRegressor):
    """
    Advanced optimization algorithms that inherit from GradientDescentRegressor
    """
    
    def fit_momentum(self, X, y, batch_size=32, momentum=0.9):
        """
        Momentum SGD - accelerates SGD by adding a fraction of previous update
        """
        X_with_intercept = self._add_intercept(X)
        n_samples, n_features = X_with_intercept.shape
        
        # Initialize weights and velocity
        weights = np.random.randn(n_features) * 0.01
        velocity = np.zeros_like(weights)
        
        self.cost_history = []
        
        for iteration in range(self.max_iterations):
            indices = np.random.permutation(n_samples)
            
            for start_idx in range(0, n_samples, batch_size):
                end_idx = min(start_idx + batch_size, n_samples)
                batch_indices = indices[start_idx:end_idx]
                
                X_batch = X_with_intercept[batch_indices]
                y_batch = y[batch_indices]
                
                gradients = self._compute_gradients(X_batch, y_batch, weights)
                velocity = momentum * velocity - self.learning_rate * gradients
                weights = weights + velocity
            
            cost = self._compute_cost(X_with_intercept, y, weights)
            self.cost_history.append(cost)
            
            if len(self.cost_history) > 5 and np.std(self.cost_history[-5:]) < 1e-6:
                break
        
        self.bias = weights[0]
        self.weights = weights[1:]
        return self
    
    def fit_adam(self, X, y, batch_size=32, beta1=0.9, beta2=0.999, epsilon=1e-8):
        """
        Adam - combines momentum with adaptive learning rates
        """
        X_with_intercept = self._add_intercept(X)
        n_samples, n_features = X_with_intercept.shape
        
        # Initialize weights and moments
        weights = np.random.randn(n_features) * 0.01
        m = np.zeros_like(weights)  # First moment (momentum)
        v = np.zeros_like(weights)  # Second moment (RMSprop)
        
        self.cost_history = []
        t = 0  # Time step
        
        for iteration in range(self.max_iterations):
            indices = np.random.permutation(n_samples)
            
            for start_idx in range(0, n_samples, batch_size):
                end_idx = min(start_idx + batch_size, n_samples)
                batch_indices = indices[start_idx:end_idx]
                
                X_batch = X_with_intercept[batch_indices]
                y_batch = y[batch_indices]
                
                t += 1
                gradients = self._compute_gradients(X_batch, y_batch, weights)
                
                # Update biased first moment estimate
                m = beta1 * m + (1 - beta1) * gradients
                # Update biased second raw moment estimate
                v = beta2 * v + (1 - beta2) * (gradients ** 2)
                
                # Compute bias-corrected first moment estimate
                m_hat = m / (1 - beta1 ** t)
                # Compute bias-corrected second raw moment estimate
                v_hat = v / (1 - beta2 ** t)
                
                # Update weights
                weights = weights - self.learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)
            
            cost = self._compute_cost(X_with_intercept, y, weights)
            self.cost_history.append(cost)
            
            if len(self.cost_history) > 5 and np.std(self.cost_history[-5:]) < 1e-6:
                break
        
        self.bias = weights[0]
        self.weights = weights[1:]
        return self

## Test the Complete Implementation

In [None]:
# Generate synthetic regression data
X, y = make_regression(n_samples=200, n_features=2, noise=10, random_state=42)

# Standardize features (important for gradient descent)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Target range: [{y.min():.2f}, {y.max():.2f}]")

In [None]:
# Test all implemented methods
methods = {
    'Batch GD': ('basic', 'fit_batch_gd'),
    'Stochastic GD': ('basic', 'fit_sgd'), 
    'Mini-batch GD': ('basic', 'fit_mini_batch_gd'),
    'AdaGrad': ('basic', 'fit_adagrad'),
    'Momentum': ('advanced', 'fit_momentum'),
    'Adam': ('advanced', 'fit_adam')
}

results = {}
learning_rate = 0.01
max_iter = 100

for name, (optimizer_type, method) in methods.items():
    print(f"\nTraining with {name}...")
    
    if optimizer_type == 'basic':
        model = GradientDescentRegressor(learning_rate=learning_rate, max_iterations=max_iter)
    else:
        model = AdvancedOptimizers(learning_rate=learning_rate, max_iterations=max_iter)
    
    if method == 'fit_mini_batch_gd':
        getattr(model, method)(X_train, y_train, batch_size=16)
    elif method in ['fit_adagrad', 'fit_momentum', 'fit_adam']:
        getattr(model, method)(X_train, y_train, batch_size=16)
    else:
        getattr(model, method)(X_train, y_train)
    
    # Evaluate
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    
    results[name] = {
        'model': model,
        'train_score': train_score,
        'test_score': test_score,
        'final_cost': model.cost_history[-1] if model.cost_history else float('inf')
    }
    
    print(f"Train R²: {train_score:.4f}")
    print(f"Test R²: {test_score:.4f}")
    print(f"Final cost: {results[name]['final_cost']:.4f}")
    print(f"Iterations: {len(model.cost_history)}")

In [None]:
# Visualize convergence comparison
plt.figure(figsize=(18, 12))

# Plot 1: Cost history comparison
plt.subplot(2, 3, 1)
colors = ['blue', 'red', 'green', 'purple', 'orange', 'brown']
for i, (name, result) in enumerate(results.items()):
    if result['model'].cost_history:
        plt.plot(result['model'].cost_history, color=colors[i], label=name, linewidth=2)

plt.xlabel('Iteration')
plt.ylabel('Cost (MSE)')
plt.title('Cost Function Convergence - All Methods')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')

# Plot 2: Performance comparison
plt.subplot(2, 3, 2)
methods_list = list(results.keys())
train_scores = [results[m]['train_score'] for m in methods_list]
test_scores = [results[m]['test_score'] for m in methods_list]

x = np.arange(len(methods_list))
width = 0.35

plt.bar(x - width/2, train_scores, width, label='Train R²', alpha=0.8)
plt.bar(x + width/2, test_scores, width, label='Test R²', alpha=0.8)

plt.xlabel('Method')
plt.ylabel('R² Score')
plt.title('Performance Comparison')
plt.xticks(x, methods_list, rotation=45, ha='right')
plt.legend()
plt.grid(True, alpha=0.3)

# Plot 3: Final cost comparison
plt.subplot(2, 3, 3)
final_costs = [results[m]['final_cost'] for m in methods_list]
bars = plt.bar(methods_list, final_costs, color=colors[:len(methods_list)], alpha=0.7)
plt.xlabel('Method')
plt.ylabel('Final Cost')
plt.title('Final Cost Comparison')
plt.xticks(rotation=45, ha='right')
plt.yscale('log')
plt.grid(True, alpha=0.3)

# Plot 4: Convergence speed
plt.subplot(2, 3, 4)
iterations = [len(results[m]['model'].cost_history) for m in methods_list]
bars = plt.bar(methods_list, iterations, color=colors[:len(methods_list)], alpha=0.7)
plt.xlabel('Method')
plt.ylabel('Iterations to Convergence')
plt.title('Convergence Speed')
plt.xticks(rotation=45, ha='right')
plt.grid(True, alpha=0.3)

# Plot 5: Test performance ranking
plt.subplot(2, 3, 5)
sorted_methods = sorted(results.items(), key=lambda x: x[1]['test_score'], reverse=True)
sorted_names = [name for name, _ in sorted_methods]
sorted_scores = [result['test_score'] for _, result in sorted_methods]

bars = plt.bar(range(len(sorted_names)), sorted_scores, 
              color=[colors[methods_list.index(name)] for name in sorted_names], alpha=0.7)
plt.xlabel('Method (Ranked)')
plt.ylabel('Test R² Score')
plt.title('Performance Ranking')
plt.xticks(range(len(sorted_names)), sorted_names, rotation=45, ha='right')
plt.grid(True, alpha=0.3)

# Add value labels on bars
for bar, score in zip(bars, sorted_scores):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
             f'{score:.3f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

print("\n" + "="*60)
print("SOLUTION SUMMARY")
print("="*60)
print("All TODO items have been successfully implemented:")
print("✅ TODO 1: MSE cost function")
print("✅ TODO 2: Gradient computation")
print("✅ TODO 3: Batch GD weight updates")
print("✅ TODO 4: SGD single sample updates")
print("✅ TODO 5: Mini-batch GD updates")
print("✅ TODO 7: AdaGrad optimizer")
print("✅ BONUS: Added complete AdvancedOptimizers class with Momentum and Adam")
print("\nBest performing method:", sorted_names[0], f"(R² = {sorted_scores[0]:.4f})")

---

## Real Dataset: Diabetes Dataset

Let's test our optimizers on a real regression dataset.

In [None]:
# Load diabetes dataset
diabetes = load_diabetes()
X_diabetes, y_diabetes = diabetes.data, diabetes.target

# Standardize features
scaler_diabetes = StandardScaler()
X_diabetes_scaled = scaler_diabetes.fit_transform(X_diabetes)

# Split the data
X_train_diab, X_test_diab, y_train_diab, y_test_diab = train_test_split(
    X_diabetes_scaled, y_diabetes, test_size=0.2, random_state=42
)

print(f"Diabetes dataset shape: {X_diabetes.shape}")
print(f"Features: {len(diabetes.feature_names)}")
print(f"Target range: [{y_diabetes.min():.2f}, {y_diabetes.max():.2f}]")
print(f"Feature names: {diabetes.feature_names}")

In [None]:
# Test Adam optimizer on diabetes dataset
print("Testing optimizers on Diabetes dataset...\n")

diabetes_results = {}

# Complete the Code
for name, (optimizer_type, method) in key_optimizers.items():
    if optimizer_type == 'basic':
        model = GradientDescentRegressor(learning_rate=0.01, max_iterations=200)
        getattr(model, method)(X_train_diab, y_train_diab)
    else:
        model = AdvancedOptimizers(learning_rate=0.01, max_iterations=200)
        getattr(model, method)(X_train_diab, y_train_diab)
    
    train_score = model.score(X_train_diab, y_train_diab)
    test_score = model.score(X_test_diab, y_test_diab)
    
    diabetes_results[name] = {
        'model': model,
        'train_score': train_score,
        'test_score': test_score
    }
    
    print(f"{name}:")
    print(f"  Train R²: {train_score:.4f}")
    print(f"  Test R²: {test_score:.4f}")
    print(f"  Iterations: {len(model.cost_history)}")
    print()

In [None]:
# Visualize results on diabetes dataset
plt.figure(figsize=(15, 5))

# Plot 1: Convergence curves
plt.subplot(1, 3, 1)
colors = ['blue', 'red', 'green']
for i, (name, result) in enumerate(diabetes_results.items()):
    plt.plot(result['model'].cost_history, color=colors[i], label=name, linewidth=2)

plt.xlabel('Iteration')
plt.ylabel('Cost (MSE)')
plt.title('Diabetes Dataset - Convergence')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')

# Plot 2: Performance comparison
plt.subplot(1, 3, 2)
methods = list(diabetes_results.keys())
test_scores = [diabetes_results[m]['test_score'] for m in methods]

bars = plt.bar(methods, test_scores, color=['blue', 'red', 'green'], alpha=0.7)
plt.xlabel('Method')
plt.ylabel('Test R² Score')
plt.title('Test Performance on Diabetes')
plt.grid(True, alpha=0.3)

for bar, score in zip(bars, test_scores):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
             f'{score:.3f}', ha='center', va='bottom')

# Plot 3: Predictions vs Actual (for best model)
plt.subplot(1, 3, 3)
best_model_name = max(diabetes_results.keys(), 
                     key=lambda x: diabetes_results[x]['test_score'])
best_model = diabetes_results[best_model_name]['model']

y_pred = best_model.predict(X_test_diab)
plt.scatter(y_test_diab, y_pred, alpha=0.6, color='blue')
plt.plot([y_test_diab.min(), y_test_diab.max()], 
         [y_test_diab.min(), y_test_diab.max()], 'r--', linewidth=2)
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.title(f'Predictions vs Actual ({best_model_name})')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Best performing model: {best_model_name} (R² = {diabetes_results[best_model_name]['test_score']:.4f})")

---

## Learning Rate Experiments

Explore how different learning rates affect convergence.

In [None]:
# Experiment with different learning rates
learning_rates = [0.001, 0.01, 0.1, 0.5, 1.0]
lr_results = {}

plt.figure(figsize=(15, 10))

for i, lr in enumerate(learning_rates, 1):
    try:
        # Use SGD for this experiment
        model = GradientDescentRegressor(learning_rate=lr, max_iterations=100)
        model.fit_sgd(X_train, y_train)
        
        test_score = model.score(X_test, y_test)
        lr_results[lr] = {
            'model': model,
            'test_score': test_score,
            'converged': len(model.cost_history) < 100
        }
        
        # Plot learning curve
        plt.subplot(2, 3, i)
        plt.plot(model.cost_history, linewidth=2, color='blue')
        plt.xlabel('Iteration')
        plt.ylabel('Cost')
        plt.title(f'LR = {lr}\nR² = {test_score:.3f}')
        plt.grid(True, alpha=0.3)
        plt.yscale('log')
        
        if not lr_results[lr]['converged']:
            plt.title(f'LR = {lr} (No Convergence)\nR² = {test_score:.3f}', color='red')
            
    except Exception as e:
        print(f"Learning rate {lr} failed: {str(e)}")
        lr_results[lr] = {'test_score': 0, 'converged': False}

# Summary plot
plt.subplot(2, 3, 6)
lrs = list(lr_results.keys())
scores = [lr_results[lr]['test_score'] for lr in lrs]
colors = ['red' if not lr_results[lr]['converged'] else 'blue' for lr in lrs]

bars = plt.bar(range(len(lrs)), scores, color=colors, alpha=0.7)
plt.xlabel('Learning Rate')
plt.ylabel('Test R² Score')
plt.title('Learning Rate vs Performance')
plt.xticks(range(len(lrs)), [str(lr) for lr in lrs])
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print summary
print("\nLearning Rate Analysis:")
print("-" * 50)
for lr, result in lr_results.items():
    status = "✓ Converged" if result['converged'] else "✗ No convergence"
    print(f"LR = {lr:5.3f}: R² = {result['test_score']:6.3f} ({status})")

In [None]:
---

## Discussion Questions

After completing the exercises, discuss the following:

### 1. **Gradient Descent Variants**
- What are the trade-offs between Batch, Stochastic, and Mini-batch GD?
- When would you choose each variant?
- How does batch size affect convergence and computational efficiency?

### 2. **Learning Rate Selection**
- What happens with learning rates that are too high or too low?
- How can you detect if your learning rate is inappropriate?
- What strategies exist for adaptive learning rate scheduling?

### 3. **Feature Scaling**
- Why is feature standardization crucial for gradient descent?
- What happens when features have very different scales?
- How does this relate to the condition number of the optimization problem?

### 4. **Practical Considerations**
- How do you choose hyperparameters in practice?
- What are the computational trade-offs of different optimizers?
- How does this connect to deep learning and neural networks?

---


## Key Takeaways

- **SGD** trades accuracy for speed and can handle large datasets
- **Adaptive methods** like Adam adjust learning rates automatically
- **Feature scaling** is critical for gradient descent performance
- **Learning rate** is the most important hyperparameter to tune
- **Mini-batch** often provides the best balance of speed and stability