Evaluating machine learning algorithms typically requires
1. comparing ones own algorithm to the state of the art.
2. repeating an experiment multiple times to obtain reliable results.

One way to do this is to loop over all algorithms, data sets, and experiment repetitions or folds:

In [1]:
import copy

import pandas as pd
from sklearn.model_selection import KFold
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import fetch_openml

def load_dataset_with_id(id):
    ds = fetch_openml(data_id=id, parser='auto')
    data = ds.data
    target = ds.target
    return data, target

iris_data, iris_targets = load_dataset_with_id(61)

algs = {"Decision Tree": DecisionTreeClassifier(),
        "Random Forest": RandomForestClassifier()}
datasets = {"Mushrooms": (iris_data, iris_targets)}
n_folds = 5

result = []
for alg_name, alg_blueprint in algs.items():
    alg = copy.deepcopy(alg_blueprint)
    for ds_name, (X, y) in datasets.items():
        cv = KFold(n_splits=n_folds, shuffle=True, random_state=0)
        for fold, (train_indices, test_indices) in enumerate(cv.split(X, y)):
            X_train, y_train = X.iloc[train_indices], y.iloc[train_indices]
            X_test, y_test = X.iloc[test_indices], y.iloc[test_indices]
            alg.fit(X_train, y_train)
            accuracy = alg.score(X_test, y_test)
            result.append([alg_name, ds_name, fold, accuracy])

df = pd.DataFrame(result, columns=["Algorithm", "Dataset", "Fold", "Accuracy"])
df.drop("Fold", axis=1).groupby(["Dataset", "Algorithm"]).mean()

Unnamed: 0_level_0,Unnamed: 1_level_0,Accuracy
Dataset,Algorithm,Unnamed: 2_level_1
Mushrooms,Decision Tree,0.946667
Mushrooms,Random Forest,0.953333


Needless to say, this is is incredibly slow! So let's get rid of the nested loops and use multiprocessing instead.

Let's define a function that wraps the code in the inner loop:

In [2]:
def evaluate_alg(alg, X, y, train_indices, test_indices):
    X_train, y_train = X.iloc[train_indices], y.iloc[train_indices]
    X_test, y_test = X.iloc[test_indices], y.iloc[test_indices]
    alg.fit(X_train, y_train)
    accuracy = alg.score(X_test, y_test)
    return accuracy

Now let's create two lists; this is where it gets interesting:
1. `id_list`: a list of lists that allows us to map the results of our experiment to algorithm-dataset-fold combinations.
2. `arguments_list`: a list of lists where each entry corresponds to a list of parameters that we can pass to `evaluate_alg`

Later on, we will join `id_list` with the results to obtain the final dataframe.

We create the lists with the code below. Essentially, we loop over all algorithms, datasets and folds and store their combination for later.

In [3]:
id_list = []
arguments_list = []
for alg_name, alg_blueprint in algs.items():
    alg = copy.deepcopy(alg_blueprint)
    for ds_name, (X, y) in datasets.items():
        cv = KFold(n_splits=n_folds, shuffle=True, random_state=0)
        for fold, (train_indices, test_indices) in enumerate(cv.split(X, y)):
            id_list.append([
                alg_name, ds_name, fold
            ])
            arguments_list.append([
                alg, X, y, train_indices, test_indices
            ])

Now, let us define a function to parallelize our experiment. `run_async` takes
- a function (the one we want to run in parallel),
- a list of lists of arguments for that function,
- the number of jobs to use during parallization,
- and a configurable sleep time that controls how frequently we check if our function executions are finished.

Note that `run_async` returns the results in the same order as specified by the `args_list`. This allows mapping the results to the configurations stored in `id_list`.

In [7]:
from time import sleep
from multiprocessing import Pool


def run_async(function, args_list, njobs, sleep_time_s = 0.1):
    pool = Pool(njobs)
    results = {i: pool.apply_async(function, args=args)
               for i, args in enumerate(args_list)}
    while not all(future.ready() for future in results.values()):
        sleep(sleep_time_s)
    results = [results[i].get() for i in range(len(results))]
    pool.close()
    return results

Finally, let us run our experiment in parallel:

In [None]:
from multiprocessing import cpu_count

njobs = cpu_count()
results = run_async(evaluate_alg, arguments_list, njobs=njobs)

We can now join the `results` with the previously created `id_list`:

In [None]:
final_data = [
    id_list_item.append(result) for id_list_item in zip(id_list, results)
]
final_df = pd.DataFrame(final_data, columns=["Algorithm", "Dataset", "Fold", "Accuracy"])