In [1]:
import re
import os
import hashlib
import glob
import cleaner
from typing import List, Optional
from collections import defaultdict
from webanno_tsv import webanno_tsv_read_file, Token
import utils
from predictor import LABELS

In [2]:
def extract_annotation_labels_if_possible(predicted_text):
    label_to_text_list = defaultdict(list)
    cumulative_adjustment = 0
    # We need to process matches in the order they appear in the text
    all_matches = []
    
    # First collect all matches across all labels
    for label in LABELS:
        regex = f'<{label}>(.*?)</{label}>'
        matches = re.finditer(regex, predicted_text, flags=re.IGNORECASE | re.DOTALL)
        for m in matches:
            all_matches.append({
                'label': label,
                'match': m,
                'start_tag_len': len(label) + 2,    # <LABEL>
                'end_tag_len': len(label) + 3         # </LABEL>
            })
    
    # Sort matches by their start position in the original text
    all_matches.sort(key=lambda x: x['match'].start())
    
    # Process matches in order
    for item in all_matches:
        m = item['match']
        label = item['label']
        start_tag_len = item['start_tag_len']
        end_tag_len = item['end_tag_len']
        
        # Calculate positions adjusted for previously removed tags
        text_start = m.start(1) - cumulative_adjustment
        text_end = m.end(1) - cumulative_adjustment
        
        label_to_text_list[label].append({
            'text': m.group(1),
            'start': text_start - start_tag_len,  # adjust for this match's opening tag
            'end': text_end - start_tag_len       # text_end is already after opening tag
        })
        
        # Update cumulative adjustment for future matches
        cumulative_adjustment += start_tag_len + end_tag_len
    
    return label_to_text_list


def post_process(predicted_text, tokens):
    cleaned_text = cleaner.Cleaner(predicted_text).clean()
    label_to_text_list = extract_annotation_labels_if_possible(cleaned_text)
    return label_to_text_list



def make_span_tokens(tokens: List['Token'], start_char: int, end_char: int) -> Optional[List['Token']]:
    """
    Extract tokens within a character range, handling partial overlaps accurately.
    
    Args:
        tokens (List[Token]): List of tokens from original text
        start_char (int): Start character position (inclusive)
        end_char (int): End character position (exclusive)
    
    Returns:
        Optional[List[Token]]: Matching tokens with partial overlaps adjusted, or None
    """
    span_tokens = []
    
    for token in tokens:
        # Skip tokens completely before the range
        if token.end <= start_char:
            continue
        # Skip tokens completely after the range
        if token.start >= end_char:
            break  # Subsequent tokens have larger positions
        
        # Handle fully contained tokens
        if token.start >= start_char and token.end <= end_char:
            span_tokens.append(token)
            continue
        
        # Handle partially overlapping tokens
        overlap_start = max(token.start, start_char)
        overlap_end = min(token.end, end_char)
        
        # Extract overlapping text segment
        overlap_text = token.text[
            (overlap_start - token.start) : 
            (overlap_end - token.start)
        ]
        
        if overlap_text:
            partial_token = Token(
                sentence_idx=token.sentence_idx,
                idx=f"{token.idx}.partial",  # Mark partial tokens
                start=overlap_start,
                end=overlap_end,
                text=overlap_text
            )
            span_tokens.append(partial_token)
    
    return span_tokens if span_tokens else None

In [3]:

# # Load and build full text
# org_path = f'../data/test_labeled/ARM-software_keyword-transformer_master_README.md.tsv'
# doc = webanno_tsv_read_file(org_path)
# full_text = doc.text  # This is key: get the original document full text
# annotations = []


# cursor = 0

# for sentence in doc.sentences:
#     tokens = doc.sentence_tokens(sentence)
#     original_text = sentence.text
#     sid = hashlib.sha256(original_text.encode()).hexdigest()[:8]
#     path = f'../results/deepseek-chat/prompt-0/zzz_ARM-software_keyword-transformer_master_README.md.tsv'

#     with open(f'{path}/{sid}.txt', 'r') as fd:
#         predicted_text = fd.read()

#     label_to_text_list = post_process(predicted_text, tokens)
    
#     sentence_offset = full_text.find(sentence.text, cursor)
#     cursor = sentence_offset + len(sentence.text)
    
#     span_tokens_to_label_list = []
#     for label, text_list in label_to_text_list.items():
#         for text in text_list:
#             span_tokens = make_span_tokens(
#                 tokens,
#                 start_char=text['start'] + sentence_offset,
#                 end_char=text['end'] + sentence_offset
#             )
#             span_tokens_to_label_list.append({
#                 'span_tokens': span_tokens,
#                 'label': label
#             })

#     print(span_tokens_to_label_list)
#     for span_tokens_to_label in span_tokens_to_label_list:
#         span_tokens = span_tokens_to_label['span_tokens']
#         label = span_tokens_to_label['label']
#         if span_tokens is None:
#             continue
#         annotation = utils.make_annotation(tokens=span_tokens, label=label)
#         annotations.append(annotation)
# predicted_doc = utils.replace_webanno_annotations(doc, annotations=annotations)
# # Verify
# if doc.text != predicted_doc.text:
#     #logging.warning('content changed')
#     pass
# if len(doc.sentences) == len(predicted_doc.sentences):
#     #logging.warning('sentences changed')
#     pass
# if len(doc.tokens) == len(predicted_doc.tokens):
#     #logging.warning('tokens changed')
#     pass
# for s1, s2 in zip(doc.sentences, predicted_doc.sentences):
#     if s1 == s2:
#         #logging.warning(f'sentence changed, \n{s1}\n{s2}')
#         pass

# for t1, t2 in zip(doc.tokens, predicted_doc.tokens):
#     if t1 == t2:
#         #logging.warning(f'token changed: \n{t1}\n{t2}')
#         pass

# with open("ARM-software_keyword-transformer_master_README.md.tsv", 'w') as fd:
#     fd.write(predicted_doc.tsv())


In [4]:
ref_dir = '../data/test_labeled'
tsv_files = glob.glob(os.path.join(ref_dir, '*.tsv'))

for org_path in tsv_files:
    print(f"Processing file: {org_path}")
    doc = webanno_tsv_read_file(org_path)
    full_text = doc.text
    annotations = []
    cursor = 0

    file_base = os.path.splitext(os.path.basename(org_path))[0]
    pred_base_path = f'../results/deepseek-chat/prompt-0/zzz_{file_base}.tsv'

    for sentence in doc.sentences:
        tokens = doc.sentence_tokens(sentence)
        original_text = sentence.text
        sid = hashlib.sha256(original_text.encode()).hexdigest()[:8]

        pred_path = os.path.join(pred_base_path, f'{sid}.txt')
        if not os.path.exists(pred_path):
            print(f"Prediction not found: {pred_path}")
            continue

        with open(pred_path, 'r') as fd:
            predicted_text = fd.read()

        offset = full_text.find(original_text, cursor)
        if offset == -1:
            print(f"Error: sentence not found in full_text: {original_text[:30]}...")
            continue
        cursor = offset + len(original_text)

        label_to_text_list = post_process(predicted_text,tokens)

        span_tokens_to_label_list = []
        for label, text_list in label_to_text_list.items():
            for text in text_list:
                start_abs = text['start'] + offset
                end_abs = text['end'] + offset
                span_tokens = make_span_tokens(tokens, start_abs, end_abs)
                span_tokens_to_label_list.append({
                    'span_tokens': span_tokens,
                    'label': label
                })

        for span_tokens_to_label in span_tokens_to_label_list:
            span_tokens = span_tokens_to_label['span_tokens']
            label = span_tokens_to_label['label']
            if span_tokens is None:
                continue
            annotation = utils.make_annotation(tokens=span_tokens, label=label)
            annotations.append(annotation)

    predicted_doc = utils.replace_webanno_annotations(doc, annotations=annotations)

    output_path = f"../results/deepseek-chat/test_unlabeled_up/{file_base}.tsv"
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, 'w') as fd:
        fd.write(predicted_doc.tsv())
    print(f"Saved predicted file to: {output_path}")


Processing file: ../data/test_labeled\231sm_Low_Resource_KBP_master_README.md.tsv
Saved predicted file to: ../results/deepseek-chat/test_unlabeled_up/231sm_Low_Resource_KBP_master_README.md.tsv
Processing file: ../data/test_labeled\allenai_aspire_main_README.md.tsv
Saved predicted file to: ../results/deepseek-chat/test_unlabeled_up/allenai_aspire_main_README.md.tsv
Processing file: ../data/test_labeled\alpiges_LinConGauss_master_README.md.tsv
Saved predicted file to: ../results/deepseek-chat/test_unlabeled_up/alpiges_LinConGauss_master_README.md.tsv
Processing file: ../data/test_labeled\anonymous-submission-22_dejavu_master_README.md.tsv
Saved predicted file to: ../results/deepseek-chat/test_unlabeled_up/anonymous-submission-22_dejavu_master_README.md.tsv
Processing file: ../data/test_labeled\ARM-software_keyword-transformer_master_README.md.tsv
Saved predicted file to: ../results/deepseek-chat/test_unlabeled_up/ARM-software_keyword-transformer_master_README.md.tsv
Processing file: ../