Skip to content

Commit

Permalink
Merge pull request #263 from jakobj/enh/faster-caching
Browse files Browse the repository at this point in the history
Improved caching decorator
  • Loading branch information
jakobj committed Jan 5, 2021
2 parents 8a159fc + 2f50e50 commit 20ee3cb
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 40 deletions.
113 changes: 77 additions & 36 deletions cgp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
import hashlib
import os
import pickle
from typing import Any, Callable, Dict, List, Tuple, Type, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, Type, Union

import numpy as np

from .individual import IndividualMultiGenome, IndividualSingleGenome
from .node import Node, primitives_dict

if TYPE_CHECKING:
import multiprocessing as mp # noqa: F401


def __check_cache_consistency(fn: str, func: Callable[..., float]) -> None:
"""Retrieve an entry from the cache, execute the callable with the
Expand All @@ -19,7 +22,9 @@ def __check_cache_consistency(fn: str, func: Callable[..., float]) -> None:
can be found in the cache.
"""
cached_item: Union[Dict[str, Any], None] = __find_item_with_finite_return_value(fn)
cached_item: Union[Dict[str, Any], None] = __find_args_and_return_value_for_consistency_check(
fn
)
if cached_item is None:
return

Expand All @@ -33,21 +38,21 @@ def __check_cache_consistency(fn: str, func: Callable[..., float]) -> None:
)


def __find_item_with_finite_return_value(fn: str) -> Union[Dict[str, Any], None]:
"""Find a cache entry which has a finite return value."""
def __find_args_and_return_value_for_consistency_check(fn: str) -> Union[Dict[str, Any], None]:
"""Try to retrieve argument and return value for consistency check."""
if os.path.isfile(fn):
with open(fn, "rb") as f:
while True:
try:
cursor: Dict[str, Any] = pickle.load(f)
except EOFError:
return None # no entry yet, so not possible to check

cached_item: Dict[str, Any] = list(cursor.values())[0]
if np.isfinite(cached_item["return_value"]):
return cached_item
else:
return None
try:
res: Dict[str, Any] = pickle.load(f)
except EOFError:
return None

if "args_return_value_consistency_check" in res and np.all(
np.isfinite(res["args_return_value_consistency_check"]["return_value"])
):
return res["args_return_value_consistency_check"]

return None


def __compute_key_from_args(*args: Any, **kwargs: Any) -> str:
Expand Down Expand Up @@ -97,26 +102,44 @@ def __compute_key_from_evaluation(
def __find_result_in_cache_file(fn: str, key: str) -> Union[float, None]:
if os.path.isfile(fn):
with open(fn, "rb") as f:
while True:
try:
cursor: Dict[str, Any] = pickle.load(f)
except EOFError:
break

if key in cursor:
return cursor[key]["return_value"]
try:
res = pickle.load(f)
except EOFError:
return None

if key in res:
return res[key]
else:
return None

return None


def __store_new_cache_entry(
fn: str, key: str, return_value: float, args: Tuple, kwargs: Dict[str, Any]
) -> None:
with open(fn, "ab") as f:

result = {"args": args, "kwargs": kwargs, "return_value": return_value}
res: Dict[str, Any]
try:
with open(fn, "rb") as f:
res = pickle.load(f)
except (EOFError, FileNotFoundError):
res = {}

res[key] = return_value

pickle.dump({key: result}, f)
if "args_return_value_consistency_check" not in res or not np.all(
np.isfinite(res["args_return_value_consistency_check"]["return_value"])
):
res["args_return_value_consistency_check"] = {
"args": args,
"kwargs": kwargs,
"return_value": return_value,
}

with open(fn, "wb") as f:
pickle.dump(res, f)


def disk_cache(
Expand All @@ -126,7 +149,8 @@ def disk_cache(
fec_seed: int = 0,
fec_min_value: float = -100.0,
fec_max_value: float = 100.0,
fec_batch_size: int = 10
fec_batch_size: int = 10,
file_lock: Union[None, "mp.synchronize.Lock"] = None,
) -> Callable[[Callable[..., float]], Callable[..., float]]:
"""Cache function return values on disk.
Expand Down Expand Up @@ -169,16 +193,19 @@ def disk_cache(
----------
fn : str
Name of the cache file.
use_fec : bool
Whether to use functional equivalance checking.
fec_seed : int
Seed value for fec.
fec_min_value : float
Minimal value for fec input samples.
fec_max_value : float
Maximal value for fec input samples.
fec_batch_size : int
Number of fec input samples.
use_fec : bool, optional
Whether to use functional equivalance checking. Defaults to False.
fec_seed : int, optional
Seed value for fec. Defaults to 0.
fec_min_value : float, optional
Minimal value for fec input samples. Defaults to -100.0.
fec_max_value : float, optional
Maximal value for fec input samples. Defaults to 100.0.
fec_batch_size : int, optional
Number of fec input samples. Defaults to 10.
file_lock : multiprocessing.synchronize.Lock, optional
Lock to make sure only a single process reads from/write to
cache file. Defaults to None.
Returns
-------
Expand All @@ -201,13 +228,27 @@ def wrapper(*args: Any, **kwargs: Any) -> Union[float, None]:
else:
key = __compute_key_from_args(*args, **kwargs)

if file_lock is not None:
file_lock.acquire()

result_value_cached: Union[float, None] = __find_result_in_cache_file(fn, key)

if file_lock is not None:
file_lock.release()

if result_value_cached is not None:
return result_value_cached

return_value: float = func(*args, **kwargs)

if file_lock is not None:
file_lock.acquire()

__store_new_cache_entry(fn, key, return_value, args, kwargs)

if file_lock is not None:
file_lock.release()

return return_value

return wrapper
Expand Down
4 changes: 3 additions & 1 deletion examples/example_fec_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import multiprocessing as mp
import time

import numpy as np
Expand Down Expand Up @@ -42,6 +43,7 @@ def f_target(x):
fec_min_value=-10.0,
fec_max_value=10.0,
fec_batch_size=5,
file_lock=mp.Lock(),
)
def inner_objective(ind):
"""The caching decorator uses the return values generated from
Expand Down Expand Up @@ -80,7 +82,7 @@ def objective(individual):

params = {
"population_params": {"n_parents": 10, "mutation_rate": 0.05, "seed": 8188211},
"ea_params": {"n_offsprings": 10, "tournament_size": 1, "n_processes": 1},
"ea_params": {"n_offsprings": 10, "tournament_size": 1, "n_processes": 2},
"genome_params": {
"n_inputs": 1,
"n_outputs": 1,
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# encoding: utf8
import re
from setuptools import setup
from collections import defaultdict

from setuptools import setup


def _cut_version_number_from_requirement(req):
return req.split()[0]
Expand Down
77 changes: 75 additions & 2 deletions test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import concurrent.futures
import functools
import multiprocessing as mp
import tempfile
import time

import numpy as np
import pytest

import cgp
from cgp.genome import ID_INPUT_NODE, ID_NON_CODING_GENE, ID_OUTPUT_NODE


@pytest.mark.parametrize("individual_type", ["SingleGenome", "MultiGenome"])
Expand Down Expand Up @@ -136,13 +138,84 @@ def recording_callback(pop):
assert fitness == pytest.approx(fitness_decorated)


@cgp.utils.disk_cache(tempfile.mkstemp()[1], use_fec=True)
def _fec_cache_decorator_with_multiple_inputs_multiple_outputs_objective(ind):
f = ind.to_numpy()
x = np.array([[1.0, 2.0], [3.0, 4.0]])
y = f(x)
return y


def test_fec_cache_decorator_with_multiple_inputs_multiple_outputs(genome_params):

genome_params = {
"n_inputs": 2,
"n_outputs": 2,
"n_columns": 1,
"n_rows": 1,
"levels_back": None,
"primitives": (cgp.Add, cgp.Sub),
}

genome0 = cgp.Genome(**genome_params)
# [f0(x), f1(x)] = [x_0, x_0 + x_1]
genome0.dna = [
ID_INPUT_NODE,
ID_NON_CODING_GENE,
ID_NON_CODING_GENE,
ID_INPUT_NODE,
ID_NON_CODING_GENE,
ID_NON_CODING_GENE,
0,
0,
1,
ID_OUTPUT_NODE,
0,
ID_NON_CODING_GENE,
ID_OUTPUT_NODE,
2,
ID_NON_CODING_GENE,
]
ind0 = cgp.IndividualSingleGenome(genome0)

genome1 = cgp.Genome(**genome_params)
# [f0(x), f1(x)] = [x_0, x_0 - x_1]
genome1.dna = [
ID_INPUT_NODE,
ID_NON_CODING_GENE,
ID_NON_CODING_GENE,
ID_INPUT_NODE,
ID_NON_CODING_GENE,
ID_NON_CODING_GENE,
1,
0,
1,
ID_OUTPUT_NODE,
0,
ID_NON_CODING_GENE,
ID_OUTPUT_NODE,
2,
ID_NON_CODING_GENE,
]
ind1 = cgp.IndividualSingleGenome(genome1)

y0 = _fec_cache_decorator_with_multiple_inputs_multiple_outputs_objective(ind0)

# next call should *not* use the cached value despite the first
# dimension being identical
y1 = _fec_cache_decorator_with_multiple_inputs_multiple_outputs_objective(ind1)

assert y0[:, 0] == pytest.approx(y1[:, 0])
assert y0[:, 1] != pytest.approx(y1[:, 1])


@cgp.utils.disk_cache(tempfile.mkstemp()[1])
def _cache_decorator_objective_single_process(s, sleep_time):
time.sleep(sleep_time) # simulate long execution time
return s


@cgp.utils.disk_cache(tempfile.mkstemp()[1])
@cgp.utils.disk_cache(tempfile.mkstemp()[1], file_lock=mp.Lock())
def _cache_decorator_objective_two_processes(s, sleep_time):
time.sleep(sleep_time) # simulate long execution time
return s
Expand All @@ -164,7 +237,7 @@ def evaluate_objective_on_list(x):
return list(executor.map(objective, x))

sleep_time = 1.0
x = ["test0", "test1"]
x = [0, 1]

# WARNING: below the number of processes is *not* taken into
# account in the timing; one would expect a two-fold speedup when
Expand Down

0 comments on commit 20ee3cb

Please sign in to comment.