In [1]:
from abc import abstractmethod
from typing import Protocol, TypeVar

T = TypeVar("T")

class AbelianGroupOperation(Protocol[T]):
    @abstractmethod
    def add(self, a: T, b: T) -> T:
        raise NotImplementedError

    @abstractmethod
    def neg(self, a: T) -> T:
        raise NotImplementedError

    @abstractmethod
    def identity(self) -> T:
        raise NotImplementedError

    def is_commutative(self, a: T, b: T) -> bool:
        test = self.add(a, b) == self.add(b, a)
        if not test:
            print(f"Failed commutativity assertion: {self.add(a, b)} == {self.add(b, a)}")

        return test

    def is_associative(self, a: T, b: T, c: T) -> bool:
        test = self.add(self.add(a, b), c) == self.add(a, self.add(b, c))
        if not test:
            print(f"Failed associativity assertion: {self.add(self.add(a, b), c)} == {self.add(a, self.add(b, c))}")

        return test

    def has_identity(self, a: T) -> bool:
        identity = self.identity()
        test = self.add(a, identity) == a and self.add(identity, a) == a
        if not test:
            print(f"Failed identity assertion: {self.add(a, identity)} == {self.add(identity, a)}")

        return test

    def has_inverse(self, a: T) -> bool:
        identity = self.identity()
        inv_a = self.neg(a)
        test = self.add(a, inv_a) == identity and self.add(inv_a, a) == identity
        if not test:
            print(f"Failed inverse assertion: {self.add(a, inv_a)} == {self.add(inv_a, a)}")

        return test

class IntegerAddition(AbelianGroupOperation[int]):
    def add(self, a: int, b: int) -> int:
        return a + b

    def neg(self, a: int) -> int:
        return -a

    def identity(self) -> int:
        return 0

In [2]:
a: int = 5
b: int = 6

integer_addition_group = IntegerAddition()

print(f"{a} + {b} = {integer_addition_group.add(a, b)}\n")

print(integer_addition_group.is_commutative(a, b))
print(integer_addition_group.is_associative(a, b, integer_addition_group.add(a, b)))
print(integer_addition_group.has_identity(a))
print(integer_addition_group.has_inverse(a))

5 + 6 = 11

True
True
True
True


In [3]:
from typing import Callable, Generic, Iterator, List, Optional, cast
from types import NotImplementedType

from abc import abstractmethod
from types import NotImplementedType
from typing import Callable, Iterator, List, Optional, OrderedDict, Protocol, TypeVar, cast

class Stream[T]:
    """
    Represents a stream of elements from an Abelian group.
    """

    timestamp: int
    inner: OrderedDict[int, T]
    group_op: AbelianGroupOperation[T]
    identity: bool
    default: T
    default_changes: OrderedDict[int, T]

    def __init__(self, group_op: AbelianGroupOperation[T]) -> None:
        self.inner = OrderedDict()
        self.group_op = group_op
        self.timestamp = -1
        self.identity = True
        self.default = group_op.identity()
        self.default_changes = OrderedDict()
        self.default_changes[0] = group_op.identity()
        self.send(group_op.identity())

    def send(self, element: T) -> None:
        """Adds an element to the stream and increments the timestamp."""
        if element != self.default:
            self.inner[self.timestamp + 1] = element
            self.identity = False

        self.timestamp += 1

    def group(self) -> AbelianGroupOperation[T]:
        """Returns the Abelian group operation associated with this stream."""
        return self.group_op

    def current_time(self) -> int:
        """Returns the timestamp of the most recently arrived element."""
        return self.timestamp

    def __iter__(self) -> Iterator[T]:
        for t in range(self.current_time() + 1):
            yield self[t]

    def __repr__(self) -> str:
        return self.inner.__repr__()

    def set_default(self, new_default: T):
        """
        Warning! changing this can break causality. Stay clear of this function unless you REALLY know what you are
        doing.

        This function effectively "freezes" the stream to strictly return a not-identity value when a timestamp
        beyond its frontier is requested.

        This is used in very specific scenarios. See the `LiftedIntegrate` implementation.
        """
        self.default = new_default
        self.default_changes[self.timestamp] = new_default

    def __getitem__(self, timestamp: int) -> T:
        """Returns the element at the given timestamp."""
        if timestamp < 0:
            raise ValueError("Timestamp cannot be negative")

        if timestamp <= self.current_time():
            default_timestamp = max((t for t in self.default_changes if t < timestamp), default=0)
            return self.inner.get(timestamp, self.default_changes[default_timestamp])

        elif timestamp > self.current_time():
            while timestamp > self.current_time():
                self.send(self.default)

        return self.__getitem__(timestamp)

    def latest(self) -> T:
        """Returns the most recent element."""
        return self.__getitem__(self.current_time())

    def is_identity(self) -> bool:
        return self.identity

    def to_list(self) -> List[T]:
        return list(iter(self))

    def __eq__(self, other: object) -> bool | NotImplementedType:
        """
        Compares this stream with another, considering all timestamps up to the latest.
        """
        if not isinstance(other, Stream):
            return NotImplemented

        if self.is_identity() and other.is_identity():
            return True

        cast(Stream[T], other)

        self_timestamp = self.current_time()
        other_timestamp = other.current_time()

        if self_timestamp != other_timestamp:
            return False

        return self.inner == other.inner  # type: ignore

T = TypeVar("T")

StreamReference = Callable[[], Stream[T]]


class StreamHandle[T]:
    """A handle to a stream, allowing lazy access."""

    ref: StreamReference[T]

    def __init__(self, stream_reference: StreamReference[T]) -> None:
        self.ref = stream_reference

    def get(self) -> Stream[T]:
        """Returns the referenced stream."""
        return self.ref()


R = TypeVar("R")


class Operator(Protocol[T]):
    @abstractmethod
    def step(self) -> bool:
        raise NotImplementedError

    @abstractmethod
    def output_handle(self) -> StreamHandle[T]:
        raise NotImplementedError


def step_until_fixpoint[T](operator: Operator[T]) -> None:
    while not operator.step():
        pass


def step_until_fixpoint_and_return[T](operator: Operator[T]) -> Stream[T]:
    step_until_fixpoint(operator)

    return operator.output_handle().get()


class UnaryOperator(Operator[R], Protocol[T, R]):
    """Base class for stream operators with a single input and output."""

    input_stream_handle: StreamHandle[T]
    output_stream_handle: StreamHandle[R]

    def __init__(
        self,
        stream_handle: Optional[StreamHandle[T]],
        output_stream_group: Optional[AbelianGroupOperation[R]],
    ) -> None:
        if stream_handle is not None:
            self.set_input(stream_handle, output_stream_group)

    def set_input(
        self,
        stream_handle: StreamHandle[T],
        output_stream_group: Optional[AbelianGroupOperation[R]],
    ) -> None:
        """Sets the input stream and initializes the output stream."""
        self.input_stream_handle = stream_handle

        if output_stream_group is not None:
            output = Stream(output_stream_group)

            self.output_stream_handle = StreamHandle(lambda: output)
        else:
            output = cast(Stream[R], Stream(self.input_a().group()))

            self.output_stream_handle = StreamHandle(lambda: output)

    def output(self) -> Stream[R]:
        return self.output_stream_handle.get()

    def input_a(self) -> Stream[T]:
        return self.input_stream_handle.get()

    def output_handle(self) -> StreamHandle[R]:
        handle = StreamHandle(lambda: self.output())

        return handle


S = TypeVar("S")


class BinaryOperator(Operator[S], Protocol[T, R, S]):
    """Base class for stream operators with two inputs and one output."""

    input_stream_handle_a: StreamHandle[T]
    input_stream_handle_b: StreamHandle[R]
    output_stream_handle: StreamHandle[S]

    def __init__(
        self,
        stream_a: Optional[StreamHandle[T]],
        stream_b: Optional[StreamHandle[R]],
        output_stream_group: Optional[AbelianGroupOperation[S]],
    ) -> None:
        if stream_a is not None:
            self.set_input_a(stream_a)

        if stream_b is not None:
            self.set_input_b(stream_b)

        if output_stream_group is not None:
            output = Stream(output_stream_group)

            self.set_output_stream(StreamHandle(lambda: output))

    def set_input_a(self, stream_handle_a: StreamHandle[T]) -> None:
        """Sets the first input stream and initializes the output stream."""
        self.input_stream_handle_a = stream_handle_a
        output = cast(Stream[S], Stream(self.input_a().group()))

        self.set_output_stream(StreamHandle(lambda: output))

    def set_input_b(self, stream_handle_b: StreamHandle[R]) -> None:
        """Sets the second input stream."""
        self.input_stream_handle_b = stream_handle_b

    def set_output_stream(self, output_stream_handle: StreamHandle[S]) -> None:
        """Sets the output stream handle."""
        self.output_stream_handle = output_stream_handle

    def output(self) -> Stream[S]:
        return self.output_stream_handle.get()

    def input_a(self) -> Stream[T]:
        return self.input_stream_handle_a.get()

    def input_b(self) -> Stream[R]:
        return self.input_stream_handle_b.get()

    def output_handle(self) -> StreamHandle[S]:
        handle = StreamHandle(lambda: self.output())

        return handle

In [4]:
from typing import Callable


class Delay(UnaryOperator[T, T]):
    """
    Delays the input stream by one timestamp.
    """

    def __init__(self, stream: Optional[StreamHandle[T]]) -> None:
        super().__init__(stream, None)

    def step(self) -> bool:
        """
        Outputs the previous value from the input stream.
        """
        output_timestamp = self.output().current_time()
        input_timestamp = self.input_a().current_time()

        if output_timestamp <= input_timestamp:
            self.output().send(self.input_a()[output_timestamp])

            return False

        return True


F1 = Callable[[T], R]


class Lift1(UnaryOperator[T, R]):
    """Lifts a unary function to operate on a stream"""

    f1: F1[T, R]
    frontier: int

    def __init__(
        self,
        stream: Optional[StreamHandle[T]],
        f1: F1[T, R],
        output_stream_group: Optional[AbelianGroupOperation[R]],
    ):
        self.f1 = f1
        self.frontier = 0
        super().__init__(stream, output_stream_group)

    def step(self) -> bool:
        """Applies the lifted function to the next element in the input stream."""
        output_timestamp = self.output().current_time()
        input_timestamp = self.input_a().current_time()
        join = max(input_timestamp, output_timestamp, self.frontier)
        meet = min(input_timestamp, output_timestamp, self.frontier)

        if join == meet:
            return True

        next_frontier = self.frontier + 1
        self.output().send(self.f1(self.input_a()[next_frontier]))
        self.frontier = next_frontier

        return False


F2 = Callable[[T, R], S]


class Lift2(BinaryOperator[T, R, S]):
    """Lifts a binary function to operate on two streams"""

    f2: F2[T, R, S]
    frontier_a: int
    frontier_b: int

    def __init__(
        self,
        stream_a: Optional[StreamHandle[T]],
        stream_b: Optional[StreamHandle[R]],
        f2: F2[T, R, S],
        output_stream_group: Optional[AbelianGroupOperation[S]],
    ) -> None:
        self.f2 = f2
        self.frontier_a = 0
        self.frontier_b = 0

        super().__init__(stream_a, stream_b, output_stream_group)

    def step(self) -> bool:
        """Applies the lifted function to the most recently arrived elements in both input streams."""
        a_timestamp = self.input_a().current_time()
        b_timestamp = self.input_b().current_time()
        output_timestamp = self.output().current_time()

        join = max(a_timestamp, b_timestamp, output_timestamp, self.frontier_a, self.frontier_b)
        meet = min(a_timestamp, b_timestamp, output_timestamp, self.frontier_a, self.frontier_b)
        if join == meet:
            return True

        next_frontier_a = self.frontier_a + 1
        next_frontier_b = self.frontier_b + 1
        a = self.input_a()[next_frontier_a]
        b = self.input_b()[next_frontier_b]

        application = self.f2(a, b)
        self.output().send(application)

        self.frontier_a = next_frontier_a
        self.frontier_b = next_frontier_b

        return False


class LiftedGroupAdd(Lift2[T, T, T]):
    def __init__(self, stream_a: StreamHandle[T], stream_b: Optional[StreamHandle[T]]):
        super().__init__(
            stream_a,
            stream_b,
            lambda x, y: stream_a.get().group().add(x, y),
            None,
        )


class LiftedGroupNegate(Lift1[T, T]):
    def __init__(self, stream: StreamHandle[T]):
        super().__init__(stream, lambda x: stream.get().group().neg(x), None)

class Differentiate(UnaryOperator[T, T]):
    """
    Computes the difference between consecutive elements in the input stream.
    """

    delayed_stream: Delay[T]
    delayed_negated_stream: LiftedGroupNegate[T]
    differentiation_stream: LiftedGroupAdd[T]

    def __init__(self, stream: StreamHandle[T]) -> None:
        self.input_stream_handle = stream
        self.delayed_stream = Delay(self.input_stream_handle)
        self.delayed_negated_stream = LiftedGroupNegate(self.delayed_stream.output_handle())
        self.differentiation_stream = LiftedGroupAdd(
            self.input_stream_handle, self.delayed_negated_stream.output_handle()
        )
        self.output_stream_handle = self.differentiation_stream.output_handle()

    def step(self) -> bool:
        """
        Outputs the difference between the latest element from the input stream with the one before
        """
        self.delayed_stream.step()
        self.delayed_negated_stream.step()
        self.differentiation_stream.step()

        return self.output().current_time() == self.input_a().current_time()


class Integrate(UnaryOperator[T, T]):
    """
    Computes the running sum of the input stream.
    """

    delayed_stream: Delay[T]
    integration_stream: LiftedGroupAdd[T]

    def __init__(self, stream: StreamHandle[T]) -> None:
        self.input_stream_handle = stream
        self.integration_stream = LiftedGroupAdd(self.input_stream_handle, None)
        self.delayed_stream = Delay(self.integration_stream.output_handle())
        self.integration_stream.set_input_b(self.delayed_stream.output_handle())

        self.output_stream_handle = self.integration_stream.output_handle()

    def step(self) -> bool:
        """
        Adds the latest element from the input stream to the running sum
        """
        self.delayed_stream.step()
        self.integration_stream.step()

        return self.output().current_time() == self.input_a().current_time()

In [5]:
def create_integer_identity_stream(to: int) -> Stream[int]:
    stream = Stream(IntegerAddition())
    for i in range(to):
        stream.send(i)

    return stream

In [6]:
n = 10
integer_identity_stream = create_integer_identity_stream(n)
zero_to_ten = StreamHandle(lambda: integer_identity_stream)
print(f"Stream of integers from 0 to {n}: {zero_to_ten.get().to_list()}\n")

Stream of integers from 0 to 10: [0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]



In [7]:
delayed_zero_to_ten = Delay(zero_to_ten)
step_until_fixpoint(delayed_zero_to_ten)
print(f"Delayed stream of integers: \n{delayed_zero_to_ten.output().to_list()}\n")

Delayed stream of integers: 
[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]



In [8]:
diff_zero_to_ten = Differentiate(zero_to_ten)
step_until_fixpoint(diff_zero_to_ten)
print(f"Diff stream of integers: \n{diff_zero_to_ten.output().to_list()}\n")

Diff stream of integers: 
[0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1]



In [9]:
int_zero_to_ten = Integrate(zero_to_ten)
step_until_fixpoint(int_zero_to_ten)
print(f"Integration stream of integers: \n{int_zero_to_ten.output().to_list()}\n")

Integration stream of integers: 
[0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45]



In [10]:
int_diff_zero_to_ten = Integrate(diff_zero_to_ten.output_handle())
step_until_fixpoint(int_diff_zero_to_ten)
print(f"Integrated diff stream of integers: \n{int_diff_zero_to_ten.output().to_list()}\n")

Integrated diff stream of integers: 
[0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]



In [11]:
from typing import Dict, Generic, Iterable, Tuple, TypeVar

T = TypeVar("T")


class ZSet(Generic[T]):
    """
    Represents a Z-set, a generalization of multisets with integer weights.
    Elements can have positive, negative, or zero weights.

    A Z-Set whose elements have all weight one can be interpreted as a set. One where
    all are strictly positive is a bag, and one where they are either one or -1 is a diff.
    """

    inner: Dict[T, int]

    def __init__(self, values: Dict[T, int]) -> None:
        self.inner = values

    def items(self) -> Iterable[Tuple[T, int]]:
        """Returns an iterable of (element, weight) pairs."""
        return self.inner.items()

    def __repr__(self) -> str:
        return self.inner.__repr__()

    def __eq__(self, other: object) -> bool:
        """
        Two Z-sets are equal if they have the same elements with the same weight.
        """
        if not isinstance(other, ZSet):
            return False

        return self.inner == other.inner  # type: ignore

    def __contains__(self, item: T) -> bool:
        """An item is in the Z-set if it has non-zero weight."""
        return self.inner.__contains__(item)

    def __getitem__(self, item: T) -> int:
        """Returns the weight of an item (0 if not present)."""
        if item not in self:
            return 0

        return self.inner[item]

    def is_identity(self) -> bool:
        return len(self.inner) == 0

    def __setitem__(self, key: T, value: int) -> None:
        self.inner[key] = value


class ZSetAddition(Generic[T], AbelianGroupOperation[ZSet[T]]):
    """
    Defines addition operation for Z-sets, forming an Abelian group.
    """

    def add(self, a: ZSet[T], b: ZSet[T]) -> ZSet[T]:
        """
        Adds two Z-sets by summing weights of common elements.
        Elements with resulting zero weight are removed.
        """
        result = a.inner | b.inner

        for k, v in b.inner.items():
            if k in a.inner:
                new_weight = a.inner[k] + v
                if new_weight == 0:
                    del result[k]
                else:
                    result[k] = new_weight

        return ZSet(result)

    def neg(self, a: ZSet[T]) -> ZSet[T]:
        """Returns the inverse of a Z-set by negating all weights."""
        return ZSet({k: v * -1 for k, v in a.inner.items()})

    def identity(self) -> ZSet[T]:
        """Returns the empty Z-set."""
        return ZSet({})

In [12]:
A: ZSet[str] = ZSet({"apple": 1, "orange": 3})
B: ZSet[str] = ZSet({"apple": 3, "banana": 2})

zset_addition_group = ZSetAddition()

C = zset_addition_group.add(A, B)
print(f"{A} + {B} = {C}\n")

D = zset_addition_group.neg(A)
print(f"neg({A}) = {D}\n")

print(zset_addition_group.is_commutative(A, B))
print(zset_addition_group.is_associative(A, B, zset_addition_group.add(A, B)))
print(zset_addition_group.has_identity(A))
print(zset_addition_group.has_inverse(A))

{'apple': 1, 'orange': 3} + {'apple': 3, 'banana': 2} = {'apple': 4, 'orange': 3, 'banana': 2}

neg({'apple': 1, 'orange': 3}) = {'apple': -1, 'orange': -3}

True
True
True
True


In [13]:
class StreamAddition(AbelianGroupOperation[Stream[T]]):
    """Defines addition for streams by lifting their underlying group's addition."""

    group: AbelianGroupOperation[T]

    def __init__(self, group: AbelianGroupOperation[T]) -> None:
        self.group = group

    def add(self, a: Stream[T], b: Stream[T]) -> Stream[T]:
        """Adds two streams element-wise."""
        handle_a = StreamHandle(lambda: a)
        handle_b = StreamHandle(lambda: b)

        lifted_group_add = LiftedGroupAdd(handle_a, handle_b)
        out = step_until_fixpoint_and_return(lifted_group_add)
        if a.is_identity():
            out.default = b.default

        if b.is_identity():
            out.default = a.default

        return out

    def inner_group(self) -> AbelianGroupOperation[T]:
        """Returns the underlying group operation."""
        return self.group

    def neg(self, a: Stream[T]) -> Stream[T]:
        """Negates a stream element-wise."""
        handle_a = StreamHandle(lambda: a)
        lifted_group_neg = LiftedGroupNegate(handle_a)

        return step_until_fixpoint_and_return(lifted_group_neg)

    def identity(self) -> Stream[T]:
        """
        Returns an identity stream for the addition operation.
        """
        identity_stream = Stream(self.group)

        return identity_stream

In [14]:
A = ZSet({"apple": 1, "orange": 3})
B = ZSet({"apple": -2, "orange": 3})
C = ZSet({"apple": 2, "bananas": -2})

zset_stream = Stream(ZSetAddition())
zset_stream.send(A)
zset_stream.send(B)
zset_stream.send(C)


zset_stream_addition_group = StreamAddition(zset_addition_group)
zset_stream_sum = zset_stream_addition_group.add(zset_stream, zset_stream)

print(f"zset_stream: {zset_stream}\n")
print(f"zset_stream + zset_stream:\n{zset_stream_sum}\n")
print(zset_stream_addition_group.is_commutative(zset_stream, zset_stream))
print(
    zset_stream_addition_group.is_associative(zset_stream, zset_stream, zset_stream_sum)
)
print(zset_stream_addition_group.has_identity(zset_stream))
print(zset_stream_addition_group.has_inverse(zset_stream))

zset_stream: OrderedDict({1: {'apple': 1, 'orange': 3}, 2: {'apple': -2, 'orange': 3}, 3: {'apple': 2, 'bananas': -2}})

zset_stream + zset_stream:
OrderedDict({1: {'apple': 2, 'orange': 6}, 2: {'apple': -4, 'orange': 6}, 3: {'apple': 4, 'bananas': -4}})

True
True
True
True


In [15]:
JoinCmp = Callable[[T, R], bool]
PostJoinProjection = Callable[[T, R], S]

def join[T, R, S](
    left_zset: ZSet[T],
    right_zset: ZSet[R],
    p: JoinCmp[T, R],
    f: PostJoinProjection[T, R, S],
) -> ZSet[S]:
    output: Dict[S, int] = {}
    for left_value, left_weight in left_zset.items():
        for right_value, right_weight in right_zset.items():
            if p(left_value, right_value):
                projected_value = f(left_value, right_value)
                new_weight = left_weight * right_weight

                if projected_value in output:
                    output[projected_value] += new_weight
                else:
                    output[projected_value] = new_weight

    return ZSet(output)

In [16]:
import math

some_zset: ZSet[int] = ZSet({1: 1, 2: 1, 3: 1, 4: 1})
other_zset: ZSet[int] = ZSet({4: 1, 9: 1})

some_join_cmp = lambda left, right: left == math.isqrt(right)
some_post_join_projection = lambda left, right: left

print(join(some_zset, other_zset, some_join_cmp, some_post_join_projection))

{2: 1, 3: 1}


In [17]:
class LiftedJoin(Lift2[ZSet[T], ZSet[R], ZSet[S]]):
    def __init__(
        self,
        stream_a: Optional[StreamHandle[ZSet[T]]],
        stream_b: Optional[StreamHandle[ZSet[R]]],
        p: JoinCmp[T, R],
        f: PostJoinProjection[T, R, S],
    ):
        super().__init__(stream_a, stream_b, lambda x, y: join(x, y, p, f), None)

In [18]:
some_zset_stream = Stream(ZSetAddition())
some_zset_stream.send(some_zset)

other_zset_stream = Stream(ZSetAddition())
other_zset_stream.send(other_zset)

lifted_join_stream = LiftedJoin(
    StreamHandle(lambda: some_zset_stream),
    StreamHandle(lambda: other_zset_stream),
    some_join_cmp,
    some_post_join_projection,
)

print(step_until_fixpoint_and_return(lifted_join_stream).to_list())

[{}, {2: 1, 3: 1}]


In [19]:
class LiftedLiftedJoin(
    Lift2[
        Stream[ZSet[T]],
        Stream[ZSet[R]],
        Stream[ZSet[S]],
    ]
):
    def __init__(
        self,
        stream_a: Optional[StreamHandle[Stream[ZSet[T]]]],
        stream_b: Optional[StreamHandle[Stream[ZSet[R]]]],
        p: JoinCmp[T, R],
        f: PostJoinProjection[T, R, S],
    ):
        super().__init__(
            stream_a,
            stream_b,
            lambda x, y: step_until_fixpoint_and_return(
                LiftedJoin(StreamHandle(lambda: x), StreamHandle(lambda: y), p, f)
            ),
            None,
        )

In [20]:
some_stream_of_zset_streams = Stream(StreamAddition(ZSetAddition()))
some_stream_of_zset_streams.send(some_zset_stream)

other_stream_of_zset_streams = Stream(StreamAddition(ZSetAddition()))
other_stream_of_zset_streams.send(other_zset_stream)

lifted_lifted_join_stream = LiftedLiftedJoin(
    StreamHandle(lambda: some_stream_of_zset_streams),
    StreamHandle(lambda: other_stream_of_zset_streams),
    some_join_cmp,
    some_post_join_projection,
)

def from_stream_of_streams_into_list_of_lists[T](nested_stream: Stream[Stream[T]]):
    return [ stream.to_list() for stream in nested_stream ] 

print(from_stream_of_streams_into_list_of_lists(step_until_fixpoint_and_return(lifted_lifted_join_stream)))

[[{}], [{}, {2: 1, 3: 1}]]


In [21]:
def step_until_fixpoint_set_new_default_then_return[T](
    operator: Integrate[T] | Delay[T],
) -> Stream[T]:
    step_until_fixpoint(operator)

    out = operator.output_handle().get()
    latest = out.latest()
    out.set_default(latest)

    return out


class LiftedDelay(Lift1[Stream[T], Stream[T]]):
    """
    Lifts the Delay operator to work on streams of streams.
    """

    def __init__(self, stream: StreamHandle[Stream[T]]):
        super().__init__(
            stream,
            lambda s: step_until_fixpoint_set_new_default_then_return(Delay(StreamHandle(lambda: s))),
            None,
        )


class LiftedIntegrate(Lift1[Stream[T], Stream[T]]):
    """
    Lifts the Integrate operator to work on streams of streams.
    """

    def __init__(self, stream: StreamHandle[Stream[T]]):
        super().__init__(
            stream,
            lambda s: step_until_fixpoint_set_new_default_then_return(Integrate(StreamHandle(lambda: s))),
            None,
        )

class LiftedDifferentiate(Lift1[Stream[T], Stream[T]]):
    def __init__(self, stream: StreamHandle[Stream[T]]):
        super().__init__(
            stream,
            lambda s: step_until_fixpoint_and_return(Differentiate(StreamHandle(lambda: s))),
            None,
        )

In [22]:
class LiftedMod2(Lift1[int, int]):
    def __init__(self, stream: StreamHandle[int]):
        super().__init__(stream, lambda i: i % 2, None)


class LiftedLiftedMod2(
    Lift1[Stream[int], Stream[int]]
):
    def __init__(self, stream: StreamHandle[Stream[int]]):
        super().__init__(
            stream,
            lambda s: step_until_fixpoint_and_return(
                LiftedMod2(StreamHandle(lambda: s))
            ),
            None,
        )

In [23]:
example = Stream(StreamAddition(IntegerAddition()))
for t1 in range(4):
    s = Stream(IntegerAddition())

    for t0 in range(4):
        s.send(t0 + (2 * t1))

    example.send(s)
print(f"example:\n{from_stream_of_streams_into_list_of_lists(example)}\n")
example_n_times = example.current_time()
example_handle = StreamHandle(lambda: example)

mod2 = LiftedLiftedMod2(example_handle)
step_until_fixpoint(mod2)
print(f"↑↑mod2(example):\n{from_stream_of_streams_into_list_of_lists(mod2.output())}\n")

int_example = Integrate(example_handle)
step_until_fixpoint(int_example)
print(f"∫(example):\n{from_stream_of_streams_into_list_of_lists(int_example.output())}\n")

lift_int_example = LiftedIntegrate(example_handle)
step_until_fixpoint(lift_int_example)
print(f"↑∫(example):\n{from_stream_of_streams_into_list_of_lists(lift_int_example.output())}\n")

diff_example = Differentiate(example_handle)
step_until_fixpoint(diff_example)
print(f"D(example):\n{from_stream_of_streams_into_list_of_lists(diff_example.output())}\n")

lift_diff_example = LiftedDifferentiate(example_handle)
step_until_fixpoint(lift_diff_example)
print(f"↑D(example):\n{from_stream_of_streams_into_list_of_lists(lift_diff_example.output())}\n")

delayed_example = Delay(example_handle)
step_until_fixpoint(delayed_example)
print(f"z^-1(example):\n{from_stream_of_streams_into_list_of_lists(delayed_example.output())}\n")

lifted_delayed_example = LiftedDelay(example_handle)
step_until_fixpoint(lifted_delayed_example)
print(f"↑z^-1(example):\n{from_stream_of_streams_into_list_of_lists(lifted_delayed_example.output())}\n")

example:
[[0], [0, 0, 1, 2, 3], [0, 2, 3, 4, 5], [0, 4, 5, 6, 7], [0, 6, 7, 8, 9]]

↑↑mod2(example):
[[0], [0, 0, 1, 0, 1], [0, 0, 1, 0, 1], [0, 0, 1, 0, 1], [0, 0, 1, 0, 1]]

∫(example):
[[0], [0, 0, 1, 2, 3], [0, 2, 4, 6, 8], [0, 6, 9, 12, 15], [0, 12, 16, 20, 24]]

↑∫(example):
[[0], [0, 0, 1, 3, 6], [0, 2, 5, 9, 14], [0, 4, 9, 15, 22], [0, 6, 13, 21, 30]]

D(example):
[[0], [0, 0, 1, 2, 3], [0, 2, 2, 2, 2], [0, 2, 2, 2, 2], [0, 2, 2, 2, 2]]

↑D(example):
[[0], [0, 0, 1, 1, 1], [0, 2, 1, 1, 1], [0, 4, 1, 1, 1], [0, 6, 1, 1, 1]]

z^-1(example):
[[0], [0], [0, 0, 1, 2, 3], [0, 2, 3, 4, 5], [0, 4, 5, 6, 7], [0, 6, 7, 8, 9]]

↑z^-1(example):
[[0], [0, 0, 0, 1, 2, 3], [0, 0, 2, 3, 4, 5], [0, 0, 4, 5, 6, 7], [0, 0, 6, 7, 8, 9]]



In [24]:
class DeltaLiftedDeltaLiftedJoin(BinaryOperator[Stream[ZSet[T]], Stream[ZSet[R]], Stream[ZSet[S]]]):
    """
    Incrementally computes the Z-Set join between two streams element-wise. Equivalent to - but keeps less state - incrementalizing a doubly-lifted join. See :func:`~pydbsp.stream.operators.Incrementalize2` to grasp what it means to incrementalize a singly-lifted join.
    """

    p: JoinCmp[T, R]
    f: PostJoinProjection[T, R, S]
    frontier_a: int
    frontier_b: int

    integrated_stream_a: Integrate[Stream[ZSet[T]]]
    delayed_integrated_stream_a: Delay[Stream[ZSet[T]]]
    lift_integrated_stream_a: LiftedIntegrate[ZSet[T]]
    integrated_lift_integrated_stream_a: Integrate[Stream[ZSet[T]]]

    integrated_stream_b: Integrate[Stream[ZSet[R]]]
    delayed_integrated_stream_b: Delay[Stream[ZSet[R]]]
    lift_integrated_stream_b: LiftedIntegrate[ZSet[R]]
    integrated_lift_integrated_stream_b: Integrate[Stream[ZSet[R]]]
    lift_delayed_integrated_lift_integrated_stream_b: LiftedDelay[ZSet[R]]
    lift_delayed_lift_integrated_stream_b: LiftedDelay[ZSet[R]]

    join_1: LiftedLiftedJoin[T, R, S]
    join_2: LiftedLiftedJoin[T, R, S]
    join_3: LiftedLiftedJoin[T, R, S]
    join_4: LiftedLiftedJoin[T, R, S]

    sum_one: LiftedGroupAdd[Stream[ZSet[S]]]
    sum_two: LiftedGroupAdd[Stream[ZSet[S]]]
    sum_three: LiftedGroupAdd[Stream[ZSet[S]]]

    output_stream: Stream[Stream[ZSet[S]]]

    def set_input_a(self, stream_handle_a: StreamHandle[Stream[ZSet[T]]]) -> None:
        self.input_stream_handle_a = stream_handle_a
        self.integrated_stream_a = Integrate(self.input_stream_handle_a)
        self.delayed_integrated_stream_a = Delay(self.integrated_stream_a.output_handle())

        self.lift_integrated_stream_a = LiftedIntegrate(self.input_stream_handle_a)
        self.integrated_lift_integrated_stream_a = Integrate(self.lift_integrated_stream_a.output_handle())

    def set_input_b(self, stream_handle_b: StreamHandle[Stream[ZSet[R]]]) -> None:
        self.input_stream_handle_b = stream_handle_b
        self.integrated_stream_b = Integrate(self.input_stream_handle_b)
        self.delayed_integrated_stream_b = Delay(self.integrated_stream_b.output_handle())

        self.lift_integrated_stream_b = LiftedIntegrate(self.input_stream_handle_b)
        self.integrated_lift_integrated_stream_b = Integrate(self.lift_integrated_stream_b.output_handle())
        self.lift_delayed_integrated_lift_integrated_stream_b = LiftedDelay(
            self.integrated_lift_integrated_stream_b.output_handle()
        )
        self.lift_delayed_lift_integrated_stream_b = LiftedDelay(self.lift_integrated_stream_b.output_handle())

        self.join_1 = LiftedLiftedJoin(
            self.delayed_integrated_stream_a.output_handle(),
            self.lift_delayed_lift_integrated_stream_b.output_handle(),
            self.p,
            self.f,
        )
        self.join_2 = LiftedLiftedJoin(
            self.integrated_lift_integrated_stream_a.output_handle(),
            self.input_stream_handle_b,
            self.p,
            self.f,
        )
        self.join_3 = LiftedLiftedJoin(
            self.lift_integrated_stream_a.output_handle(),
            self.delayed_integrated_stream_b.output_handle(),
            self.p,
            self.f,
        )
        self.join_4 = LiftedLiftedJoin(
            self.input_stream_handle_a,
            self.lift_delayed_integrated_lift_integrated_stream_b.output_handle(),
            self.p,
            self.f,
        )

    def __init__(
        self,
        diff_stream_a: Optional[StreamHandle[Stream[ZSet[T]]]],
        diff_stream_b: Optional[StreamHandle[Stream[ZSet[R]]]],
        p: JoinCmp[T, R],
        f: PostJoinProjection[T, R, S],
    ):
        self.p = p
        self.f = f
        self.frontier_a = 0
        self.frontier_b = 0
        inner_group: ZSetAddition[S] = ZSetAddition()
        group: StreamAddition[ZSet[S]] = StreamAddition(inner_group)  # type: ignore

        self.output_stream = Stream(group)
        self.output_stream_handle = StreamHandle(lambda: self.output_stream)

        if diff_stream_a is not None:
            self.set_input_a(diff_stream_a)

        if diff_stream_b is not None:
            self.set_input_b(diff_stream_b)

    def output(self) -> Stream[Stream[ZSet[S]]]:
        return self.output_stream

    def step(self) -> bool:
        current_a_timestamp = self.input_a().current_time()
        current_b_timestamp = self.input_b().current_time()
        # Not sure about this.
        if current_a_timestamp == self.frontier_a and current_b_timestamp == self.frontier_b:
            return True

        self.integrated_stream_a.step()
        self.delayed_integrated_stream_a.step()
        self.lift_integrated_stream_a.step()
        self.integrated_lift_integrated_stream_a.step()
        self.integrated_stream_b.step()
        self.delayed_integrated_stream_b.step()
        self.lift_integrated_stream_b.step()
        self.integrated_lift_integrated_stream_b.step()
        self.lift_delayed_integrated_lift_integrated_stream_b.step()
        self.lift_delayed_lift_integrated_stream_b.step()
        self.join_1.step()
        self.join_2.step()
        self.join_3.step()
        self.join_4.step()

        group = self.output().group()
        sum_1 = group.add(self.join_1.output().latest(), self.join_2.output().latest())
        sum_2 = group.add(self.join_3.output().latest(), self.join_4.output().latest())
        self.output_stream.send(group.add(sum_1, sum_2))

        self.frontier_a += 1
        self.frontier_b += 1

        return False


In [25]:
lifted_lifted_delta_join_stream = DeltaLiftedDeltaLiftedJoin(
    StreamHandle(lambda: some_stream_of_zset_streams),
    StreamHandle(lambda: other_stream_of_zset_streams),
    some_join_cmp,
    some_post_join_projection,
)

print(from_stream_of_streams_into_list_of_lists(step_until_fixpoint_and_return(lifted_lifted_delta_join_stream)))

[[{}], [{}, {2: 1, 3: 1}]]


In [26]:
def H[T](diff: ZSet[T], integrated_state: ZSet[T]) -> ZSet[T]:
    distincted_diff: Dict[T, int] = {}
    for k, v in diff.items():
        current_k_latest_diff_weight = v

        if k in integrated_state:
            current_k_latest_delayed_state_weight = integrated_state[k]
            coalesced_weight = current_k_latest_diff_weight + current_k_latest_delayed_state_weight

            if current_k_latest_delayed_state_weight > 0 and coalesced_weight <= 0:
                distincted_diff[k] = -1

                continue

            if current_k_latest_delayed_state_weight <= 0 and coalesced_weight > 0:
                distincted_diff[k] = 1
        else:
            if v > 0:
                distincted_diff[k] = 1

    return ZSet(distincted_diff)

In [27]:
some_zset_update_one = ZSet({1: -1, 6: -1})

H(some_zset_update_one, some_zset)

{1: -1}

In [28]:
class LiftedH(Lift2[ZSet[T], ZSet[T], ZSet[T]]):
    def __init__(
        self,
        diff_stream_a: StreamHandle[ZSet[T]],
        integrated_stream_a: StreamHandle[ZSet[T]],
    ):
        super().__init__(diff_stream_a, integrated_stream_a, H, None)

In [29]:
some_zset_update_one_stream = Stream(ZSetAddition())
some_zset_update_one_stream.send(some_zset_update_one)

step_until_fixpoint_and_return(
    LiftedH(
        StreamHandle(lambda: some_zset_update_one_stream),
        StreamHandle(lambda: some_zset_stream),
    ),
).to_list()

[{}, {1: -1}, {}]

In [30]:
class LiftedLiftedH(Lift2[Stream[ZSet[T]], Stream[ZSet[T]], Stream[ZSet[T]]]):
    def __init__(
        self,
        integrated_diff_stream_a: StreamHandle[Stream[ZSet[T]]],
        lifted_delayed_lifted_integrated_stream_a: StreamHandle[Stream[ZSet[T]]],
    ):
        super().__init__(
            integrated_diff_stream_a,
            lifted_delayed_lifted_integrated_stream_a,
            lambda x, y: step_until_fixpoint_and_return(
                LiftedH(StreamHandle(lambda: x), StreamHandle(lambda: y)),
            ),
            None,
        )


In [31]:
some_zset_update_stream_stream = Stream(StreamAddition(ZSetAddition()))
some_zset_update_stream_stream.send(some_zset_update_one_stream)

from_stream_of_streams_into_list_of_lists(step_until_fixpoint_and_return(
    LiftedLiftedH(
        StreamHandle(lambda: some_zset_update_stream_stream),
        StreamHandle(lambda: some_stream_of_zset_streams),
    ),
))

[[{}], [{}, {1: -1}, {}]]

In [32]:
class DeltaLiftedDeltaLiftedDistinct(UnaryOperator[Stream[ZSet[T]], Stream[ZSet[T]]]):
    """
    An operator for incrementally maintaining distinct elements in a stream of
    streams.
    """

    integrated_diff_stream_a: Integrate[Stream[ZSet[T]]]
    lift_integrated_diff_stream_a: LiftedIntegrate[ZSet[T]]
    lift_delay_lift_integrated_diff_stream_a: LiftedDelay[ZSet[T]]
    lift_lift_H: LiftedLiftedH[T]
    diff_lift_lift_H: Differentiate[Stream[ZSet[T]]]

    def set_input(
        self,
        stream_handle: StreamHandle[Stream[ZSet[T]]],
        output_stream_group: Optional[AbelianGroupOperation[Stream[ZSet[T]]]],
    ) -> None:
        self.input_stream_handle = stream_handle
        self.integrated_diff_stream_a = Integrate(self.input_stream_handle)
        self.lift_integrated_diff_stream_a = LiftedIntegrate(self.integrated_diff_stream_a.output_handle())
        self.lift_delay_lift_integrated_diff_stream_a = LiftedDelay(self.lift_integrated_diff_stream_a.output_handle())
        self.lift_lift_H = LiftedLiftedH(
            self.integrated_diff_stream_a.output_handle(),
            self.lift_delay_lift_integrated_diff_stream_a.output_handle(),
        )
        self.diff_lift_lift_H = Differentiate(self.lift_lift_H.output_handle())  # type: ignore
        self.output_stream_handle = self.diff_lift_lift_H.output_handle()

    def __init__(self, diff_stream_a: Optional[StreamHandle[Stream[ZSet[T]]]]):
        super().__init__(diff_stream_a, None)

    def step(self) -> bool:
        fixedpoint_integrated_diff_stream_a = self.integrated_diff_stream_a.step()
        fixedpoint_lift_integrated_diff_stream_a = self.lift_integrated_diff_stream_a.step()
        fixedpoint_lift_delay_lift_integrated_diff_stream_a = self.lift_delay_lift_integrated_diff_stream_a.step()
        fixedpoint_lift_lift_H = self.lift_lift_H.step()
        fixedpoint_diff_lift_lift_H = self.diff_lift_lift_H.step()

        return (
            fixedpoint_integrated_diff_stream_a
            and fixedpoint_lift_integrated_diff_stream_a
            and fixedpoint_lift_delay_lift_integrated_diff_stream_a
            and fixedpoint_lift_lift_H
            and fixedpoint_diff_lift_lift_H
        )

In [33]:
stream_of_zset_streams = Stream(StreamAddition(ZSetAddition()))
stream_of_zset_streams.send(some_zset_stream)

delta_lift_delta_lift_distinct = DeltaLiftedDeltaLiftedDistinct(
    StreamHandle(lambda: stream_of_zset_streams)
)
delta_lift_delta_lift_distinct.step()
print(from_stream_of_streams_into_list_of_lists(delta_lift_delta_lift_distinct.output()))

stream_of_zset_streams.send(some_zset_update_one_stream)
delta_lift_delta_lift_distinct.step()
print(from_stream_of_streams_into_list_of_lists(delta_lift_delta_lift_distinct.output()))

[[{}], [{}, {1: 1, 2: 1, 3: 1, 4: 1}, {}, {}]]
[[{}], [{}, {1: 1, 2: 1, 3: 1, 4: 1}, {}, {}], [{}, {1: -1}, {}, {}, {}]]


In [34]:
def stream_introduction[T](value: T, group: AbelianGroupOperation[T]) -> Stream[T]:
    output_stream = Stream(group)
    output_stream.send(value)

    return output_stream


def stream_elimination[T](stream: Stream[T]) -> T:
    output_value = stream.group().identity()
    for value in stream:
        output_value = stream.group().add(output_value, value)

    return output_value


class LiftedStreamIntroduction(Lift1[T, Stream[T]]):
    def __init__(self, stream: StreamHandle[T]) -> None:
        super().__init__(
            stream,
            lambda x: stream_introduction(x, stream.get().group()),
            StreamAddition(stream.get().group()),  # type: ignore
        )


class LiftedStreamElimination(Lift1[Stream[T], T]):
    def __init__(self, stream: StreamHandle[Stream[T]]) -> None:
        super().__init__(stream, lambda x: stream_elimination(x), stream.get().group().inner_group())  # type: ignore

In [35]:
print(zset_stream.to_list())

stream_of_zset_stream = stream_introduction(zset_stream, zset_stream_addition_group)
print(from_stream_of_streams_into_list_of_lists(stream_of_zset_stream))

print(stream_elimination(zset_stream))

print(stream_elimination(stream_of_zset_stream).to_list())

[{}, {'apple': 1, 'orange': 3}, {'apple': -2, 'orange': 3}, {'apple': 2, 'bananas': -2}]
[[{}], [{}, {'apple': 1, 'orange': 3}, {'apple': -2, 'orange': 3}, {'apple': 2, 'bananas': -2}]]
{'apple': 1, 'orange': 6, 'bananas': -2}
[{}, {'apple': 1, 'orange': 3}, {'apple': -2, 'orange': 3}, {'apple': 2, 'bananas': -2}]


In [36]:
Edge = Tuple[int, int]
GraphZSet = ZSet[Edge]

empty_graph = ZSetAddition().identity()

class IncrementalGraphReachability(UnaryOperator[GraphZSet, GraphZSet]):
    delta_input: LiftedStreamIntroduction[GraphZSet]
    join: DeltaLiftedDeltaLiftedJoin[Edge, Edge, Edge]
    delta_input_join_sum: LiftedGroupAdd[Stream[GraphZSet]]
    distinct: DeltaLiftedDeltaLiftedDistinct[Edge]
    lift_delayed_distinct: LiftedDelay[GraphZSet]
    flattened_output: LiftedStreamElimination[GraphZSet]

    def __init__(self, stream: StreamHandle[GraphZSet]):
        self.input_stream_handle = stream

        self.delta_input = LiftedStreamIntroduction(self.input_stream_handle)

        self.join = DeltaLiftedDeltaLiftedJoin(
            None,
            None,
            lambda left, right: left[1] == right[0],
            lambda left, right: (left[0], right[1]),
        )
        self.delta_input_join_sum = LiftedGroupAdd(self.delta_input.output_handle(), self.join.output_handle())

        self.distinct = DeltaLiftedDeltaLiftedDistinct(self.delta_input_join_sum.output_handle())
        self.lift_delayed_distinct = LiftedDelay(self.distinct.output_handle())

        self.join.set_input_a(self.lift_delayed_distinct.output_handle())
        self.join.set_input_b(self.delta_input.output_handle())

        self.flattened_output = LiftedStreamElimination(self.distinct.output_handle())
        self.output_stream_handle = self.flattened_output.output_handle()

    def step(self) -> bool:
        self.delta_input.step()
        self.lift_delayed_distinct.step()
        self.join.step()
        self.delta_input_join_sum.step()
        self.distinct.step()
        self.flattened_output.step()

        latest = self.flattened_output.output().latest()
        id = self.output().group().identity()
        if latest == id:
            return True

        return False

In [37]:
import itertools as it

initial_graph: GraphZSet = ZSet(
    {(fst, snd): 1 for fst, snd in it.product(range(5), range(5)) if snd - 1 == fst}
)

graph_stream = Stream(ZSetAddition())
graph_stream.send(initial_graph)
graph_stream_handle = StreamHandle(lambda: graph_stream)
reachability = IncrementalGraphReachability(graph_stream_handle)
print(f"Graph:\n{graph_stream}\n")
step_until_fixpoint(reachability)
print(f"Reachability Graph:\n{reachability.output().to_list()}\n")

Graph:
OrderedDict({1: {(0, 1): 1, (1, 2): 1, (2, 3): 1, (3, 4): 1}})

Reachability Graph:
[{}, {(0, 1): 1, (1, 2): 1, (2, 3): 1, (3, 4): 1}, {(0, 2): 1, (1, 3): 1, (2, 4): 1}, {(0, 3): 1, (1, 4): 1}, {(0, 4): 1}, {}]



In [38]:
import typing


def load_graph(file_path: str) -> GraphZSet:
    out = []

    with open(file_path, mode="r") as file:
        lines = file.readlines()
        out = {
            edge: 1
            for edge in map(
                lambda line: tuple(map(lambda node: int(node), line.split()[0:2])),
                lines,
            )
        }

    return typing.cast(GraphZSet, ZSet(out))

In [39]:
dense_graph_zset = load_graph("data/graph1000.txt")

In [40]:
%%time

dense_graph_stream = Stream(ZSetAddition())
dense_graph_stream.send(dense_graph_zset)
dense_graph_stream_handle = StreamHandle(lambda: dense_graph_stream)

def step_until_first_fixpoint(op: IncrementalGraphReachability):
    while True:
        op.step()
        out = op.output_handle().get().latest()
        if out == zset_addition_group.identity():
            break

dense_graph_reachability = IncrementalGraphReachability(dense_graph_stream_handle)
step_until_first_fixpoint(dense_graph_reachability)

CPU times: user 1.16 s, sys: 41.8 ms, total: 1.2 s
Wall time: 1.23 s


In [41]:
result = stream_elimination(dense_graph_reachability.output())

# DBSP Rust outputs 11532 tuples
len(result.inner)

11532

In [42]:
sparse_graph_zset = load_graph("data/graph10000.txt")

In [43]:
%%time

sparse_graph_stream = Stream(ZSetAddition())
sparse_graph_stream.send(sparse_graph_zset)
sparse_graph_stream_handle = StreamHandle(lambda: sparse_graph_stream)

sparse_graph_reachability = IncrementalGraphReachability(sparse_graph_stream_handle)
step_until_first_fixpoint(sparse_graph_reachability)

CPU times: user 1min 19s, sys: 421 ms, total: 1min 20s
Wall time: 1min 20s


In [44]:
result = stream_elimination(sparse_graph_reachability.output())

# DBSP Rust outputs 262144 tuples
len(result.inner)

262144