Skip to content

Commit

Permalink
MAINT: Improve simulation
Browse files Browse the repository at this point in the history
Make simulation restartable so that interruption is less costly
  • Loading branch information
bashtage committed Apr 16, 2020
1 parent 0b3a29c commit 6d70dd8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
@@ -1,9 +1,11 @@
import argparse
import datetime as dt
import gzip
import os
import pickle
from random import shuffle
import sys
from typing import Optional
from typing import Optional, Tuple

import colorama
from joblib import Parallel, delayed
Expand All @@ -29,6 +31,8 @@
EX_NUM = 100
# Maximum memory of the main simulated data
MAX_MEMORY = 2 ** 21
# Number of iterations between display
DISP_ITERATIONS = 25000

if sys.platform.lower() == "win32":
os.system("color")
Expand Down Expand Up @@ -223,6 +227,35 @@ def block(gen: np.random.Generator, statistic: str, num: int, trend: str) -> NDA
return results


def temp_file_name(full_path: str, gzip=True):
base, file_name = list(os.path.split(full_path))
extension = ".pkl" if not gzip else ".pkl.gz"
temp_file = "partial-" + file_name.replace(".hdf", extension)
return os.path.join(base, temp_file)


def save_partial(
gen: np.random.Generator, results: pd.DataFrame, remaining: int, full_path: str
) -> None:
temp_file = temp_file_name(full_path)
info = {"results": results, "remaining": remaining, "gen": gen}
with gzip.open(temp_file, "wb", 4) as pkl:
pickle.dump(info, pkl)


def load_partial(
gen: np.random.Generator, results: pd.DataFrame, remaining: int, full_path: str
) -> Tuple[np.random.Generator, pd.DataFrame, int]:
temp_file = temp_file_name(full_path)
if os.path.exists(temp_file):
with gzip.open(temp_file, "rb") as pkl:
info = pickle.load(pkl)
gen = info["gen"]
results = info["results"]
remaining = info["remaining"]
return gen, results, remaining


def worker(
gen: np.random.Generator, statistic: str, trend: str, idx: int, full_path: str
):
Expand All @@ -237,6 +270,7 @@ def worker(
results = pd.DataFrame(
index=pd.RangeIndex(EX_SIZE), columns=columns, dtype="double"
)
gen, results, remaining = load_partial(gen, results, remaining, full_path)
start = dt.datetime.now()
last_print_remaining = remaining
while remaining > 0:
Expand All @@ -250,12 +284,13 @@ def worker(
time_per_iter = elapsed.total_seconds() / (EX_SIZE - remaining)
remaining_time = int(time_per_iter * remaining)
rem = str(dt.timedelta(seconds=remaining_time))
if last_print_remaining - remaining >= 10000:
if last_print_remaining - remaining >= DISP_ITERATIONS:
print(
f"Index: {idx}, Statistic: {statistic}, Trend: {trend}, "
f"Remaining: {GREEN}{remaining}{RESET}"
)
print(f"Est. time remaining: {RED}{rem}{RESET}")
save_partial(gen, results, remaining, full_path)
last_print_remaining = remaining
results = results.quantile(QUANTILES)
results.to_hdf(full_path, "results")
Expand Down Expand Up @@ -309,6 +344,15 @@ def worker(
if args.z_only:
print(f"{BLUE}Note{RESET}: Only running Z-type tests")
jobs = [job for job in jobs if job[1] == "z"]
# Reorder jobs to prefer those with partial results first
first = []
remaining = []
for job in jobs:
if os.path.exists(temp_file_name(job[4])):
first.append(job)
else:
remaining.append(job)
jobs = first + remaining
nremconfig = len(jobs)
nconfig = len(children)
print(
Expand Down
4 changes: 3 additions & 1 deletion arch/utility/__init__.py
Expand Up @@ -7,7 +7,9 @@
PKG = os.path.dirname(os.path.dirname(__file__))


def test(extra_args: Optional[Union[str, Sequence[str]]] = None, exit=True) -> None:
def test(
extra_args: Optional[Union[str, Sequence[str]]] = None, exit: bool = True
) -> None:
"""
Test runner that allows testing of installed package.
Expand Down

0 comments on commit 6d70dd8

Please sign in to comment.