In [34]:
from datetime import timedelta, datetime, timezone
from bytewax.operators.windowing import EventClock, TumblingWindower

clock = EventClock(ts_getter=lambda x: x["at"], wait_for_system_duration=timedelta(0))
windower = TumblingWindower(
    length=timedelta(hours=1),
    align_to=datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
)

In [35]:
from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource
import bytewax.operators.windowing as win

flow = Dataflow("join_eg")

names_l = [
    {
        "user_id": 123,
        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
        "name": "Bee",
    },
    {
        "user_id": 456,
        "at": datetime(2023, 12, 14, 0, 0, tzinfo=timezone.utc),
        "name": "Hive",
    },
]
names = op.input("names", flow, TestingSource(names_l))

emails_l = [
    {
        "user_id": 123,
        "at": datetime(2023, 12, 14, 0, 15, tzinfo=timezone.utc),
        "email": "bee@bytewax.io",
    },
    {
        "user_id": 456,
        "at": datetime(2023, 12, 14, 1, 15, tzinfo=timezone.utc),
        "email": "hive@bytewax.io",
    },
]
emails = op.input("emails", flow, TestingSource(emails_l))

keyed_names = op.map("key_names", names, lambda x: (str(x["user_id"]), x))
keyed_emails = op.map("key_emails", emails, lambda x: (str(x["user_id"]), x))
op.inspect("check_names", keyed_names)
op.inspect("check_emails", keyed_emails)

joined_out = win.join_window("join", clock, windower, keyed_names, keyed_emails)

op.inspect("check_join", joined_out.down)

Stream(stream_id='join_eg.check_join.inspect_debug.down', _scope=_Scope(parent_id='join_eg'))

In [36]:
from bytewax import run

run.cli_main(flow)

join_eg.check_names: ('123', {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'})
join_eg.check_emails: ('123', {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 15, tzinfo=datetime.timezone.utc), 'email': 'bee@bytewax.io'})
join_eg.check_names: ('456', {'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Hive'})
join_eg.check_emails: ('456', {'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 1, 15, tzinfo=datetime.timezone.utc), 'email': 'hive@bytewax.io'})
join_eg.check_join: ('456', (8328, ({'user_id': 456, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Hive'}, None)))
join_eg.check_join: ('123', (8328, ({'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'Bee'}, {'user_id': 123, 'at': datetime.datetime(2023, 12, 14, 0, 15, tzinfo=datetime.timezone.utc), 'email': 'bee@bytewax.io'})))
jo