<img style="float: right;" src="images/hyperstream.svg">

# HyperStream Tutorial 5: Workflows

Workflows define a graph of streams. Usually, the first stream will be a special "raw" stream that pulls in data from a custom data source. Workflows can have multiple time ranges, which will cause the streams to be computed on all of the ranges given.

## Introduction

In this tutorial, we will be ussing a time-series dataset about the temperature in different countries and cities. The dataset is availabel at [The Census at School New Zeland][1]. The necessary files for this tutorial are already included in the folder **data/TimeSeriesDatasets_130207**.

In particular, there are four files with the minimum and maximum temperatures in different cities of Asia, Australia, NZ and USA from 2000 to 2012. And the rainfall levels of New Zeland. 

![workflows](images/workflow_world_temp.svg)

[1]: http://new.censusatschool.org.nz/resource/time-series-data-sets-2013/

In [1]:
try:
    %load_ext watermark
except ImportError:
    watermark = False
    pass

import sys
sys.path.append("../") # Add parent dir in the Path

from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC
import hyperstream

from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
from dateutil.parser import parse

if watermark:
    %watermark -v -m -p hyperstream -g

hs = HyperStream(loglevel=30)
print(hs)
print([p.channel_id_prefix for p in hs.config.plugins])



HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>
[u'example', u'data_importers', u'data_generators']


## Reading the data

In the data folder there are four csv files with the names **TempAsia.csv, TempAustralia.csv, TempNZ.csv and TempUSA.csv**. The first column of each csv file contains a header with the names of the columns. The first one being the date and the following are the minimum and maximum temperature in different cities with the format **cityMin** and **cityMax**.

Here is an example of the first 5 rows of the **TempAsia.csv** file:

```
Date,TokyoMax,TokyoMin,BangkokMax,BangkokMin
2000M01,11.2,4.2,32.8,24
```

The format of the date has the form **YYYYMmm** where **YYYY** is the year and **mm** is the month. Because this format is not recognized by the default parser of the **csv_reader** tool, we will need to specify our own parser that first replaces the **M** by an hyphen **-** and then applies the **dateutils.parser**.

Then, we will use a tool to read each csv, and a Stream to store all the results of applying the tool. When we specify to the tool that there is a header row in the csv file, the value of each Stream instance will be a dictionary with the name of the column and its corresponding value. For example, a Stream instance with the 4 cities shown above will look like:

```
[2000-01-19 00:00:00+00:00]: {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2}
```

In [4]:
def dateparser(dt):
    return parse(dt.replace('M', '-')).replace(tzinfo=UTC)

ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),
                      datetime(2013, 1, 1).replace(tzinfo=UTC))
ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),
                         datetime(2007, 3, 1).replace(tzinfo=UTC))

# M will be the Memory Channel
M = hs.channel_manager.memory

countries = ['Asia', 'Australia', 'NZ', 'USA']

temp_tools_csv = {}
temp_streams = {}
for country in countries:
    temp_tools_csv[country] = hs.plugins.example.tools.csv_reader(
            'data/TimeSeriesDatasets_130207/Temp{}.csv'.format(country),
            header=True, dateparser=dateparser)
    temp_streams[country] = M.get_or_create_stream(country)
    temp_tools_csv[country].execute(sources=[], sink=temp_streams[country],
                                    interval=ti_all)

In [9]:
# TD Working here
try:
    w = hs.create_workflow(workflow_id='tutorial_05', name='tutorial_05', owner='tutorials', description='Tutorial 5 workflow', online=False)
except KeyError:
    pass


# USA: First Stream Instance
# [2000-01-21 00:00:00+00:00]: {

countries = {
    'Asia': ['BangkokMin', 'BangkokMax', 'HongKongMax', 'HongKongMin', 'KualaLumpurMax', 'KualaLumpurMin', 'NewDelhiMax', 'NewDelhiMin', 'TokyoMax', 'TokyoMin'], 
    'Australia': ['BrisbaneMax', 'BrisbaneMin', 'CanberraMax', 'CanberraMin', 'GoldCoastMax', 'GodCoastMin', 'MelbourneMin', 'Melbournemax',  'SydneyMax', 'SydneyMin'], 
    'NZ': ['AucklandMax', 'AucklandMin', 'ChristchurchMax', 'ChristchurchMin', 'DunedinMax', 'DunedinMin', 'HamiltonMax', 'HamiltonMin','WellingtonMax', 'WellingtonMin'], 
    'USA': ['ChicagoMin', 'ChicagoMax', 'HoustonMax', 'HoustonMin', 'LosAngelesMax', 'LosAngelesMin', 'NYMax', 'NYMin', 'SeattleMax', 'SeattleMin']
}

for country in countries:
    id_country = 'country_' + country
    if not hs.plate_manager.meta_data_manager.contains(identifier=id_country):
        print("Adding " + country)
        hs.plate_manager.meta_data_manager.insert(
            parent='root', data=country, tag='country', identifier=id_country)

    for city in countries[country]:
        id_city = id_country + '.' + 'city_' + city
        if not hs.plate_manager.meta_data_manager.contains(identifier=id_city):
            print("Adding {} for country {}".format(city, country))
            hs.plate_manager.meta_data_manager.insert(
                parent=id_country, data=city, tag='city', identifier=id_city)


C = hs.plate_manager.create_plate(plate_id="C", description="Countries", values=[], complement=True, parent_plate=None, meta_data_id="country")
hs.plate_manager.create_plate(plate_id="C.C", description="Cities", values=[], complement=True, parent_plate="C", meta_data_id="city")

csv_reader = hs.plugins.data_importers.tools.csv_multi_reader(filename_template='data/TimeSeriesDatasets_130207/Temp{}.csv', datetime_parser=dateparser, skip_rows=1)

n = w.create_node(stream_name='raw_data', channel=M, plate_ids=["C"])
w.create_multi_output_factor(source=None, sink=n, splitting_node=None, tool=csv_reader)

w.execute(ti_all)

# Print the results
for stream in n.streams:
    print(stream)
    print(n.streams[stream].window().items())

((u'country', 'NZ'),)
[StreamInstance(timestamp=datetime.datetime(2000, 1, 21, 0, 0, tzinfo=<UTC>), value=[23.4, 15.5, 20.2, 10.8, 18.2, 8.8, 23.8, 12.4, 20.0, 14.2]), StreamInstance(timestamp=datetime.datetime(2000, 2, 21, 0, 0, tzinfo=<UTC>), value=[23.4, 16.4, 21.7, 11.0, 20.9, 9.8, 24.9, 12.2, 20.5, 14.7]), StreamInstance(timestamp=datetime.datetime(2000, 3, 21, 0, 0, tzinfo=<UTC>), value=[22.6, 14.8, 19.7, 9.1, 18.9, 7.3, 24.3, 10.1, 18.7, 12.7]), StreamInstance(timestamp=datetime.datetime(2000, 4, 21, 0, 0, tzinfo=<UTC>), value=[20.2, 13.5, 17.1, 6.7, 16.6, 5.1, 20.3, 10.3, 17.7, 12.6]), StreamInstance(timestamp=datetime.datetime(2000, 5, 21, 0, 0, tzinfo=<UTC>), value=[18.0, 11.8, 15.0, 4.2, 15.1, 2.0, 17.4, 8.3, 15.9, 10.6]), StreamInstance(timestamp=datetime.datetime(2000, 6, 21, 0, 0, tzinfo=<UTC>), value=[15.6, 9.2, 13.0, 2.1, 12.2, 2.3, 14.5, 5.7, 13.8, 9.0]), StreamInstance(timestamp=datetime.datetime(2000, 7, 21, 0, 0, tzinfo=<UTC>), value=[15.8, 10.2, 12.2, 2.9, 12.0, -0

## Print one Stream Instance per Stream

Now that we have generated one Stream per each country, we can inspect the first Stream Instance of each Stream.

In [None]:
for country in countries:
    # Print two examples per stream
    print('\n{}: First Stream Instance'.format(country))
    key, value = temp_streams[country].window().first()
    print '[%s]: %s' % (key, value)

## Visualize the temperatures in one Country

Now, we can visualize the temperatures of all the cities in one country. First, we will create a list of all the cities in one of the Streams by looking at the first Stream Instance. Then, we will create a list of lists containing the temperature value of each city, together with their corresponding time. Then, we can use the function **plot_multiple_stock** created for this tutorial.

In [None]:
country = countries[0]
this_cities_list = [key for key, value in temp_streams[country].window().items()[0].value.iteritems()]

data = {city:[] for city in this_cities_list}
time = []
for key, values in temp_streams[country].window().items():
    time.append(str(key))
    for city, temperature in values.iteritems():
        data[city].append(temperature)
        
names = data.keys()
data = [value for key, value in data.iteritems()]
        
plot_multiple_stock(data, time=time, names=names, title='Temperatures in ' + country, ylabel='ºC')

In [None]:
from hyperstream import StreamInstance
from hyperstream import StreamId

one_country_stream = temp_streams[country]

# It is similar to a database channel
A = hs.channel_manager.assets
this_cities_stream = A.get_or_create_stream('cities_{}'.format(country))

mapping = {}
for city in this_cities_list:
    mapping[city] = city

A.write_to_stream(stream_id=this_cities_stream.stream_id, data=StreamInstance(ti_all.end, mapping))

print this_cities_stream.window(TimeInterval.up_to_now()).items()

In [None]:
for city in this_cities_list:
    if not hs.plate_manager.meta_data_manager.contains(identifier='city_'+city):
        print("Adding " + city)
        hs.plate_manager.meta_data_manager.insert(parent='root', data=city,
                                                  tag='city', identifier='city_'+city)

In [None]:
cities_plate = hs.plate_manager.create_plate(plate_id='C', meta_data_id='city', parent_plate=None, 
                                             values=[], complement=True, description='Cities')
this_country_temps = []
for city in this_cities_list:
    print("Adding " + city)
    this_country_temps.append(M.get_or_create_stream(stream_id=StreamId(name='temperature',
                                                                         meta_data=(('city', city),))))

It is possible to create all the Streams passing a list to the splitter tool **splitter_from_list**. However, this could not be automated in a workflow

```Python
# TODO Ussing this new tool, it is not necessary to create a new stream. However, if it is a Stream it could be
# automated for any other countries
splitter_tool = hs.plugins.example.tools.splitter_from_list(element=None)

# TODO try to change the parameter name of MultiOutputTool splitting_stream to splitting_parameter
# or something that does not force you to think that it is a stream
splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_list, output_plate=cities_plate, 
                      interval=ti_all, input_plate_value=None, sinks=this_country_temps)
```

**TODO: Ask Tom: Question:** With the splitter_from_stream version we still need to create a list with the mapping... Then, I can not see the difference between using one or the other method.
**Answer:** The list in the Stream is allowed to change over time, this makes the future Streams to be more robust to change (e.g. the number of houses in the SPHERE project). Also, there is a tool that uses a dictionary for the splitting criteria, that is **splitter_of_dict** that expects the splitter_stream to be None, and a mapping parameter containing the static mapping.

In [None]:
splitter_tool = hs.tools.splitter_from_stream(element=None, use_mapping_keys_only=False)

splitter_tool.execute(source=one_country_stream, splitting_stream=this_cities_stream, output_plate=cities_plate, 
                      interval=ti_all, input_plate_value=None, sinks=this_country_temps)

In [None]:
one_city = this_country_temps[0]
city_name = one_city.stream_id.meta_data[0][1]
my_time, my_data = zip(*[(key.__str__(), value) for key, value in one_city.window(ti_all).items()])

plot_high_chart(my_time, my_data, type="high_stock", title='Temperature in {}'.format(city_name), yax='ºC')

# Put this all together as a workflow

In [None]:
w = hs.create_workflow(
    workflow_id="world_climate",
    name="World climate data and statistics", 
    description="Climate data statistics of ceveral cities from Asia, Australia, New Zeland and USA",
    owner="Miquel",
    online=False,
    monitor=False
)

## First create the nodes

Nodes correspond to the Streams that we used above.

In [None]:
from collections import namedtuple
NodeDef = namedtuple('NodeDef', ['channel', 'stream_name', 'plate_ids'], verbose=False)

nodes = (
    NodeDef(S, "wearable",     ["H"]),
    NodeDef(M, "wearable_xl1", ["H"]),
    NodeDef(M, "window_5",     ["H"]),
    NodeDef(M, "window_300",   ["H"]),
    NodeDef(M, "arm_angle",    ["H"]),
    NodeDef(M, "inactivity",   ["H"])
)

# Simple object to hold nodes
class NodeCollection(object): 
    pass

N = NodeCollection()

for n in nodes:
    setattr(N, n.stream_name, w.create_node(channel=n.channel, stream_name=n.stream_name, plate_ids=n.plate_ids))