# Initialization

In [2]:
import logging, rx
from rx import operators
from rx.subject import Subject
from threading import current_thread, Lock, Condition
from datetime import datetime, timedelta
from typing import Callable, Any, List

logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

def logWithTime(msg):
    logger.info(f'{datetime.now().strftime("%H:%M:%S.%f")} [{current_thread().ident:05d}] {msg}')

# Create observable functions

In [18]:
def sequenceInTime(period, elapsed) -> rx.Observable:
    observable = rx.interval(period).pipe(
        operators.map(lambda x: {
            'idx': x,
            'ts': datetime.now()
            }),
        operators.take_with_time(elapsed),
        # operators.observe_on(rx.scheduler.ThreadPoolScheduler(1)),
    )
    return observable

# Subscribe and operate

In [75]:
period = timedelta(seconds=.00)
elapsed = timedelta(seconds=10)
frequency = timedelta(seconds=1)
now = datetime.now()
startAt = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute, second=now.second)
boundary = startAt + frequency
lastTickTime = None
windows = []
buffer = []
cv = Condition()

def onWindow(x):
    cv.acquire()
    s = f' ({x[0]["idx"]}){x[0]["ts"]:%H:%M:%S.%f} .. ({x[-1]["idx"]}){x[-1]["ts"]:%H:%M:%S.%f}' if len(x) > 0 else ''
    logWithTime(f'[{len(x)}]{s}')
    cv.release()

def onNext(x):
    global windows, buffer, boundary, frequency, cv
    cv.acquire()
    ts = x['ts']
    if ts < boundary:
        buffer.append(x)
        cv.release()
        return
    s = f' ({buffer[0]["idx"]:5d}){buffer[0]["ts"]:%H:%M:%S.%f} .. ({buffer[-1]["idx"]:5d}){buffer[-1]["ts"]:%H:%M:%S.%f}' if len(buffer) > 0 else ''
    logWithTime(f'[{boundary:%H:%M:%S}] [{len(buffer):5d}]{s}')
    windows.append(buffer)
    buffer = [x]
    boundary += frequency
    cv.release()

def onCompleted():
    cv.acquire()
    logWithTime('stream completed')
    cv.notify()
    cv.release()

cv.acquire()
logWithTime(f'receive ticks from {startAt} and every {frequency} apart')
source = sequenceInTime(period, elapsed)
source = source.pipe(
    operators.observe_on(rx.scheduler.ThreadPoolScheduler(1)),
    # operators.observe_on(rx.scheduler.CurrentThreadScheduler()),
    # operators.observe_on(rx.scheduler.NewThreadScheduler()),
    operators.group_by(lambda x: x['ts'].second)
)

groupedTicks = {}

def groupFunc(key, x):
    global groupedTicks
    cv.acquire()
    if key in groupedTicks:
        groupedTicks[key].append(x)
    else:
        groupedTicks[key] = [x]
    cv.release()

with source.subscribe(
    on_next=onNext,
    # on_next=lambda x: x.subscribe(on_next=lambda y: groupFunc(x.key, y)),
    on_completed=onCompleted,
) as subscriber:
    if not cv.wait((elapsed + timedelta(seconds=1)).total_seconds()):
        logWithTime('timeout')
    logWithTime(f'received {len([j for i in windows for j in i])} ticks totally, in {len(windows)} windows')
    # for k in groupedTicks:
    #     g = groupedTicks[k]
    #     logWithTime(f'group \'{k}\': [{len(g):5d}] ({g[0]["idx"]:5d}){g[0]["ts"]:%H:%M:%S.%f} .. ({g[-1]["idx"]:5d}){g[-1]["ts"]:%H:%M:%S.%f}')
    # logWithTime(f'received {len([v for k in groupedTicks for v in groupedTicks[k]])} ticks totally, in {len(groupedTicks)} windows')

logWithTime(f'{datetime.now() - now} elapsed')
cv.release()

21:41:30.583874 [12428] receive ticks from 2022-03-12 21:41:30 and every 0:00:01 apart
21:41:40.602453 [09508] stream completed
21:41:40.606453 [12428] group '30': [  235] (    0)21:41:30.591873 .. (  234)21:41:30.998868
21:41:40.607469 [12428] group '31': [  595] (  235)21:41:31.001872 .. (  829)21:41:31.998950
21:41:40.607469 [12428] group '32': [  606] (  830)21:41:32.000950 .. ( 1435)21:41:32.998247
21:41:40.609469 [12428] group '33': [  598] ( 1436)21:41:33.000247 .. ( 2033)21:41:33.998324
21:41:40.610455 [12428] group '34': [  602] ( 2034)21:41:34.001322 .. ( 2635)21:41:34.998390
21:41:40.611455 [12428] group '35': [  620] ( 2636)21:41:35.000391 .. ( 3255)21:41:35.999390
21:41:40.612470 [12428] group '36': [  621] ( 3256)21:41:36.001392 .. ( 3876)21:41:36.999371
21:41:40.613465 [12428] group '37': [  608] ( 3877)21:41:37.000372 .. ( 4484)21:41:37.999419
21:41:40.615453 [12428] group '38': [  641] ( 4485)21:41:38.001420 .. ( 5125)21:41:38.998419
21:41:40.616454 [12428] group '39':

In [4]:
cv = Condition()
windowLock = Lock()
windowCount = 0
windows = []

def onWindow(observable):
    global windowLock
    windowItems = []
    def onItem(i):
        nonlocal windowItems
        windowLock.acquire()
        windowItems.append(i)
        windowLock.release()
    def onItemCompleted():
        global windowCount, windows
        nonlocal windowItems
        windowLock.acquire()
        logWithTime(f'window#{windowCount}: [{len(windowItems)}]{f" ({windowItems[0]} .. {windowItems[-1]})" if len(windowItems) > 0 else ""}')
        windows += windowItems
        windowCount += 1
        windowLock.release()
    observable.subscribe(
        on_next=onItem,
        on_completed=onItemCompleted,
    )

def onCompleted():
    global cv
    logWithTime(f'... stream completed')
    cv.acquire()
    cv.notify()
    cv.release()

cv.acquire()
period = .01
elapsed = 5
frequency = 1
observable = sequenceInTime(period, elapsed).pipe(
    operators.window(rx.interval(frequency)),
)
with observable.subscribe(
    on_next=onWindow,
    on_completed=onCompleted,
) as subscriber:
    startAt = datetime.now()
    logWithTime(f'start receiving from stream ...')
    if not cv.wait(elapsed + 1):
        logWithTime(f'timeout')
    logWithTime(f'total received {len(windows)} ticks')
    logWithTime(f'exit, {datetime.now() - startAt} elapsed')
cv.release()

17:57:07.192292 [16144] start receiving from stream ...
17:57:08.197290 [06660] window#0: [63] ({'idx': 0, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 7, 214288)} .. {'idx': 62, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 8, 181290)})
17:57:09.206632 [12524] window#1: [62] ({'idx': 64, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 8, 213288)} .. {'idx': 125, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 9, 206632)})
17:57:10.214631 [12428] window#2: [64] ({'idx': 126, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 9, 221629)} .. {'idx': 189, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 10, 198638)})
17:57:11.223624 [07236] window#3: [57] ({'idx': 191, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 10, 230633)} .. {'idx': 247, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 11, 208626)})
17:57:12.194632 [17132] window#4: [61] ({'idx': 249, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 11, 238628)} .. {'idx': 309, 'ts': datetime.datetime(2022, 3, 12, 17, 57, 12, 178658)})
17:57:12.197622 [17132]