In [None]:
# Uncomment the below line to download the dataset (approx. 2gb)
# !curl -O https://storage.googleapis.com/nasbench/nasbench_full.tfrecord",
# Initialize the NASBench object which parses the raw data into memory
# This may take a few minutes, about 2 on my machine
import copy
import numpy as np
import random
from nasbench import api
from typing import List
from nasbench.api import ModelSpec
from dataclasses import dataclass
from functools import lru_cache
from tqdm.notebook import tqdm

In [None]:
nasbench = api.NASBench("nasbench_full.tfrecord")

In [None]:
INPUT = "input"
OUTPUT = "output"
CONV3X3 = "conv3x3-bn-relu"
CONV1X1 = "conv1x1-bn-relu"
MAXPOOL3X3 = "maxpool3x3"

NUM_VERTICES = 7
MAX_EDGES = 9

EDGE_SPOTS = NUM_VERTICES * (NUM_VERTICES - 1) / 2  # Upper triangular matrix
OP_SPOTS = NUM_VERTICES - 2  # Input/output vertices are fixed

ALLOWED_OPS = [CONV3X3, CONV1X1, MAXPOOL3X3]
ALLOWED_EDGES = [0, 1]  # Binary adjacency matrix

RNG_SEED = 42

In [None]:
@dataclass
class ModelData:
    """
    Wraps up resulting data from NASBench.query in a class for code cleanliness.
    """
    matrix: List[List[int]]
    operations: List[str]
    parameters: int
    train_time: float
    train_accuracy: float
    valid_accuracy: float
    test_accuracy: float


class SpecWrapper(ModelSpec):
    """
    Wraps a model to allow easier access to the resulting data of the model.
    """

    def __lt__(self, other: "SpecWrapper"):
        if not (isinstance(other, SpecWrapper)):
            return False

        return self.get_data().test_accuracy < other.get_data().test_accuracy

    @lru_cache(maxsize=1)
    def get_data(self):
        """
        Get resultant data from NASBench.
        The LRU cache ensures we don"t add time to the budget counters multiple times.
        :return:
        """
        data = nasbench.query(self)
        return ModelData(
            data["module_adjacency"],
            data["module_operations"],
            data["trainable_parameters"],
            data["training_time"],
            data["train_accuracy"],
            data["validation_accuracy"],
            data["test_accuracy"]
        )

    def __repr__(self):
        return self.hash_spec(nasbench.config["available_ops"])

In [None]:
n_specs = len(list(nasbench.hash_iterator()))
all_specs = tqdm(map(lambda x: nasbench.get_metrics_from_hash(x), nasbench.hash_iterator()), total=n_specs)
all_specs = [SpecWrapper(matrix=spec[0]["module_adjacency"], ops=spec[0]["module_operations"]) for spec in all_specs]

In [None]:
def random_spec() -> SpecWrapper:
    return random.choice(all_specs)

In [None]:
def sel_best(population: List[SpecWrapper], k: int) -> List[SpecWrapper]:
    """
    Select the top k candidates amongst the entire population.
    :param population: The population to select from.
    :param k: The number of top individuals to select.
    :return: The selected individuals.
    """
    population.sort()
    return population[-k:]


def sel_random(population: List[SpecWrapper], k: int) -> List[SpecWrapper]:
    """
    Select k individuals from the population at random.
    :param population: The population to select from.
    :param k: The number of individuals to select.
    :return: The selected individuals.
    """
    return random.sample(population, k)


def sel_tournament(population: List[SpecWrapper], k: int, n: int) -> List[SpecWrapper]:
    """
    Select the n best individuals from amongst k randomly selected candidates.
    :param population: The population to select from.
    :param k: The number of candidates to randomly select.
    :param n: The number of top candidates to be returned.
    :return: The selected individuals.
    """
    candidates = sel_random(population, k)
    return sel_best(candidates, n)

In [None]:
def run_random_search(k: int, max_time: float):
    """
    Runs a simulated random search for a fixed amount of time. Actual time taken will be
    relatively quick, only the simulated time is considered. Will return all evaluated models,
    along with the k best models found.
    :param k: Number of best models to return
    :param max_time_budget: The amount of simulated time to expend.
    :return: All models evaluated, along with the best k models.
    """

    # nasbench tracks time taken with each query, effectively simulating full
    # training, as we can query how long into the training we would be if we
    # were training ourselves. we reset the counters at the top of each experiment
    np.random.seed(RNG_SEED)
    random.seed(RNG_SEED)
    nasbench.reset_budget_counters()

    best: List[SpecWrapper] = []
    models: List[SpecWrapper] = []
    unique_hashes: List[str] = []
    collisions: int = 0

    time_spent, _ = nasbench.get_budget_counters()
    while time_spent < max_time:
        last_time = time_spent
        spec = random_spec()
        spec_hash = spec.hash_spec(nasbench.config["available_ops"])
        assert time_spent == last_time

        if spec_hash in unique_hashes:
            collisions += 1
            continue
        else:
            unique_hashes.append(spec_hash)

        models.append(spec)
        best = sel_best(models, k)
        time_spent, _ = nasbench.get_budget_counters()

    print(f"{len(unique_hashes)} unique models during random search.")
    print(f"{collisions} collisions during random search.")

    return models, best

In [None]:
all, best = run_random_search(10, 5e8)