<a href="https://colab.research.google.com/github/HalgasAdrian/CS5230-Coursework/blob/main/HW5B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Problem 1: Simple Sampling**

You are not allowed to use sampling libraries/functions. But you can use rand() call to generate a pseudo-uniform value in [0,1]; you can also use a library that computes the pdf(x|params). make sure to recap first Rejection Sampling and Inverse Transform Sampling

A. Implement simple sampling from continuous distributions: uniform (min, max, sample_size) and gaussian (mu, sigma, sample_size)

B. Implement sampling from a 2-dim Gaussian Distribution (2d mu, 2d sigma, sample_size)

C. Implement wihtout-replacement sampling from a discrete non-uniform distribution (given as input) following the Steven's method described in class ( paper ). Test it on desired sample sizes N significantly smaller than population size M (for example N=20 M=300)

In [1]:
import random

def sample_uniform(a, b, sample_size):
    """
    Generates samples from a Uniform(a, b) distribution.

    Args:
        a (float): Lower bound.
        b (float): Upper bound.
        sample_size (int): Number of samples to generate.

    Returns:
        list of float: Uniform samples.
    """
    samples = []
    for _ in range(sample_size):
        u = random.random()  # rand() returns value in [0,1)
        sample = a + (b - a) * u
        samples.append(sample)
    return samples

# Example usage:
print("Uniform samples:", sample_uniform(0, 10, 5))

Uniform samples: [6.987538055619814, 2.6119309767933196, 9.453894979763586, 9.007527242424382, 2.542066158250824]


In [2]:
import math

def sample_gaussian(mu, sigma, sample_size):
    """
    Generates samples from a Gaussian distribution N(mu, sigma^2)
    using the Box-Muller transform.

    Args:
        mu (float): Mean.
        sigma (float): Standard deviation.
        sample_size (int): Number of samples to generate.

    Returns:
        list of float: Gaussian samples.
    """
    samples = []
    # Process two samples per iteration using Box-Muller:
    for _ in range(sample_size // 2):
        u1 = random.random()
        u2 = random.random()
        # Avoid taking log of zero:
        if u1 == 0:
            u1 = 1e-10
        z0 = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
        z1 = math.sqrt(-2 * math.log(u1)) * math.sin(2 * math.pi * u2)
        samples.append(mu + sigma * z0)
        samples.append(mu + sigma * z1)
    # If sample_size is odd, generate one additional sample:
    if sample_size % 2 != 0:
        u1 = random.random()
        u2 = random.random()
        if u1 == 0:
            u1 = 1e-10
        z0 = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
        samples.append(mu + sigma * z0)
    return samples

# Example usage:
print("Gaussian samples:", sample_gaussian(0, 1, 5))

Gaussian samples: [-1.2823551358522043, -0.3916149557147224, 2.0250496247006513, -1.0520974229953688, -0.5949501315390834]


In [3]:
def sample_gaussian_2d(mu_vector, sigma_vector, sample_size):
    """
    Generates samples from a 2D Gaussian distribution with independent components.

    Args:
        mu_vector (list or tuple of floats): [mu_x, mu_y].
        sigma_vector (list or tuple of floats): [sigma_x, sigma_y].
        sample_size (int): Number of 2D samples to generate.

    Returns:
        list of [float, float]: List of 2D sample points.
    """
    samples = []

    # For each sample, generate 2 independent standard normal numbers:
    for _ in range(sample_size):
        # We can use the Box-Muller transform to generate a pair; here we only need 2 numbers.
        u1 = random.random()
        u2 = random.random()
        if u1 == 0:
            u1 = 1e-10
        z1 = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
        # For the second normal, either call Box-Muller again or use the orthogonal value:
        # For clarity, generate a new pair for each sample
        u3 = random.random()
        u4 = random.random()
        if u3 == 0:
            u3 = 1e-10
        z2 = math.sqrt(-2 * math.log(u3)) * math.cos(2 * math.pi * u4)

        # Transform to the desired mean and standard deviation:
        sample_x = mu_vector[0] + sigma_vector[0] * z1
        sample_y = mu_vector[1] + sigma_vector[1] * z2
        samples.append([sample_x, sample_y])

    return samples

# Example usage:
mu_2d = [0, 0]
sigma_2d = [1, 1]
print("2D Gaussian samples:", sample_gaussian_2d(mu_2d, sigma_2d, 5))

2D Gaussian samples: [[-0.962700359874964, -0.6487905857268136], [-0.8560819786230847, -0.34007792571958795], [0.9278813267525409, 1.4630738821801952], [-0.5644163262626185, -0.8470550936827791], [0.5868602180560467, 1.3332663475644468]]


In [4]:
def weighted_sample_without_replacement(weights, sample_size):
    """
    Samples indices without replacement from a list of weights using
    the method based on exponential variates (often called Steven's method).

    Args:
        weights (list of floats): Non-negative weights for the population (length M).
        sample_size (int): Number of items to sample (N), N << M.

    Returns:
        list of int: Selected indices (without replacement).
    """
    keys = []
    # Compute a key for every element.
    for i, w in enumerate(weights):
        # To avoid division by zero, handle zero-weight items appropriately.
        # Here we simply assign an infinite key so they will not be selected.
        if w <= 0:
            key = float('inf')
        else:
            u = random.random()  # Uniform in [0,1)
            # To avoid log(0) in the extremely unlikely event u==0:
            if u == 0:
                u = 1e-10
            key = -math.log(u) / w
        keys.append((key, i))

    # Sort items by their keys (ascending order) and select the sample_size smallest keys.
    keys.sort(key=lambda x: x[0])
    selected_indices = [index for (_, index) in keys[:sample_size]]
    return selected_indices

# Test scenario: N=20, M=300. We generate a synthetic weight vector.
M = 300
N = 20
# For example, let the weights be random positive numbers (not necessarily summing to 1)
population_weights = [random.random() + 0.1 for _ in range(M)]  # adding 0.1 to avoid zero weights

selected = weighted_sample_without_replacement(population_weights, N)
print("Selected indices (without replacement):", selected)

Selected indices (without replacement): [8, 176, 62, 224, 77, 177, 18, 13, 58, 136, 74, 86, 0, 111, 126, 263, 46, 145, 44, 188]


**Problem 2: Conditional Sampling**

Implement Gibbs Sampling for a multidim gaussian generative joint, by using the conditionals which are also gaussian distributions . The minimum requirement is for joint to have D=2 variables and for Gibbs to alternate between the two.

In [5]:
import random
import math

def sample_gaussian_value(mu, sigma):
    """
    Generate a single sample from a Gaussian distribution N(mu, sigma^2)
    using the Box–Muller transform.
    """
    u1 = random.random()
    u2 = random.random()
    # Guard against zero for u1 (extremely unlikely)
    if u1 == 0:
        u1 = 1e-10
    z = math.sqrt(-2 * math.log(u1)) * math.cos(2 * math.pi * u2)
    return mu + sigma * z

def gibbs_sampling_2d(mu, sigma, rho, num_samples, burn_in=100):
    """
    Perform Gibbs sampling for a 2-dimensional Gaussian distribution.

    The joint distribution:
         (X, Y) ~ N( [mu_x, mu_y], Σ )
    where Σ is defined as:
         [sigma_x^2        rho*sigma_x*sigma_y]
         [rho*sigma_x*sigma_y   sigma_y^2     ]

    The conditionals are:
      X|Y=y ~ N(mu_x + rho*(sigma_x/sigma_y)*(y-mu_y), (1-rho^2)*sigma_x^2)
      Y|X=x ~ N(mu_y + rho*(sigma_y/sigma_x)*(x-mu_x), (1-rho^2)*sigma_y^2)

    Args:
        mu: list or tuple of two floats [mu_x, mu_y]
        sigma: list or tuple of two floats [sigma_x, sigma_y]
        rho: float, correlation coefficient between X and Y, must be in (-1, 1)
        num_samples: int, number of samples to collect (after burn-in)
        burn_in: int, number of burn-in iterations (default is 100)

    Returns:
        List of [x, y] samples from the joint distribution.
    """

    # Initialize the chain. We can start at the mean.
    x = mu[0]
    y = mu[1]

    samples = []

    # Total iterations: burn_in + actual samples
    total_iterations = burn_in + num_samples

    for iteration in range(total_iterations):
        # Sample X given current Y:
        mean_x_cond = mu[0] + rho * (sigma[0]/sigma[1]) * (y - mu[1])
        var_x_cond = (1 - rho**2) * sigma[0]**2
        x = sample_gaussian_value(mean_x_cond, math.sqrt(var_x_cond))

        # Sample Y given updated X:
        mean_y_cond = mu[1] + rho * (sigma[1]/sigma[0]) * (x - mu[0])
        var_y_cond = (1 - rho**2) * sigma[1]**2
        y = sample_gaussian_value(mean_y_cond, math.sqrt(var_y_cond))

        # After burn_in, collect the sample.
        if iteration >= burn_in:
            samples.append([x, y])

    return samples

# --- Example Usage ---

# Parameters for the joint Gaussian:
mu = [0.0, 0.0]
sigma = [1.0, 1.0]  # Standard deviations for X and Y
rho = 0.8           # Correlation coefficient

# Number of samples we want to collect (after burn-in)
num_samples = 1000
burn_in = 200

# Run Gibbs Sampling
samples = gibbs_sampling_2d(mu, sigma, rho, num_samples, burn_in)

# Print a few samples to check
for i, sample in enumerate(samples[:10]):
    print(f"Sample {i+1}: X = {sample[0]:.4f}, Y = {sample[1]:.4f}")

Sample 1: X = 0.6713, Y = 0.5571
Sample 2: X = 0.1021, Y = 1.5488
Sample 3: X = 0.8961, Y = 0.3792
Sample 4: X = -0.2160, Y = 0.2510
Sample 5: X = -0.9930, Y = 0.7896
Sample 6: X = 1.0555, Y = 0.2000
Sample 7: X = -0.0766, Y = 0.5402
Sample 8: X = 0.0592, Y = 0.6796
Sample 9: X = 0.4553, Y = 0.7211
Sample 10: X = 0.1736, Y = -0.7669


**Problem 3: Implement your own baby-LDA**

Implement your own LDA using Gibbs Sampling, following this paper and this easy-to-read book . Gibbs Sampling is a lot slower than EM alternatives, so this can take some time; use a smaller sample of docs/words at first.

20NG train dataset 11280 docs x 53000 words
Small sonnet dataset (one per line) 154 docs x 3092 words

In [7]:
import os
import re
import random
import math
from collections import Counter

# --- For 20NG: Import from scikit-learn
from sklearn.datasets import fetch_20newsgroups

# ------------------------------------------------------------------
# Utility functions for cleaning and processing
def clean_text(text):
    """
    Clean raw text: lowercase, remove digits and punctuation, and split into words.
    Only used for 20NG (which is raw).
    """
    text = text.lower()
    text = re.sub(r'\d+', '', text)                   # remove digits
    text = re.sub(r'[^\w\s]', '', text)                # remove punctuation
    words = text.split()
    words = [word for word in words if len(word) > 2]   # filter very short words
    return words

def build_vocab(docs, min_freq=5):
    """
    Create vocabulary from a list of tokenized documents.

    Args:
      docs: list of documents (each document is a list of tokens)
      min_freq: minimum frequency threshold for inclusion in vocabulary

    Returns:
      vocab: sorted list of words
      word_to_index: dictionary mapping word to index
    """
    counter = Counter()
    for doc in docs:
        counter.update(doc)
    vocab = [word for word, count in counter.items() if count >= min_freq]
    vocab = sorted(vocab)
    word_to_index = {word: i for i, word in enumerate(vocab)}
    return vocab, word_to_index

def docs_to_indices(docs, word_to_index):
    """
    Convert tokenized documents (list of words) to lists of indices.

    Args:
      docs: list of documents (each document is a list of tokens)
      word_to_index: dictionary mapping word to index

    Returns:
      List of documents represented as lists of word indices.
    """
    docs_indices = []
    for doc in docs:
        indices = [word_to_index[word] for word in doc if word in word_to_index]
        if indices:
            docs_indices.append(indices)
    return docs_indices

# ------------------------------------------------------------------
# Data-loading functions

def load_20ng_dataset():
    """
    Load 20NG training data using scikit-learn. The texts are raw so we clean them.

    Returns:
      List of documents, where each document is a list of words.
    """
    # Remove headers, footers, and quotes to get cleaner documents.
    newsgroups = fetch_20newsgroups(subset='train', remove=('headers', 'footers', 'quotes'))
    docs = []
    for text in newsgroups.data:
        words = clean_text(text)
        if words:
            docs.append(words)
    return docs

def load_sonnets_preprocessed(filename):
    """
    Load a preprocessed sonnets dataset where each line is a document
    and tokens are already pre-tokenized (separated by whitespace).

    Args:
      filename: Path to the preprocessed sonnets file.

    Returns:
      List of documents, each a list of tokens.
    """
    docs = []
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                # Since the file is preprocessed, assume tokens are space-separated.
                tokens = line.strip().split()
                if tokens:
                    docs.append(tokens)
    except Exception as e:
        print(f"Error reading {filename}: {e}")
    return docs

# ------------------------------------------------------------------
# Baby LDA implementation using Gibbs Sampling

def discrete_sample(probabilities):
    """
    Sample an index from a discrete distribution with unnormalized probabilities.
    """
    total = sum(probabilities)
    r = random.uniform(0, total)
    cumulative = 0.0
    for i, p in enumerate(probabilities):
        cumulative += p
        if cumulative >= r:
            return i
    return len(probabilities) - 1  # fallback

def baby_lda(DOCS, Vocab, K, alpha, beta, iterations):
    """
    Run baby LDA using Gibbs sampling.

    Args:
      DOCS: List of documents (each a list of word indices).
      Vocab: List of strings; the vocabulary.
      K: int; number of topics.
      alpha: float; Dirichlet prior for doc-topic distributions.
      beta: float; Dirichlet prior for topic-word distributions.
      iterations: int; number of Gibbs sampling iterations.

    Returns:
      Z: Topic assignments for each word (list of lists).
      A: Document-topic count matrix (N x K).
      B: Topic-word count matrix (K x W).
    """
    N = len(DOCS)         # number of documents
    W = len(Vocab)        # vocabulary size

    # Initialize A with alpha for each topic per document.
    A = [[alpha for _ in range(K)] for _ in range(N)]
    # Initialize B with beta for each word per topic.
    B = [[beta for _ in range(W)] for _ in range(K)]
    # BSUM holds the sum over words for each topic.
    BSUM = [beta * W for _ in range(K)]
    # Z holds topic assignments for each word in each document, initialized to -1.
    Z = [[-1 for _ in doc] for doc in DOCS]

    # Gibbs sampling iterations.
    for it in range(iterations):
        for d, doc in enumerate(DOCS):
            for i, w in enumerate(doc):
                current_topic = Z[d][i]
                # Remove current assignment if it exists.
                if current_topic != -1:
                    A[d][current_topic] -= 1
                    B[current_topic][w] -= 1
                    BSUM[current_topic] -= 1

                # Calculate unnormalized probabilities for each topic.
                topic_probs = []
                for k in range(K):
                    prob = A[d][k] * (B[k][w] / BSUM[k])
                    topic_probs.append(prob)

                new_topic = discrete_sample(topic_probs)
                Z[d][i] = new_topic
                # Update counts with new assignment.
                A[d][new_topic] += 1
                B[new_topic][w] += 1
                BSUM[new_topic] += 1

        if (it + 1) % 10 == 0 or it == 0:
            print(f"Iteration {it+1} of {iterations} completed.")
    return Z, A, B

def print_top_words_per_topic(B, Vocab, top_n=10):
    """
    Display the top N words for each topic based on topic-word counts.
    """
    for k, counts in enumerate(B):
        word_count_pairs = [(Vocab[w], count) for w, count in enumerate(counts)]
        word_count_pairs.sort(key=lambda x: x[1], reverse=True)
        top_words = [word for word, count in word_count_pairs[:top_n]]
        print(f"Topic {k+1}: {', '.join(top_words)}")

# ------------------------------------------------------------------
# Main execution: Run baby LDA on 20NG train data and sonnetsPreprocessed.txt

if __name__ == '__main__':
    # --- LDA Parameters
    K = 6                 # Number of topics
    alpha = 5.0           # Prior for document-topic distributions
    beta = 2.0            # Prior for topic-word distributions
    iterations = 100      # Number of Gibbs sampling iterations

    # --- 20NG Dataset
    print("Loading 20NG train dataset...")
    docs_20ng = load_20ng_dataset()
    print(f"Loaded {len(docs_20ng)} documents from 20NG.")
    # Build vocabulary; adjust min_freq as needed (e.g., 10)
    vocab_20ng, word_to_index_20ng = build_vocab(docs_20ng, min_freq=10)
    print(f"20NG Vocabulary size: {len(vocab_20ng)}")
    docs_20ng_indices = docs_to_indices(docs_20ng, word_to_index_20ng)

    print("\nRunning baby LDA on 20NG dataset...")
    Z_20ng, A_20ng, B_20ng = baby_lda(docs_20ng_indices, vocab_20ng, K, alpha, beta, iterations)
    print("\nTop words per topic for 20NG:")
    print_top_words_per_topic(B_20ng, vocab_20ng)

    # --- Sonnets Dataset
    print("\nLoading preprocessed sonnets dataset (sonnetsPreprocessed.txt)...")
    sonnet_file = "sonnetsPreprocessed.txt"   # Ensure this file path is correct
    docs_sonnet = load_sonnets_preprocessed(sonnet_file)
    print(f"Loaded {len(docs_sonnet)} sonnet documents.")
    # For sonnets the file is already preprocessed, so no additional cleaning is done.
    # Use a lower min_freq since the dataset is small.
    vocab_sonnet, word_to_index_sonnet = build_vocab(docs_sonnet, min_freq=1)
    print(f"Sonnets Vocabulary size: {len(vocab_sonnet)}")
    docs_sonnet_indices = docs_to_indices(docs_sonnet, word_to_index_sonnet)

    print("\nRunning baby LDA on sonnets dataset...")
    Z_sonnet, A_sonnet, B_sonnet = baby_lda(docs_sonnet_indices, vocab_sonnet, K, alpha, beta, iterations)
    print("\nTop words per topic for sonnets:")
    print_top_words_per_topic(B_sonnet, vocab_sonnet)


Loading 20NG train dataset...


KeyboardInterrupt: 