# Custom Observers

Observers are at the heart of PyBN, but unfortunately it is not possible to define a recipe for everyones needs, but we built the system flexible enough that anybody can design its own observer. For simplicity of reading of this section we will recurr to a Jupyter Notebook trick to define a class along multiple code blocks. All observers derive from the class Observer.

In [1]:
from pybn.observers import Observer
import numpy as np

For this tutorial we will create an observer that just adds the state of each node in the network at each timestep and the observation is the average of such quantity. The first step is thus making our custom observer to derive from it and define the __init__ method. We will add the tag REQUIRED when the observer must have an implementation of that function.

In [2]:
class MyCustomObserver(Observer):
    #REQUIRED.
    def __init__(self, nodes=1, runs=1, base=2):
        # The parameter runs is important to preallocate the necessary memory, 
        # preventing unnecesary memory re-allocations each time.
        self.runs = runs
        # The current_run counter. It is initialized with -1 by convetion.
        self.current_run = -1
        # Base refers to the base used for fuzzy networks, in some cases it 
        # is necessary to have this parameter to do some calculations.
        self.base = base
        # The size of the network.
        self.nodes = nodes
        # The counter is used to compute the final average of a single run.
        self.counter = 0
        # Data is where the observer will be storing all the data.
        self.data = np.zeros((self.runs, self.nodes))
        # Aditional variables may be declared if needed.
        self.table = np.zeros((self.nodes))
        self.table_requires_update = False

We also need to declare a custom method to build this observer from a configuration file.

In [3]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    @classmethod
    def from_configuration(cls, configuration):
        return cls(
            nodes=configuration['parameters']['nodes'],  
            runs=configuration['execution']['samples'],
            base=configuration['parameters']['base'])

We need to declare methods to clear after each run and to reset the observer to is default value. process_table is an auxiliary function that the observer does not require but it will help us later to do some final calculations at the end of the run.

In [4]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    def clear(self):
        # This function is called each time the networks starts a new run.
        if (self.table_requires_update):
            self.process_table()
        self.table = np.zeros((self.nodes))
        self.current_run += 1
        self.counter = 0
        
        # The next three lines allows us to expand the storage of the observer if needed.
        # This lines prevents some errors when running out of space to store new values.
        if (self.current_run == self.runs):
            self.runs += 1
            self.data = np.append(self.data, np.zeros((1,self.nodes)), axis=0)

    #REQUIRED.
    def reset(self):
        # This function is called by the user when it wants to reset the observer to default.
        # Most of the time will be called from network.reset_observers().
        self.counter = 0
        self.current_run = -1
        self.table_requires_update = False
        self.data = np.zeros((self.runs, self.nodes))
        self.table = np.zeros((self.nodes))

The most important method is update.

In [5]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    def update(self, state):
        # Iterate through all nodes at time step t.
        for i in range(len(state)):
            # Add the value of that state.
            self.table[i] += state[i]
        # Increase the timestep counter.
        self.counter += 1
        # Set the table_requires_update to true.
        self.table_requires_update = True

We will also add a few methods that will help the calculations.

In [6]:
class MyCustomObserver(MyCustomObserver):
    #AUXILIAR.
    def process_table(self):
        # Average all the values and store it in self.data.
        self.data[self.current_run] = self.table / self.counter 
        
    #AUXILIAR.
    def compute_data(self):
        if (self.table_requires_update):
            self.process_table()
        mean_sum = np.mean(self.data)
        std_sum = np.std(self.data)
        mean_sum_per_node = np.mean(self.data, axis=0)
        std_sum_per_node = np.std(self.data, axis=0)
        return mean_sum, std_sum, mean_sum_per_node, std_sum_per_node

Finally we just need to define the method summary to recover the data the observer is calculating. This method just needs to print the information back to the you in a readble format. You can also use multiple prints if you prefer.

In [7]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    def summary(self, precision=3):
        # Final computation of the data.
        mean_sum, std_sum, mean_sum_per_node, std_sum_per_node = self.compute_data()
        # Write the summary.
        summary = 'Network average sum:\t' + f"{mean_sum:.{precision}f}" + ' ± ' + f"{std_sum:.{precision}f}" + '\n' + 'Nodes average sum:\n'
        for i in range(self.nodes):
            summary += f"{mean_sum_per_node[i]:.{precision}f}" + ' ± ' + f"{std_sum_per_node[i]:.{precision}f}" + ',\t'
            # The next to lines just add a new row of values each 5 nodes. Just to make the summary more readable.
            if ((i+1)%5 == 0):
                summary += '\n'
        # Print the summary.
        print(summary)

The summary writer has not been introduced and will not be since most of the time it will do the work if the observer is appropriately defined. The important thing to say about it is that it is in charge of writing the results of the experiments to files. We will need to declare two functions. The first one is just a practically the same that the summary function with a better format for data storage. The program expects two different outputs: one when we are only interested in the general behaviour and one when we are interested in each node behaviour. By convention we store the data [mean, std] for the former case and [mean_node_0, std_node_0, ..., mean_node_k, std_node_k] for the later case. Feel free to ignore this convention but a custom data reader may be required in such case.

In [8]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    def file_summary(self, per_node=False, precision=6):
        # Final computation of the data.
        mean_sum, std_sum, mean_sum_per_node, std_sum_per_node = self.compute_data()
        # Write the summary.
        
        if (per_node):
            summary = []
            for i in range(self.nodes):
                summary = f'{mean_sum_per_node[i]:.{precision}f},{std_sum_per_node[i]:.{precision}f},'
            summary = ''.join(summary)
        else:
            summary = f'{mean_sum:.{precision}f},{std_sum:.{precision}f},'

        # A list of tuples for each data the observer will compute.
        # The first entry is for the name of the observation.
        # This name is important since it will be used for the execution module to name the files.
        return [('average_sum', summary)]

The reason behind the second one is rather obscure but it is required when we do parallel calculations and all it does is to ensures that all data is passed correctly to the summary writer. All that this function needs to do is to ensure you have all the variables pre computed when you call file_summary. If your observer you dont need to precompute anything feel free to declare a dummy function.

In [9]:
class MyCustomObserver(MyCustomObserver):
    #REQUIRED.
    def pre_summary_writer(self):
        self.process_table()
        self.table_requires_update = False

That's it, our custom observer is properly defined.

# Testing the custom observer

In [10]:
from pybn.graphs import uniform_graph
from pybn.networks import BooleanNetwork

Lets define a small network that only perform a few steps in order to visually compare the result of the observer with the state evolution of the network.

In [11]:
nodes = 8
steps = 5
average_connectivity = 3.1

graph = uniform_graph(nodes, average_connectivity)
network = BooleanNetwork(nodes, graph)

In [12]:
MyCustomObserver(nodes=nodes)

<__main__.MyCustomObserver at 0x7f9ae5ef17f0>

Instantiate and attach the observer.

In [13]:
observers = [MyCustomObserver(nodes=nodes)] 
network.attach_observers(observers)

Perform one small execution and print the states.

In [14]:
# Set a random initial state.
network.set_initial_state(observe=True)
print(network.state)
# Perform several steps.
for _ in range(steps):
    network.step(observe=True)
    print(network.state)

[1 0 1 0 1 0 1 1]
[0 1 0 1 0 1 0 1]
[0 0 1 1 0 0 1 1]
[0 0 0 1 0 1 1 0]
[0 1 1 1 0 1 1 0]
[1 0 1 1 0 0 1 1]


Print the observers summary.

In [15]:
# Get observer's summary.    
network.observers_summary()

Network average sum:	0.542 ± 0.232
Nodes average sum:
0.333 ± 0.000,	0.333 ± 0.000,	0.667 ± 0.000,	0.833 ± 0.000,	0.167 ± 0.000,	
0.500 ± 0.000,	0.833 ± 0.000,	0.667 ± 0.000,	
