In [125]:
#hide
#default_exp utils.convert_legacy
from nbdev.showdoc import *
from dsblocks.utils.nbdev_utils import nbdev_setup, TestRunner

nbdev_setup ()
tst = TestRunner (targets=['dummy'])
#tst = TestRunner (targets=[])

# Convert legacy data format

> Utilities for converting data to/from legacy format

In [126]:
#export
import pandas as pd
import numpy as np
import pickle
import os
import sys
import time
import warnings
warnings.filterwarnings('ignore')

from hpsearch.config import hp_defaults as dflt
from hpsearch.utils.experiment_utils import read_df, write_df

In [127]:
#for tests
import pytest
import pandas as pd
from IPython.display import display
import datetime

from dsblocks.utils.nbdev_utils import md
from dsblocks.utils.utils import remove_previous_results

from hpsearch.utils.experiment_utils import read_df, write_df
from hpsearch.config import hp_defaults as dflt

## Convert `experiment_data` from and to older version

## legacy data format

### get_parameters_columns

In [128]:
#export
def get_parameters_columns (experiment_data, only_not_null=False):
    parameters =  [par for par in experiment_data.columns if not par[0].isdigit() and (par.find('time_')<0) and (par.find('date')<0)]
    if only_not_null:
        parameters = np.array(parameters)[~experiment_data.loc[:,parameters].isnull().all(axis=0)].tolist()
    return parameters

### get_scores_columns

In [129]:
#export
def get_scores_columns (experiment_data=None, suffix_results='', class_ids = None):
    """
    Determine the columnns that provide evaluation scores. 
    
    We assume that they start with the class number, and that the other columns 
    do not start with a digit.
    """
    if class_ids is not None:
        scores_columns = ['%d%s' %(col,suffix_results) for col in class_ids]
    else:
        if experiment_data is None:
            raise ValueError ('Either experiment_data or class_ids should be different than None')
        scores_columns = [col for col in experiment_data.columns if col[0].isdigit()]
        # For some experiments, we have multiple scores per class (e.g., due to different evaluation criteria). The argument suffix_results can be used to select the appropriate score.
        if len(suffix_results) > 0:
            scores_columns = [col for col in scores_columns if (len(col.split(suffix_results))==2) and (len(col.split(suffix_results)[1])==0) and (col.split(suffix_results)[0].isdigit()) ]
        else:
            # We assume that default scores are in columns whose names only have the class number 
            scores_columns = [col for col in scores_columns if (len(col.split('_'))>=1)]
    return scores_columns

### get_scores_names

In [130]:
#export
def get_scores_names (experiment_data=None, run_number=None, experiment=None, only_valid=True, 
                      sort=False):
    """ 
    Determine the names of the scores included in experiment data. 
    
    We assume that the score columns start with the class number, and that the other columns do not start with a digit.

    If run_number is provided, we provide the scores stored for that run number. If, in addition to this, 
    experiment is provided, and only_valid=True, we provide only the scores that are not NaN for the given 
    experiment number.
    """
    
    if run_number is None:
        scores_names = np.unique([('_'.join(col.split('_')[1:]) if (len(col.split('_')) > 1) else '') 
                                    for col in experiment_data.columns if col[0].isdigit()])
        
    else:
        scores_names = [col.split(f'{run_number}')[1] for col in experiment_data.columns if col.startswith(str(run_number))]
        scores_names = [('_'.join(col.split('_')[1:]) if (len(col.split('_')) > 1) else '')
                                    for col in scores_names]
        if (experiment is not None) and only_valid:
            scores_names = [name for name in scores_names if not np.isnan(experiment_data.loc[experiment, f'{run_number}_{name}'])]
        if sort:
            scores_names = list(np.sort(scores_names))
    # remove special names
    scores_names = [name for name in scores_names if name != 'finished']
    return scores_names

## `update_data_format`

In [131]:
#export
def update_data_format (df):
    par_cols_src = get_parameters_columns(df)
    par_cols_dst = pd.MultiIndex.from_product ([[dflt.parameters_col], par_cols_src, ['']])
    par_df = df[par_cols_src]
    par_df.columns = par_cols_dst

    score_cols_src = [c for c in get_scores_columns (df) if not c.endswith('finished')]
    score_cols_src = np.sort(score_cols_src).tolist()
    run_number = np.unique([c.split('_')[0] for c in score_cols_src])
    scores_names = get_scores_names (df)
    scores_names = np.sort(scores_names).tolist()
    score_cols_dst = pd.MultiIndex.from_product ([[dflt.scores_col], scores_names, run_number.tolist()])
    scores_dst_sort = np.sort(pd.MultiIndex.from_tuples([(t[0], t[2], t[1]) for t in score_cols_dst]))
    score_cols_dst = pd.MultiIndex.from_tuples([(t[0], t[2], t[1]) for t in scores_dst_sort])
    score_df = df[score_cols_src]
    score_df.columns = score_cols_dst

    finished_cols_src = [c for c in get_scores_columns (df) if c.endswith('finished')]
    finished_cols_src = np.sort(finished_cols_src).tolist()
    finished_cols_dst = pd.MultiIndex.from_product ([[dflt.run_info_col], ['finished'], run_number.tolist()])
    finished_df = df[finished_cols_src]
    finished_df.columns = finished_cols_dst

    time_cols_src = [c for c in df.columns if c.startswith('time')]
    time_cols_src = np.sort(time_cols_src).tolist()
    time_cols_dst = pd.MultiIndex.from_product ([[dflt.run_info_col], ['time'], run_number.tolist()])
    time_df = df[time_cols_src]
    time_df.columns = time_cols_dst

    date_cols_src = [c for c in df.columns if c.startswith('date')]
    date_cols_src = np.sort(date_cols_src).tolist()
    date_cols_dst = pd.MultiIndex.from_product ([[dflt.run_info_col], ['date'], run_number.tolist()])
    date_df = df[date_cols_src*len(date_cols_dst)]
    date_df.columns = date_cols_dst

    df = pd.concat ([par_df, score_df, finished_df, time_df, date_df], axis=1)
    df = df[df.columns.sort_values()]
    
    return df

### Usage

In [132]:
#exports tests.utils.test_convert_legacy
def generate_data ():
    df = pd.DataFrame ([[0.1, 0.05, 0.6, 0.5, 0.0034384727478027344,
            datetime.time(10, 42, 26, 630428), True, 0.61, 0.51,
            0.002204418182373047, True, 0.62, 0.52, 0.002073526382446289, True],
           [0.2, 0.05, 0.7000000000000001, 0.6000000000000001,
            0.0020360946655273438, datetime.time(10, 42, 26, 669600), True,
            None, None, None, None, None, None, None, None]])
    df.columns = ['offset', 'rate', '0_validation_accuracy', '0_test_accuracy', 'time_0',
                                   'date', '0_finished', '1_validation_accuracy', '1_test_accuracy',
                                   'time_1', '1_finished', '2_validation_accuracy', '2_test_accuracy',
                                   'time_2', '2_finished']
    return df

def test_update_data_format ():
    # get data
    df = generate_data ()
    display (df)
    
    # run function
    df = update_data_format (df)

    # check results
    np.testing.assert_array_equal (df[('scores','validation_accuracy')].values, 
                               np.array([[0.6, 0.61, 0.62], [0.7000000000000001, np.nan, np.nan]]))
    np.testing.assert_array_equal (df[('scores','test_accuracy')].values, 
                               np.array([[0.5, 0.51, 0.52], [0.6000000000000001, np.nan, np.nan]]))

In [133]:
tst.run (test_update_data_format, tag='dummy')

running test_update_data_format


Unnamed: 0,offset,rate,0_validation_accuracy,0_test_accuracy,time_0,date,0_finished,1_validation_accuracy,1_test_accuracy,time_1,1_finished,2_validation_accuracy,2_test_accuracy,time_2,2_finished
0,0.1,0.05,0.6,0.5,0.003438,10:42:26.630428,True,0.61,0.51,0.002204,True,0.62,0.52,0.002074,True
1,0.2,0.05,0.7,0.6,0.002036,10:42:26.669600,True,,,,,,,,


## `update_and_replace_experiment_data`

In [134]:
#export
def update_and_replace_experiment_data (path_experiments):
    df = read_df (path_experiments)
    df = update_data_format (df)
    write_df (df, path_experiments)

### Usage

In [141]:
#exports tests.utils.test_convert_legacy
def test_update_and_replace_experiment_data ():
    path_experiments = 'test_update_and_replace_experiment_data'
    os.makedirs (path_experiments, exist_ok=True)
    
    # get and write data
    df = generate_data ()
    write_df (df, path_experiments)
    display (df)
    
    # run function
    update_and_replace_experiment_data (path_experiments)
    print ('\nfiles written: ', os.listdir (path_experiments))
    
    # check results
    df = read_df (path_experiments)
    np.testing.assert_array_equal (df[('scores','validation_accuracy')].values, 
                               np.array([[0.6, 0.61, 0.62], [0.7000000000000001, np.nan, np.nan]]))
    np.testing.assert_array_equal (df[('scores','test_accuracy')].values, 
                               np.array([[0.5, 0.51, 0.52], [0.6000000000000001, np.nan, np.nan]]))
    
    remove_previous_results (path_experiments)

In [142]:
tst.run (test_update_and_replace_experiment_data, tag='dummy')

running test_update_and_replace_experiment_data


Unnamed: 0,offset,rate,0_validation_accuracy,0_test_accuracy,time_0,date,0_finished,1_validation_accuracy,1_test_accuracy,time_1,1_finished,2_validation_accuracy,2_test_accuracy,time_2,2_finished
0,0.1,0.05,0.6,0.5,0.003438,10:42:26.630428,True,0.61,0.51,0.002204,True,0.62,0.52,0.002074,True
1,0.2,0.05,0.7,0.6,0.002036,10:42:26.669600,True,,,,,,,,



files written:  ['experiments_data.csv', 'experiments_data.pk']
