In [1]:
import os
import json
import jsonlines
from collections import defaultdict
import time

In [2]:
all_start = time.time()

In [3]:
# CMeIE
# A. c→(h,t)
out_name = 'c_to_ht_withtype'
inp_prompt = '''You are currently a senior information extraction expert.
Your task is to extract all possible trigger-argument pairs from the given text. First, identify potential event triggers. Then, based on the extracted triggers and the given text, extract the corresponding arguments. For each argument, identify its role type from the given list of argument role types.
The given list of argument role types is: {role_list}.
The output format of this task is: (event trigger|| trigger|| argument|| argument role).
Given text: "{text}"
'''
oup_prompt = '{answer_text}'

In [4]:
data_file_list = [
    'ACE05-train.jsonl',
    'ACE05-dev.jsonl',
    'ACE05-test.jsonl',
]
out_file_list = [
    'train.json',
    'dev.json',
    'test.json'
]

In [5]:
read_dir = './ori/ACE05/'
out_dir = './ACE05/pipeline拆解'
out_dir = os.path.join(out_dir,out_name)
if not os.path.exists(out_dir):
    os.makedirs(out_dir)
with open(os.path.join(read_dir,'labels.json'),'r') as f:
    trip_types_list = json.load(f)

In [6]:
event_types, role_types = trip_types_list
event_types = event_types.split(',')
event_types = [item.strip() for item in event_types]
role_types = role_types.split(',')
role_types = [item.strip() for item in role_types]

In [7]:
event_types = str(event_types).replace("'",'"')
role_types = str(role_types).replace("'",'"')

In [8]:
start = time.time()
print('out_name:{}'.format(out_name))
for data_file,out_file in zip(data_file_list,out_file_list):
    read_path = os.path.join(read_dir,data_file)
    out_path = os.path.join(out_dir,out_file)
    with jsonlines.open(read_path,'r') as f:
        datas = [data for data in f]
    with jsonlines.open(out_path,'w') as fw:
        for data in datas:
            inp = inp_prompt.format(role_list=role_types,event_list = event_types,text=data['text'])
            spo_list = [(item['subject'],item['subject_type'],item['object']['@value'], item['object_type']['@value']) for item in data['spo_list']]
            processed_spo_list = []
            for spo_item in spo_list:
                if spo_item not in processed_spo_list:
                    processed_spo_list.append(spo_item)
            oup = '\n'.join(['({}|| {}|| {}|| {})'.format(item[0],item[1],item[2], item[3]) for item in processed_spo_list])
            oup = '```\n' + oup.strip() + '\n```'
            out_data = {
                'instruction':inp,
                'input':'',
                'output':oup,
                'text':data['text'],
                'spo_list':data['spo_list']
            }
            fw.write(out_data)
end = time.time()
print('cost:{}秒'.format(round(end-start, 2)))

out_name:c_to_ht_withtype
cost:0.87秒


In [9]:
# CMeIE
# A. c→(r)
out_name = 'c_to_r_withtype'
inp_prompt = '''You are currently a senior expert in event detection.
Your task is to identify potential event types from the given list of event types based on the given text.
The given list of event types: {event_list}.
The output format of the task is: (event type).
Given text: "{text}"
'''
oup_prompt = '{answer_text}'

In [10]:
data_file_list = [
    'ACE05-train.jsonl',
    'ACE05-dev.jsonl',
    'ACE05-test.jsonl',
]
out_file_list = [
    'train.json',
    'dev.json',
    'test.json'
]

In [11]:
read_dir = './ori/ACE05/'
out_dir = './ACE05/pipeline拆解'
out_dir = os.path.join(out_dir,out_name)
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

In [12]:
start = time.time()
print('out_name:{}'.format(out_name))
for data_file,out_file in zip(data_file_list,out_file_list):
    read_path = os.path.join(read_dir,data_file)
    out_path = os.path.join(out_dir,out_file)
    with jsonlines.open(read_path,'r') as f:
        datas = [data for data in f]
    with jsonlines.open(out_path,'w') as fw:
        for data in datas:
            inp = inp_prompt.format(role_list=role_types,event_list = event_types,text=data['text'])
            spo_list = []
            for item in data['spo_list']:
                if item['predicate'] not in spo_list:
                    spo_list.append(item['predicate'])
            processed_spo_list = []
            for spo_item in spo_list:
                if spo_item not in processed_spo_list:
                    processed_spo_list.append(spo_item)
            oup = '\n'.join(['({})'.format(item) for item in processed_spo_list])
            oup = '```\n' + oup.strip() + '\n```'
            out_data = {
                'instruction':inp,
                'input':'',
                'output':oup,
                'text':data['text'],
                'spo_list':data['spo_list']
            }
            fw.write(out_data)
end = time.time()
print('cost:{}秒'.format(round(end-start, 2)))

out_name:c_to_r_withtype
cost:0.86秒


In [13]:
# CMeIE
# B.  r[s1] c→(h,t)
out_name = 'rc_to_ht_withtype'
inp_prompt = '''You are currently a senior information extraction expert.
Your task is to extract all possible trigger-argument pairs from the given text and event type. First, identify potential event triggers. Then, based on the extracted triggers and the given text, extract the corresponding arguments. For each argument, identify its role type from the given list of argument role types.
The given list of argument role types is: {role_list}.
The output format of this task is: (event trigger|| trigger|| argument|| argument role).
Given text: "{text}"
Given event type: "{event_type}"
'''
oup_prompt = '{answer_text}'

In [14]:
data_file_list = [
    'ACE05-train.jsonl',
    'ACE05-dev.jsonl',
    'ACE05-test.jsonl',
]
out_file_list = [
    'train.json',
    'dev.json',
    'test.json'
]

In [15]:
read_dir = './ori/ACE05/'
out_dir = './ACE05/pipeline拆解'
out_dir = os.path.join(out_dir,out_name)
if not os.path.exists(out_dir):
    os.makedirs(out_dir)


In [16]:
start = time.time()
print('out_name:{}'.format(out_name))
for data_file,out_file in zip(data_file_list,out_file_list):
    read_path = os.path.join(read_dir,data_file)
    out_path = os.path.join(out_dir,out_file)
    with jsonlines.open(read_path,'r') as f:
        datas = [data for data in f]
    with jsonlines.open(out_path,'w') as fw:
        for data in datas:
            relation_to_ht = defaultdict(list)
            for spo_item in data['spo_list']:
                sub = spo_item['subject']
                sub_type = spo_item['subject_type']
                predicate = spo_item['predicate']
                obj = spo_item['object']['@value']
                obj_type = spo_item['object_type']['@value']
                relation_to_ht[predicate].append((sub, sub_type, obj, obj_type))
            for predicate in relation_to_ht.keys():
                spo_list = relation_to_ht[predicate]
                inp = inp_prompt.format(role_list=role_types,event_list = event_types,text=data['text'], event_type = predicate)
                oup = '\n'.join(['({}|| {}|| {}|| {})'.format(item[0],item[1],item[2], item[3]) for item in spo_list])
                oup = '```\n' + oup.strip() + '\n```'
                out_data = {
                    'instruction':inp,
                    'input':'',
                    'output':oup,
                    'text':data['text'],
                    'spo_list':data['spo_list']
                }
                fw.write(out_data)
end = time.time()
print('cost:{}秒'.format(round(end-start, 2)))

out_name:rc_to_ht_withtype
cost:0.88秒


In [17]:
# CMeIE
# C. h[s1]t [s2]c→r
out_name = 'htc_to_r_withtype'
inp_prompt = '''You are currently a senior expert in event detection.
Your task is to identify potential event types from the given list of event types based on the given text and trigger-argument pair. The input format of the trigger-argument pair is: (event trigger, trigger, argument, argument role).
The given list of event types: {event_list}.
The output format of the task is: (Event Type).
Given text: "{text}"
Given trigger-argument pair: ({trigger}|| trigger|| {arg_name}|| {arg_role})
'''
oup_prompt = '{answer_text}'

In [18]:
data_file_list = [
    'ACE05-train.jsonl',
    'ACE05-dev.jsonl',
    'ACE05-test.jsonl',
]
out_file_list = [
    'train.json',
    'dev.json',
    'test.json'
]

In [19]:
read_dir = './ori/ACE05/'
out_dir = './ACE05/pipeline拆解'
out_dir = os.path.join(out_dir,out_name)
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

In [20]:
start = time.time()
print('out_name:{}'.format(out_name))
for data_file,out_file in zip(data_file_list,out_file_list):
    read_path = os.path.join(read_dir,data_file)
    out_path = os.path.join(out_dir,out_file)
    with jsonlines.open(read_path,'r') as f:
        datas = [data for data in f]
    with jsonlines.open(out_path,'w') as fw:
        for data in datas:
            ht_to_relation = defaultdict(list)
            for spo_item in data['spo_list']:
                sub = spo_item['subject']
                sub_type = spo_item['subject_type']
                predicate = spo_item['predicate']
                obj = spo_item['object']['@value']
                obj_type = spo_item['object_type']['@value']
                ht_item = (sub, sub_type, obj, obj_type)
                ht_to_relation[ht_item].append(predicate)
            for ht_item in ht_to_relation.keys():
                predicate = ht_to_relation[ht_item]
                inp = inp_prompt.format(role_list=role_types,event_list = event_types,text=data['text'], trigger = ht_item[0], arg_name = ht_item[2], arg_role = ht_item[3])
                oup = '\n'.join(['({})'.format(item) for item in predicate])
                oup = '```\n' + oup.strip() + '\n```'
                out_data = {
                    'instruction':inp,
                    'input':'',
                    'output':oup,
                    'text':data['text'],
                    'spo_list':data['spo_list']
                }
                fw.write(out_data)
end = time.time()
print('cost:{}秒'.format(round(end-start, 2)))

out_name:htc_to_r_withtype
cost:0.79秒


In [21]:
all_end = time.time()
all_end-all_start

9.844723224639893