In [71]:
import numpy as np
import toolz as tz
from toolz import curried as c
%load_ext memory_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


## yieldを使ったストリーミング

In [11]:

def log_all_standard(input): 
    output = []
    for elem in input: 
        output.append(np.log(elem))
    return output

In [37]:
def add_standard(input):
    output = []
    for elem in input:
        output.append(elem+1)
    return output
        

In [38]:
arr = np.random.rand(1000000) + 0.5

In [39]:
%memit print(np.sum(add_standard(log_all_standard(arr))))

954472.8695796223
peak memory: 138.49 MiB, increment: 43.75 MiB


In [30]:
def log_all_streaming(input):
    for elem in input:
        yield elem

In [40]:
def add_streaming(input):
    for elem in input:
        yield elem+1

TODO 本来ならば書き込みまでさせて、メモリは1つのデータしか持たないことを示したい

In [56]:
%memit print(np.sum(add_streaming(log_all_streaming(arr))))

2009.093246973814
2009.093246973814
2009.093246973814
2009.093246973814
2009.093246973814
peak memory: 58.12 MiB, increment: 0.00 MiB


In [55]:
# toolzを使用してネストさせない
tz.pipe(arr,log_all_streaming,add_streaming,np.sum)

2009.093246973814

In [70]:
# functoolのreduceでもたたみこめる
from functools import  reduce
reduce(lambda x,func:func(x),[arr,log_all_streaming,add_streaming,np.sum])

peak memory: 58.16 MiB, increment: 0.00 MiB


##  curry化を応用してストリーミング用のパイプラインを作る

In [86]:
arr=np.random.randint(1,120,size=10000)

In [142]:
from time import  sleep
def sleep_(input_):
    sleep(1)
    return input_

In [166]:
counts=tz.pipe(
arr,
c.map(str),
c.filter(lambda x:len(x)<2),
    c.map(c.do(sleep_)),
    c.map(c.do(print)),
    tz.concat,
    c.map(''.join),
    tz.frequencies
)

9
3
5
7
3
7
7
8
1
9
1
7
8
8
6
3
9
1


KeyboardInterrupt: 

In [158]:
list(counts)

['9', '3', '5', '7', '8', '1', '6', '4', '2']

2