# Get OpenML data

In [1]:
import os
import pickle
import sys
import urllib

import numpy as np
import openml
import scipy.io.arff

In [2]:
API_KEY = None
TEMP_DIR = 'run_cache'

In [3]:
if not os.path.exists(TEMP_DIR):
    os.makedirs(TEMP_DIR)

if API_KEY is not None:
    apikey = API_KEY
else:
    apikey = os.environ.get('OPENML_APIKEY', None)
    if apikey is None:
        raise RuntimeError('OpenML API key was not provided.')
openml.config.apikey = apikey

In [4]:
################################################################################

# Testcase
# task_id, flow_id = 31, 5889  # 261, None
task_id, flow_id = 31, 66

##########     ##########     ##########     ##########     ##########

# Top-performing pipeline for each data set (i.e., task)
## Supervised Classification on credit-approval # Gradient Boosting
# task_id, flow_id = 29, 12736

## Supervised Classification on credit-g # Ranger Classifier
# task_id, flow_id = 31, 6794

## Supervised Classification on adult # Boosting with Decision Trees
# task_id, flow_id = 7592, 6970

##########     ##########     ##########     ##########     ##########

# Compare the influence of the shape of decision boundaries
# task_id, flow_id = 31, 1720  # weka.J48 (decision tree) <- 22
# task_id, flow_id = 31, 66    # weka.IBk (k-nearest neighbours) <- 13
# task_id, flow_id = 31, 1820  # weka.MultilayerPerceptron <- 12
## task_id, flow_id = 31, 5920  # weka.MultilayerPerceptron <- xx
# task_id, flow_id = 31, 70    # weka.SMO_PolyKernel (SVM) <- 15
# task_id, flow_id = 31, 72    # weka.SMO_RBFKernel (SVM) <- 33
# task_id, flow_id = 31, 1726  # weka.RandomForest <- 20

################################################################################

temp_dir = TEMP_DIR

# def get_task_predictions(task_id, temp_dir, flow_id=None):
task_path = os.path.join(temp_dir, f'{task_id}')
if not os.path.exists(task_path):
    os.makedirs(task_path)

In [5]:
# get run ids for the task
# for task 31 / flow 66 it takes roughly 5 minutes
runs = openml.runs.list_runs(task=[task_id])

if flow_id is None:
    runs_id = list(runs.keys())
else:
    runs_id = [i for i in runs if runs[i]['flow_id'] == flow_id]

# get the task info
task = openml.tasks.get_task(task_id)
# task_data, task_labels = task.get_X_and_y()
_, task_test_indices = task.get_train_test_split_indices()  # task_train_indices
task_test_indices_sorted = np.sort(task_test_indices)

# get dataset info
dataset_object = openml.datasets.get_dataset(task.dataset_id)
dataset_target_label = dataset_object.default_target_attribute
dataset = dataset_object.get_data()[0]

task_labels_test = dataset[dataset_target_label][task_test_indices_sorted].to_numpy()

In [6]:
print(f'Number of runs in this flow: {len(runs_id)}')

Number of runs in this flow: 13


In [8]:
parallelise_pool = 8

PARALLELISE = not True

In [9]:
if PARALLELISE:
    import multiprocessing

    pool = multiprocessing.Pool(parallelise_pool)
    pool.map(openml.runs.get_run, runs_id)
else:
    # Get predictions from every run of this task
    # flows = []
    runs_list = openml.runs.get_runs(runs_id)

In [10]:
_msg = ('The previous step must be (re-)run without parallelisation to '
        'proceed.')
assert bool(runs_list) and not PARALLELISE, _msg

In [11]:
# The following two step can either be parallelised or not
PARALLELISE = not True

In [12]:
if PARALLELISE:
    # https://code.activestate.com/recipes/82234-importing-a-dynamically-generated-module/
    prl_str = f'''\
import os
import urllib

task_path = 'run_cache/{task_id}'

def download(run):
    url = run.predictions_url
    id_ = run.run_id

    save_path = os.path.join(task_path, f'{{id_}}.arff')

    if not os.path.exists(save_path):
        urllib.request.urlretrieve(url, save_path)
    '''
    with open('download.py', 'w') as f:
        f.write(prl_str)
else:
    def download(run):
        url = run.predictions_url
        id_ = run.run_id

        # flow_id = run.flow_id
        # openml.flows.get_flow(flow_id)
        # flows.append(flow_id)

        save_path = os.path.join(task_path, f'{id_}.arff')

        if not os.path.exists(save_path):
            urllib.request.urlretrieve(url, save_path)

In [13]:
if PARALLELISE:
    import multiprocessing
    import download as dwnd

    pool = multiprocessing.Pool(8)
    pool.map(dwnd.download, runs_list)
else:
    for run in runs_list:
        download(run)

In [14]:
if not True:
    pool.close()
    pool.terminate()

In [15]:
def progress_bar(prog, tot):
    sys.stdout.write('\r')
    sys.stdout.write("[%-99s] %d%%" % ('='*prog, tot))
    sys.stdout.flush()

In [16]:
# load each run
predictions_per_flow = dict()
labels_per_fold = dict()

max_bar = len(runs_id)
for i_bar, id_ in enumerate(runs_id):
    flow_id = runs[id_]['flow_id']

    load_path = os.path.join(task_path, f'{id_}.arff')

    prog_ = int(100*i_bar/max_bar)  # 100
    progress_bar(prog_, prog_)

    arff = scipy.io.arff.loadarff(load_path)
    predictions = arff[0]

    if 'repeat' in predictions.dtype.fields:
        if not np.all(predictions['repeat'] == 0):
            raise RuntimeError('Not executed in a single repeat')
    else:
        raise RuntimeError('repeat column missing')

    if 'fold' in predictions.dtype.fields:
        folds = np.unique(predictions['fold'])
    else:
        raise RuntimeError('fold column missing')

    for f in folds:
        fold_predictions_index = np.where(predictions['fold'] == f)
        fold_predictions = predictions[fold_predictions_index]

        if 'row_id' in fold_predictions.dtype.fields:
            test_row_id_ = np.sort(fold_predictions['row_id'].astype(np.int64))
            if f == 0 and not np.all(test_row_id_ == task_test_indices_sorted):
                raise RuntimeError('Test row id mismatch!')
        else:
            raise RuntimeError('row_id column missing')

        if 'correct' in fold_predictions.dtype.fields:
            correct = np.sort(fold_predictions[['row_id', 'correct']], order=['row_id']
                )['correct']
            # if not np.all(correct == task_labels_test.astype(correct.dtype)):
            #     raise RuntimeError('Test labels mismatch!')
            if f == 0:
                comp = task_labels_test
            else:
                comp = dataset[dataset_target_label][test_row_id_].to_numpy()
            if not np.all(correct == comp.astype(correct.dtype)):
                raise RuntimeError('Test labels mismatch!')
        elif 'truth' in fold_predictions.dtype.fields:
            correct = np.sort(fold_predictions[['row_id', 'truth']], order=['row_id']
                )['truth']
            if f == 0:
                comp = task_labels_test
            else:
                comp = dataset[dataset_target_label][test_row_id_].to_numpy()
            if not np.all(correct == comp.astype(correct.dtype)):
                raise RuntimeError('Test labels mismatch!')
        else:
            raise RuntimeError('correct/truth column missing')
        #
        if f not in labels_per_fold:
            labels_per_fold[f] = correct
        else:
            assert np.all(labels_per_fold[f] == correct)

        if 'prediction' in fold_predictions.dtype.fields:
            predicted = np.sort(fold_predictions[['row_id', 'prediction']], order=['row_id']
                )['prediction']
            
            if flow_id not in predictions_per_flow:
                predictions_per_flow[flow_id] = dict()
            #
            if f in predictions_per_flow[flow_id]:
                predictions_per_flow[flow_id][f].append(predicted)
            else:
                predictions_per_flow[flow_id][f] = [predicted]
        else:
            raise RuntimeError('prediction column missing')
progress_bar(100, 100)



In [17]:
# Preserve processed data
with open(os.path.join(task_path, f'{task_id}_{flow_id}.pickle'), 'wb') as f:
    pickle.dump(dict(
        runs=predictions_per_flow,
        labels=labels_per_fold),
        f)