# VOLTTRON Observer Notebook

This Observer notebook sets up and executes a DataPuller that captures
data from another VOLTTRON instance. It uses a Historian to record the data.

The notebook also uses the Message Debugger agent to monitor messages
flowing across the VOLTTRON bus.

Most of the notebook's setup and execution is done with shell commands, called from Python.

# Setup: Prepare the Volttron Environment

VOLTTRON must be installed before using this notebook. For detailed instructions on
installing and configuring a VOLTTRON/Jupyter server environment, see [Jupyter Notebooks](http://volttron.readthedocs.io/en/devguides/supporting/utilities/JupyterNotebooks.html) 
in VOLTTRON ReadTheDocs.

As is described in that guide, environment variables should have been defined before starting 
the Jupyter server:

````
$ export VOLTTRON_ROOT=~/repos/volttron
````
        (path of the VOLTTRON repository, installed prior to running bootstrap)

````
$ export VOLTTRON_HOME=~/.volttron
````
        (directory in which the VOLTTRON instance runs)

The first VOLTTRON instance on a server usually runs, by convention, in ~/.volttron.
If multiple VOLTTRON instances are to be run on a single host, each must have its own VOLTTRON_HOME.

Also before starting the Jupyter server, a VOLTTRON virtual environment should have been 
activated by executing the following in $VOLTTRON_ROOT:

````
$ source env/bin/activate
````

The Python code below does some initialization to prepare for the steps that follow.

In [None]:
import datetime
import json
import os
import pprint
import sqlite3
import subprocess
import sys
import time

# Define a "run this shell command" method, wrapping subprocess.check_output()
def _sh(shell_command, shell=True, stderr=None):
    try:
        return_value = subprocess.check_output(shell_command, shell=shell, stderr=stderr)
    except Exception, err:
        print('Shell command failed: {}', shell_command)
        print(err)
        return_value = 'Error'
    return return_value

# Same as _sh(), except that this also prints the command output, preceded by an optional label.
def _print_sh(shell_command, label=None, **kwargs):
    print('{0}: {1}\n'.format(label+':' if label else '', _sh(shell_command, **kwargs)))

# Set up local variables vhome and vroot.
# The environment variables VOLTTRON_ROOT and VOLTTRON_HOME should already be defined -- see above.
vroot = %env VOLTTRON_ROOT
vhome = %env VOLTTRON_HOME
print("VOLTTRON_ROOT={}".format(vroot))
print("VOLTTRON_HOME={}".format(vhome))

# Define a VIP_SOCKET environment variable for use while installing and running agents.
socket_name = 'ipc://' + vhome + '/run/vip.socket'
%env VIP_SOCKET=$socket_name

# Run from the VOLTTRON root directory.
os.chdir(vroot)

data_dir = vhome + '/data'

print("Initialization complete")

# Setup: Shut Down All Agents

This ensures a clean agent installation process by the notebook.

In [None]:
print('Wait for the list to be displayed, and confirm that no agents are listed as running...\n')

# Shut down all agents.
_sh('volttron-ctl shutdown')

# List agent status to verify that the status of each agent is 0 or blank.
_print_sh('volttron-ctl status', stderr=subprocess.STDOUT)

# Setup: Discover the Observer's Network Parameters

The Aggregagator must know each Observer's network parameters
for authentication purposes.
Discover those parameters now.

Copy the vip-address's IP and port, and the serverkey,
to the Aggregator notebook under
'Setup: Configure the Aggregator's Network Parameters'.

Also, make sure that this port is open for TCP access on the Observer's host.

In order for this Observer to pull data from an Aggregator, the Aggregator
must know the Observer's network parameters, storing them in its known_hosts file.
Discover those parameters now.

Copy the vip-address's IP and port, and the serverkey,
to the Aggregator notebook under 
'Setup: Add Each Observer to the known_hosts File',
and execute that notebook's code to add this Observer to known_hosts.

In [None]:
# Obtain this server's IP address, volttron port number (usually 22916), and server key:
print('Obtaining network parameters and server key; please wait...\n')
_print_sh('curl ifconfig.me', label='Public IP address')
_print_sh('volttron-ctl auth serverkey', label='Serverkey')
_print_sh('cat {}/config'.format(vhome), label='Config file')

# Setup: Configure the Aggregator's Network Parameters

This Observer pulls data from an Aggregator, so it must be
configured with the Aggregator's IP address, port number and server key.

Define those parameters here. 

Obtain them from the Aggregator notebook,
'Setup: Discover the Aggregator's Network Parameters'.

In [None]:
aggregator_vip_address = '54.67.31.234'
aggregator_vip_port = '22916'
aggregator_server_key = 'A_WyNaTRQu3jkMeX6NgmchCCnPsYhZUjnt2zdAyf0HU'

aggregator_vip = "tcp://{0}:{1}".format(aggregator_vip_address, aggregator_vip_port)

print('vip = {0}'.format(aggregator_vip))
print('aggregator_server_key = {0}'.format(aggregator_server_key))

# Setup: Test the TCP Connection

The DataPuller will send requests to the VOLTTRON Aggregator instance
via TCP commands. Test that the Aggregator instance is capable of receiving
TCP requests on the designated IP address and port.

If this test fails, the port may not be open on the other server (firewall issue?),
the request may be for the wrong IP address and/or port ID,
or the other server's VOLTTRON instance may be down or incorrectly configured.

In [None]:
# Use an 'nc' (netcat) command to test the TCP connection
shell_command = 'nc -z -vv -w5 {0} {1}'.format(aggregator_vip_address, aggregator_vip_port)
_print_sh(shell_command, label='Network connection test result', stderr=subprocess.STDOUT)

# Setup: Configure a DataPuller

Create a configuration file for this collector's DataPuller.

The file specifies the Aggregator's IP address, port and server key,
and indicates which topics should be pulled.

In [None]:
config = """{{
    "agentid": "datapuller",
    "source-vip": "{0}",
    "source-serverkey": "{1}",
    "required_target_agents": [],
    "custom_topic_list": ["simstorage"],
    "services_topic_list": ["devices"],
    "topic_replace_list": [
        {{
            "from": "FromString", 
            "to": "ToString"
        }}
    ]
}}""".format(aggregator_vip, aggregator_server_key)
print("config = {}".format(config))
config_path = vhome + '/my_datapuller.config'
with open(config_path, 'w') as file:
    file.write(config)
print('Datapuller configuration written to {}'.format(config_path))

# Setup: Configure a SQLHistorian

In [None]:
# Create a SQLHistorian configuration specifically for this project

# The historian's database will reside in $VOLTTRON_HOME/data. 
# Make sure that the directory exists.
if not os.path.exists(data_dir):
    _sh('mkdir {}'.format(data_dir))

config = '''{{
    "agentid": "sqlhistorian-sqlite",
    "connection": {{
        "type": "sqlite",
        "params": {{
            "database": "{0}/historian.sqlite"
        }}
    }},
    "tables_def": {{
        "table_prefix": "",
        "data_table": "data_table",
        "topics_table": "topics_table",
        "meta_table": "meta_table"
    }}
}}'''.format(data_dir)
print("config = {}".format(config))
config_path = vhome + '/my_observer_historian.config'
with open(config_path, 'w') as file:
    file.write(config)
print('Historian configuration written to {}'.format(config_path))

# Setup: Install Agents

Install each agent employed by the Observer: a DataPuller, a SQLHistorian, a Message Debugger, and 2 Volttron Central agents.

In [None]:
print('Wait for the list to be displayed, then confirm that all of these agents appear in it...')

def install_agent(dir=None, id=None, config=None, tag=None):
    script_install_command = 'python scripts/install-agent.py -s {0} -i {1} -c {2} -t {3} -f'
    _sh(script_install_command.format(dir, id, config, tag))
    print('Installed {}'.format(tag))

# Install a DataPuller agent that pulls metrics from another VOLTTRON instance
install_agent(dir=vroot+'/examples/DataPuller',
              id='datapuller',
              config=vhome+'/my_datapuller.config',
              tag='datapuller')
    
# Install a SQL Historian agent that captures metrics in a SQLite database
install_agent(dir=vroot+'/services/core/SQLHistorian',
              id='sqlite_historian',
              config=vhome+'/my_observer_historian.config',
              tag='sqlite_historian')

# Install a Message Debugger agent.
install_agent(dir=vroot+'/services/core/MessageDebuggerAgent',
              id='messagedebugger',
              config=vroot+'/services/core/MessageDebuggerAgent/messagedebugger.config',
              tag='messagedebugger')

# Install a Platform Agent
install_agent(dir=vroot+'/services/core/VolttronCentralPlatform',
              id='platform.agent',
              config=vroot+'/services/core/VolttronCentralPlatform/config', 
              tag='vcp')

# Install a Volttron Central Agent
install_agent(dir=vroot+'/services/core/VolttronCentral',
              id='volttron.central',
              config=vroot+'/services/core/VolttronCentral/config', 
              tag='vc')

# List agent status to verify that the agents were installed successfully.
_print_sh('volttron-ctl status', stderr=subprocess.STDOUT)

# Setup: Get the Observer's datapuller Credentials

The Observer's DataPuller agent needs to authenticate to the Aggregator. Authentication is facilitated by adding the agent's credentials to the Aggregator's auth.json file.

Copy the PUBLICKEY from the command output below. On the Aggregator, run `volttron-ctl auth add` from the command line. When prompted for credentials, paste the key.

In [None]:
_print_sh('volttron-ctl auth publickey --tag datapuller')

# Setup: Start All Agents

When ready to start monitoring messages and observing data pulled from the Aggregator, start the agents.

In [None]:
print('Wait for the list to be displayed, then confirm that each started agent is running...')

_sh('volttron-ctl start --tag datapuller')
_sh('volttron-ctl start --tag messagedebugger')
_sh('volttron-ctl start --tag sqlite_historian')
_sh('volttron-ctl start --tag vcp')
_sh('volttron-ctl start --tag vc')

# List agent status to verify that the started agents have status "running".
_print_sh('volttron-ctl status', stderr=subprocess.STDOUT)

# Execution: Refresh variables, stop agents, delete database

In [None]:
print('Make a fresh start - refresh variable definitions, shut down any running agents, refresh the database')

import datetime
import json
import os
import pprint
import sqlite3
import subprocess
import sys
import time

# Define a "run this shell command" method, wrapping subprocess.check_output()
def _sh(shell_command, shell=True, stderr=None):
    try:
        return_value = subprocess.check_output(shell_command, shell=shell, stderr=stderr)
    except Exception, err:
        print('Shell command failed: {}', shell_command)
        print(err)
        return_value = 'Error'
    return return_value

# Same as _sh(), except that this also prints the command output, preceded by an optional label.
def _print_sh(shell_command, label=None, **kwargs):
    print('{0}: {1}\n'.format(label+':' if label else '', _sh(shell_command, **kwargs)))

# Set up local variables vhome and vroot.
# The environment variables VOLTTRON_ROOT and VOLTTRON_HOME should already be defined -- see above.
vroot = %env VOLTTRON_ROOT
vhome = %env VOLTTRON_HOME
print("VOLTTRON_ROOT={}".format(vroot))
print("VOLTTRON_HOME={}".format(vhome))

# Define a VIP_SOCKET environment variable for use while installing and running agents.
socket_name = 'ipc://' + vhome + '/run/vip.socket'
%env VIP_SOCKET=$socket_name

# Run from the VOLTTRON root directory.
os.chdir(vroot)

data_dir = vhome + '/data'

# List agent status to verify that the status of each agent is 0 or blank.
_print_sh('volttron-ctl status', stderr=subprocess.STDOUT)

# Data Reporting: Prepare to Execute Sqlite Commands

In [None]:
# Define a function that executes SQLite commands on our Historian database
def run_sqlite_cmd(command_string):
    _print_sh('sqlite3 {0} {1}'.format(data_dir + '/historian.sqlite', command_string))

# Data Reporting: Describe the Historian's Database Schema

Start the data reporting process by displaying the schema of the Historian's SQLite database.

In [None]:
run_sqlite_cmd('".schema"')

# Data Reporting: List the Topics

List each topic in the database's topics_table. This is the list of each type of data that has been captured and stored.

In [None]:
run_sqlite_cmd('"SELECT * FROM topics_table;"')

# Data Reporting: List Values for a Single Topic

Select a single topic by name, and list each value in the database for it.

In [None]:
topic_name = 'simstorage/power_kw'
# topic_name = 'my_chargepoint/portLoad'
start_time = '2017-09-06T22:00:00.000000+00:00'
stop_time = '2017-09-013T22:30:00.000000+00:00'

display_variables = 'ts, value_string'
join_statement = 'INNER JOIN topics_table on (data_table.topic_id = topics_table.topic_id) '
sqlite_cmd = '''"SELECT {0} FROM data_table {1} WHERE topics_table.topic_name = '{2}' AND data_table.ts > '{3}' AND data_table.ts < '{4}';"'''.format(
    display_variables,
    join_statement,
    topic_name,
    start_time,
    stop_time)
print('sqlite command: \n{0}\n'.format(sqlite_cmd))

run_sqlite_cmd(sqlite_cmd)

# Data Reporting: Graph Values for a Single Topic

Use numpy and matplotlib to produce a graph of the values for a topic.

In [None]:
topic_name = 'simstorage/power_kw'
# topic_name = 'my_chargepoint/portLoad'
start_time = '2017-09-06T22:00:00.000000+00:00'
stop_time = '2017-09-06T22:30:00.000000+00:00'

display_variables = 'ts, value_string'
join_statement = 'INNER JOIN topics_table on (data_table.topic_id = topics_table.topic_id) '
sqlite_cmd = '''SELECT {0} FROM data_table {1} WHERE topics_table.topic_name = '{2}' AND data_table.ts > '{3}' AND data_table.ts < '{4}';'''.format(
    display_variables,
    join_statement,
    topic_name,
    start_time,
    stop_time)
print('sqlite command: \n{0}\n'.format(sqlite_cmd))

import numpy
import matplotlib.pyplot as plt
from matplotlib import dates

# Connect to the SQLite database
conn = sqlite3.connect(data_dir + '/historian.sqlite')
c = conn.cursor()

# Populate graphArray with the result of querying the database for the specified topic.
graphArray = []
for row in c.execute(sqlite_cmd):
    # Remove parentheses and single quotes
    row_string_filtered = str(row).translate(None, "()'u\'")
    # In Python 3, the Unicode string would need to be filtered like this:
    # row_string_filtered = str(row).translate({ord(c): None for c in "()'u\'"})
    graphArray.append(row_string_filtered)

if graphArray:
    timestamps, values = numpy.loadtxt(graphArray,
                                       delimiter=',',
                                       unpack=True,
                                       converters={0: dates.strpdate2num('%Y-%m-%dT%H:%M:%S.%f+00:00')})
    fig = plt.figure()
    fig.add_subplot(1, 1, 1, facecolor='white')
    plt.plot_date(x=timestamps, y=values, fmt='b-')
    plt.gcf().autofmt_xdate()
    plt.show()
else:
    print('No data returned from query')

# Data Monitoring: Prepare to Use the Message Viewer

The VOLTTRON Message Debugger examines each message that comes across the message bus, storing them in a database.

It exposes RPC calls that can filter for interesting messages.

For more information, see the Message Debugger's 
[User Guide](http://volttron.readthedocs.io/en/develop/devguides/agent_development/Message-Debugging.html).

The Message Viewer uses RPC calls to request Message Debugger data.

There is a bug in Jupyter notebook logger setup, as documented in
https://github.com/jupyter/notebook/issues/1397. The following Python code
reloads the logger and calls basicConfig so that the Message Viewer's setupLogging call won't fail.

In [None]:
import logging
reload(logging)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')

# The Message Viewer must be run from its directory.
os.chdir(vroot + '/services/core/MessageDebuggerAgent/messageviewer')
from viewer import MessageViewer

print("MessageViewer loaded")

# Data Monitoring: Clear the Message Debugger Database

A Message Debugger session is a time-bounded set of messages.

Issue an RPC call to start a new session (which also stops the previous session, if any).

In [None]:
MessageViewer.delete_debugging_db()

# Data Monitoring: Start a Session

Start a new debugging session. It will be assigned a fresh session ID.

In [None]:
MessageViewer.enable_message_debugging()

# Data Monitoring: List Message Debugger Sessions

Issue an RPC call to get a list of Message Debugger sessions, and display the result.

In [None]:
# Create a PrettyPrinter instance that formats output from the Message Viewer.
pp = pprint.PrettyPrinter()

# Display a list of Message Debugger DebugSessions.
cmd_result = MessageViewer.display_db_objects('DebugSession')
if type(cmd_result) is str:
    # We seem to have gotten back an error message -- print it.
    pp.pprint(cmd_result)
else:
    printable_session = {sess['rowid']: sess for sess in cmd_result['results']}
    pp.pprint(printable_session)

# Data Monitoring: List Message Counts in a Debug Session by Agent

A given session (such as session 1) often has too many messages to display them unfiltered.

Get a list of message counts by sending/receiving agent, in preparation for filtering the stream by sender.

In [None]:
session_id = '1'

# Display the count of messages, by agent, for the debug session.
cmd_result = MessageViewer.session_details_by_agent(session_id)
pp.pprint(cmd_result)

# Data Monitoring: List Message Counts in a Debug Session by Topic

Another way to filter the message stream is by topic. Get a list of message counts by topic.

In [None]:
session_id = '1'

# Display the count of messages, by topic, for the debug session.
cmd_result = MessageViewer.session_details_by_topic(session_id)
pp.pprint(cmd_result)

# Data Monitoring: List Messages in a Session for a Sender and Topic

Issue an RPC request to get a list of messages in a specific DebugSession with a specific sending agent and topic.

In [None]:
session_id = '1'
agent_name = 'datapuller'
topic_name = 'devices/power_kw/simpv'

# Display each message in the chosen DebugSession for the specified topic.
cmd_result = MessageViewer.display_db_objects('DebugMessage', filters={'session_id': session_id,
                                                                       'sender': agent_name,
                                                                       'topic': topic_name})
if type(cmd_result) is str:
    # We may have received an error message -- print it.
    pp.pprint(cmd_result)
else:
    pp.pprint(cmd_result['results'])

# Data Monitoring: Filter Messages by Time

Hmm, that might still have been a flood of information. 
Reduce the data quantity by showing only messages that were
routed after a given time.

In [None]:
session_id = '1'
agent_name = 'datapuller'
topic_name = 'devices/power_kw/simpv'

# Substitute an appropriate timestamp here. You may need to add time to the wallclock 
# (e.g. perhaps 7 or 8 hours), putting it in UTC.
start_time_cutoff = '2017-08-03 22:24:00'

# Display each message in the chosen DebugSession that was routed after the indicated start time.
cmd_result = MessageViewer.display_db_objects('DebugMessage', filters={'session_id': session_id,
                                                                       'sender': agent_name,
                                                                       'topic': topic_name,
                                                                       'starttime': start_time_cutoff})
if type(cmd_result) is str:
    # We may have received an error message -- print it.
    pp.pprint(cmd_result)
else:
    pp.pprint(cmd_result['results'])

# Shutdown: Stop all agents

When finished, stop all VOLTTRON agents.

In [None]:
# Stop all agents.
_sh('volttron-ctl shutdown')

# Verify that all agents have been stopped.
_print_sh('volttron-ctl status', stderr=subprocess.STDOUT)

# Shutdown: Check Disk Space

In [None]:
# Report disk consumption on the server to determine whether files need to be deleted.
_sh("df | egrep '/dev/'")

# Shutdown: Delete the Historian Database

In [None]:
# Delete the Historian's SQLite database to conserve disk and get a fresh start.
if os.path.exists(data_dir + '/historian.sqlite'):
    _sh('rm {0}'.format(data_dir + '/historian.sqlite'))

# Shutdown: Delete the VOLTTRON Log File

In [None]:
# If needed to conserve disk space, delete the VOLTTRON log file.
if os.path.exists(root_dir + '/log1'):
    _sh('rm {0}'.format(root_dir + '/log1'))