In [None]:
# from __future__ import annotations
# from dataclasses import dataclass, field
# 
# import numpy as np
# import pandas as pd
# import typing as tp
# import jijmodeling as jm
# import pathlib
# import uuid
# 
# @dataclass
# class DataNode:
#     data: tp.Any
#     name: str | None = None
#     operator: FunctionNode | None = None
# 
# 
# class FunctionNode:
#     name: str | None = None
# 
#     def __init__(
#         self,
#     ) -> None:
#         self.inputs = []
# 
#     def apply(self, inputs: list[DataNode], **kwargs: tp.Any) -> DataNode:
#         self.inputs += inputs
#         return self.operate(inputs, **kwargs)
# 
#     def operate(self, inputs: list[DataNode], **kwargs: tp.Any) -> DataNode:
#         raise NotImplementedError


In [None]:
# DEFAULT_RESULT_DIR = "./.jb_results"
# 
# 
# @dataclass
# class ID(DataNode):
#     data: str | uuid.UUID = field(default_factory=uuid.uuid4)
# 
#     def __post_init__(self):
#         self.data = str(self.data)
# 
# 
# @dataclass
# class Date(DataNode):
#     data: str | pd.Timestamp = field(default_factory=pd.Timestamp.now)
#     name: str = "timestamp"
# 
#     def __post_init__(self):
#         if isinstance(self.data, str):
#             self.data = pd.Timestamp(self.data)
# 
# 
# class Min(FunctionNode):
#     name = "min"
# 
#     def operate(self, inputs: list[Array]) -> DataNode:
#         data = inputs[0].data.min()
#         name = inputs[0].name + f"_{self.name}" if inputs[0].name else None
#         node = DataNode(data=data, name=name, operator=self)
#         return node
# 
# 
# class Max(FunctionNode):
#     name = "max"
# 
#     def operate(self, inputs: list[Array]) -> DataNode:
#         data = inputs[0].data.max()
#         name = inputs[0].name + f"_{self.name}" if inputs[0].name else None
#         node = DataNode(data=data, name=name, operator=self)
#         return node
# 
# 
# class Mean(FunctionNode):
#     name = "mean"
# 
#     def operate(self, inputs: list[Array]) -> DataNode:
#         data = inputs[0].data.mean()
#         name = inputs[0].name + f"_{self.name}" if inputs[0].name else None
#         node = DataNode(data=data, name=name, operator=self)
#         return node
# 
# 
# class Std(FunctionNode):
#     name = "std"
# 
#     def operate(self, inputs: list[Array]) -> DataNode:
#         data = inputs[0].data.std()
#         name = inputs[0].name + f"_{self.name}" if inputs[0].name else None
#         node = DataNode(data=data, name=name, operator=self)
#         return node
# 
# 
# @dataclass
# class Array(DataNode):
#     data: np.ndarray
# 
#     def min(self) -> DataNode:
#         return Min().apply([self])
# 
#     def max(self) -> DataNode:
#         return Max().apply([self])
# 
#     def mean(self) -> DataNode:
#         return Mean().apply([self])
# 
#     def std(self) -> DataNode:
#         return Std().apply([self])
# 
# 
# @dataclass
# class Energy(Array):
#     name: str = "energy"
# 
# 
# @dataclass
# class Objective(Array):
#     name: str = "objective"
# 
# 
# @dataclass
# class ConstraintViolation(Array):
#     def __post_init__(self):
#         if self.name is None:
#             raise NameError("Attribute 'name' is None. Please set a name.")
# 
# 
# @dataclass
# class SampleSet(DataNode):
#     data: jm.SampleSet
# 
# 
# class RecordFactory(FunctionNode):
#     name = "record"
# 
#     def operate(self, inputs: list[DataNode], name: str | None = None) -> Record:
#         data = pd.Series({node.name: node.data for node in inputs})
#         return Record(data, name=name, operator=self)
# 
# 
# class TableFactory(FunctionNode):
#     name = "table"
# 
#     def operate(self, inputs: list[Record], name: str | None = None) -> Table:
#         data = pd.DataFrame({node.name: node.data for node in inputs})
#         return Table(data, name=name, operator=self)
# 
# 
# class ArtifactFactory(FunctionNode):
#     name = "artifact"
# 
#     def operate(self, inputs: list[Record], name: str | None = None) -> Artifact:
#         data = {node.name: node.data.to_dict() for node in inputs}
#         return Artifact(data, name=name, operator=self)
# 
# 
# @dataclass
# class Record(DataNode):
#     data: pd.Series = field(default_factory=lambda: pd.Series(dtype="object"))
# 
# 
# @dataclass
# class Table(DataNode):
#     data: pd.DataFrame = field(default_factory=pd.DataFrame)
# 
#     def append(self, record: Record, axis: tp.Literal[0, 1] = 0) -> Table:
#         table = TableFactory().apply([record])
#         return Concat().apply([self, table], axis=axis)
# 
# 
# @dataclass
# class Artifact(DataNode):
#     data: dict = field(default_factory=dict)
# 
# 
# class Concat(FunctionNode):
#     name = "concat"
# 
#     def operate(
#         self, inputs: list[Table] | list[Artifact], name=None, axis: tp.Literal[0, 1] = 0
#     ) -> Table | Artifact:
#         dtype = type(inputs[0])
#         if not all([isinstance(node, dtype) for node in inputs]):
#             raise TypeError(
#                 "Type of elements of 'inputs' must be unified with either 'Table' or 'Artifact'."
#             )
# 
#         if isinstance(inputs[0], Artifact):
#             data = {node.name: node.data for node in inputs}
#             return Artifact(data=data, name=name, operator=self)
#         elif isinstance(inputs[0], Table):
#             data = pd.concat(
#                 [node.data for node in inputs if isinstance(node, Table)], axis=axis
#             )
#             return Table(data=data, name=name, operator=self)
#         else:
#             raise TypeError(f"'{inputs[0].__class__.__name__}' type is not supported.")
# 
# 
# @dataclass
# class Experiment(DataNode):
#     data: tuple = ()
# 
#     def __post_init__(self):
#         if not self.data:
#             self.data = (Table(), Artifact())

In [None]:
from __future__ import annotations
from dataclasses import dataclass

import typing as tp

@dataclass(frozen=True)
class DN:
    data: tp.Any
    name: str | None = None

    @property
    def dtype(self) -> type:
        return type(self.data)


d = DN([1, 2])


In [1]:
import chainer.functions
import pandas as pd
from pandas.core import frame

In [7]:
a = [1, 2, 3]
a[::-1]

[3, 2, 1]

In [12]:
import sys

sys.path.append("../")

import jijbench as jb
import numpy as np

from jijbench.node.base import DataNode, FunctionNode
from jijbench.functions.math import Min




In [None]:
array = jb.Array(np.array([1, 2, 3]))
f: FunctionNode[jb.Array, jb.Array] = Min()
res = array.apply(f)
res.operator

In [32]:
factory = jb.functions.RecordFactory()

inputs = [jb.ID(name="id"), jb.Date(), jb.Array(np.arange(5))]
record = factory(inputs)

record = pd.concat([record.data, record.data])
record["id"]

id    ID(data='963fccd9-f449-463b-96dc-fc4de1474a7d'...
id    ID(data='963fccd9-f449-463b-96dc-fc4de1474a7d'...
dtype: object

In [None]:
class A:
    pass

a = A()
a.__class__.__name__

def func(x=[]):
    x += [1]
    return x

print(func())
(func())
print(a)
func()

In [None]:
import pandas as pd

pd.concat

def f():
    return 1

df = pd.DataFrame()
df.apply(f)

In [2]:
import abc

class A(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def f(self):
        pass

class B(A):
    def __init__(self, x) -> None:
        self.x = x
    
    def f(self):
        print(f"x: {self.x}")

In [14]:
b = B([1])

c = type(b)(b.x)
c.x += [2]
b.x

[1, 2]

In [30]:
import typing as tp

x = 1
x = tp.cast(float, x)

1

In [18]:
import numpy as np

x = np.array([1])
x.copy()
type(x)

np.ndarray
np._ArrayOrScalarCommon

numpy.ndarray

In [1]:
import pandas as pd

df = pd.DataFrame([1])
print(id(df))
print(id(df.copy()))

140064391932992
140064215154544


In [2]:
dir(df)

['T',
 '_AXIS_LEN',
 '_AXIS_ORDERS',
 '_AXIS_TO_AXIS_NUMBER',
 '_HANDLED_TYPES',
 '__abs__',
 '__add__',
 '__and__',
 '__annotations__',
 '__array__',
 '__array_priority__',
 '__array_ufunc__',
 '__array_wrap__',
 '__bool__',
 '__class__',
 '__contains__',
 '__copy__',
 '__dataframe__',
 '__deepcopy__',
 '__delattr__',
 '__delitem__',
 '__dict__',
 '__dir__',
 '__divmod__',
 '__doc__',
 '__eq__',
 '__finalize__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__iand__',
 '__ifloordiv__',
 '__imod__',
 '__imul__',
 '__init__',
 '__init_subclass__',
 '__invert__',
 '__ior__',
 '__ipow__',
 '__isub__',
 '__iter__',
 '__itruediv__',
 '__ixor__',
 '__le__',
 '__len__',
 '__lt__',
 '__matmul__',
 '__mod__',
 '__module__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__nonzero__',
 '__or__',
 '__pos__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__rdivmod__',
 '__reduce__',
 '__reduce_ex_

In [52]:
import typing as tp

T = tp.TypeVar("T")

def first(xs: list[T]) -> T:
    return xs[0]

x = first([1.0, 2.0, 3, "4"])


class A:
    _typ = "T"
    
    def f(self, x: T) -> T:
        return x
    
    
    
    def g(self, x: T) -> T:
        return x


In [58]:
a = A()

getattr(a, "a", "default")

'default'

In [49]:
import sys

sys.path.append("../../")

import jijbench as jb
from jijbench.node.base import DataNode

t = jb.Table(pd.DataFrame())
a = jb.Any(1, "a")

isinstance(a, DataNode)

True

In [40]:
type("ABCBase", (type, ), {})

__main__.ABCBase

In [3]:
def create_pandas_abc_type(name, attr, comp):
    def _check(inst):
        return getattr(inst, attr, "_typ") in comp

    # https://github.com/python/mypy/issues/1006
    # error: 'classmethod' used with a non-method
    @classmethod  # type: ignore[misc]
    def _instancecheck(cls, inst) -> bool:
        return _check(inst) and not isinstance(inst, type)

    @classmethod  # type: ignore[misc]
    def _subclasscheck(cls, inst) -> bool:
        # Raise instead of returning False
        # This is consistent with default __subclasscheck__ behavior
        if not isinstance(inst, type):
            raise TypeError("issubclass() arg 1 must be a class")

        return _check(inst)

    dct = {"__instancecheck__": _instancecheck, "__subclasscheck__": _subclasscheck}
    meta = type("ABCBase", (type,), dct)
    return meta(name, (), dct)


In [71]:
AT2

__main__.ArtifactBase

In [80]:
import sys

sys.path.append("../../")

import typing as tp
from typing import Type
import jijbench as jb

if tp.TYPE_CHECKING:
    from jijbench.data.mapping import Artifact

meta = type("Base", (type, ), {})
t = meta("Artifact", (), {})
AT = tp.cast("Artifact", t)

t2 = create_pandas_abc_type("ArtifactBase", "_typ", ("arifact", ))
AT2 = tp.cast("Type[Artifact]", t2)

obj = jb.Artifact({}, "a")
isinstance(obj, AT2)

False

In [88]:
meta = type(obj.__class_, (type, ), {})
meta

__main__.Artifact

In [74]:
AT2

__main__.ArtifactBase

In [53]:
a = A()
dir(a)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_typ',
 'f',
 'g']

In [57]:
dir(AT)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__']

__main__.Type[Artifact]

In [49]:
jb.Artifact

jijbench.data.mapping.Artifact

In [37]:
AT

__main__.Type[Artifact]

In [33]:
isinstance(t, meta)

True

In [28]:
t

__main__.Type[Artifact]

In [5]:
T = create_pandas_abc_type("T", "_typ", ("T",))

In [7]:
dir(T())

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__instancecheck__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasscheck__',
 '__subclasshook__',
 '__weakref__']

In [10]:
import pandas as pd

df = pd.DataFrame()

df._typ

'dataframe'

In [13]:
from pandas.core.dtypes.generic import ABCDataFrame

isinstance(df, ABCDataFrame)

True

In [14]:
meta

NameError: name 'meta' is not defined

In [19]:
def f(x=[]):
    return x

x = f()
x += [1]

# print(f(x=[1, 2]))
print(f())

class A:
    def __init__(self, x={}):
        self.x = x

a = A()
a.x.update({"x": 1})
print(a.x)
print()

a = A()
print(a.x)
a.x.update({"y": 2})
a.x

[1]
{'x': 1}
{'x': 1}


{'x': 1, 'y': 2}

In [47]:
from __future__ import annotations
import typing as tp
from typing_extensions import TypeAlias

T = tp.TypeVar("T", bound=A)
S = tp.TypeVar("S", bound=A)

ListType: TypeAlias = tp.Union[list[int],list[float]]

class A:
    def __init__(self, b: B):
        self.b = b

class AB(A):
    pass

class B(tp.Generic[T, S]):
    def f(self, x: T) -> S:
        a = x[0]
        return a


int == int
tp.Union[int, str] == int

False

In [None]:
from __future__ import annotations
from dataclasses import dataclass, field

import copy
import dill
import numpy as np
import pandas as pd
import typing as tp
import inspect
import itertools
import jijmodeling as jm
import pathlib
import uuid
import warnings

import sys
sys.path.append("../")
# from jijbench.exceptions import SolverFailedError
class SolverFailedError(Exception):
    pass


DNodeInType = tp.TypeVar("DNodeInType", bound="DataNode", covariant=True)
DNodeOutType = tp.TypeVar("DNodeOutType", bound="DataNode", covariant=True)

DEFAULT_RESULT_DIR = pathlib.Path("./.jb_results")


@dataclass
class DataNode:
    data: tp.Any
    name: str | None = None

    def __post_init__(self) -> None:
        self.operator: FunctionNode[DataNode, DataNode] | None = None



class FunctionNode(tp.Generic[DNodeInType, DNodeOutType]):
    def __init__(
        self,
        name: str | None = None
    ) -> None:
        self._name = name
        self.inputs: list[DataNode] = []

    def __call__(self, inputs: list[DNodeInType], **kwargs: tp.Any) -> DNodeOutType:
        raise NotImplementedError

    @property
    def name(self) -> str | None:
        return self._name

    def apply(self, inputs: list[DNodeInType], **kwargs: tp.Any) -> DNodeOutType:
        self.inputs += inputs
        node = self(inputs, **kwargs)
        node.operator = self
        return node


@dataclass
class ID(DataNode):
    data: str = field(default_factory=lambda: str(uuid.uuid4()))

    def __post_init__(self) -> None:
        self.data = str(self.data)


@dataclass
class Date(DataNode):
    data: str | pd.Timestamp = field(default_factory=pd.Timestamp.now)
    name: str = "timestamp"

    def __post_init__(self) -> None:
        if isinstance(self.data, str):
            self.data = pd.Timestamp(self.data)


@dataclass
class Value(DataNode):
    data: int | float


@dataclass
class Array(DataNode):
    data: np.ndarray
    name: str

    def min(self) -> Array:
        return Min().apply([self])

    def max(self) -> Array:
        return Max().apply([self])

    def mean(self) -> Array:
        return Mean().apply([self])

    def std(self) -> Array:
        return Std().apply([self])


class Min(FunctionNode["Array", "Array"]):
    def __call__(self, inputs: list[Array]) -> Array:
        data = inputs[0].data.min()
        name = inputs[0].name + f"_{self.name}"
        node = Array(data=data, name=name)
        return node

    @property
    def name(self) -> str:
        return "min"


class Max(FunctionNode["Array", "Array"]):
    def __call__(self, inputs: list[Array]) -> Array:
        data = inputs[0].data.max()
        name = inputs[0].name + f"_{self.name}"
        node = Array(data=data, name=name)
        return node

    @property
    def name(self) -> str:
        return "max"


class Mean(FunctionNode["Array", "Array"]):
    def __call__(self, inputs: list[Array]) -> Array:
        data = inputs[0].data.mean()
        name = inputs[0].name + f"_{self.name}"
        node = Array(data=data, name=name)
        return node

    @property
    def name(self) -> str:
        return "mean"


class Std(FunctionNode["Array", "Array"]):
    def __call__(self, inputs: list[Array]) -> Array:
        data = inputs[0].data.std()
        name = inputs[0].name + f"_{self.name}"
        node = Array(data=data, name=name)
        return node

    @property
    def name(self) -> str:
        return "std"


class RecordFactory(FunctionNode["DataNode", "Record"]):
    def __call__(self, inputs: list[DataNode], name: str | None = None, extract: bool = True) -> Record:
        data = {}
        for node in inputs:
            if isinstance(node.data, jm.SampleSet) and extract:
                data.update({n.name: n.data for n in self._to_nodes_from_sampleset(node.data)})
            else:
                data[node.name] = node.data
        data = pd.Series(data)
        return Record(data, name=name)

    @property
    def name(self) -> str:
        return "record"

    def _to_nodes_from_sampleset(self, sampleset: jm.SampleSet) -> list[DataNode]:
        data = []

        data.append(Array(np.array(sampleset.record.num_occurrences), "num_occurrences"))
        data.append(Array(np.array(sampleset.evaluation.energy), "energy"))
        data.append(Array(np.array(sampleset.evaluation.objective), "objective"))

        constraint_violations = sampleset.evaluation.constraint_violations
        if constraint_violations:
            for k, v in constraint_violations.items():
                data.append(Array(np.array(v), k))

        data.append(Value(sum(sampleset.record.num_occurrences), "num_samples"))
        data.append(Value(sum(sampleset.feasible().record.num_occurrences), "num_feasible"))

        # TODO スキーマが変わったら修正
        solving_time = sampleset.measuring_time.solve
        if solving_time is None:
            execution_time = np.nan
            warnings.warn(
                "'solve' of jijmodeling.SampleSet is None. Give it if you want to evaluate automatically."
            )
        else:
            if solving_time.solve is None:
                execution_time = np.nan
                warnings.warn(
                    "'solve' of jijmodeling.SampleSet is None. Give it if you want to evaluate automatically."
                )
            else:
                execution_time = solving_time.solve
        data.append(Value(execution_time, "execution_time"))
        return data


class TableFactory(FunctionNode["Record", "Table"]):
    def __call__(self, inputs: list[Record], name: str | None = None, index_name: str | None = None) -> Table:
        data = pd.DataFrame({node.name: node.data for node in inputs}).T
        data.index.name = index_name
        return Table(data, name=name)

    @property
    def name(self) -> str:
        return "table"


class ArtifactFactory(FunctionNode["Record", "Artifact"]):
    def __call__(self, inputs: list[Record], name: str | None = None) -> Artifact:
        data = {node.name: node.data.to_dict() for node in inputs}
        return Artifact(data, name=name)

    @property
    def name(self) -> str:
        return "artifact"


@dataclass
class Record(DataNode):
    data: pd.Series = field(default_factory=lambda: pd.Series(dtype="object"))


@dataclass
class DataBase(DataNode):
    def append(self, record: Record, **kwargs: tp.Any) -> None:
        raise NotImplementedError

    def _append(self, record: Record, factory: TableFactory | ArtifactFactory, **kwargs: tp.Any) -> None:
        node = factory.apply([record], name=self.name)
        node.operator = factory

        c = Concat()
        inputs = [copy.deepcopy(self), node]
        c.inputs = inputs
        self.data = c(inputs, **kwargs).data
        self.operator = c


@dataclass
class Table(DataBase):
    data: pd.DataFrame = field(default_factory=pd.DataFrame)

    def append(self, record: Record, axis: tp.Literal[0, 1] = 0, index_name: str | None = None, **kwargs: tp.Any) -> None:
        self._append(record, TableFactory(), axis=axis, index_name=index_name)


@dataclass
class Artifact(DataBase):
    data: dict = field(default_factory=dict)

    def append(self, record: Record, **kwargs: tp.Any) -> None:
        self._append(record, ArtifactFactory(), **kwargs)


class Concat(FunctionNode["DataBase", "DataBase"]):
    def __call__(
        self,
        inputs: list[DataBase],
        name: str | None = None,
        axis: tp.Literal[0, 1] = 0,
        index_name: str | None = None
    ) -> DataBase:
        dtype = type(inputs[0])
        if not all([isinstance(node, dtype) for node in inputs]):
            raise TypeError(
                "Type of elements of 'inputs' must be unified with either 'Table' or 'Artifact'."
            )

        if isinstance(inputs[0], Artifact):
            data = inputs[0].data.copy()
            for node in inputs[1:]:
                if node.name in data:
                    data[node.name].update(node.data.copy())
                else:
                    data[node.name] = node.data.copy()
            return Artifact(data=data, name=name)
        elif isinstance(inputs[0], Table):
            data = pd.concat(
                [node.data for node in inputs], axis=axis
            )
            data.index.name = index_name
            return Table(data=data, name=name)
        else:
            raise TypeError(f"'{inputs[0].__class__.__name__}' type is not supported.")

    @property
    def name(self) -> str:
        return "concat"


@dataclass
class Experiment(DataBase):
    def __init__(
            self,
            data: tuple[Artifact, Table] | None = None,
            name: str | None = None,
            autosave: bool = True,
            savedir: str | pathlib.Path = DEFAULT_RESULT_DIR
        ):
        if name is None:
            name = ID().data

        if data is None:
            data = (Artifact(), Table())

        if data[0].name is None:
            data[0].name = name

        if data[1].name is None:
            data[1].name = name

        self.data = data
        self.name = name
        self.autosave = autosave

        if isinstance(savedir, str):
            savedir = pathlib.Path(savedir)
        self.savedir = savedir

    @property
    def artifact(self) -> dict:
        return self.data[0].data

    @property
    def table(self) -> pd.DataFrame:
        t = self.data[1].data
        is_tuple_index = all([isinstance(i, tuple) for i in t.index])
        if is_tuple_index:
            names = t.index.names if len(t.index.names) >= 2 else None
            index = pd.MultiIndex.from_tuples(t.index, names=names)
            t.index = index
        return t

    def __enter__(self) -> Experiment:
        p = self.savedir / str(self.name)
        (p / "table").mkdir(parents=True, exist_ok=True)
        (p / "artifact").mkdir(parents=True, exist_ok=True)
        return self

    def __exit__(self, exception_type, exception_value, traceback) -> None:
        index = (self.name, self.table.index[-1])
        self.table.rename(index={self.table.index[-1]: index}, inplace=True)

        if self.autosave:
            self.save()

    def append(self, record: Record) -> None:
        for d in self.data:
            d.append(record, index_name=("experiment_id", "run_id"))

    def concat(self, experiment: Experiment) -> None:
        c = Concat()

        artifact = c([self.data[0], experiment.data[0]])
        table = c([self.data[1], experiment.data[1]])

        self.data = (artifact, table)
        self.operator = c

    def save(self):
        def is_dillable(obj: tp.Any):
            try:
                dill.dumps(obj)
                return True
            except Exception:
                return False

        p = self.savedir / str(self.name) / "table" / "table.csv"
        self.table.to_csv(p)

        p = self.savedir / str(self.name) / "artifact" / "artifact.dill"
        record_name = list(self.data[0].operator.inputs[1].data.keys())[0]
        if p.exists():
            with open(p, "rb") as f:
                artifact = dill.load(f)
                artifact[self.name][record_name] = {}
        else:
            artifact = {self.name: {record_name: {}}}

        record = {}
        for k, v in self.artifact[self.name][record_name].items():
            if is_dillable(v):
                record[k] = v
            else:
                record[k] = str(v)
        artifact[self.name][record_name].update(record)

        with open(p, "wb") as f:
            dill.dump(artifact, f)


class Solver(FunctionNode["DataNode", "Record"]):
    def __init__(self, function: tp.Callable) -> None:
        super().__init__()
        self.function = function

    def __call__(self, extract: bool = True, **kwargs: tp.Any) -> Record:
        parameters = inspect.signature(self.function).parameters
        is_kwargs = any([p.kind == 4 for p in parameters.values()])
        kwargs = (
            kwargs
            if is_kwargs
            else {k: v for k, v in kwargs.items() if k in parameters}
        )
        try:
            ret = self.function(**kwargs)
            if not isinstance(ret, tuple):
                ret = (ret, )
        except Exception as e:
            msg = f'An error occurred inside your solver. Please check implementation of "{self.name}". -> {e}'
            raise SolverFailedError(msg)

        solver_return_names = [f"{self.name}_return[{i}]" for i in range(len(ret))]
        nodes = [DataNode(data=data, name=name) for data, name in zip(ret, solver_return_names)]
        return RecordFactory().apply(nodes, extract=extract)

    @property
    def name(self) -> str:
        return self.function.__name__


class Benchmark(FunctionNode["DataNode", "Experiment"]):
    def __init__(
            self,
            params: dict[str, tp.Iterable[tp.Any]],
            solver: tp.Callable | list[tp.Callable],
            name: str | None = None,
            autosave: bool = True,
            savedir: str | pathlib.Path = DEFAULT_RESULT_DIR,
        ) -> None:
        super().__init__()
        self.params = params
        if isinstance(solver, tp.Callable):
            self.solver = [solver]
        else:
            self.solver = solver

        if name is None:
            name = ID().data
        self._name = name

        self.autosave = autosave
        self.savedir = savedir

    def __call__(self, concurrent=False, extract=True) -> Experiment:
        experiment = Experiment(name=ID().data, autosave=self.autosave, savedir=self.savedir)
        for f in self.solver:
            if concurrent:
                experiment = self._run_concurrently(experiment, Solver(f))
            else:
                experiment = self._run_sequentially(experiment, Solver(f))
        return experiment

    @property
    def name(self) -> str:
        return self._name

    def _run_concurrently(self, experiment: Experiment, solver: Solver) -> Experiment:
        raise NotImplementedError

    def _run_sequentially(self, experiment: Experiment, solver: Solver, extract=True) -> Experiment:
        # TODO 返り値名を変更できるようにする。
        # solver.rename_return(ret)
        for r in itertools.product(*self.params.values()):
            with experiment:
                solver_args = dict([(k, v) for k, v in zip(self.params.keys(), r)])
                name = ID().data
                record = solver(**solver_args, extract=extract)
                record.name = name
                experiment.append(record)

                # TODO 入力パラメータをtableで保持する
                # params = (dict([(k, v) for k, v in zip(self.params.keys(), r)]))
                # params = RecordFactory().apply(params)
                # params.name = name
                # experiment.append(record)
        return experiment


In [None]:
def sample_model(x, y):
    jm_sampleset_dict = {
        "record": {
            "solution": {
                "x": [
                    (([0, 1], [0, 1]), [1, 1], (2, 2)),
                    (([0, 1], [1, 0]), [1, 1], (2, 2)),
                    (([], []), [], (2, 2)),
                    (([0, 1], [0, 0]), [1, 1], (2, 2)),
                ]
            },
            "num_occurrences": [4, 3, 2, 1],
        },
        "evaluation": {
            "energy": [3.0, 24.0, 0.0, 20.0],
            "objective": [3.0, 24.0, 0.0, 17.0],
            "constraint_violations": {
                "onehot1": [0.0, 0.0, 2.0, 0.0],
                "onehot2": [0.0, 0.0, 2.0, 2.0],
            },
            "penalty": {},
        },
        "measuring_time": {"solve": None, "system": None, "total": None},
    }
    jm_sampleset = jm.SampleSet.from_serializable(jm_sampleset_dict)
    solving_time = jm.SolvingTime(
        **{"preprocess": 1.0, "solve": 1.0, "postprocess": 1.0}
    )
    jm_sampleset.measuring_time.solve = solving_time
    return jm_sampleset, x, y


bench = Benchmark(
    params={"x": range(3), "y": range(3)},
    solver=[sample_model]
)
result = bench()

In [None]:
result.table

In [None]:
result.artifact

In [None]:
import pandas 
tp.MutableMapping

In [None]:
result.data[0].operator.inputs[0].operator

In [None]:
import jijmodeling2 as jm2

In [None]:
result.table.shape

In [None]:
def f1():
    return 1

def f2():
    return 1, "a", 3.0

def f3(x, y):
    return x * 2, y / 2

s1 = Solver(f1)
ret = s1()

s2 = Solver(f2)
ret = s2()
ret.data

s3 = Solver(f3)
ret = s3(x=2, y=3)
print(ret.data)
print()

table = TableFactory()([ret])
table.data

In [None]:
x = (0, 1)
x[0] = 2

In [None]:
x = {"a": {"i": 0}}
y = {"a": {"sss": "ttt"}}

if "a" in x:
    x["a"].update(y["a"])
x

In [None]:
import jijmodeling as jm

def sample_model(x, y):
    jm_sampleset_dict = {
        "record": {
            "solution": {
                "x": [
                    (([0, 1], [0, 1]), [1, 1], (2, 2)),
                    (([0, 1], [1, 0]), [1, 1], (2, 2)),
                    (([], []), [], (2, 2)),
                    (([0, 1], [0, 0]), [1, 1], (2, 2)),
                ]
            },
            "num_occurrences": [4, 3, 2, 1],
        },
        "evaluation": {
            "energy": [3.0, 24.0, 0.0, 20.0],
            "objective": [3.0, 24.0, 0.0, 17.0],
            "constraint_violations": {
                "onehot1": [0.0, 0.0, 2.0, 0.0],
                "onehot2": [0.0, 0.0, 2.0, 2.0],
            },
            "penalty": {},
        },
        "measuring_time": {"solve": None, "system": None, "total": None},
    }
    jm_sampleset = jm.SampleSet.from_serializable(jm_sampleset_dict)
    solving_time = jm.SolvingTime(
        **{"preprocess": 1.0, "solve": 1.0, "postprocess": 1.0}
    )
    jm_sampleset.measuring_time.solve = solving_time
    return jm_sampleset, x, y

e = Experiment(name="test")
print("start")
print(e.data[0])
print(e.data[0].data)
print()

for x in range(3):
    for y in range(3):
        with e:
            solver = Solver(sample_model)
            record = solver(x=x, y=y)
            record.name = ID().data
            e.append(record)


In [None]:
e.data[1].data

In [None]:
pd.MultiIndex.from_tuples(e.table.index)

In [None]:
df = pd.DataFrame([1, 2, 3])
len(df.index.names)

In [None]:
e.artifact

In [None]:
index = (("x", 0), ("x", 1), ("y", 0), ("y", 1))
df = pd.DataFrame([0, 1, 2, 3])
df.index = pd.MultiIndex.from_tuples(index, names=None)
df2 = df.reset_index()
df2.index.names = ["a"]
df3 = df2.rename(index={3: (5,)})
df3

In [None]:
df

In [None]:
pd.concat([df, df2])

In [None]:
e1 = Experiment()
e1.data[1].data

In [None]:
e.data[0].operator.inputs

In [None]:
e.data[1].operator.inputs

In [None]:
import dill
import pathlib

def func():
    def inner():
        return 1

    return inner

p = pathlib.Path("./test.dill")

with open(p, "wb") as f:
    dill.dump(func, f)

dill.dumps(lambda x: x ** 2)

In [None]:
import dill
import pathlib

p = pathlib.Path("./test.dill")

with open(p, "rb") as f:
    obj = dill.load(f)

In [None]:
import pickle
import pathlib

def func():
    def inner():
        return 1

    return inner

p = pathlib.Path("./test.pkl")

with open(p, "wb") as f:
    pickle.dump(func, f)

try:
    f = lambda x: x ** 2
    pickle.dumps(f)
except Exception:
    print("False")
    print(f)
    print(str(f))


In [None]:
a = pickle.dumps("a")
b = pickle.dumps("b")


with open("test.pkl", "wb") as f:
    f.write(a + b)

In [None]:
pickle.dumps("a" + "b")

In [None]:
with open("test.pkl", "rb") as f:
    obj = pickle.load(f)
obj

In [None]:
import pickle
import pathlib

p = pathlib.Path("./test.pkl")

with open(p, "rb") as f:
    obj = pickle.load(f)

In [None]:
obj()()

In [None]:
! pip install dill

In [None]:
p = pathlib.Path(".")
df = pd.DataFrame([1])
df.to_csv(p / "table.csv")

In [None]:
p = e.savedir / "test" /"table"
p.exists()

In [None]:
p

In [None]:
e.table

In [None]:
e.table.to_csv(p)

In [None]:
p = pathlib.Path(".") / "table2.csv"
p.exists()

In [None]:
e.table

In [None]:
e.artifact

In [None]:
table = Table()
table.append(record, axis=1)
table.data

In [None]:
table = TableFactory().apply([record])
table.data

In [None]:
e.table.data

In [None]:
e.table.data.shape

In [None]:
e.artifact.data

In [None]:
T1 = tp.TypeVar("T1")
T2 = tp.TypeVar("T2", bound=float)

class A(tp.Generic[T1, T2]):
    def f(self, x: T1) -> T2:
        raise NotImplementedError
    

class B(A[int, float]):
    def f(self, x: int) -> float:
        return float(x)
    
b = B()
b.f(10)


In [None]:
array = Array(np.arange(10))
isinstance(array, DataNode)

In [None]:
from torch.utils.data import Dataset

In [None]:
i = ID()
print(i.data)
print(i.name)

In [None]:
e0 = Experiment()
e0.data[0].data["a"] = [0, 1, 2]
e0.data

e0 = Experiment()
e0.data

In [None]:
a = 1
a.__class__.__name__

In [None]:
pd.Series(dtype="object")

In [None]:
d0 = DataNode(0, name="d0")
d1 = DataNode(1, name="d1")
d2 = DataNode(2, name="d2")

r0 = RecordFactory()([d0, d1], name="new r0")
r1 = RecordFactory()([d0, d2], name="new r1")
r1.data

In [None]:
t0 = Table(pd.DataFrame([0, 1], index=["d0", "d1"], columns=["a"]))
t1 = Table(pd.DataFrame([2, 3], index=["d0", "d1"], columns=["b"]))
tc = Concat()([t0, t1], axis=1)

tc.data

In [None]:
t1.append(r1, axis=1)

In [None]:
t1.data

In [None]:
a = ArtifactFactory().apply([r0])
a

In [None]:
a0 = Artifact({"a": 0, "b": 1}, name=ID().data)
a1 = Artifact({"c": 0, "d": 1}, name=ID().data)

print("aaaaa")
print(a0.data)
print(id(a0))
print()

print("bbbbb")
a0.append(r1)
print(a0.data)
print(id(a0))
print()

print(a0.operator.inputs[0].data)
print(id(a0.operator.inputs[0].data))


In [None]:
a = {}
b = a.copy()
print(id(a))
print(id(b))

In [None]:
acopy = a0.copy()
print(acopy)
print(id(acopy))

In [None]:
print(a0.operator.inputs[0])
print(id(a0.operator.inputs[0]))

In [None]:
ac = Concat()([a0, a1])
print(ac.data)
print(a0.data)
print(a1.data)

In [None]:
df = pd.DataFrame()
df.copy()

n = np.array(1)
n.copy()

from numpy import array_api

In [None]:
x = ([1], )
print(x)
print(id(x[0]))

y = (x[0], )
print(y)
print(id(y[0]))

In [None]:
x = 1

x.copy()

In [None]:
dn = object.__new__(DataNode)
dn.__init__(1)
dn

In [None]:
class A:
    def __init__(self, x: int | None = None) -> None:
        self.x = x
        
    def f(self):
        obj = super().__new__(self.__class__)
        obj.__init__()
        return obj



a = A(1)
print(a.x)
obj1 = object.__new__(a.__class__)
obj2 = object.__new__(A)
print(obj1)
print(obj2)
# print(a.__class__)
# print(A)
a2 = a.f()
print(a2.x)
print(a.x)

In [None]:
data = Array(np.arange(10))
m = data.min()
print(m)
if m.operator is not None:
    print(m.operator.inputs)

In [None]:
a0.operator

In [None]:
@tp.overload
def func(x: int) -> int:
    ...

@tp.overload
def func(x: float) -> float:
    ...

def func(x: int | float) -> int | float:
    if isinstance(x, int):
        return x
    else:
        return x

func(1.0)

In [None]:
r = Record()
r.data["a"] = 1
r.data

r1 = Record()
r1.data

r.data

In [None]:
x = {}
x1 = {"a": 1}
# x.update(x1)
x.get("b", {}).update(x1)
# x = {}.update(x1)

In [None]:
if ():
    print("a")
else:
    print("b")

In [None]:
class A:
    pass

class B(A):
    pass

def func() -> A:
    return B()

In [None]:
import numpy as np

array = Array(np.arange(10))
print(array.data)
mi = array.min()
if mi.operator is not None:
    print(mi.operator.inputs[0])
    
print(array.min())
print(array.max())

In [None]:
d0 = DataNode(pd.Series([0, 1, 2]), name="a")
d1 = DataNode(pd.Series([2, 3, 4]), name="b")

x = {d0.name: d0.data, d1.name: d1.data}
pd.DataFrame(x)

In [None]:
@dataclass
class A:
    x: int
    
    def __post_init__(self):
        self.y = None
        
    def f(self):
        self.y += 1
        
a = A(1)
a.y




In [None]:
import jijzept as jz
import jijmodeling as jm

sampler = jz.JijSASampler(config="/home/d/.jijzept/config.toml")

problem = jm.Problem("sample")
x = jm.Binary("x", 5)
problem += x[:]
problem += jm.Constraint("onehot", x[:] == 1)

res = sampler.sample_model(problem, {})

In [None]:
res.feasible()

In [None]:
id0 = ID(name="benchmark_id")
energy = Energy(np.array(res.evaluation.energy))
objective = Objective(np.array(res.evaluation.objective))
if res.evaluation.constraint_violations is not None:
    const_name, const_values  = list(res.evaluation.constraint_violations.items())[0]
    constraint_violation = ConstraintViolation(np.array(const_values), const_name)
else:
    constraint_violation = ConstraintViolation()

print(energy)
print(objective)
print(constraint_violation)

In [None]:
record = Record(
    name=id0.data,
    children=[id0, energy, objective, constraint_violation]
)
record.to_dict()

In [None]:
record.to_series()

In [None]:
table = Table(children=[record])
table.to_dataframe()

In [None]:
table.append(record)
table.to_dataframe()

In [None]:
artifact = Artifact(children=[record])
artifact.to_dict()

In [None]:
id1 = ID(name="benchmark_id")
record2 = Record(
    name=id1.data,
    children=[id1, energy, objective, constraint_violation]
)
artifact.append(record2)
artifact.to_dict()

In [None]:
print(id0)
print(id1)

In [None]:
experiment = Experiment()

In [None]:
experiment.append(record)
experiment.append(record2)


In [None]:
df = experiment.table.to_dataframe()
df

In [None]:
experiment.table.children

In [None]:
Date()

In [None]:
ID()

In [None]:
experiment.table.children

In [None]:
ID().data

In [None]:
p = pathlib.Path("./a")
(p / "b").mkdir(parents=True)

In [None]:
pd.Timestamp("2022-01-01")

In [None]:
class ArrayVisitor:
    def min(self, array: Array):
        return float(array.data.min())
    

array = Array(np.arange(10))
array.accept_min(ArrayVisitor())


In [None]:
from chainer.variable import Variable, VariableNode
from chainer.function_node import FunctionNode
from chainer.function import FunctionAdapter

a = Variable(np.array(10.0), name="a")
x = Variable(np.array(1.0), name="x")
y = Variable(np.array(2.0), name="y")
z = a * x + y

z.backward(retain_grad=True)

print(z)
print(z.node)
print("=======")
print(z.node.creator.inputs)
print(z.node.creator.inputs[0].data)
print(z.node.creator.inputs[0].name)
print(z.node.creator.inputs[1].data)
print(z.node.creator.inputs[1].name)
print("@@@@@@@@@@")
print(z.node.creator.inputs[0].creator.inputs[0].data)
print(z.node.creator.inputs[0].creator.inputs[0].name)
print(z.node.creator.inputs[0].creator.inputs[1].data)
print(z.node.creator.inputs[0].creator.inputs[1].name)
print(z.node.creator.inputs[1].data)
print(z.node.creator.inputs[1].name)
print("++++++++++")
print(z.node.creator.inputs[0].creator.inputs[0].creator)
print(z.node.creator.inputs[0].creator.inputs[1].creator)
print()



In [None]:
from chainer.functions import add

In [None]:
s1 = pd.Series([1, 2, 3], index=["a", "b", "c"])
s2 = pd.Series([1, 2, 3], index=["a", "b", "c"])

In [None]:
pd.concat([s1], axis=1).T

In [None]:
# Tn = tp.TypeVar("Tn")

# class FunctionNode(tp.Generic[Tn]):
#     def __init__(
#         self, 
#         name: str | None = None, 
#         children: list[Tn] | None = None
#     ) -> None:
#         self._name = name``
#         self._children = children
#         
#     @property
#     def name(self) -> str | None:
#         return self._name
#     
#     @property
#     def children(self) -> list[Tn] | None:
#         return self._children
#     
#     def _narrow_children_type(self) -> list[Tn]:
#         if self.children is None:
#             raise ValueError(
#                 "Value of attribute 'children' is None, which means this node has no children. Therefore, it cannot operate node futher more."
#             )
#         return self.children
#     
#     def append(self, node: Tn) -> None:
#         children = self._narrow_children_type()
#         children.append(node)
#     
    
            
# Tn = tp.TypeVar("Tn", bound="BaseNode")
# 
# class BaseNode(tp.Generic[Tn]):
#     pass
# 
# @dataclass
# class DataNode(BaseNode[Tn]):
#     data: tp.Any = None
#     name: str | None = None
#     children: list[Tn] | None = None
#     
#     
# class FunctionNode(BaseNode[Tn]):
#     def __init__(
#         self, 
#         name: str | None = None, 
#         children: list[Tn] | None = None
#     ) -> None:
#         self.name = name
#         if children is None:
#             self.children = []
#         else:
#             self.children = children
# 