In [1]:
import numpy as np

class RBM:
    def __init__(self, n_visible, n_hidden, learning_rate=0.01, k=1):
        """
        Initialize a Restricted Boltzmann Machine.
        
        Parameters:
        - n_visible: Number of visible units
        - n_hidden: Number of hidden units
        - learning_rate: Learning rate for weight updates
        - k: Number of Gibbs sampling steps for Contrastive Divergence (CD-k)
        """
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        self.k = k
        
        # Initialize weights and biases
        self.W = np.random.normal(0, 0.01, (n_visible, n_hidden))  # Weight matrix
        self.b = np.zeros(n_visible)  # Visible biases
        self.c = np.zeros(n_hidden)   # Hidden biases

    def sigmoid(self, x):
        """Compute sigmoid activation."""
        return 1 / (1 + np.exp(-x))

    def sample_hidden(self, v):
        """
        Sample hidden units given visible units.
        Returns: Probabilities and binary states of hidden units.
        """
        activation = np.dot(v, self.W) + self.c
        p_h = self.sigmoid(activation)
        h = (p_h > np.random.random(p_h.shape)).astype(float)
        return p_h, h

    def sample_visible(self, h):
        """
        Sample visible units given hidden units.
        Returns: Probabilities and binary states of visible units.
        """
        activation = np.dot(h, self.W.T) + self.b
        p_v = self.sigmoid(activation)
        v = (p_v > np.random.random(p_v.shape)).astype(float)
        return p_v, v

    def contrastive_divergence(self, v):
        """
        Perform one step of Contrastive Divergence (CD-k).
        Returns: Positive and negative gradients for weights and biases.
        """
        # Positive phase
        pos_h_prob, pos_h = self.sample_hidden(v)
        pos_assoc = np.dot(v.T, pos_h_prob)

        # Negative phase (k steps of Gibbs sampling)
        neg_v = v.copy()
        for _ in range(self.k):
            neg_h_prob, neg_h = self.sample_hidden(neg_v)
            neg_v_prob, neg_v = self.sample_visible(neg_h)
        
        neg_assoc = np.dot(neg_v.T, neg_h_prob)

        # Gradients
        grad_W = pos_assoc - neg_assoc
        grad_b = np.mean(v - neg_v, axis=0)
        grad_c = np.mean(pos_h_prob - neg_h_prob, axis=0)

        return grad_W, grad_b, grad_c

    def train(self, data, epochs=100, batch_size=100):
        """
        Train the RBM using Contrastive Divergence.
        
        Parameters:
        - data: Training data (binary, shape: [n_samples, n_visible])
        - epochs: Number of training epochs
        - batch_size: Size of mini-batches
        """
        n_samples = data.shape[0]
        for epoch in range(epochs):
            error = 0
            for start in range(0, n_samples, batch_size):
                end = min(start + batch_size, n_samples)
                batch = data[start:end]

                # Compute gradients
                grad_W, grad_b, grad_c = self.contrastive_divergence(batch)

                # Update weights and biases
                self.W += self.learning_rate * grad_W / batch_size
                self.b += self.learning_rate * grad_b
                self.c += self.learning_rate * grad_c

                # Compute reconstruction error
                error += np.mean((batch - self.sample_visible(self.sample_hidden(batch)[1])[1])**2)

            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Reconstruction Error: {error / (n_samples // batch_size):.4f}")

    def reconstruct(self, v):
        """
        Reconstruct input data by sampling hidden and then visible units.
        Returns: Reconstructed visible units.
        """
        _, h = self.sample_hidden(v)
        _, v_reconstructed = self.sample_visible(h)
        return v_reconstructed

# Example usage with synthetic binary data
if __name__ == "__main__":
    # Generate synthetic binary data (e.g., 100 samples with 10 visible units)
    np.random.seed(42)
    data = (np.random.random((100, 10)) > 0.5).astype(float)
    
    # Initialize and train RBM
    rbm = RBM(n_visible=10, n_hidden=5, learning_rate=0.1, k=1)
    rbm.train(data, epochs=100, batch_size=10)
    
    # Reconstruct a sample
    sample = data[0:1]
    reconstructed = rbm.reconstruct(sample)
    print("Original:", sample)
    print("Reconstructed:", reconstructed)

Epoch 0, Reconstruction Error: 0.4940
Epoch 10, Reconstruction Error: 0.5020
Epoch 20, Reconstruction Error: 0.4890
Epoch 30, Reconstruction Error: 0.4600
Epoch 40, Reconstruction Error: 0.4750
Epoch 50, Reconstruction Error: 0.4310
Epoch 60, Reconstruction Error: 0.4270
Epoch 70, Reconstruction Error: 0.4150
Epoch 80, Reconstruction Error: 0.3760
Epoch 90, Reconstruction Error: 0.4080
Original: [[0. 1. 1. 1. 0. 0. 0. 1. 1. 1.]]
Reconstructed: [[0. 1. 0. 0. 0. 0. 0. 1. 1. 1.]]
