In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Set random seed for reproducibility
np.random.seed(42)

class LSTM:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
        """
        Initialize an LSTM network from scratch

        Parameters:
        -----------
        input_size : int
            Size of the input features
        hidden_size : int
            Size of the hidden state
        output_size : int
            Size of the output
        learning_rate : float
            Learning rate for gradient descent
        """
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.learning_rate = learning_rate

        # Initialize weights for the forget gate
        self.Wf = np.random.randn(hidden_size, hidden_size + input_size) * 0.01
        self.bf = np.zeros((hidden_size, 1))

        # Initialize weights for the input gate
        self.Wi = np.random.randn(hidden_size, hidden_size + input_size) * 0.01
        self.bi = np.zeros((hidden_size, 1))

        # Initialize weights for the cell gate (candidate values)
        self.Wc = np.random.randn(hidden_size, hidden_size + input_size) * 0.01
        self.bc = np.zeros((hidden_size, 1))

        # Initialize weights for the output gate
        self.Wo = np.random.randn(hidden_size, hidden_size + input_size) * 0.01
        self.bo = np.zeros((hidden_size, 1))

        # Initialize weights for the output layer
        self.Wy = np.random.randn(output_size, hidden_size) * 0.01
        self.by = np.zeros((output_size, 1))

    def sigmoid(self, x):
        """Sigmoid activation function"""
        return 1 / (1 + np.exp(-np.clip(x, -15, 15)))  # Clip to avoid overflow

    def forward(self, inputs):
        """
        Forward pass through the LSTM

        Parameters:
        -----------
        inputs : list of numpy arrays
            List of input vectors for each time step

        Returns:
        --------
        h_states : list of numpy arrays
            List of hidden states for each time step
        outputs : list of numpy arrays
            List of output vectors for each time step
        """
        # Store inputs for backpropagation
        self.inputs = inputs
        T = len(inputs)  # Number of time steps

        # Initialize lists to store states and gates
        self.f_gates = []  # Forget gates
        self.i_gates = []  # Input gates
        self.c_tildes = []  # Candidate cell states
        self.o_gates = []  # Output gates
        self.c_states = []  # Cell states
        self.h_states = []  # Hidden states
        outputs = []  # Outputs

        # Initialize the first hidden state and cell state with zeros
        h_prev = np.zeros((self.hidden_size, 1))
        c_prev = np.zeros((self.hidden_size, 1))

        # Forward pass for each time step
        for t in range(T):
            # Ensure the input is a column vector
            x_t = np.array(inputs[t]).reshape(-1, 1)

            # Concatenate previous hidden state with current input
            concat = np.vstack((h_prev, x_t))

            # Forget gate: determines what to forget from the cell state
            # f_t = sigmoid(W_f · [h_{t-1}, x_t] + b_f)
            f_t = self.sigmoid(np.dot(self.Wf, concat) + self.bf)
            self.f_gates.append(f_t)

            # Input gate: determines what new information to store
            # i_t = sigmoid(W_i · [h_{t-1}, x_t] + b_i)
            i_t = self.sigmoid(np.dot(self.Wi, concat) + self.bi)
            self.i_gates.append(i_t)

            # Candidate cell state: creates vector of new candidate values
            # c̃_t = tanh(W_c · [h_{t-1}, x_t] + b_c)
            c_tilde = np.tanh(np.dot(self.Wc, concat) + self.bc)
            self.c_tildes.append(c_tilde)

            # Output gate: determines what to output from the cell state
            # o_t = sigmoid(W_o · [h_{t-1}, x_t] + b_o)
            o_t = self.sigmoid(np.dot(self.Wo, concat) + self.bo)
            self.o_gates.append(o_t)

            # New cell state: forget old information and add new information
            # c_t = f_t * c_{t-1} + i_t * c̃_t
            c_t = f_t * c_prev + i_t * c_tilde
            self.c_states.append(c_t)

            # New hidden state: filtered cell state through output gate
            # h_t = o_t * tanh(c_t)
            h_t = o_t * np.tanh(c_t)
            self.h_states.append(h_t)

            # Calculate output
            # y_t = W_y · h_t + b_y
            y_t = np.dot(self.Wy, h_t) + self.by
            outputs.append(y_t)

            # Update for next time step
            h_prev = h_t
            c_prev = c_t

        return self.h_states, outputs

    def backward(self, targets):
        """
        Backpropagation through time (BPTT) for LSTM

        Parameters:
        -----------
        targets : list of numpy arrays
            List of target vectors for each time step

        Returns:
        --------
        loss : float
            Mean squared error loss
        """
        T = len(targets)  # Number of time steps

        # Initialize gradient accumulators
        dWf, dWi, dWc, dWo, dWy = np.zeros_like(self.Wf), np.zeros_like(self.Wi), np.zeros_like(self.Wc), np.zeros_like(self.Wo), np.zeros_like(self.Wy)
        dbf, dbi, dbc, dbo, dby = np.zeros_like(self.bf), np.zeros_like(self.bi), np.zeros_like(self.bc), np.zeros_like(self.bo), np.zeros_like(self.by)

        # Initialize loss
        loss = 0

        # Initialize gradients for the next time step
        dh_next = np.zeros_like(self.h_states[0])
        dc_next = np.zeros_like(self.c_states[0])

        # Backpropagate through time
        for t in reversed(range(T)):
            # Convert target to correct shape
            target_t = np.array(targets[t]).reshape(-1, 1)

            # Calculate output for current time step
            y_t = np.dot(self.Wy, self.h_states[t]) + self.by

            # Calculate error and loss
            error = y_t - target_t
            loss += np.sum(error ** 2) / 2

            # Gradient of output weights
            dWy += np.dot(error, self.h_states[t].T)
            dby += error

            # Gradient of hidden state
            dh = np.dot(self.Wy.T, error) + dh_next

            # Get the current gates and states
            o_t = self.o_gates[t]
            c_t = self.c_states[t]
            c_tilde = self.c_tildes[t]
            i_t = self.i_gates[t]
            f_t = self.f_gates[t]

            # Get previous cell state and hidden state
            c_prev = np.zeros_like(self.c_states[0]) if t == 0 else self.c_states[t-1]
            h_prev = np.zeros_like(self.h_states[0]) if t == 0 else self.h_states[t-1]

            # Gradient of output gate
            do = dh * np.tanh(c_t)
            do_input = do * o_t * (1 - o_t)  # Derivative of sigmoid

            # Gradient of cell state
            dc = dc_next + dh * o_t * (1 - np.tanh(c_t)**2)

            # Gradient of candidate cell state
            dc_tilde = dc * i_t
            dc_tilde_input = dc_tilde * (1 - c_tilde**2)  # Derivative of tanh

            # Gradient of input gate
            di = dc * c_tilde
            di_input = di * i_t * (1 - i_t)  # Derivative of sigmoid

            # Gradient of forget gate
            df = dc * c_prev
            df_input = df * f_t * (1 - f_t)  # Derivative of sigmoid

            # Prepare next cell state gradient
            dc_next = dc * f_t

            # Concatenate previous hidden state with current input
            x_t = np.array(self.inputs[t]).reshape(-1, 1)
            concat = np.vstack((h_prev, x_t))

            # Accumulate gradients for weights
            dWf += np.dot(df_input, concat.T)
            dWi += np.dot(di_input, concat.T)
            dWc += np.dot(dc_tilde_input, concat.T)
            dWo += np.dot(do_input, concat.T)

            # Accumulate gradients for biases
            dbf += df_input
            dbi += di_input
            dbc += dc_tilde_input
            dbo += do_input

            # Gradient for next hidden state
            dconcat = np.dot(self.Wf.T, df_input) + np.dot(self.Wi.T, di_input) + \
                     np.dot(self.Wc.T, dc_tilde_input) + np.dot(self.Wo.T, do_input)

            dh_next = dconcat[:self.hidden_size]

        # Clip gradients to prevent exploding gradients
        for dparam in [dWf, dWi, dWc, dWo, dWy, dbf, dbi, dbc, dbo, dby]:
            np.clip(dparam, -5, 5, out=dparam)

        # Update parameters
        self.Wf -= self.learning_rate * dWf
        self.Wi -= self.learning_rate * dWi
        self.Wc -= self.learning_rate * dWc
        self.Wo -= self.learning_rate * dWo
        self.Wy -= self.learning_rate * dWy

        self.bf -= self.learning_rate * dbf
        self.bi -= self.learning_rate * dbi
        self.bc -= self.learning_rate * dbc
        self.bo -= self.learning_rate * dbo
        self.by -= self.learning_rate * dby

        return loss / T

    def train(self, X_train, y_train, epochs=100):
        """
        Train the LSTM on the given data

        Parameters:
        -----------
        X_train : list of lists
            List of input sequences
        y_train : list of lists
            List of target sequences
        epochs : int
            Number of training epochs

        Returns:
        --------
        losses : list
            List of losses for each epoch
        """
        losses = []

        for epoch in range(epochs):
            total_loss = 0

            for i in range(len(X_train)):
                # Forward pass
                _, _ = self.forward(X_train[i])

                # Backward pass
                loss = self.backward(y_train[i])
                total_loss += loss

            avg_loss = total_loss / len(X_train)
            losses.append(avg_loss)

            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {avg_loss:.6f}")

        return losses

# Demonstration with a specific problem: Learning a simple pattern
def generate_pattern_data(samples, pattern_length=3):
    """Generate a repeating pattern with some noise"""
    pattern = np.sin(np.linspace(0, 2 * np.pi, pattern_length))

    # Repeat the pattern
    repetitions = int(np.ceil(samples / pattern_length))
    extended_pattern = np.tile(pattern, repetitions)[:samples]

    # Add some noise
    noise = np.random.normal(0, 0.05, samples)
    noisy_pattern = extended_pattern + noise

    return noisy_pattern

def prepare_sequences(data, seq_length):
    """Prepare input-output sequences for time series prediction"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        # Input sequence
        X.append(data[i:i+seq_length])
        # Target sequence (next values)
        y.append(data[i+1:i+seq_length+1])
    return X, y

def run_pattern_example():
    print("LSTM from Scratch: Learning a Repeating Pattern")
    print("=" * 50)

    # Generate data with a repeating pattern
    pattern_length = 3
    total_samples = 200
    data = generate_pattern_data(total_samples, pattern_length)

    # Prepare sequences
    seq_length = 10
    X, y = prepare_sequences(data, seq_length)

    # Split into train and test
    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]

    # Display information
    print(f"Pattern length: {pattern_length}")
    print(f"Sequence length: {seq_length}")
    print(f"Training sequences: {len(X_train)}")
    print(f"Testing sequences: {len(X_test)}")
    print("-" * 50)

    # Create and train LSTM
    lstm = LSTM(input_size=1, hidden_size=10, output_size=1, learning_rate=0.01)
    losses = lstm.train(X_train, y_train, epochs=200)

    # Plot training loss
    plt.figure(figsize=(10, 5))
    plt.plot(losses)
    plt.title('LSTM Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Mean Squared Error')
    plt.grid(True)

    # Test with a sequence and predict future values
    test_seq = X_test[0]
    actual_future = y_test[0]

    # Initial prediction
    _, outputs = lstm.forward(test_seq)
    predictions = [out[0][0] for out in outputs]

    # Continue predicting beyond known data
    extended_predictions = predictions.copy()
    current_seq = test_seq.copy()

    for _ in range(30):  # Predict 30 steps into the future
        # Update sequence with the last prediction
        current_seq = np.append(current_seq[1:], extended_predictions[-1])

        # Get new prediction
        _, new_outputs = lstm.forward(current_seq)
        next_pred = new_outputs[-1][0][0]
        extended_predictions.append(next_pred)

    # Plot the results
    plt.figure(figsize=(12, 6))

    # Plot original full pattern for reference
    time_steps = range(total_samples)
    plt.plot(time_steps, data, 'g-', alpha=0.3, label='Full Pattern')

    # Plot the test sequence
    start_idx = train_size + seq_length - 1
    test_idxs = range(start_idx, start_idx + len(test_seq))
    plt.plot(test_idxs, test_seq, 'b-', linewidth=2, label='Test Sequence')

    # Plot actual future values
    future_idxs = range(start_idx + 1, start_idx + 1 + len(actual_future))
    plt.plot(future_idxs, actual_future, 'k-', linewidth=2, label='Actual Future')

    # Plot one-step predictions
    pred_idxs = range(start_idx + 1, start_idx + 1 + len(predictions))
    plt.plot(pred_idxs, predictions, 'r--', linewidth=2, label='One-step Predictions')

    # Plot extended predictions
    ext_pred_idxs = range(start_idx + 1, start_idx + 1 + len(extended_predictions))
    plt.plot(ext_pred_idxs, extended_predictions, 'm:', linewidth=2, label='Extended Predictions')

    plt.title('LSTM Pattern Prediction')
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True)

    # Highlight the pattern repeats
    for i in range(0, total_samples, pattern_length):
        plt.axvline(x=i, color='g', alpha=0.2, linestyle=':')

    plt.show()

    print("Pattern prediction demonstration complete!")
    print("-" * 50)
    print("Observations:")
    print("1. The LSTM learns to predict the repeating pattern")
    print("2. The model captures both the pattern and the slight noise in the data")
    print("3. For extended predictions, the LSTM maintains the pattern structure")
    print("   but may drift over time without correction")

if __name__ == "__main__":
    run_pattern_example()