# label_converter.py

list of in-use functions:
1. yolo2bcc_newer(y_yolo, imgsz, silent = True)
2. qt2yolo_optimized(qt, G, Na, wh_yolo, torchMode=False, device=None)

In [1]:
import numpy as np
import torch

In [None]:
BACKGROUND_CLASS_ID = 2

In [None]:
def yolo2bcc_newer(y_yolo, imgsz, silent = True):
    wh = y_yolo[:, ..., 2:4]/imgsz
    conf = y_yolo[:, ..., 4]

    p = y_yolo[..., 4]
    sigma_t = y_yolo[..., 5:]
    sigma_prime_t = sigma_t / sigma_t.sum(axis=2).unsqueeze(-1)
    class_prob = sigma_prime_t*p.unsqueeze(-1)
    bkgd_prob = 1-p
    bcc_prob = torch.cat([class_prob, bkgd_prob.unsqueeze(-1)], -1)
    if not silent:
        mins = [round(x, 6) for x in list(bcc_prob.min(1).values.min(0).values.cpu().detach().numpy())]
        maxs = [round(x, 6) for x in list(bcc_prob.max(1).values.max(0).values.cpu().detach().numpy())]
        print('Minimum probs (c1, c2, bkgd):', mins)
        print('Maximum probs (c1, c2, bkgd):', maxs)
    bcc_logits = torch.log(bcc_prob)
    return bcc_logits, wh, conf

In [None]:
def qt2yolo_optimized(qt, G, Na, wh_yolo, torchMode=False, device=None):
    Ng = G.shape[0]
    num_images = qt.shape[0]
    y_bcc = []
    for i in range(num_images):
        cs = torch.argmax(qt[i, :], 1).to(device)
        st = 0
        for g in range(Ng):
            g_frac = G[g]
            S_g = np.ceil(1/g_frac).astype(int)
            n_cells = S_g*S_g
            for a in range(Na):
                z = torch.linspace(g_frac/2, 1-g_frac/2, S_g).repeat(S_g, 1).unsqueeze(-1)
                xy = torch.cat((z.permute(1, 0, 2), z), 2).permute(1, 0, 2).reshape(n_cells, 2).to(device)
                wh = wh_yolo[i][st:st+n_cells]
                c = cs[st:st+n_cells]
                icxywh = torch.cat(((i*torch.ones(n_cells, 1)).to(device), c.unsqueeze(-1), xy, wh), 1)
                y_bcc.append(icxywh)
                st += n_cells
    return torch.cat(y_bcc)

# train_with_bcc.py

list of in-use functions:
1. get_file_volunteers_dict(data_dict, mode='train', vol_id_map=VOL_ID_MAP)
2. compute_param_confusion_matrices(bcc_params, torchMode=False)
3. init_bcc_params(K=4)
4. init_metrics(n_epochs)
5. convert_target_volunteers_yolo2bcc(target_volunteers, Na=3, Nc=2, G=DEFAULT_G, batch_size=None, vol_id_map=VOL_ID_MAP)
6. xywhpc1ck_to_cxywh(y)
7. nn_predict(model, x, imgsz, offgrid_translate_flag=True, normalize_flag=True, transform_format_flag=True)

In [11]:
DEFAULT_G = np.array([1.0/32, 1.0/16, 1.0/8])
VOL_ID_MAP = {'Camellia': 0, 'Conghui': 1, 'HaoWen': 2, 'Xiongjie': 3}

In [None]:
def get_file_volunteers_dict(data_dict, mode='train', vol_id_map=VOL_ID_MAP):
    vol_path = os.path.join(data_dict['path'], 'volunteers')
    vol_mode_path = os.path.join(vol_path, mode)
    file_names = [x for x in os.listdir(vol_mode_path) if not x.startswith('.')]
    file_vols_dict = {}
    for fn in file_names:
        vol_file_path = os.path.join(vol_mode_path, fn)
        with open(vol_file_path) as f:
            vol_seq = [vol_id_map[x.strip()] for x in f.readlines()]
        file_vols_dict[fn] = torch.tensor(vol_seq).int()
    return file_vols_dict

In [None]:
def compute_param_confusion_matrices(bcc_params, torchMode=False):
    # set up variational parameters
    prior_param_confusion_matrices = confusion_matrix.initialise_prior(n_classes=bcc_params['n_classes'],
                                                                       n_volunteers=bcc_params['n_crowd_members'],
                                                                       alpha_diag_prior=bcc_params['confusion_matrix_diagonal_prior'],
                                                                       torchMode = torchMode)
    variational_param_confusion_matrices = prior_param_confusion_matrices.detach().clone() if torchMode else np.copy(prior_param_confusion_matrices)
    return {'prior': prior_param_confusion_matrices, 'variational': variational_param_confusion_matrices}

In [None]:
def init_bcc_params(K=4):
    bcc_params = {'n_classes': 3,
                  'n_crowd_members': K,
                  'confusion_matrix_diagonal_prior': 1e-1,
                  'convergence_threshold': 1e-6}
    return bcc_params

In [None]:
def init_metrics(n_epochs):
    metrics = {'accuracy': np.zeros((n_epochs,), dtype=np.float64)}
    return {'train': metrics, 'test': metrics, 'posterior_estimate': metrics}

In [None]:
def convert_target_volunteers_yolo2bcc(target_volunteers, Na=3, Nc=2, G=DEFAULT_G, batch_size=None, vol_id_map=VOL_ID_MAP):
    n_images = batch_size
    n_vols = len(vol_id_map)
    Ng = G.shape[0]

    targets_per_i_bcc_list = []
    for i in range(n_images):
        target_vols_per_i = target_volunteers[target_volunteers[:, 0] == i][:, 1:]
        targets_per_ig_bcc_list = []
        for g in range(Ng):  # per grid choice
            g_frac = G[g]
            S_g = np.ceil(1/g_frac).astype(int)**2
            # Don't need a loop for anchor-boxes as we are simply repeating Na times below.
            targets_per_iv_bcc_list = []
            for v in range(n_vols):
                targets_per_iv = target_vols_per_i[target_vols_per_i[:, -1] == v][:, :-1]
                c, x, y, _, _ = targets_per_iv.T # w and h are ignored

                x_cell_ids = torch.where(x<1, x/g_frac, torch.ones(x.shape)*(np.ceil(1/g_frac))).int()
                y_cell_ids = torch.where(y<1, y/g_frac, torch.ones(y.shape)*(np.ceil(1/g_frac))).int()
                gc_ids = ((y_cell_ids)*(np.ceil(1/g_frac)) + x_cell_ids).long()

                targets_per_iv_bcc = BACKGROUND_CLASS_ID * torch.ones(S_g)
                targets_per_iv_bcc[gc_ids] = c
                targets_per_iv_bcc_list.append(targets_per_iv_bcc)
            targets_per_iga_bcc = torch.stack(tuple(targets_per_iv_bcc_list)).T
            targets_per_ig_bcc = targets_per_iga_bcc.repeat((Na, 1))
            targets_per_ig_bcc_list.append(targets_per_ig_bcc)
        targets_per_i_bcc = torch.cat(targets_per_ig_bcc_list)
        targets_per_i_bcc_list.append(targets_per_i_bcc)
    target_volunteers_bcc = torch.stack(tuple(targets_per_i_bcc_list))
    return target_volunteers_bcc

In [None]:
def xywhpc1ck_to_cxywh(y):
    y[:, 4] = y[:, 5:].max(dim=1).indices
    y = y[:, :5]
    y = y[:, [4, 0, 1, 2, 3]]
    return y

In [None]:
def nn_predict(model, x, imgsz, offgrid_translate_flag=True, normalize_flag=True, transform_format_flag=True):
    # update of approximating posterior for the true labels and confusion matrices
    # get current predictions from a neural network
    y = model(x)[0]
    if offgrid_translate_flag:
        # TODO: A quickfix here is to force-translate any point lying outside the grid to the nearest grid cell.
        y[..., :2][y[..., :2] < 0] = 0
        y[..., :2][y[..., :2] > imgsz] = imgsz
    if normalize_flag:
        y[..., :2] = y[..., :2] / imgsz
    if transform_format_flag: # Transform (x, y, w, h, prob, c1, ..., ck) to (c, x, y, w, h)
        z = y[..., :5]
        n_images = y.shape[0]
        for i in range(n_images):
            z[i] = xywhpc1ck_to_cxywh(y[i])
        y = z
    return y