In [91]:
from typing import *
import xai
import numpy as np
import dill
import pickle
import inspect
import multiprocessing as mp

In [96]:
X = TypeVar("X")
Y = TypeVar("Y")

class Stream(Generic[X]):

    def __init__(self, source: Iterable[X]) -> None:
        super().__init__()
        self._parent: Stream[Any]|None = None
        self._source = source

    def map(self, f: Callable[[X],Y]) -> "Stream[Y]":
        return self._appended(f(x) for x in self)
    
    def fork(self, processes: int) -> "Stream[Y]":
        assert processes > 0
        if processes == 0:
            return self
        else:
            with mp.Pool(processes) as pool:
                pool.join()

    
    def foreach(self, f: Callable[[X],Y]) -> None:
        for x in self:
            f(x)

    def _appended(self, source: Iterable[Y]) -> "Stream[Y]":
        stream = Stream(source)
        stream._parent = self
        return stream 
    
    def __iter__(self) -> Iterator[X]:
        return iter(self._source)
    

Stream([1,2,3]).foreach(print)

1
2
3


In [None]:
x.__next__()

In [None]:

dir(type("".join))

In [None]:
type("".join).__call__()

In [None]:
gen = iter(i for i in range(10))
next(gen)

In [2]:
import multiprocessing as mp

In [77]:
import time
import os

queue = mp.Queue()

foo = mp.Process(target=lambda n,queue: queue.put(n), args=(2,queue))
bar = mp.Process(target=lambda n,queue: [queue.put(i) for i in range(n)], args=(30,queue))

In [78]:
foo.start()
bar.start()

In [88]:
queue.get(), queue.qsize()

(8, 21)

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def add(n):
    return n+2

def fetch():
    for i in range(100000):
        print("fetch")
        yield i

with ThreadPoolExecutor(10) as tp:
    for res in tp.map(add, fetch(), chunksize=2):
        print(res)

In [None]:
(xai.Stream.sample(range(100), with_replacement=True)
 .take(1000000)
 .map(lambda n: f"int:{n}"*10)
 .monitor("Random sampling")
 .save("numbers"))

In [None]:
np.stack(xai.Stream.load("numbers", str).drop(1).monitor().list())

In [None]:
xai.Stream.load("asteroids-l32.pt", xai.AutoEncoder).item()

In [None]:
next(iter(xai.Stream.load("asteroids-l32.pt", xai.AutoEncoder)))

In [None]:
xai.Stream([]).fork(lambda s: (
    s.map(lambda n: 2).fork(),
    s.map()
))

In [None]:
import functools

In [None]:

import pickle
import sys
f = lambda n: 2

f.__setstate__

In [None]:
f = lambda n: 2
f.__getstate__ = lambda self: dill.dumps(self)
f.__setstate__ = lambda self: dill.dumps(self)

In [None]:

dump = pickle.dumps(type("Foo", tuple(), {
    "__call__": lambda self: 2
})())
dump

In [None]:
load = dill.loads(dump)
load()

In [None]:
from typing import *
from IPython.display import DisplayObject
from dataclasses import dataclass
from plotly.graph_objects import Figure # type: ignore

import plotly.express as px # type: ignore

class TrainRecord(NamedTuple):
    batch_size: int
    train_loss: float
    val_loss:   float|None
    accuracy:   float|None
    info:       str|None

class TrainHistory(list[TrainRecord]):

    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        for key,value in vars(self.figure()).items():
            setattr(self, key, value)

    def append_epoch(self,
                     batch_size: int,
                     train_loss: float,
                     val_loss:   float|None = None,
                     accuracy:   float|None = None,
                     info:       str|None = None) -> None:
        self.append(TrainRecord(
            batch_size=batch_size,
            train_loss=train_loss,
            val_loss=val_loss,
            accuracy=accuracy,
            info=info
        ))

    def figure(self) -> Figure:
        if not self:
            return px.line({})
        
        last_info = self[0].info
        
        milestones: list[tuple[str,dict[str,int]]] = [(last_info, {})]
        
        for epoch,record in enumerate(self):
            if last_info != record.info:
                milestones.append((record.info,{}))

            for stat in record:
                pass


            

        return px.line({"x": []}, x="x")
    
    def _ipython_display_(self):
        return self.figure()._ipython_display_()
    
TrainHistory([TrainRecord(54,34,23,1,""),TrainRecord(54,34,23,1,"")])