# Data Streams in Python

### Not in SKLearn!

Most of the large machine learning packages focus on *static* data, rather than *streaming* data

For example, SKLearn models do not learn from one observation at a time, and evaluation is built around a static test set which can be shuffled

In [23]:
def hide_code_in_slideshow():   
    from IPython import display
    import binascii
    import os
    uid = binascii.hexlify(os.urandom(8)).decode()    
    html = """<div id="%s"></div>
    <script type="text/javascript">
        $(function(){
            var p = $("#%s");
            if (p.length==0) return;
            while (!p.hasClass("cell")) {
                p=p.parent();
                if (p.prop("tagName") =="body") return;
            }
            var cell = p;
            cell.find(".input").addClass("hide-in-slideshow")
        });
    </script>""" % (uid, uid)
    display.display_html(html, raw=True)
    
%%html
<style>
 .container.slides .celltoolbar, .container.slides .hide-in-slideshow {
    display: None ! important;
}
</style>

SyntaxError: invalid syntax (<ipython-input-23-c6bc9e64b3b6>, line 22)

### River - Package for Streaming data

![](river.png)

### River - Lets start with an included data stream

In [3]:
from river import synth
stream = synth.Agrawal()

stream

Synthetic data generator

    Name  Agrawal              
    Task  Binary classification
 Samples  ∞                    
Features  9                    
 Outputs  1                    
 Classes  2                    
  Sparse  False                

Configuration
-------------
classification_function  0    
                   seed  None 
        balance_classes  False
           perturbation  0.0  

### How would we train a model?

#### In SKLearn - 1) Test-Train split 2) Fit a static model to the train set

#### In River - Loop over the stream, training and predicting each observation individually

In [3]:
from sklearn.model_selection import train_test_split
from sklearn import tree
from sklearn.metrics import accuracy_score

stream = synth.Agrawal(seed=1)
# Convert from river stream to X, y 
X, y = list(zip(*stream.take(1000)))
X = [list(x.values()) for x in X]

# SKLearn training
X_train, X_test, y_train, y_test = train_test_split(X, y)
clf = tree.DecisionTreeClassifier()
clf = clf.fit(X_train, y_train)
y_preds = clf.predict(X_test)
print(accuracy_score(y_test, y_preds))



1.0


In [4]:
from river import evaluate
from river import metrics
from river import tree

stream = synth.Agrawal(seed=1)
clf = tree.HoeffdingTreeClassifier()
metric = metrics.Accuracy()

for X, y in stream.take(1000):
    p = clf.predict_one(X)
    metric.update(y, p)
    
    clf.learn_one(X, y)

print(metric)

Accuracy: 84.60%


Why does accuracy seem lower? Is is *really* lower?

### Lets look at accuracy over time

In [39]:
hide_code_in_slideshow()
import matplotlib.pyplot as plt
%matplotlib widget
from ipywidgets import interact, IntSlider
import ipywidgets as widgets

acc_cache = []
stream = iter(synth.Agrawal(seed=1).take(10000))
clf = tree.HoeffdingTreeClassifier()
metric = metrics.Accuracy()

plt.ioff()
fig = plt.figure()
fig.canvas.toolbar_visible = False
fig.canvas.header_visible = False 
fig.canvas.footer_visible = False
line, = plt.plot([], [])
plt.show()

def get_accuracy_at_step(i):
    if i < len(acc_cache):
        return acc_cache[i]
    while i >= len(acc_cache):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        acc_cache.append(metric.get())
        clf.learn_one(X, y)
    return acc_cache[i]

def plot_acc_up_to(i):
    get_accuracy_at_step(i)
    y = acc_cache[:i]
    x = range(i)
    
    line.set_data(x, y)
    fig.gca().relim()
    fig.gca().autoscale_view()
    fig.canvas.draw_idle()

interact(plot_acc_up_to, i=IntSlider(min=1, max=9999, value=10))



Canvas(footer_visible=False, header_visible=False, toolbar=Toolbar(toolitems=[('Home', 'Reset original view', …

interactive(children=(IntSlider(value=10, description='i', max=9999, min=1), Output()), _dom_classes=('widget-…

<function __main__.plot_acc_up_to(i)>

In [25]:
def get_accuracy_overtime(i):
    stream = synth.Agrawal(seed=1)
    clf = tree.HoeffdingTreeClassifier()
    metric = metrics.Accuracy()
    accuracy = []
    for j in range(i):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        clf.learn_one(X, y)
        
        accuracy.append(metric.get())
    return accuracy

Overall accuracy always includes the performance at the start, when our model was barely trained.

### Sliding window accuracy - only captures *recent* performance

In [40]:
hide_code_in_slideshow()
import matplotlib.pyplot as plt
%matplotlib widget
from ipywidgets import interact, IntSlider
import ipywidgets as widgets

acc_cache = []
stream = iter(synth.Agrawal(seed=1).take(10000))
clf = tree.HoeffdingTreeClassifier()
metric = metrics.Rolling(metrics.Accuracy(), window_size=500)

plt.ioff()
fig = plt.figure()
fig.canvas.toolbar_visible = False
fig.canvas.header_visible = False 
fig.canvas.footer_visible = False
line, = plt.plot([], [])
plt.show()

def get_accuracy_at_step(i):
    if i < len(acc_cache):
        return acc_cache[i]
    while i >= len(acc_cache):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        acc_cache.append(metric.get())
        clf.learn_one(X, y)
    return acc_cache[i]

def plot_acc_up_to(i):
    get_accuracy_at_step(i)
    y = acc_cache[:i]
    x = range(i)
    
    line.set_data(x, y)
    fig.gca().relim()
    fig.gca().autoscale_view()
    fig.canvas.draw_idle()

interact(plot_acc_up_to, i=IntSlider(min=1, max=9999, value=10))



Canvas(footer_visible=False, header_visible=False, toolbar=Toolbar(toolitems=[('Home', 'Reset original view', …

interactive(children=(IntSlider(value=10, description='i', max=9999, min=1), Output()), _dom_classes=('widget-…

<function __main__.plot_acc_up_to(i)>

In [32]:
def get_sliding_accuracy_overtime(i):
    stream = synth.Agrawal(seed=1)
    clf = tree.HoeffdingTreeClassifier()
    metric = metrics.Rolling(metrics.Accuracy(), window_size=500)

    accuracy = []
    for j in range(i):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        clf.learn_one(X, y)
        
        accuracy.append(metric.get())
    return accuracy

### Concept Drift - A change in the distribution of data

In [41]:
hide_code_in_slideshow()
import matplotlib.pyplot as plt
%matplotlib widget
from ipywidgets import interact, IntSlider
import ipywidgets as widgets
from itertools import chain

acc_cache = []
streamA = iter(synth.Agrawal(classification_function=0, seed=1).take(10000))
streamB = iter(synth.Agrawal(classification_function=3, seed=1).take(10000))
stream = chain(streamA, streamB)
clf = tree.HoeffdingTreeClassifier()
metric = metrics.Rolling(metrics.Accuracy(), window_size=500)

plt.ioff()
fig = plt.figure()
fig.canvas.toolbar_visible = False
fig.canvas.header_visible = False 
fig.canvas.footer_visible = False
line, = plt.plot([], [])
plt.show()

def get_accuracy_at_step(i):
    if i < len(acc_cache):
        return acc_cache[i]
    while i >= len(acc_cache):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        acc_cache.append(metric.get())
        clf.learn_one(X, y)
    return acc_cache[i]

def plot_acc_up_to(i):
    get_accuracy_at_step(i)
    y = acc_cache[:i]
    x = range(i)
    
    line.set_data(x, y)
    fig.gca().relim()
    fig.gca().autoscale_view()
    fig.canvas.draw_idle()

interact(plot_acc_up_to, i=IntSlider(min=1, max=19999, value=10))



Canvas(footer_visible=False, header_visible=False, toolbar=Toolbar(toolitems=[('Home', 'Reset original view', …

interactive(children=(IntSlider(value=10, description='i', max=19999, min=1), Output()), _dom_classes=('widget…

<function __main__.plot_acc_up_to(i)>

In [None]:
from itertools import chain
def get_sliding_accuracy_overtime(i):
    streamA = iter(synth.Agrawal(classification_function=0, seed=1).take(10000))
    streamB = iter(synth.Agrawal(classification_function=3, seed=1).take(10000))
    stream = chain(streamA, streamB)
    clf = tree.HoeffdingTreeClassifier()
    metric = metrics.Rolling(metrics.Accuracy(), window_size=500)

    accuracy = []
    for j in range(i):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        clf.learn_one(X, y)
        
        accuracy.append(metric.get())
    return accuracy

### Lets try to detect the drift

In [66]:
hide_code_in_slideshow()
import matplotlib.pyplot as plt
%matplotlib widget
from ipywidgets import interact, IntSlider
import ipywidgets as widgets
from itertools import chain
from river.drift import ADWIN, DDM

streamA = iter(synth.Agrawal(classification_function=0, seed=1).take(10000))
streamB = iter(synth.Agrawal(classification_function=3, seed=1).take(10000))
stream = chain(streamA, streamB)
clf = tree.HoeffdingTreeClassifier()
metric = metrics.Rolling(metrics.Accuracy(), window_size=500)
drift_detector = ADWIN(delta=0.005)

def run_stream(stream, n):
    accuracy = []
    warn = []
    drift = []
    for j in range(n):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        clf.learn_one(X, y)
        
        is_correct = 1 if p==y else 0
        in_drift, in_warning = drift_detector.update(is_correct)
        if in_drift:
            drift_detector.reset()
        accuracy.append(metric.get())
        warn.append(1 if in_warning else 0)
        drift.append(1 if in_drift else 0)
        
    return accuracy, warn, drift

acc_cache, warn_cache, drift_cache = run_stream(stream, 20000)

plt.ioff()
fig = plt.figure()
fig.canvas.toolbar_visible = False
fig.canvas.header_visible = False 
fig.canvas.footer_visible = False
line, = plt.plot([], [])
lines_warn = []
warn_periods = []
l = None
for i,w in enumerate(warn_cache):
    if not w:
        if l:
            warn_periods.append(list(range(l, i)))
            l = None
    else:
        if not l:
            l = i
if l:
   warn_periods.append(list(range(l, len(warn_cache))))
# for wp in warn_periods:
#     wl, = plt.plot([], [])
#     lines_warn.append(wl)
lines_drift = []
for i,d in enumerate(drift_cache):
    if d:
        dl = plt.axvline(x=i, ymin=0, ymax=1, c='red')
        dl.set_visible(False)
        lines_drift.append((dl, i))
    
plt.show()



def plot_acc_up_to(i):
    y = acc_cache[:i]
    x = range(i)
    for dl, di in lines_drift:
        if i >= di:
            dl.set_visible(True)
        else:
            dl.set_visible(False)
    line.set_data(x, y)
    fig.gca().relim()
    fig.gca().autoscale_view()
    fig.canvas.draw_idle()

interact(plot_acc_up_to, i=IntSlider(min=1, max=19999, value=10))



Canvas(footer_visible=False, header_visible=False, toolbar=Toolbar(toolitems=[('Home', 'Reset original view', …

interactive(children=(IntSlider(value=10, description='i', max=19999, min=1), Output()), _dom_classes=('widget…

<function __main__.plot_acc_up_to(i)>

In [55]:
from itertools import chain
from river.drift import ADWIN

def get_sliding_accuracy_overtime(i):
    streamA = iter(synth.Agrawal(classification_function=0, seed=1).take(10000))
    streamB = iter(synth.Agrawal(classification_function=3, seed=1).take(10000))
    stream = chain(streamA, streamB)
    clf = tree.HoeffdingTreeClassifier()
    metric = metrics.Rolling(metrics.Accuracy(), window_size=500)

    accuracy = []
    for j in range(i):
        X, y = next(stream)
        p = clf.predict_one(X)
        metric.update(y, p)
        clf.learn_one(X, y)
        
        is_correct = 1 if p==y else 0
        in_drift, in_warning = drift_detector.update(is_correct)
  
        accuracy.append(metric.get())
    return accuracy