## Generate sample table for Terra

In [12]:
import os
import operator
from typing import Any
from copy import deepcopy

## Configuration

In [13]:
# default values
sample_set_id = 'second_wave'
experiment_prefix = 'sa_bac'
base_yaml_file = 'gs://fc-secure-edbbad6f-85d8-45a4-8e64-3f4a0ac501ff/base_yaml_files/sa_bac_template_params_second_wave.yaml'
mighty_codes_tar_gz = 'gs://fc-secure-edbbad6f-85d8-45a4-8e64-3f4a0ac501ff/mighty_codes_tarballs/MightyCodes-e8303bd.tar.gz'
final_state = 'N/A'
checkpoint_interval_seconds = 1000
eval_split_size = 128
convergence_abs_tol = 1e-6
convergence_countdown = 10

# where to save the .tsv file
sample_output_path = f'../ws/sa_bac_{sample_set_id}_{experiment_prefix}_sample.tsv'
sample_set_entity_output_path = f'../ws/sa_bac_{sample_set_id}_{experiment_prefix}_sample_set_entity.tsv'
sample_set_membership_output_path = f'../ws/sa_bac_{sample_set_id}_{experiment_prefix}_sample_set_membership.tsv'

# batch settings
code_length_list = [
    6,
    7,
    8,
    9,
    10
]

min_max_hamming_weight_dict = {
    6: [[1, 5], [2, 4]],
    7: [[1, 6], [2, 5]],
    8: [[1, 7], [2, 6]],
    9: [[1, 8], [2, 7]],
    10: [[1, 9], [2, 8]]
}

n_types_dict = {
    6: [16],
    7: [32, 16],
    8: [64, 32, 16],
    9: [128, 64, 32, 16],
    10: [256, 128, 64, 32, 16]
}

source_nonuniformity_list = [
    10.,
    100.,
    1000.
]

channel_model_list = [
    'channel_bac_merfish',
    # 'channel_bsc_10'
]

quality_factor_list = [
    10
]

metric_type_list = [
    'fdr',
    # 'tpr',
    # 'fdr[0.25]'
]

columns = [
    'entity:sample_id',
    'base_yaml_file',
    'mighty_codes_tar_gz',
    'experiment_prefix',
    'channel_model',
    'quality_factor',
    'convergence_abs_tol',
    'convergence_countdown',
    'code_length',
    'min_hamming_weight',
    'max_hamming_weight',
    'n_types',
    'source_nonuniformity',
    'metric_type',
    'checkpoint_interval_seconds',
    'eval_split_size',
    'final_state'
]

identity = lambda x: x

primitive_opts = [
    ('base_yaml_file', [base_yaml_file]),
    ('mighty_codes_tar_gz', [mighty_codes_tar_gz]),
    ('experiment_prefix', [experiment_prefix]),
    ('channel_model', channel_model_list),
    ('quality_factor', quality_factor_list),
    ('convergence_abs_tol', [convergence_abs_tol]),
    ('convergence_countdown', [convergence_countdown]),
    ('code_length', code_length_list),
    ('source_nonuniformity', source_nonuniformity_list),
    ('metric_type', metric_type_list),
    ('checkpoint_interval_seconds', [checkpoint_interval_seconds]),
    ('eval_split_size', [eval_split_size]),
    ('final_state', [final_state])
]

conditional_opts = [
    ('code_length', min_max_hamming_weight_dict,
     [('min_hamming_weight', operator.itemgetter(0)),
      ('max_hamming_weight', operator.itemgetter(1))]),
    ('code_length', n_types_dict,
     [('n_types', identity)])
]

## Generate

In [14]:
# evaluation list
evals = []

# process primitive options
for opt in primitive_opts:
    assert isinstance(opt, tuple)
    assert len(opt) == 2
    assert isinstance(opt[0], str)
    assert isinstance(opt[1], list)
    key = opt[0]
    values = opt[1]
    next_evals = []
    for value in values:
        if len(evals) > 0:
            for prev in evals:
                new = deepcopy(prev)
                new[key] = value
                next_evals.append(new)
        else:
            new = {key: value}
            next_evals.append(new)
    evals = next_evals

# process conditional options
assert len(evals) > 0
for opt in conditional_opts:
    assert isinstance(opt, tuple)
    assert len(opt) == 3
    parent_key = opt[0]
    parent_to_children_value_dict = opt[1]
    children_valuation_manifest_list = opt[2]
    assert isinstance(parent_key, str)
    assert isinstance(parent_to_children_value_dict, dict)
    assert isinstance(children_valuation_manifest_list, list)
    next_evals = []
    for prev in evals:
        assert parent_key in prev
        parent_value = prev[parent_key]
        assert parent_value in parent_to_children_value_dict
        children_value_bundle_list = parent_to_children_value_dict[parent_value]
        assert isinstance(children_value_bundle_list, list)
        for children_value_bundle in children_value_bundle_list:
            new = deepcopy(prev)
            for child_key, child_value_extractor in children_valuation_manifest_list:
                assert child_key not in new, f"{child_key}, {new}"
                new[child_key] = child_value_extractor(children_value_bundle)
            next_evals.append(new)
    evals = next_evals
    
# generate names
for e in evals:
    name = (f"{experiment_prefix}__"
            f"{sample_set_id}__"
            f"{e['channel_model']}__"
            f"{e['code_length']}__{e['min_hamming_weight']}__{e['max_hamming_weight']}__"
            f"{e['n_types']}__"
            f"{int(e['source_nonuniformity'])}__"
            f"{e['quality_factor']}__"
            f"{e['metric_type']}")
    e['entity:sample_id'] = name
    e['entity:sample_set_id'] = sample_set_id

## Save

In [15]:
# generate .tsv file
with open(sample_output_path, 'w') as f:
    f.write('\t'.join(columns) + '\n')
    for e in evals:
        values = [str(e[column]) for column in columns]
        f.write('\t'.join(values) + '\n')
        
with open(sample_set_entity_output_path, 'w') as f:
    f.write('entity:sample_set_id\n')
    f.write(f'{sample_set_id}\n')
    for code_length in code_length_list:
        f.write(f'{sample_set_id}_{code_length}\n')
    
with open(sample_set_membership_output_path, 'w') as f:
    f.write('membership:sample_set_id\tsample\n')
    for e in evals:
        f.write(f"{sample_set_id}\t{e['entity:sample_id']}\n")
    for code_length in code_length_list:
        for e in evals:
            if int(e['code_length']) == code_length:
                f.write(f"{sample_set_id}_{code_length}\t{e['entity:sample_id']}\n")