In [61]:
from determined.experimental import client
from determined.common import yaml
import pathlib
import time

In [62]:
import threading
import queue

In [68]:
model_dir = pathlib.Path("./cifar10")
exp_conf_path = model_dir / "const.yaml"
exp_conf = yaml.safe_load(exp_conf_path.read_text())

In [69]:
val_metric_name = "validation_error"
hp_search = {
    "learning_rate": [0.0001, 0.001, 0.01, 0.1]
}
terminal_trial_states = [
    "CANCELED", "COMPLETED", "ERROR"
]

In [75]:
def monitor_trial(trial, interval):
    prev_val_steps = None
    prev_best_val = None
    steps_threshold = 5
    
    while trial.state.name not in terminal_trial_states:
        trial.reload()
        print(f"Trial {trial.id}: {trial.state.name}")
        summary_metrics = trial.summary_metrics
        if not summary_metrics or "validation_metrics" not in summary_metrics:
            time.sleep(interval)
            continue
        current_val = summary_metrics["validation_metrics"][val_metric_name]["min"]
        current_steps = summary_metrics["validation_metrics"][val_metric_name]["count"]
        
        if prev_val_steps is not None and prev_best_val is not None:
            early_stop = should_early_stop(prev_best_val, prev_val_steps, current_val, current_steps, steps_threshold)
            if early_stop:
                print(f"Early stopping trial {trial.id} due to no improvement for {val_metric_name} for {steps_threshold} steps.")
                trial.kill()
                
        time.sleep(interval)

def create_experiment_with_hparams(hp_name, hp_val, val_metric_name, trial_queue):
    print(f"Starting experiment with {hp_name}={hp_val}")
    exp_conf["hyperparameters"][hp_name] = hp_val

    exp = client.create_experiment(config=exp_conf, model_dir=model_dir)

    trial = exp.await_first_trial()
    trial_queue.put(trial.id)

    monitor_trial(trial, 5)
    
def should_early_stop(prev_best_val, prev_val_steps, current_best_val, current_val_steps, stop_threshold):
    """
    Primitive early stopping: returns True if a trial's searcher validation metric has not improved within a specified number of steps, else False.
    """
    if prev_val_steps + stop_threshold <= current_val_steps and current_best_val == prev_best_val:
        return True
    return False


In [74]:
trial_queue = queue.Queue()
exp_threads = []

for hp_name, hp_vals in hp_search.items():
    for hp_val in hp_vals:
        exp_thread = threading.Thread(target=create_experiment_with_hparams, args=(hp_name, hp_val, val_metric_name, trial_queue))
        exp_threads.append(exp_thread)
        exp_thread.start()

for thread in exp_threads:
    thread.join()

print(f"All trials completed. Generating summary report.")
trial_vals = []
for trial_id in trial_queue.queue:
    trial = client.get_trial(trial_id=trial_id)
    
    # Smaller is better
    trial_best_val = trial.summary_metrics["validation_metrics"][val_metric_name]["min"]
    
    for hparam in hp_search.keys():  
        trial_vals.append({
            "trial_id": trial.id,
            "hparam_name": hparam,
            "hparam_val": trial.hparams[hparam],
            "val_metric_name": val_metric_name,
            "best_val_metric": trial_best_val,
        })

trial_vals.sort(key=lambda x: x["best_val_metric"])

print("=" * 100)
print(f"Hyperparameter space: {hp_search}")
print(f"Trials completed: {len(trial_vals)}")
print(f"Best validation: {trial_vals[0]}")

Starting experiment with learning_rate=0.0001
Starting experiment with learning_rate=0.001d 0 files
Preparing files to send to master... 6.5KB and 6 files                                                        
Preparing files to send to master... 6.5KB and 6 files
Trial 61 QUEUED
Trial 62 QUEUED
Trial 61 RUNNING
Trial 62 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 62 RUNNING
Trial 62 RUNNING
Trial 61 RUNNING
Trial 61 COMPLETEDTrial 62 STOPPING_COMPLETED

Trial 62 COMPLETED
All trials completed. Generating summary report.
Hyperparameter space: {'learning_rate': [0.

In [119]:
from determined.common.experimental.trial import LogLevel

# Filtering logs -> get all debug logs from specific agent before a specific timestamp.
for log in trial.logs(search_text="a658e1d8", min_level=LogLevel.DEBUG):
    print(log)

[2023-07-19T21:04:50.912994Z] a658e1d8 || INFO: [29] root: New trial runner in (container a658e1d8-5fed-439e-ab53-7f26e89bdba8) on agent i-0610c08d001e1c06d: {"bind_mounts": [], "checkpoint_policy": "best", "checkpoint_storage": {"access_key": "********", "bucket": "det-python-sdk-demo-us-west-2-573932760021", "endpoint_url": null, "prefix": null, "save_experiment_best": 0, "save_trial_best": 1, "save_trial_latest": 1, "secret_key": "********", "type": "s3"}, "data": {}, "debug": false, "description": null, "entrypoint": "model_def:CIFARTrial", "environment": {"image": {"cpu": "determinedai/environments:py-3.8-pytorch-1.12-tf-2.11-cpu-14cb565", "cuda": "determinedai/environments:cuda-11.3-pytorch-1.12-tf-2.11-gpu-14cb565", "rocm": "determinedai/environments:rocm-5.0-pytorch-1.10-tf-2.7-rocm-14cb565"}, "environment_variables": {"cpu": [], "cuda": [], "rocm": []}, "proxy_ports": [], "ports": {"trial": 1734}, "registry_auth": null, "force_pull_image": false, "pod_spec": null, "add_capabil