In [5]:
import itertools,json
from collections import defaultdict

### 从train数据集中选择某些句子，作为之后few-shot所使用的例句.输出包括了连续的文段和对应的所有命名实体。

In [6]:
def get_entities(seq):
    """
    Gets entities from sequence.
    Args:
        seq (list): sequence of labels.
    Returns:
        list: list of (chunk_type, chunk_start, chunk_end).
    Example:
        seq = ['B-PER', 'I-PER', 'O', 'B-LOC', 'I-PER']
        get_entity_bio(seq)
        #output
        [['PER', 0,1], ['LOC', 3, 3], ['PER', 4, 4]]
    """
    # if any(isinstance(s, list) for s in seq):
    #     seq = [item for sublist in seq for item in sublist + ['O']]
    if any(isinstance(s,list) for s in seq):
        seq = list(itertools.chain(*seq))

    prev_tag = 'O'
    prev_type = ''
    begin_offset = 0
    chunks = []
    in_chunk = False
    # for i, chunk in enumerate(seq + ['O']):
    for i, chunk in enumerate(seq):
        tag = chunk[0]
        type_ = chunk.split('-')[-1]
        if end_of_chunk(prev_tag, tag, prev_type, type_) and in_chunk:
            chunks.append((prev_type, begin_offset, i - 1))
            in_chunk = False
        if start_of_chunk(prev_tag, tag, prev_type, type_):
            in_chunk = True
            begin_offset = i

        prev_tag = tag
        prev_type = type_
    # don't forget if the last label is B- or I-:
    if in_chunk:
        chunks.append((prev_type,begin_offset,i-1))
        in_chunk = False
    return sorted(chunks,key=lambda x:x[1])
    # return sorted(list(set(chunks)), key=lambda x: x[1])

def end_of_chunk(prev_tag, tag, prev_type, type_):
    """
    Checks if a chunk ended between the previous and current word.
    Args:
        prev_tag: previous chunk tag.
        tag: current chunk tag.
        prev_type: previous type.
        type_: current type.

    Returns:
        chunk_end: boolean.
    """
    chunk_end = False

    if prev_tag == 'E': chunk_end = True
    if prev_tag == 'S': chunk_end = True

    if prev_tag == 'B' and tag == 'B': chunk_end = True
    if prev_tag == 'B' and tag == 'S': chunk_end = True
    if prev_tag == 'B' and tag == 'O': chunk_end = True
    if prev_tag == 'I' and tag == 'B': chunk_end = True
    if prev_tag == 'I' and tag == 'S': chunk_end = True
    if prev_tag == 'I' and tag == 'O': chunk_end = True

    if prev_tag != 'O' and prev_tag != '.' and prev_type != type_:
        chunk_end = True

    return chunk_end

def start_of_chunk(prev_tag, tag, prev_type, type_):
    """
    Checks if a chunk started between the previous and current word.
    Args:
        prev_tag: previous chunk tag.
        tag: current chunk tag.
        prev_type: previous type.
        type_: current type.

    Returns:
        chunk_start: boolean.
    """
    chunk_start = False

    if tag == 'B': chunk_start = True

    # if tag == 'B': chunk_start = True
    # if tag == 'S': chunk_start = True
    # if prev_tag == 'E' and tag == 'E': chunk_start = True
    # if prev_tag == 'E' and tag == 'I': chunk_start = True
    # if prev_tag == 'S' and tag == 'E': chunk_start = True
    # if prev_tag == 'S' and tag == 'I': chunk_start = True
    # if prev_tag == 'O' and tag == 'E': chunk_start = True
    # if prev_tag == 'O' and tag == 'I': chunk_start = True
    # if tag != 'O' and tag != '.' and prev_type != type_:
    #     chunk_start = True
    return chunk_start

def load_file(filepath):
    with open(filepath, encoding='utf-8') as f:
        data = f.read().splitlines()
    label_chunk,labels = [],[]
    word_chunk,words = [],[]

    for line in data:
        if line:
            word,label = line.split(' ')
            word_chunk.append(word)
            label_chunk.append(label)
        else:
            if not word_chunk:
                continue
            else:
                words.append(word_chunk)
                labels.append(label_chunk)
                word_chunk,label_chunk = [],[]
    if word_chunk:
        words.append(word_chunk)
        labels.append(label_chunk)
    return [words,labels]

def get_NE_word(words,labels):
    # get a list of tuple like (<named entity>,<type>), the same phrase could occur repeatedly. 
    # if an identical word with different label, that will be recogized as different entity.
    if any(isinstance(x,list) for x in words):
        words = list(itertools.chain(*words))
    entities= get_entities(labels)
    entity_words = []
    for entity in entities:
        start,end = entity[1],entity[2]
        word = ' '.join(words[start:(end+1)])
        t = (word,entity[0])
        if word!='':
            entity_words.append(t)
    return entity_words

def reshape_data_new(word_list,label_list,temp_filepath):
    result = []
    count = len(result)
    sentences = [' '.join(each) for each in word_list]
    for i in range(len(sentences)):
        if count>=100:
            break
        text = sentences[i]
        if len(text)<=30:
            continue
        entity_res = get_NE_word(words=word_list[i],labels=label_list[i])
        entity_dict = defaultdict(list)
        for each in entity_res:
            entity_dict[each[1]].append(each[0])
        result.append({'TEXT':text,'NEs':entity_dict})
        count = len(result)


    with open(temp_filepath,'w',encoding='utf-8') as f:
        json.dump(result,f,indent=4,ensure_ascii=False)
        f.write('\n')
    return result

In [4]:
# dataset_name = 'WNUT2017'
# dataset_name = 'Twitter'
dataset_name = 'Bio-NER'
output_directory = f'./output/{dataset_name}'
file_path = f'./data/{dataset_name}/test.txt'
temp_filepath = output_directory+'/query.json'
test_words,test_labels = load_file(file_path)
query_data = reshape_data_new(test_words,test_labels,temp_filepath)

file_path = f'./output/{dataset_name}/shot.txt'
shot_prompt_filepath = output_directory+'/5shot.json'
test_words,test_labels = load_file(file_path)
reshape_data_new(test_words,test_labels,shot_prompt_filepath)

[{'TEXT': 'IL-2 gene expression and NF-kappa B activation through CD28 requires reactive oxygen production by 5-lipoxygenase .',
  'NEs': defaultdict(list,
              {'DNA': ['IL-2 gene'],
               'protein': ['NF-kappa B', 'CD28', '5-lipoxygenase']})},
 {'TEXT': 'This differential effect of E1A expression on the cytolytic phenotypes of infected and stably transfected human cells suggests that human NK cells provide an effective immunologic barrier against the in vivo survival and neoplastic progression of E1A-immortalized cells that may emerge from the reservoir of persistently infected cells in the human host .',
  'NEs': defaultdict(list,
              {'protein': ['E1A'],
               'cell_type': ['human cells',
                'human NK cells',
                'persistently infected cells'],
               'cell_line': ['E1A-immortalized cells']})},
 {'TEXT': 'In addition , when used together IL-2 and IL-12 synergized in the induction of IFN-gamma and GM-CSF and this 