In [2]:
import numpy as np
import math
import torch
import torch.nn as nn

In [4]:
"""
    Generate a sequence of outputs and corresponding states based on provided probability matrices

    Parameters:
    states (list): list of state names as strings, same order as probability matrices
                   (ex: ["A", "B", "C"])
    transition_matrix: see above image
    n (int): length of sequence to output

    Returns:
    two lists, one with the outputs and the other with the corresponding states
"""
def e_machine(states, transition_matrix, n):
    output_states = []
    outputs = []

    # pick a random state to start with
    state_index = np.random.choice(len(states))
    output_states.append(states[state_index])

    for i in range(n):
        # get column of probabilities corresponding to state
        state_prob = transition_matrix[state_index]

        # select transition
        result = np.random.choice(len(state_prob), p=state_prob)

        # figure out output and state corresponding to transition
        output = math.floor(result/len(states))
        state_index = result % len(states)

        output_states.append(states[state_index])
        outputs.append(output)
    
    return output_states, outputs

In [6]:
# even process
test_array_2 = [[0.5,0,0,0.5], [0,0,1,0]]
#[np.array([[0.5,0], [0,0]]),np.array([[0,1],[0.5,0]])]
states, outputs = e_machine(["A", "B"], test_array_2, 100)
print("states:", states)
print("outputs:", outputs)

states: ['B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A']
outputs: [1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0]


- frequencies -> transition matrix
- np.random.direchlet
- implement simple reservoir


#### Calculating the transition matrices from the outputs and their corresponding states

Generating a long sequence of outputs for the even process

In [10]:
np.random.seed(5)
states, outputs = e_machine(["A", "B"], test_array_2, 10000)
print("states:", states)
print("outputs:", outputs)

states: ['B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', '

Calculating frequency of each transition/emission

In [32]:
transitions = {"(A,0 | A)" : 0,
               "(B,0 | A)" : 0,
               "(A,0 | B)" : 0,
               "(B,0 | B)" : 0,
               "(A,1 | A)" : 0,
               "(B,1 | A)" : 0,
               "(A,1 | B)" : 0,
               "(B,1 | B)" : 0}
for i, output in enumerate(outputs):
    key = f"({states[i+1]},{output} | {states[i]})"
    transitions[key] += 1

transitions

{'(A,0 | A)': 3349,
 '(B,0 | A)': 0,
 '(A,0 | B)': 0,
 '(B,0 | B)': 0,
 '(A,1 | A)': 0,
 '(B,1 | A)': 3325,
 '(A,1 | B)': 3326,
 '(B,1 | B)': 0}

Calculating probabilities

In [34]:
# P(A,0 | A)
transitions["(A,0 | A)"] /(transitions["(A,0 | A)"] + transitions['(B,1 | A)'])

0.5017980221756069

In [35]:
# P(B,1 | A)
transitions["(B,1 | A)"] /(transitions["(A,0 | A)"] + transitions['(B,1 | A)'])

0.4982019778243932

Generalizing the above into a function

In [35]:
def probability_rederivation(output_states, outputs):
    transitions = {}
    for i, output in enumerate(outputs):
        key = f"({output_states[i+1]}, {output} | {output_states[i]})"
        transitions[key] = transitions.get(key, 0) + 1

    # to store lists of transitions from state n
    states = {}
    for key in transitions.keys():
        state = key[-2]
        states[state] = states.get(state, []) + [[key, transitions[key]]]

    probabilities = {}
    for key in states.keys():
        total = sum(n for _, n in states[key])
        for transition, frequency in states[key]:
            probabilities[f"P{transition}"] = frequency/total
    
    return probabilities

In [44]:
states, outputs = e_machine(["A", "B"], test_array_2, 10000)
print("states:", states)
print("outputs:", outputs)
probability_rederivation(states, outputs)

states: ['A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'B', 'A', '

{'P(A, 0 | A)': 0.5007496251874063,
 'P(B, 1 | A)': 0.4992503748125937,
 'P(A, 1 | B)': 1.0}

Running with randomly generated distributions

In [31]:
def distribution_generator(num_states, num_outputs):
    alpha = [1] * num_states * num_outputs
    return np.random.dirichlet(alpha, num_states)

In [38]:
test_array_3 = distribution_generator(2,2)
print(test_array_3)
states, outputs = e_machine(["A", "B"], test_array_3, 10000)
print("states:", states)
print("outputs:", outputs)
probability_rederivation(states, outputs)

[[0.43523182 0.11295301 0.2337796  0.21803556]
 [0.09685626 0.2478058  0.04404797 0.61128997]]
states: ['B', 'B', 'A', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'B', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'B', 'B', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A', 'A', 'B', 'B', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'A', 'A', 'A', 'B', 'B', 'A', 'A', 'A', 'A', 'B', 'A', 'A', 'A', 'A', 'B', 'B', 'A', 'A', 'A', 'B', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', 'B', '

{'P(B, 1 | B)': 0.6101262557502973,
 'P(A, 0 | B)': 0.09656200289485375,
 'P(B, 0 | B)': 0.24739534817065306,
 'P(A, 1 | B)': 0.04591639318419582,
 'P(B, 1 | A)': 0.21799411005592137,
 'P(A, 0 | A)': 0.4389662817246286,
 'P(A, 1 | A)': 0.2320571787829655,
 'P(B, 0 | A)': 0.11098242943648456}

#### Reservoir/ESN

RNN code

In [None]:
# code from https://medium.com/@VersuS_/coding-a-recurrent-neural-network-rnn-from-scratch-using-pytorch-a6c9fc8ed4a7
batch_size = 20

class RNN(nn.Module):
    """
    Basic RNN block. This represents a single layer of RNN
    """
    def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
        """
        input_size: Number of features of your input vector
        hidden_size: Number of hidden neurons
        output_size: Number of features of your output vector
        """
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = batch_size
        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden_state) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Returns computed output and tanh(i2h + h2h)
        Inputs
        ------
        x: Input vector
        hidden_state: Previous hidden state
        Outputs
        -------
        out: Linear output (without activation because of how pytorch works)
        hidden_state: New hidden state matrix
        """
        x = self.i2h(x)
        hidden_state = self.h2h(hidden_state)
        hidden_state = torch.tanh(x + hidden_state)
	    out = self.h2o(hidden_state)
        return out, hidden_state
        
    def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
        """
				Helper function.
        Returns a hidden state with specified batch size. Defaults to 1
        """
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)

Initializing hyperparameters, dataloader, model, loss function, optimizer

In [None]:
batch_size = 5
input_size = 5
hidden_size = 5
output_size = 5

training_set = []
validation_set = []

# loading dataloaders
training_loader = torch.utils.data.DataLoader(training_set, batch_size=batch_size, shuffle=True)
validation_loader = torch.utils.data.DataLoader(validation_set, batch_size=batch_size, shuffle=False)

# initializing model
simple_esn = RNN(input_size=input_size, hidden_size=hidden_size, output_size=output_size)

# Freeze all layers
for param in simple_esn.parameters():
    param.requires_grad = False

# Unfreeze last layer
for param in simple_esn.h2o.parameters():
    param.requires_grad = True

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(simple_esn.parameters(), lr=0.001, momentum=0.9)

model = simple_esn

In [None]:
def train_one_epoch(epoch_index):
    running_loss = 0.
    last_loss = 0.

    # Here, we use enumerate(training_loader) instead of
    # iter(training_loader) so that we can track the batch
    # index and do some intra-epoch reporting
    for i, data in enumerate(training_loader):
        # Every data instance is an input + label pair
        inputs, labels = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs)

        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels)
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 1000 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            running_loss = 0.

    return last_loss

In [None]:
EPOCHS = 0
epoch_number = 0

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch_number + 1))

    # Make sure gradient tracking is on, and do a pass over the data
    model.train(True)
    avg_loss = train_one_epoch(epoch_number)


    running_vloss = 0.0
    # Set the model to evaluation mode, disabling dropout and using population
    # statistics for batch normalization.
    model.eval()

    # Disable gradient computation and reduce memory consumption.
    with torch.no_grad():
        for i, vdata in enumerate(validation_loader):
            vinputs, vlabels = vdata
            voutputs = model(vinputs)
            vloss = loss_fn(voutputs, vlabels)
            running_vloss += vloss

    avg_vloss = running_vloss / (i + 1)
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

    epoch_number += 1