In [1]:
# Get credentials
from IPython.utils import io
with io.capture_output() as captured:
    %run ../Introduction.ipynb
    
# Instantiate client
from sentenai import Client, Event
sentenai = Client(host, auth)

from datetime import datetime, timedelta

# Event Streams

Within the event stream API, the collection of event streams is represented as a dictionary-like Streams object with each stream name as the key.

In [2]:
dict(sentenai.streams)

{'foo': Stream(name='foo'),
 'ostream': Stream(name='ostream'),
 'stream1': Stream(name='stream1'),
 'test-logger': Stream(name='test-logger'),
 'weather': Stream(name='weather')}

---
### Streams can be accessed by their unique name
You may perform `getitem` on the streams API to instantiate a new Stream object. The stream may or may not exist. Referencing a new stream does not create a new event stream.

#### Getting a `Stream` object with `getitem`

In [3]:
weather = sentenai.streams['weather']
imaginary = sentenai.streams['imaginary']

#### Streams may or may not exist

A Stream that does not exist is one that has not been created server-side, even if you have a client object.

In [4]:
print("weather exists? {}".format(bool(weather)))
print("imaginary exists? {}".format(bool(imaginary)))

weather exists? True
imaginary exists? False


How do we instantiate a new stream? By adding events to it.

#### Streams may be filtered to create sub-streams

In [5]:
weather = sentenai.streams['weather']
cloudy_weather = weather.where('icon = "cloudy"')

In [16]:
len(cloudy_weather.events)

4

---
### Deleting streams

Deleting a stream eliminates all information about a stream along with all events that are a member of that stream.

This operation may take a second.

In [4]:
try:
  del sentenai.streams['imaginary']
except:
    pass

## Stream Metadata

Event Streams may have user-set metadata, which can be accessed via the `.metadata` sub-object.


---
### Setting metadata

In [6]:
weather.metadata['flag'] = True
weather.metadata['str'] = "hello world"
weather.metadata['count'] = 4

---
### Getting metadata

In [11]:
dict(weather.metadata)

{}

In [8]:
weather.metadata['flag']

True

---
### Deleting metadata

In [9]:
del weather.metadata['flag']

#### Clear metadata

In [13]:
del weather.metadata
dict(weather.metadata)

{}

## Core Stream methods

Stream objects have a number of built in properties and methods.

---
### Adding events to a stream

The `Stream` class provides three methods for managing events: `insert`, `update`, and `remove`.

#### Inserting a new event

In [46]:
test = sentenai.streams['test-stream']

my_evt = test.insert(Event(ts=datetime.utcnow(), data={'temperatureMax': -500.0}))
print(my_evt)

Event(id='MEuzy4Ma4GnfWxgQlet-KDv4', ts=numpy.datetime64('2019-07-23T02:52:27.904421'), data={'temperatureMax': -500.0})


#### Updating an event

In [47]:
my_evt.data = {'temperatureMax': -50.0}
test.update(my_evt)
test[my_evt.id]

Stream(name='test-stream')["MEuzy4Ma4GnfWxgQlet-KDv4"]

#### Deleting an event

In [48]:
del test.events[my_evt.id] # or weather.remove(evt.id)

try:
    test.events[my_evt.id]
except KeyError:
    print("Event successfully deleted")

Event successfully deleted


## The `Stream.events` collection

All events belonging to a stream are stored in the stream's events collection.

---
### A stream's events collection is list-like and time-ordered.

Every stream contains a collection of events. The event sub-API is located at `Stream.events`.


In [53]:
ostream = sentenai.streams['ostream']
for i in range(20):
    ostream.insert(Event(ts=datetime(2018,1,1)+timedelta(seconds=60*i), data={'a': i / 2, 'b': i * 3}))

#### List operations

In [54]:
len(ostream.events)

20

In [55]:
ostream.events[0]

Event(id='X-p8KBLtT_ZUwS-Q_xYi8grp', ts=numpy.datetime64('2018-01-01T00:00:00'), data={'a': 0.0, 'b': 0})

In [56]:
ostream.events[5]

Event(id='29CX9dBBcSeYzTspCmJMik3H', ts=numpy.datetime64('2018-01-01T00:05:00'), data={'a': 2.5, 'b': 15})

In [57]:
ostream.events[-1]

Event(id='5MC8urMgvwlEbbfrqXu8-I99', ts=numpy.datetime64('2018-01-01T00:19:00'), data={'a': 9.5, 'b': 57})

In [35]:
for evt in ostream.events[3:7]:
    print(evt.id, evt.ts)

_opeyjlFiM6R8Ksj3paGLPQs 2018-01-01T00:03:00
y8DefihIXu9yr1SGpCpUBS1N 2018-01-01T00:04:00
29CX9dBBcSeYzTspCmJMik3H 2018-01-01T00:05:00
7-O4g5mxcFOYvKyLj3jnYpxx 2018-01-01T00:06:00


#### A Stream's events collection can be sliced by time.

In [37]:
for evt in ostream.events[datetime(2018,1,1,0,5):datetime(2018,1,1,0,10)]:
    print(evt.id, evt.ts)

29CX9dBBcSeYzTspCmJMik3H 2018-01-01T00:05:00
7-O4g5mxcFOYvKyLj3jnYpxx 2018-01-01T00:06:00
w7fXH1bGqP6uev4trCYNAX4k 2018-01-01T00:07:00
VBIdcLq7xnHSKl6aLdRyPD1G 2018-01-01T00:08:00
p8cMTMALJJDiVVYrl3HNwITA 2018-01-01T00:09:00


#### Time slices can use a limit to prevent returning too many events

In [38]:
for evt in ostream.events[datetime(2018,1,1,0,5):datetime(2018,1,1,0,10):2]:
    print(evt.id, evt.ts)

29CX9dBBcSeYzTspCmJMik3H 2018-01-01T00:05:00
7-O4g5mxcFOYvKyLj3jnYpxx 2018-01-01T00:06:00


---
### A stream's events collection is dict-like

In [41]:
ostream.events['ABCD'] = Event(data={'foo': False})

In [42]:
ostream.events['ABCD']

Event(id='ABCD', ts=numpy.datetime64('2019-07-23T02:48:21.860785970'), data={'foo': False})

In [43]:
del ostream.events['ABCD']

## Stream schemas

Since events streams are a time-ordered collection of events, and events are represented as a tree of values and subtrees (think JSON-like objects), we can represent a `Stream`'s schema as a dictionary of paths to each value, also known as a `Field`.

---
### Stream Fields

In [18]:
stream1 = sentenai.streams['stream1']

deep_evt = Event(
    ts=datetime(2015,1,1,1,1),
    data={
        'a': {
            'aa': 1,
            'bb': {
                'cc': 8,
                'dd': "yes"
            }
        },
        'b': False
    }
)

stream1.events.insert(deep_evt)

for field in stream1:
    print(field.path)

('a', 'aa')
('a', 'bb', 'cc')
('a', 'bb', 'dd')
('b',)


##### To access a field, use dictionary-like notation:

In [19]:
stream1['a', 'bb', 'cc']

Stream(name='stream1')["a", "bb", "cc"]

---
### Fields statistics

#### Global statistics

In [32]:
weather['humidity'].count, weather['humidity'].missing

(345, 3)

In [20]:
weather['humidity'].mean

0.6473913043478261

In [57]:
weather['humidity'].min, weather['humidity'].max

(0.28, 0.98)

In [60]:
weather['humidity'].std

0.15552099400186561

In [23]:
weather['icon'].unique

{'partly-cloudy-day': 163,
 'rain': 94,
 'partly-cloudy-night': 48,
 'clear-day': 21,
 'snow': 6,
 'fog': 5,
 'cloudy': 4,
 'wind': 4}

In [24]:
weather['icon'].top

'partly-cloudy-day'

#### Local Statistics
Numerical statistics like `min`, `max`, `mean`,  and `std` can also be retrieved for a slice of time. Note that these stats treat all values with equal weight, they are not time-weighted at this time. That means they work best with fixed interval events, e.g. meteorological data.

In [29]:
weather['humidity'][datetime(2010,6,1):datetime(2010,7,1)]._stats

{'categorical': {},
 'numerical': {'max': 0.93,
  'mean': 0.6963333333333331,
  'count': 30,
  'missing': 0,
  'min': 0.4,
  'std': 0.13241739997482319}}

## Values

Streams aren't just collections of events; they also have encapsulate the temporal concept of having *values* at a point in time, where the collection of values at a point in time is the set of all paths paired with the value of that path when it was last set.


---
### Stream Values
The `stream.values(at=)` method returns the latest values at a point in time as a dictionary of paths to values. Values set in the past will be forward-filled to this point in time.

In [21]:
stream1 @ datetime(2015,1,1,1,1)      # or use stream1.values(at=...)

{('a', 'aa'): 1, 'b': False, ('a', 'bb', 'dd'): 'yes', ('a', 'bb', 'cc'): 8}

---
### Field Values
A field's value at a point in time can be accessed via the `@` operator.

In [22]:
weather['temperatureMax'] @ datetime(2010,6,1)     # or use weather['temperatureMax'].value(at=...)

83.42

----
### Time values (bounds)
The left bound of a stream is the timestamp of the oldest event in the stream. The right bound of a stream is the timestamp of the newest event in the stream.

In [59]:
weather.bounds

(numpy.datetime64('2010-01-01T00:00:00'),
 numpy.datetime64('2010-12-11T00:00:00'))