In [17]:
from collections import namedtuple
import random

Record = namedtuple('Record', 'easy medium nightmare')

def data_stream():
    random_generator = random.Random(42)
    easy = 0
    for _ in range(10000000):
        easy += random_generator.randint(0, 2) 
        medium = random_generator.randint(0, 256 - 1)
        nightmare = random_generator.randint(0, 1000000000 - 1)
        
        yield Record(
            easy=easy,
            medium=medium,
            nightmare=nightmare
        )
        
def easy_stream():
    for record in data_stream():
        yield record.easy
        
def medium_stream():
    for record in data_stream():
        yield record.medium
        
def nightmare_stream():
    for record in data_stream():
        yield record.nightmare

In [18]:
import numpy as np

def get_tuple_stream_mean(stream, number_of_values):
    result = np.zeros(number_of_values, dtype='object')
    count = 0. 
    for streamed_tuple in stream:
        result += streamed_tuple
        count += 1
    return ['{:0.2f}'.format(x) for x in result / count]

In [None]:
# %%time
# def example(stream):
#     for value in stream:
#         yield (value, value + 10)
# print(get_tuple_stream_mean(example(easy_stream()), 2))

In [19]:
from collections import deque
def mean_dispersion(stream):
    k = 0
    d = deque([])
    for value in stream:
        d.append(value)
        k = k+1
        if k == 1000:
            break
    dsq = deque(i*i for i in d)
    mean = np.mean(d)
    meansq = np.mean(dsq)
    yield (mean, meansq-mean*mean)
    
        
    for value in stream:
        mean = mean - d.popleft()/1000 + value/1000
        d.append(value)
        meansq = meansq - dsq.popleft()/1000 + value*value/1000
        dsq.append(value*value)
        yield (mean, meansq-mean*mean)

In [20]:
from collections import deque
def min_max_easy(stream):
    k = 0
    d = deque([])
    for value in stream:
        d.append(value)
        k = k+1
        if k == 1000:
            break
    yield (d[0], d[999], d[500])
    
        
    for value in stream:
        d.popleft()
        d.append(value)
        yield (d[0], value, d[500])

In [25]:
# yields mean, max, median for the window
from collections import OrderedDict
from sortedcontainers import SortedList
def min_max_noneasy(stream):
    k = 0
    d = OrderedDict()
    sl = SortedList()
    
    
    for value in stream:
        k = k+1
        d[k] = value
        sl.add(value)
        if k == 1000:
            break
    
    yield (sl[0], sl[999], sl[500])
    
        
    for value in stream:
        k = k+1
        vdel = d.popitem(last = False)[1]
        d[k] = value
        sl.remove(vdel)
        sl.add(value)
        
        yield (sl[0], sl[999], sl[500])

In [None]:
%%time
print(get_tuple_stream_mean(mean_dispersion(easy_stream()), 2))

In [None]:
%%time
print(get_tuple_stream_mean(mean_dispersion(medium_stream()), 2))

In [None]:
%%time
print(get_tuple_stream_mean(mean_dispersion(nightmare_stream()), 2))

In [None]:
%%time
print(get_tuple_stream_mean(min_max_easy(easy_stream()), 3))

In [26]:
%%time
print(get_tuple_stream_mean(min_max_noneasy(easy_stream()), 3))

['4999175.79', '5000174.76', '4999675.78']
Wall time: 3min 38s


In [27]:
%%time
print(get_tuple_stream_mean(min_max_noneasy(medium_stream()), 3))

['0.02', '254.98', '127.60']
Wall time: 3min 43s


In [28]:
%%time
print(get_tuple_stream_mean(min_max_noneasy(nightmare_stream()), 3))

['1017512.29', '999017359.97', '500438415.64']
Wall time: 3min 49s


In [10]:

from sortedcontainers import SortedList
s = SortedList()
s.add(3)
s.add(1)
s.add(2)
print(s)
print(s[-1])
s.remove(2)
print(s)

ValueError: 1 not in sort order at index 1