In [None]:
import os
import pandas as pd
from utils import find_inner_segments, load_json, Interval, time_to_sample
from itertools import combinations
import numpy as np

In [None]:
# define data paths
butqdb_path = './data/but-qdb/brno-university-of-technology-ecg-quality-database-but-qdb-1.0.0'

# implemented priority rules
priority_rules = ['consensus', 'highest', 'majority_vote']
# set the priority rule
priority_rule = 'consensus'

# load lookup table with the annotation segments data
lookup_table = load_json("segment-class_lookup.json")

# get all record names from their folder names
record_names = [d for _, dirs, _ in os.walk(butqdb_path) for d in dirs]

# dataset sampling frequency is 1000Hz
sampling_frequency = 1000

# set window size for sampling (seconds)
window_size = 2.5

# color palette
color_palette = ['lightgray', '#28a745', '#ffc107', '#dc3545']

In [None]:
# Fastest method for processing annotations
# Only using custom Interval class for current annotator timeline and keep find_inner_segments for comparison with other 2 classes
def process_annotations(butqdb_path, record_id, priority_rule):
    # load the annotation file
    file_path = os.path.join(butqdb_path, record_id, f"{record_id}_ANN.csv")
    annotation_columns = ['start1', 'end1', 'score1', 'start2', 'end2', 'score2', 'start3', 'end3', 'score3', 'start_cons', 'end_cons', 'score_cons']
    df = pd.read_csv(file_path, header=None, names=annotation_columns)
    
    if priority_rule == 'consensus':
        # use consensus annotations and segments
        segments = [
            (int(start), int(end), int(c)) for (start, end, c) in zip(df['start_cons'], df['end_cons'], df['score_cons'])
            if pd.notna(start) and pd.notna(end)
        ]
    # elif priority_rule == 'majority_vote':
    #     # start with annotations and and segments of annotator 1 and check other 2 for their class
    #     segments = [
    #         (int(start), int(end), int(c)) for (start, end, c) in zip(df['start1'], df['end1'], df['score1'])
    #         if pd.notna(start) and pd.notna(end)
    #     ]
        
    #     # add segments to Interval timeline class for convenient updating of subsegments
    #     base_interval = Interval(segments)
        
    #     # per segment of initial annotator 1
    #     for (start, end, current_class) in segments:
    #         # check if other annotators [2 & 3] have annotated any subsegment of current with a higher class

    #         # get subsegments segments from the second annotator
    #         ann2_inner_segments = find_inner_segments(lookup_table, record_id, 'ann2', start, end)
    #         for (s2, e2, c2) in ann2_inner_segments:
    #             # ann1 & ann2 form majority, no update
    #             if current_class == c2:
    #                 pass
    #             # if not equal to current class check 3th ann within that subsegment to find which class he has assigned
    #             elif current_class != c2:
    #                 ann3_inner_segments = find_inner_segments(lookup_table, record_id, 'ann3', s2, e2)
    #                 for (s3, e3, c3) in ann3_inner_segments:
    #                     # ann2 and ann3 form majority, replace subsegment
    #                     if c2 == c3:
    #                         new_seg = (s3, e3, c3)
    #                         base_interval.replace_range(new_seg)
    #                     elif c2 != c3:
    #                         # ann1 & ann3 form majority, no need to change
    #                         if current_class == c3:
    #                             pass
    #                         # tiebreaker: if ann1 != ann2 != ann3: assign highest of 3 classes
    #                         elif current_class != c2 != c3:
    #                             max_class = max(current_class, c2, c3)
    #                             # update if the current class is not the highest class for this subsegment
    #                             if max_class != current_class:
    #                                 new_seg = (s3, e3, max_class)
    #                                 base_interval.replace_range(new_seg)
    #         # debugging code
    #         # updated_segment = base_interval.get_subsegments(start, end)
    #         # cons_seg = find_inner_segments(lookup_table, record_id, 'cons', start, end)
    #         # if updated_segment != cons_seg:
    #         #     print(updated_segment)
    #         #     print(cons_seg)
    #         #     print(set(updated_segment) ^ set(cons_seg))
                
    #     segments = [(int(s), int(e), int(c)) for (s, e, c) in base_interval]
        
    # elif priority_rule == 'highest':
    #     # start with annotations and and segments of annotator 1 and check other 2 for higher classes
    #     segments = [
    #         (int(start), int(end), int(c)) for (start, end, c) in zip(df['start1'], df['end1'], df['score1'])
    #         if pd.notna(start) and pd.notna(end)
    #     ]
    #     # add segments to Interval timeline class for convenient updating of subsegments
    #     base_interval = Interval(segments)
        
    #     # per segment of initial annotator 1
    #     for (start, end, current_class) in segments:
    #         # check if other annotators [2 & 3] have annotated any subsegment of current with a higher class
    #         for annnotator_id in [3, 2]:
    #             # get subsegments segments from the other annotators
    #             inner_segments = find_inner_segments(lookup_table, record_id, f'ann{annnotator_id}', start, end)
    #             # reduce all inner segments to only segments with higher class
    #             higher_subsegments = [(s, e, c) for s, e, c in inner_segments if c > current_class]
    #             # replace current annotations with higher annotations on interval timeline
    #             for s in higher_subsegments:
    #                 base_interval.replace_range(s)
    #     segments = [(int(s), int(e), int(c)) for (s, e, c) in base_interval]
    return segments

### Store the newly created segments and the annotated class

In [None]:
# create a dictionary storing the segments according to the chosen rule
def annotation_store(butqdb_path, record_list, priority_rule):
    ann_dict = {}
    for record in record_list:
        segments = process_annotations(butqdb_path, record, priority_rule)
        ann_dict[record] = segments
    return ann_dict

print("Selected priority rule is:", priority_rule)
annotation_dict = annotation_store(butqdb_path, record_names, priority_rule)