In [1]:
import random
from collections import Counter

In [2]:
PROBABILITY_OF_HAVING_STOCK = 0.8
PROBABILITY_OF_USER_WAITING = 0.4

In [3]:
def has_stock() -> int:
    return random.random() < PROBABILITY_OF_HAVING_STOCK

def user_wants_to_wait() -> bool:
    return random.random() < PROBABILITY_OF_USER_WAITING

In [4]:
from enum import Enum, auto

class State(Enum):
    WAITING = auto()
    PENDING = auto()
    SUSPENDED = auto()
    CANCELLED = auto()
    ARCHIVED = auto()
    DELIVERED = auto()

### Naive, Imperative Implementation

In [5]:
def game() -> State:
    state = State.WAITING
    
    if has_stock():
        state = State.PENDING
        state = State.DELIVERED
        return state
    
    state = State.SUSPENDED

    if user_wants_to_wait():
        state = State.PENDING
        state = State.DELIVERED
        return state
    
    state = State.CANCELLED    
    state = State.ARCHIVED
    return state

In [6]:
results = [game() for _ in range(100_000)]
Counter(results)

Counter({<State.DELIVERED: 6>: 87957, <State.ARCHIVED: 5>: 12043})

In [7]:
random.seed(9)
value = game()

### Naive, Functional Implementation

In [8]:
def create_order() -> State:
    return State.WAITING

def check_stock() -> State:
    return State.PENDING if has_stock() else State.SUSPENDED

def wait_for_confirmation() -> State:
    return State.PENDING if user_wants_to_wait() else State.CANCELLED

def deliver() -> State:
    return State.DELIVERED

def archive() -> State:
    return State.ARCHIVED

def game_() -> State:
    order = create_order()

    order = check_stock()

    if order is State.SUSPENDED:
        order = wait_for_confirmation()
    
    if order is State.PENDING:
        order = deliver()

    if order is State.CANCELLED:
        order = archive()

    return order

In [9]:
for i in range(1_000):
    random.seed(i)
    value = game()
    
    random.seed(i)
    value_ = game_()
    assert value is value_, i

### Pure Python Implementation

In [10]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, ParamSpec, Union, TypeVar, Generic

T = TypeVar("T", covariant=True)

@dataclass
class Success(Generic[T]):
    value: T

@dataclass
class Failure(Generic[T]):
    value: T


T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")


Result = Union[Success[T1], Failure[T2]]


U = TypeVar("U")
W = TypeVar("W")
P = ParamSpec("P")

def compose(f: Callable[P, U], g: Callable[[U], W]) -> Callable[P, W]:
    def inner(*args: P.args, **kwargs: P.kwargs) -> W:
        return g(f(*args, **kwargs))
    return inner

def invert(value: Result[T1, T2]) -> Result[T2, T1]:
    return Failure(value.value) if isinstance(value, Success) else Success(value.value)

def identity(x: T) -> T:
    return x


def bind(f: Callable[[T1], Result[T3, T2]]) -> Callable[[Result[T1, T2]], Result[Union[T1, T3], T2]]:
    def adapt(double_track: Result[T1, T2]) -> Result[Union[T1, T3], T2]:
        return f(double_track.value) if isinstance(double_track, Success) else  double_track
    return adapt

def map_(f: Callable[[T1], Success[T3]]) -> Callable[[Result[T1, T2]], Result[T3, T2]]:
    def adapt(double_track: Result[T1, T2]) -> Result[T3, T2]:
        return f(double_track.value) if isinstance(double_track, Success) else  double_track
    return adapt


# def lash(f: Callable[..., Result[T1, T3]]) -> Callable[[Result[T1, T2]], Result[T1, Union[T2, T3]]]:
#     def adapt(double_track: Result[T1, T2]) -> Result[T1, Union[T2, T3]]:
#         return f(double_track.value) if isinstance(double_track, Failure) else  double_track
#     return adapt

def lash(f: Callable[[T1], Result[T1, T3]]) -> Callable[[Result[T1, T2]], Result[T1, Union[T2, T3]]]:
    return compose(compose(invert, bind(compose(f, invert))), invert)

# def alt(f: Callable[..., Failure[T3]]) -> Callable[[Result[T1, T2]], Result[T1, T3]]:
#     def adapt(double_track: Result[T1, T2]) -> Result[T1, T3]:
#         return f(double_track.value) if isinstance(double_track, Failure) else  double_track
#     return adapt

def alt(f: Callable[[T1], Failure[T3]]) -> Callable[[Result[T1, T2]], Result[T1, T3]]:
    return compose(compose(invert, map_(compose(f, invert))), invert)


@dataclass
class PipeLazyWithStart(Generic[P, W]):
    f: Callable[P, W]

    def then(self, g: Callable[[W], U]) -> PipeLazyWithStart[P, U]:
        return PipeLazyWithStart(compose(self.f, g))

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> W:
        return self.f(*args, **kwargs)


In [11]:
def create_order(*_) -> Success[State]:
    return Success(State.WAITING)

def check_stock(*_) -> Result[State, State]:
    return Success(State.PENDING) if has_stock() else Failure(State.SUSPENDED)

def wait_for_confirmation(*_) -> Result[State, State]:
    return Success(State.PENDING) if user_wants_to_wait() else Failure(State.CANCELLED)

def deliver(*_) -> Success[State]:
    return Success(State.DELIVERED)

def archive(*_) -> Failure[State]:
    return Failure(State.ARCHIVED)

def extract_value(order: Result[State, State]) -> State:
    return order.value

random.seed(9)
game_2 = (
    PipeLazyWithStart(create_order)
    .then(bind(check_stock))
    .then(lash(wait_for_confirmation))
    .then(map_(deliver))
    .then(alt(archive))
    .then(extract_value)
)
game_2()

<State.DELIVERED: 6>

In [12]:
for i in range(1_000):
    random.seed(i)
    value = game()
    
    random.seed(i)
    value_ = game_2()
    # print(value, value_)
    assert value is value_, i

### Implementing using Returns

In [13]:
%pip install returns mypy

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\elcg\.pyenv\pyenv-win\versions\3.11.0a7\python.exe -m pip install --upgrade pip' command.


In [14]:
from returns.result import Result, Success, Failure

In [15]:
def create_order(*_) -> Success[State]:
    return Success(State.WAITING)

def check_stock(*_) -> Result[State, State]:
    return Success(State.PENDING) if has_stock() else Failure(State.SUSPENDED)

def wait_for_confirmation(*_) -> Result[State, State]:
    return Success(State.PENDING) if user_wants_to_wait() else Failure(State.CANCELLED)

def deliver(*_) -> State:
    return State.DELIVERED

def archive(*_) -> State:
    return State.ARCHIVED

def extract_value(order: Result[State, State]) -> State:
    return order._inner_value


In [16]:
from returns.pipeline import pipe
from returns.pointfree import bind, lash, map_, alt

game_3 = pipe(
    bind(check_stock),
    lash(wait_for_confirmation),
    map_(deliver),
    alt(archive),
    extract_value
)
game_3(create_order())

<State.DELIVERED: 6>

In [17]:
for i in range(1_000):
    random.seed(i)
    value = game()
    
    random.seed(i)
    value_ = game_3(create_order())

    assert value is value_, i

### Implementing using Result

In [18]:
%pip install result

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\elcg\.pyenv\pyenv-win\versions\3.11.0a7\python.exe -m pip install --upgrade pip' command.


In [19]:
from result import Ok, Err, Result

In [20]:
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")

def invert(value: Result[T1, T2]) -> Result[T2, T1]:
    return Err(value.unwrap()) if isinstance(value, Ok) else Ok(value.unwrap_err())

def bind(f: Callable[..., Result[T3, T2]]) -> Callable[[Result[T1, T2]], Result[Union[T1, T3], T2]]:
    def adapt(double_track: Result[T1, T2]) -> Result[Union[T1, T3], T2]:
        return f(double_track.unwrap()) if double_track.is_ok() else  double_track
    return adapt

def map_(f: Callable[..., Ok[T3]]) -> Callable[[Result[T1, T2]], Result[T3, T2]]:
    def adapt(double_track: Result[T1, T2]) -> Result[T3, T2]:
        return f(double_track.unwrap()) if isinstance(double_track, Ok) else  double_track
    return adapt

def lash(f: Callable[..., Result[T1, T3]]) -> Callable[[Result[T1, T2]], Result[T1, Union[T2, T3]]]:
    return compose(compose(invert, bind(compose(f, invert))), invert)


def alt(f: Callable[..., Err[T3]]) -> Callable[[Result[T1, T2]], Result[T1, T3]]:
    return compose(compose(invert, map_(compose(f, invert))), invert)

In [21]:
def create_order(*_) -> Ok[State]:
    return Ok(State.WAITING)

def check_stock(*_) -> Result[State, State]:
    return Ok(State.PENDING) if has_stock() else Err(State.SUSPENDED)

def wait_for_confirmation(*_) -> Result[State, State]:
    return Ok(State.PENDING) if user_wants_to_wait() else Err(State.CANCELLED)

def deliver(*_) -> Ok[State]:
    return Ok(State.DELIVERED)

def archive(*_) -> Err[State]:
    return Err(State.ARCHIVED)

def extract_value(order: Result[State, State]) -> State:
    return order._value


In [22]:
game_4 = (
    PipeLazyWithStart(create_order)
    .then(bind(check_stock))
    .then(lash(wait_for_confirmation))
    .then(map_(deliver))
    .then(alt(archive))
    .then(extract_value)
)

game_4()

<State.DELIVERED: 6>

In [23]:
for i in range(1_000):
    random.seed(i)
    value = game()
    
    random.seed(i)
    value_ = game_4()
    # print(value, value_)
    assert value is value_, i

### Probability of Delivery

In [24]:
%pip install sympy

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\elcg\.pyenv\pyenv-win\versions\3.11.0a7\python.exe -m pip install --upgrade pip' command.


In [25]:
from sympy.stats import P, E, variance, Uniform, Normal

In [26]:
from sympy import simplify

In [27]:
X1 = Uniform("X1", 0, 1)
X2 = Uniform("X2", 0, 1)

probability_of_having_stock = P(X1 < PROBABILITY_OF_HAVING_STOCK)
probability_of_user_waiting = P(X2 < PROBABILITY_OF_USER_WAITING)


In [28]:
delivered_probability = (
    probability_of_having_stock
    + (1 - probability_of_having_stock) * probability_of_user_waiting
)
delivered_probability

0.880000000000000