In [1]:
import math
from collections import Counter
import random

In [3]:
import math
import random
from collections import Counter

def load_and_split_data(file_path, train_ratio=0.7, val_ratio=0.15):
    """
    Loads text data from a file and splits it into training, validation, and test sets.
    Args:
        file_path (str): Path to the data file
        train_ratio (float): Proportion of data for training
        val_ratio (float): Proportion of data for validation
    Returns:
        tuple: Contains train, validation, and test data and labels
    """
    print(f"Reading data from {file_path}...")
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    print(f"Total number of samples loaded: {len(lines)}")
    
    # Count label distribution
    label_counts = Counter(line.split('\t')[0] for line in lines)
    print("\nLabel Distribution:")
    for label, count in label_counts.items():
        print(f"{label}: {count} ({count/len(lines)*100:.2f}%)")
    
    # Set random seed for reproducibility
    random.seed(42)
    random.shuffle(lines)
    
    # Calculate split points
    train_end = int(len(lines) * train_ratio)
    val_end = int(len(lines) * (train_ratio + val_ratio))
    
    # Split data
    train_data = lines[:train_end]
    val_data = lines[train_end:val_end]
    test_data = lines[val_end:]
    
    def split_data(data):
        sentences = []
        labels = []
        for line in data:
            if '\t' in line:
                label, text = line.strip().split('\t', 1)
                # Convert 'spam' to 1 and 'ham' to 0
                label_int = 1 if label.lower() == 'spam' else 0
                labels.append(label_int)
                sentences.append(text)
        return sentences, labels
    
    return split_data(train_data) + split_data(val_data) + split_data(test_data)

def preprocess_text(text):
    """
    Cleans text by removing special characters and converting to lowercase.
    Args:
        text (str): Input text to clean
    Returns:
        str: Cleaned text
    """
    # Convert to lowercase
    text = text.lower()
    
    # Remove URLs
    text = ' '.join(word for word in text.split() 
                   if not word.startswith(('http:', 'https:', 'www.')))
    
    # Keep only letters and spaces
    cleaned_text = ""
    for char in text:
        if char.isalpha() or char.isspace():
            cleaned_text += char
    
    return cleaned_text

def tokenize(text):
    """
    Converts text into tokens, removes stopwords, and applies stemming.
    Args:
        text (str): Input text to tokenize
    Returns:
        list: List of processed tokens
    """
    stopwords = {
        "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", 
        "has", "he", "in", "is", "it", "its", "of", "on", "that", "the", 
        "to", "was", "were", "will", "with", "the", "this", "but", "they",
        "have", "had", "what", "when", "where", "who", "which", "why", "how"
    }
    
    def simple_stem(word):
        """Simple word stemming rules"""
        if len(word) < 4:
            return word
        if word.endswith('ing'):
            return word[:-3]
        elif word.endswith('ed'):
            return word[:-2]
        elif word.endswith('s'):
            return word[:-1]
        return word
    
    words = text.split()
    return [simple_stem(word) for word in words 
            if word not in stopwords and len(word) > 1]

class TfidfVectorizer:
    """
    Converts text documents to TF-IDF feature vectors.
    """
    def __init__(self):
        self.vocabulary = set()
        self.idf = {}
        self.vocab_index = {}
    
    def fit(self, documents):
        """
        Builds vocabulary and computes IDF scores from documents
        Args:
            documents (list): List of tokenized documents
        """
        # Build vocabulary
        for doc in documents:
            self.vocabulary.update(doc)
        
        self.vocab_index = {word: idx for idx, word in enumerate(sorted(self.vocabulary))}
        
        # Compute document frequencies
        doc_freq = Counter()
        for doc in documents:
            doc_words = set(doc)
            for word in doc_words:
                doc_freq[word] += 1
        
        # Calculate IDF scores
        num_docs = len(documents)
        self.idf = {word: math.log((num_docs + 1)/(doc_freq[word] + 1)) + 1 
                   for word in self.vocabulary}
    
    def transform(self, documents):
        """
        Transforms documents into TF-IDF feature vectors
        Args:
            documents (list): List of tokenized documents
        Returns:
            list: List of TF-IDF feature vectors
        """
        X = []
        for doc in documents:
            tf = Counter(doc)
            doc_len = len(doc) if len(doc) > 0 else 1
            
            features = [0.0] * len(self.vocabulary)
            for word in set(doc):
                if word in self.vocab_index:
                    idx = self.vocab_index[word]
                    tf_val = tf[word] / doc_len
                    features[idx] = tf_val * self.idf.get(word, 0)
            
            X.append(features)
        return X

class LogisticRegression:
    def __init__(self, learning_rate=0.01, lambda_reg=0.1, num_epochs=100, 
                 early_stop_threshold=1e-4, batch_size=32):
        self.learning_rate = learning_rate
        self.lambda_reg = lambda_reg
        self.num_epochs = num_epochs
        self.early_stop_threshold = early_stop_threshold
        self.batch_size = batch_size
        self.weights = None
        self.bias = 0
        self.training_history = []
    
    def sigmoid(self, z):
        """Compute sigmoid function"""
        # Clip z to prevent overflow
        z = min(max(z, -100), 100)
        return 1 / (1 + math.exp(-z))
    
    def compute_loss(self, X, y, indices):
        """Compute binary cross-entropy loss with L2 regularization"""
        loss = 0
        for i in indices:
            z = sum(X[i][j] * self.weights[j] for j in range(len(self.weights))) + self.bias
            pred = self.sigmoid(z)
            # Add small epsilon to avoid log(0)
            loss -= (y[i] * math.log(pred + 1e-15) + (1 - y[i]) * math.log(1 - pred + 1e-15))
        
        # Add L2 regularization term
        l2_term = self.lambda_reg * sum(w * w for w in self.weights)
        return (loss / len(indices)) + l2_term
    
    def compute_class_weights(self, y):
        """
        Compute class weights to handle class imbalance
        Args:
            y (list): Labels
        Returns:
            dict: Class weights
        """
        counts = Counter(y)
        total = len(y)
        weights = {
            0: total / (2 * counts[0]) if counts[0] > 0 else 1,
            1: total / (2 * counts[1]) if counts[1] > 0 else 1
        }
        print(f"Class weights: {weights}")
        return weights
    
    def train(self, X, y, method='sgd'):
        """
        Train the model using either SGD or mini-batch gradient descent
        Args:
            X (list): Feature vectors
            y (list): Labels
            method (str): 'sgd' or 'minibatch'
        """
        if not self.weights:
            # Initialize weights with small random values
            self.weights = [random.uniform(-0.1, 0.1) for _ in range(len(X[0]))]
        
        # Compute class weights for balanced training
        class_weights = self.compute_class_weights(y)
        
        print(f"\nTraining with {method.upper()}:")
        print("Epoch\tLoss\t\tΔLoss")
        
        n_samples = len(y)
        prev_loss = float('inf')
        no_improvement_count = 0
        
        for epoch in range(self.num_epochs):
            indices = list(range(n_samples))
            random.shuffle(indices)
            
            if method == 'sgd':
                batch_indices = [[i] for i in indices]
            else:
                batch_indices = [indices[i:i + self.batch_size] 
                               for i in range(0, len(indices), self.batch_size)]
            
            epoch_loss = 0
            for batch in batch_indices:
                weight_gradients = [0] * len(self.weights)
                bias_gradient = 0
                
                for i in batch:
                    # Forward pass
                    z = sum(X[i][j] * self.weights[j] for j in range(len(self.weights))) + self.bias
                    pred = self.sigmoid(z)
                    
                    # Apply class weights to error
                    sample_weight = class_weights[y[i]]
                    error = sample_weight * (pred - y[i])
                    
                    # Accumulate gradients
                    for j in range(len(self.weights)):
                        weight_gradients[j] += error * X[i][j]
                    bias_gradient += error
                
                # Apply updates with regularization
                batch_size = len(batch)
                for j in range(len(self.weights)):
                    reg_gradient = 2 * self.lambda_reg * self.weights[j]
                    self.weights[j] -= self.learning_rate * (weight_gradients[j]/batch_size + reg_gradient)
                self.bias -= self.learning_rate * (bias_gradient/batch_size)
                
                epoch_loss += self.compute_loss(X, y, batch)
            
            avg_loss = epoch_loss / len(batch_indices)
            loss_change = prev_loss - avg_loss
            self.training_history.append(avg_loss)
            
            if epoch % 5 == 0:
                print(f"{epoch}\t{avg_loss:.6f}\t{loss_change:.6f}")
            
            # Early stopping with patience
            if abs(loss_change) < self.early_stop_threshold:
                no_improvement_count += 1
                if no_improvement_count >= 3:  # Wait for 3 epochs of no improvement
                    print(f"\nEarly stopping at epoch {epoch}")
                    break
            else:
                no_improvement_count = 0
            
            prev_loss = avg_loss
    
    def predict(self, X):
        """Make predictions on new data"""
        predictions = []
        for x in X:
            z = sum(x[j] * self.weights[j] for j in range(len(self.weights))) + self.bias
            prob = self.sigmoid(z)
            predictions.append(1 if prob >= 0.5 else 0)
        return predictions

def evaluate(y_true, y_pred):
    """
    Compute evaluation metrics
    Returns:
        tuple: (accuracy, precision, recall, f1)
    """
    tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 1)
    tn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 0)
    fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 1)
    fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 0)
    
    accuracy = (tp + tn) / len(y_true)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return accuracy, precision, recall, f1

def perform_cross_validation(X, y, lambda_values, k=5):
    """
    Perform k-fold cross-validation to find the best regularization parameter
    Args:
        X (list): Feature vectors
        y (list): Labels
        lambda_values (list): List of lambda values to try
        k (int): Number of folds
    Returns:
        float: Best lambda value
    """
    print("\nPerforming cross-validation...")
    n_samples = len(y)
    fold_size = n_samples // k
    best_lambda = None
    best_score = -float('inf')
    
    for lambda_val in lambda_values:
        print(f"\nTrying lambda = {lambda_val}")
        scores = []
        
        for fold in range(k):
            # Split data into training and validation
            val_start = fold * fold_size
            val_end = val_start + fold_size
            
            X_val = X[val_start:val_end]
            y_val = y[val_start:val_end]
            X_train = X[:val_start] + X[val_end:]
            y_train = y[:val_start] + y[val_end:]
            
            # Train model
            model = LogisticRegression(lambda_reg=lambda_val, num_epochs=50)
            model.train(X_train, y_train, method='minibatch')
            
            # Evaluate
            y_pred = model.predict(X_val)
            accuracy, _, _, f1 = evaluate(y_val, y_pred)
            scores.append(f1)
        
        avg_score = sum(scores) / len(scores)
        print(f"Average F1-score: {avg_score:.4f}")
        
        if avg_score > best_score:
            best_score = avg_score
            best_lambda = lambda_val
    
    print(f"\nBest lambda value: {best_lambda} (F1-score: {best_score:.4f})")
    return best_lambda

def main():
    # Example usage with a sample input file
    # Assuming input file format: label\ttext
    file_path = "SMSSpamCollection"  # Replace with your data file path
    
    # Step 1: Load and split data
    train_texts, train_labels, val_texts, val_labels, test_texts, test_labels = load_and_split_data(file_path)
    
    # Step 2: Preprocess and tokenize texts
    print("\nPreprocessing and tokenizing texts...")
    train_tokens = [tokenize(preprocess_text(text)) for text in train_texts]
    val_tokens = [tokenize(preprocess_text(text)) for text in val_texts]
    test_tokens = [tokenize(preprocess_text(text)) for text in test_texts]
    
    # Step 3: Create TF-IDF features
    vectorizer = TfidfVectorizer()
    vectorizer.fit(train_tokens)
    
    X_train = vectorizer.transform(train_tokens)
    X_val = vectorizer.transform(val_tokens)
    X_test = vectorizer.transform(test_tokens)
    
    # Step 4: Train and evaluate with both SGD and mini-batch
    for method in ['sgd', 'minibatch']:
        print(f"\nStep 4: Training with {method.upper()}...")
        model = LogisticRegression(
            learning_rate=0.1,  # Increased learning rate
            lambda_reg=0.01,    # Reduced regularization
            num_epochs=35,
            early_stop_threshold=1e-4,
            batch_size=32
        )
        model.train(X_train, train_labels, method=method)
        
        # Evaluate on both validation and test sets
        print(f"\nStep 5: Evaluating {method.upper()} model...")
        
        # Validation set evaluation
        val_pred = model.predict(X_val)
        val_accuracy, val_precision, val_recall, val_f1 = evaluate(val_labels, val_pred)
        print(f"\n{method.upper()} Validation Results:")
        print(f"Accuracy:  {val_accuracy:.4f}")
        print(f"Precision: {val_precision:.4f}")
        print(f"Recall:    {val_recall:.4f}")
        print(f"F1-score:  {val_f1:.4f}")
        
        # Test set evaluation
        test_pred = model.predict(X_test)
        test_accuracy, test_precision, test_recall, test_f1 = evaluate(test_labels, test_pred)
        print(f"\n{method.upper()} Test Results:")
        print(f"Accuracy:  {test_accuracy:.4f}")
        print(f"Precision: {test_precision:.4f}")
        print(f"Recall:    {test_recall:.4f}")
        print(f"F1-score:  {test_f1:.4f}")

if __name__ == "__main__":
    main()

Reading data from SMSSpamCollection...
Total number of samples loaded: 5574

Label Distribution:
ham: 4827 (86.60%)
spam: 747 (13.40%)

Preprocessing and tokenizing texts...

Step 4: Training with SGD...
Class weights: {0: 0.5758783584292885, 1: 3.794747081712062}

Training with SGD:
Epoch	Loss		ΔLoss
0	0.565894	inf
5	0.548030	0.004362
10	0.551706	0.003164
15	0.549920	0.003600
20	0.551242	0.002120
25	0.552730	-0.001959
30	0.554134	-0.004919

Step 5: Evaluating SGD model...

SGD Validation Results:
Accuracy:  0.9187
Precision: 0.6524
Recall:    0.9068
F1-score:  0.7589

SGD Test Results:
Accuracy:  0.9283
Precision: 0.6821
Recall:    0.8957
F1-score:  0.7744

Step 4: Training with MINIBATCH...
Class weights: {0: 0.5758783584292885, 1: 3.794747081712062}

Training with MINIBATCH:
Epoch	Loss		ΔLoss
0	0.826326	inf
5	0.604667	0.011946
10	0.589657	0.001560
15	0.588511	-0.005011
20	0.580150	0.010970
25	0.592079	-0.009504
30	0.588796	-0.002602

Step 5: Evaluating MINIBATCH model...

MINIBATCH 

In [2]:
# Import required libraries
import math
import random
from collections import Counter

def load_and_split_data(file_path, train_ratio=0.7, val_ratio=0.15):
    """
    Loads text data from a file and splits it into training, validation, and test sets.
    Args:
        file_path (str): Path to the data file
        train_ratio (float): Proportion of data for training
        val_ratio (float): Proportion of data for validation
    Returns:
        tuple: Contains train, validation, and test data and labels
    """
    print(f"Reading data from {file_path}...")
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    print(f"Total number of samples loaded: {len(lines)}")
    
    # Count label distribution
    label_counts = Counter(line.split('\t')[0] for line in lines)
    print("\nLabel Distribution:")
    for label, count in label_counts.items():
        print(f"{label}: {count} ({count/len(lines)*100:.2f}%)")
    
    # Set random seed for reproducibility
    random.seed(42)
    random.shuffle(lines)
    
    # Calculate split points
    train_end = int(len(lines) * train_ratio)
    val_end = int(len(lines) * (train_ratio + val_ratio))
    
    # Split data
    train_data = lines[:train_end]
    val_data = lines[train_end:val_end]
    test_data = lines[val_end:]
    
    def split_data(data):
        sentences = []
        labels = []
        for line in data:
            if '\t' in line:
                label, text = line.strip().split('\t', 1)
                # Convert 'spam' to 1 and 'ham' to 0
                label_int = 1 if label.lower() == 'spam' else 0
                labels.append(label_int)
                sentences.append(text)
        return sentences, labels
    
    return split_data(train_data) + split_data(val_data) + split_data(test_data)

def load_books_data(file_path, train_ratio=0.7, val_ratio=0.15):
    """
    Loads book text data and splits it into training, validation, and test sets.
    """
    print(f"\nReading books data from {file_path}...")
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    print(f"Total number of book samples loaded: {len(lines)}")
    
    # Count author distribution
    label_counts = Counter(line.split('\t')[0] for line in lines)
    print("\nAuthor Distribution:")
    for author, count in label_counts.items():
        print(f"{author}: {count} ({count/len(lines)*100:.2f}%)")
    
    # Create author to index mapping
    authors = sorted(list(label_counts.keys()))
    author_to_idx = {author: idx for idx, author in enumerate(authors)}
    
    # Shuffle data
    random.seed(42)
    random.shuffle(lines)
    
    # Calculate split points
    train_end = int(len(lines) * train_ratio)
    val_end = int(len(lines) * (train_ratio + val_ratio))
    
    # Split data
    train_data = lines[:train_end]
    val_data = lines[train_end:val_end]
    test_data = lines[val_end:]
    
    def split_books_data(data):
        sentences = []
        labels = []
        for line in data:
            if '\t' in line:
                author, text = line.strip().split('\t', 1)
                labels.append(author_to_idx[author])
                sentences.append(text)
        return sentences, labels
    
    return (split_books_data(train_data) + split_books_data(val_data) + 
            split_books_data(test_data) + (authors,))

def preprocess_text(text):
    """
    Cleans text by removing special characters and converting to lowercase.
    Args:
        text (str): Input text to clean
    Returns:
        str: Cleaned text
    """
    # Convert to lowercase
    text = text.lower()
    
    # Remove URLs
    text = ' '.join(word for word in text.split() 
                   if not word.startswith(('http:', 'https:', 'www.')))
    
    # Keep only letters and spaces
    cleaned_text = ""
    for char in text:
        if char.isalpha() or char.isspace():
            cleaned_text += char
    
    return cleaned_text

def tokenize(text):
    """
    Converts text into tokens, removes stopwords, and applies stemming.
    Args:
        text (str): Input text to tokenize
    Returns:
        list: List of processed tokens
    """
    stopwords = {
        "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", 
        "has", "he", "in", "is", "it", "its", "of", "on", "that", "the", 
        "to", "was", "were", "will", "with", "the", "this", "but", "they",
        "have", "had", "what", "when", "where", "who", "which", "why", "how"
    }
    
    def simple_stem(word):
        """Simple word stemming rules"""
        if len(word) < 4:
            return word
        if word.endswith('ing'):
            return word[:-3]
        elif word.endswith('ed'):
            return word[:-2]
        elif word.endswith('s'):
            return word[:-1]
        return word
    
    words = text.split()
    return [simple_stem(word) for word in words 
            if word not in stopwords and len(word) > 1]

class TfidfVectorizer:
    """
    Converts text documents to TF-IDF feature vectors.
    """
    def __init__(self):
        self.vocabulary = set()
        self.idf = {}
        self.vocab_index = {}
    
    def fit(self, documents):
        """
        Builds vocabulary and computes IDF scores from documents
        Args:
            documents (list): List of tokenized documents
        """
        # Build vocabulary
        for doc in documents:
            self.vocabulary.update(doc)
        
        self.vocab_index = {word: idx for idx, word in enumerate(sorted(self.vocabulary))}
        
        # Compute document frequencies
        doc_freq = Counter()
        for doc in documents:
            doc_words = set(doc)
            for word in doc_words:
                doc_freq[word] += 1
        
        # Calculate IDF scores
        num_docs = len(documents)
        self.idf = {word: math.log((num_docs + 1)/(doc_freq[word] + 1)) + 1 
                   for word in self.vocabulary}
    
    def transform(self, documents):
        """
        Transforms documents into TF-IDF feature vectors
        Args:
            documents (list): List of tokenized documents
        Returns:
            list: List of TF-IDF feature vectors
        """
        X = []
        for doc in documents:
            tf = Counter(doc)
            doc_len = len(doc) if len(doc) > 0 else 1
            
            features = [0.0] * len(self.vocabulary)
            for word in set(doc):
                if word in self.vocab_index:
                    idx = self.vocab_index[word]
                    tf_val = tf[word] / doc_len
                    features[idx] = tf_val * self.idf.get(word, 0)
            
            X.append(features)
        return X

class LogisticRegression:
    def __init__(self, learning_rate=0.01, lambda_reg=0.1, num_epochs=100, 
                 early_stop_threshold=1e-4, batch_size=32):
        self.learning_rate = learning_rate
        self.lambda_reg = lambda_reg
        self.num_epochs = num_epochs
        self.early_stop_threshold = early_stop_threshold
        self.batch_size = batch_size
        self.weights = None
        self.bias = 0
        self.training_history = []
    
    def sigmoid(self, z):
        """Compute sigmoid function"""
        # Clip z to prevent overflow
        z = min(max(z, -100), 100)
        return 1 / (1 + math.exp(-z))
    
    def compute_loss(self, X, y, indices):
        """Compute binary cross-entropy loss with L2 regularization"""
        loss = 0
        for i in indices:
            z = sum(X[i][j] * self.weights[j] for j in range(len(self.weights))) + self.bias
            pred = self.sigmoid(z)
            # Add small epsilon to avoid log(0)
            loss -= (y[i] * math.log(pred + 1e-15) + (1 - y[i]) * math.log(1 - pred + 1e-15))
        
        # Add L2 regularization term
        l2_term = self.lambda_reg * sum(w * w for w in self.weights)
        return (loss / len(indices)) + l2_term
    
    def compute_class_weights(self, y):
        """
        Compute class weights to handle class imbalance
        Args:
            y (list): Labels
        Returns:
            dict: Class weights
        """
        counts = Counter(y)
        total = len(y)
        weights = {
            0: total / (2 * counts[0]) if counts[0] > 0 else 1,
            1: total / (2 * counts[1]) if counts[1] > 0 else 1
        }
        print(f"Class weights: {weights}")
        return weights
    
    def train(self, X, y, method='sgd'):
        """
        Train the model using either SGD or mini-batch gradient descent
        Args:
            X (list): Feature vectors
            y (list): Labels
            method (str): 'sgd' or 'minibatch'
        """
        if not self.weights:
            # Initialize weights with small random values
            self.weights = [random.uniform(-0.1, 0.1) for _ in range(len(X[0]))]
        
        # Compute class weights for balanced training
        class_weights = self.compute_class_weights(y)
        
        print(f"\nTraining with {method.upper()}:")
        print("Epoch\tLoss\t\tΔLoss")
        
        n_samples = len(y)
        prev_loss = float('inf')
        no_improvement_count = 0
        
        for epoch in range(self.num_epochs):
            indices = list(range(n_samples))
            random.shuffle(indices)
            
            if method == 'sgd':
                batch_indices = [[i] for i in indices]
            else:
                batch_indices = [indices[i:i + self.batch_size] 
                               for i in range(0, len(indices), self.batch_size)]
            
            epoch_loss = 0
            for batch in batch_indices:
                weight_gradients = [0] * len(self.weights)
                bias_gradient = 0
                
                for i in batch:
                    # Forward pass
                    z = sum(X[i][j] * self.weights[j] for j in range(len(self.weights))) + self.bias
                    pred = self.sigmoid(z)
                    
                    # Apply class weights to error
                    sample_weight = class_weights[y[i]]
                    error = sample_weight * (pred - y[i])
                    
                    # Accumulate gradients
                    for j in range(len(self.weights)):
                        weight_gradients[j] += error * X[i][j]
                    bias_gradient += error
                
                # Apply updates with regularization
                batch_size = len(batch)
                for j in range(len(self.weights)):
                    reg_gradient = 2 * self.lambda_reg * self.weights[j]
                    self.weights[j] -= self.learning_rate * (weight_gradients[j]/batch_size + reg_gradient)
                self.bias -= self.learning_rate * (bias_gradient/batch_size)
                
                epoch_loss += self.compute_loss(X, y, batch)
            
            avg_loss = epoch_loss / len(batch_indices)
            loss_change = prev_loss - avg_loss
            self.training_history.append(avg_loss)
            
            if epoch % 5 == 0:
                print(f"{epoch}\t{avg_loss:.6f}\t{loss_change:.6f}")
            
            # Early stopping with patience
            if abs(loss_change) < self.early_stop_threshold:
                no_improvement_count += 1
                if no_improvement_count >= 3:  # Wait for 3 epochs of no improvement
                    print(f"\nEarly stopping at epoch {epoch}")
                    break
            else:
                no_improvement_count = 0
            
            prev_loss = avg_loss
    
    def predict(self, X):
        """Make predictions on new data"""
        predictions = []
        for x in X:
            z = sum(x[j] * self.weights[j] for j in range(len(self.weights))) + self.bias
            prob = self.sigmoid(z)
            predictions.append(1 if prob >= 0.5 else 0)
        return predictions
class MultiClassLogisticRegression:
    def __init__(self, n_classes, n_features, learning_rate=0.01, lambda_reg=0.1, 
                 num_epochs=100, early_stop_threshold=1e-4, batch_size=32):
        self.n_classes = n_classes
        self.learning_rate = learning_rate
        self.lambda_reg = lambda_reg
        self.num_epochs = num_epochs
        self.early_stop_threshold = early_stop_threshold
        self.batch_size = batch_size
        # Initialize weights for each class
        self.weights = [[random.uniform(-0.1, 0.1) for _ in range(n_features)] 
                       for _ in range(n_classes)]
        self.biases = [0] * n_classes
        self.training_history = []
    
    def softmax(self, scores):
        """Compute softmax probabilities"""
        exp_scores = [math.exp(min(max(s, -100), 100)) for s in scores]
        total = sum(exp_scores)
        return [s / total for s in exp_scores]
    
    def compute_loss(self, X, y, indices):
        """Compute categorical cross-entropy loss with L2 regularization"""
        loss = 0
        for i in indices:
            # Compute scores for each class
            scores = [sum(X[i][j] * self.weights[c][j] for j in range(len(X[i]))) + self.biases[c]
                     for c in range(self.n_classes)]
            probs = self.softmax(scores)
            # Add small epsilon to avoid log(0)
            loss -= math.log(probs[y[i]] + 1e-15)
        
        # Add L2 regularization term
        l2_term = self.lambda_reg * sum(sum(w * w for w in class_weights) 
                                      for class_weights in self.weights)
        return (loss / len(indices)) + l2_term
    
    def train(self, X, y, method='minibatch'):
        """Train the model using mini-batch gradient descent"""
        print(f"\nTraining multi-class model with {method.upper()}:")
        print("Epoch\tLoss\t\tΔLoss")
        
        n_samples = len(y)
        prev_loss = float('inf')
        no_improvement_count = 0
        
        for epoch in range(self.num_epochs):
            indices = list(range(n_samples))
            random.shuffle(indices)
            
            if method == 'sgd':
                batch_indices = [[i] for i in indices]
            else:
                batch_indices = [indices[i:i + self.batch_size] 
                               for i in range(0, len(indices), self.batch_size)]
            
            epoch_loss = 0
            for batch in batch_indices:
                # Initialize gradients
                weight_gradients = [[0] * len(self.weights[0]) for _ in range(self.n_classes)]
                bias_gradients = [0] * self.n_classes
                
                for i in batch:
                    # Forward pass
                    scores = [sum(X[i][j] * self.weights[c][j] for j in range(len(X[i]))) + self.biases[c]
                            for c in range(self.n_classes)]
                    probs = self.softmax(scores)
                    
                    # Compute gradients
                    for c in range(self.n_classes):
                        error = probs[c]
                        if c == y[i]:
                            error -= 1
                            
                        for j in range(len(X[i])):
                            weight_gradients[c][j] += error * X[i][j]
                        bias_gradients[c] += error
                
                # Apply updates with regularization
                batch_size = len(batch)
                for c in range(self.n_classes):
                    for j in range(len(self.weights[c])):
                        reg_gradient = 2 * self.lambda_reg * self.weights[c][j]
                        self.weights[c][j] -= self.learning_rate * (weight_gradients[c][j]/batch_size + reg_gradient)
                    self.biases[c] -= self.learning_rate * (bias_gradients[c]/batch_size)
                
                epoch_loss += self.compute_loss(X, y, batch)
            
            avg_loss = epoch_loss / len(batch_indices)
            loss_change = prev_loss - avg_loss
            self.training_history.append(avg_loss)
            
            if epoch % 5 == 0:
                print(f"{epoch}\t{avg_loss:.6f}\t{loss_change:.6f}")
            
            if abs(loss_change) < self.early_stop_threshold:
                no_improvement_count += 1
                if no_improvement_count >= 3:
                    print(f"\nEarly stopping at epoch {epoch}")
                    break
            else:
                no_improvement_count = 0
            
            prev_loss = avg_loss
    
    def predict(self, X):
        """Make predictions on new data"""
        predictions = []
        for x in X:
            scores = [sum(x[j] * self.weights[c][j] for j in range(len(x))) + self.biases[c]
                     for c in range(self.n_classes)]
            probs = self.softmax(scores)
            predictions.append(max(range(len(probs)), key=lambda i: probs[i]))
        return predictions

def evaluate(y_true, y_pred):
    """
    Compute evaluation metrics
    Returns:
        tuple: (accuracy, precision, recall, f1)
    """
    tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 1)
    tn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 0)
    fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 0 and yp == 1)
    fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == 1 and yp == 0)
    
    accuracy = (tp + tn) / len(y_true)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return accuracy, precision, recall, f1

def evaluate_multiclass(y_true, y_pred, n_classes):
    """
    Compute evaluation metrics for multi-class classification
    Returns:
        tuple: (accuracy, macro_precision, macro_recall, macro_f1)
    """
    # Compute per-class metrics
    class_metrics = []
    for c in range(n_classes):
        tp = sum(1 for yt, yp in zip(y_true, y_pred) if yt == c and yp == c)
        fp = sum(1 for yt, yp in zip(y_true, y_pred) if yt != c and yp == c)
        fn = sum(1 for yt, yp in zip(y_true, y_pred) if yt == c and yp != c)
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        class_metrics.append((precision, recall, f1))
    
    # Calculate accuracy
    accuracy = sum(1 for yt, yp in zip(y_true, y_pred) if yt == yp) / len(y_true)
    
    # Calculate macro-averaged metrics
    macro_precision = sum(m[0] for m in class_metrics) / n_classes
    macro_recall = sum(m[1] for m in class_metrics) / n_classes
    macro_f1 = sum(m[2] for m in class_metrics) / n_classes
    
    return accuracy, macro_precision, macro_recall, macro_f1

def main():
    # Part A: Binary Classification (SMS Spam)
    print("Part A: SMS Spam Classification")
    file_path = "SMSSpamCollection"
    
    # Step 1: Load and split data
    train_texts, train_labels, val_texts, val_labels, test_texts, test_labels = load_and_split_data(file_path)
    
    # Step 2: Preprocess and tokenize texts
    print("\nPreprocessing and tokenizing texts...")
    train_tokens = [tokenize(preprocess_text(text)) for text in train_texts]
    val_tokens = [tokenize(preprocess_text(text)) for text in val_texts]
    test_tokens = [tokenize(preprocess_text(text)) for text in test_texts]
    
    # Step 3: Create TF-IDF features
    vectorizer = TfidfVectorizer()
    vectorizer.fit(train_tokens)
    
    X_train = vectorizer.transform(train_tokens)
    X_val = vectorizer.transform(val_tokens)
    X_test = vectorizer.transform(test_tokens)
    
    # Step 4: Train and evaluate with both SGD and mini-batch
    for method in ['sgd', 'minibatch']:
        print(f"\nStep 4: Training with {method.upper()}...")
        model = LogisticRegression(
            learning_rate=0.1,
            lambda_reg=0.01,
            num_epochs=35,
            early_stop_threshold=1e-4,
            batch_size=32
        )
        model.train(X_train, train_labels, method=method)
        
        # Evaluate on both validation and test sets
        print(f"\nStep 5: Evaluating {method.upper()} model...")
        
        # Validation set evaluation
        val_pred = model.predict(X_val)
        val_accuracy, val_precision, val_recall, val_f1 = evaluate(val_labels, val_pred)
        print(f"\n{method.upper()} Validation Results:")
        print(f"Accuracy:  {val_accuracy:.4f}")
        print(f"Precision: {val_precision:.4f}")
        print(f"Recall:    {val_recall:.4f}")
        print(f"F1-score:  {val_f1:.4f}")
        
        # Test set evaluation
        test_pred = model.predict(X_test)
        test_accuracy, test_precision, test_recall, test_f1 = evaluate(test_labels, test_pred)
        print(f"\n{method.upper()} Test Results:")
        print(f"Accuracy:  {test_accuracy:.4f}")
        print(f"Precision: {test_precision:.4f}")
        print(f"Recall:    {test_recall:.4f}")
        print(f"F1-score:  {test_f1:.4f}")
    
    # Part B: Multi-class Classification (Author Attribution)
    print("\nPart B: Author Attribution")
    books_file = "books.txt"
    
    # Load and split books data
    (book_train_texts, book_train_labels, 
     book_val_texts, book_val_labels,
     book_test_texts, book_test_labels,
     authors) = load_books_data(books_file)
    
    # Preprocess and tokenize book texts
    print("\nPreprocessing and tokenizing book texts...")
    book_train_tokens = [tokenize(preprocess_text(text)) for text in book_train_texts]
    book_val_tokens = [tokenize(preprocess_text(text)) for text in book_val_texts]
    book_test_tokens = [tokenize(preprocess_text(text)) for text in book_test_texts]
    
    # Create TF-IDF features for books
    book_vectorizer = TfidfVectorizer()
    book_vectorizer.fit(book_train_tokens)
    
    X_book_train = book_vectorizer.transform(book_train_tokens)
    X_book_val = book_vectorizer.transform(book_val_tokens)
    X_book_test = book_vectorizer.transform(book_test_tokens)
    
    # Train multi-class model
    n_classes = len(authors)
    n_features = len(book_vectorizer.vocabulary)
    
    print(f"\nTraining multi-class model for {n_classes} authors...")
    multi_model = MultiClassLogisticRegression(
        n_classes=n_classes,
        n_features=n_features,
        learning_rate=0.1,
        lambda_reg=0.01,
        num_epochs=35,
        early_stop_threshold=1e-4,
        batch_size=32
    )
    
    multi_model.train(X_book_train, book_train_labels)
    
    # Evaluate multi-class model
    print("\nEvaluating multi-class model...")
    
    # Validation set evaluation
    val_pred = multi_model.predict(X_book_val)
    val_metrics = evaluate_multiclass(book_val_labels, val_pred, n_classes)
    print("\nValidation Results:")
    print(f"Accuracy:        {val_metrics[0]:.4f}")
    print(f"Macro Precision: {val_metrics[1]:.4f}")
    print(f"Macro Recall:    {val_metrics[2]:.4f}")
    print(f"Macro F1-score:  {val_metrics[3]:.4f}")
    
    # Test set evaluation
    test_pred = multi_model.predict(X_book_test)
    test_metrics = evaluate_multiclass(book_test_labels, test_pred, n_classes)
    print("\nTest Results:")
    print(f"Accuracy:        {test_metrics[0]:.4f}")
    print(f"Macro Precision: {test_metrics[1]:.4f}")
    print(f"Macro Recall:    {test_metrics[2]:.4f}")
    print(f"Macro F1-score:  {test_metrics[3]:.4f}")

if __name__ == "__main__":
    main()

Part A: SMS Spam Classification
Reading data from SMSSpamCollection...
Total number of samples loaded: 5574

Label Distribution:
ham: 4827 (86.60%)
spam: 747 (13.40%)

Preprocessing and tokenizing texts...

Step 4: Training with SGD...
Class weights: {0: 0.5758783584292885, 1: 3.794747081712062}

Training with SGD:
Epoch	Loss		ΔLoss
0	0.565894	inf
5	0.548030	0.004362
10	0.551706	0.003164
15	0.549920	0.003600
20	0.551242	0.002120
25	0.552730	-0.001959
30	0.554134	-0.004919

Step 5: Evaluating SGD model...

SGD Validation Results:
Accuracy:  0.9187
Precision: 0.6524
Recall:    0.9068
F1-score:  0.7589

SGD Test Results:
Accuracy:  0.9283
Precision: 0.6821
Recall:    0.8957
F1-score:  0.7744

Step 4: Training with MINIBATCH...
Class weights: {0: 0.5758783584292885, 1: 3.794747081712062}

Training with MINIBATCH:
Epoch	Loss		ΔLoss
0	0.826326	inf
5	0.604667	0.011946
10	0.589657	0.001560
15	0.588511	-0.005011
20	0.580150	0.010970
25	0.592079	-0.009504
30	0.588796	-0.002602

Step 5: Evaluatin

KeyboardInterrupt: 