In [104]:
from itertools import chain
import pycrfsuite
import sklearn
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import LeaveOneOut

In [105]:
# コーパス読み込み
import codecs
class CorpusReader(object):
    
    def __init__(self, path):
        with codecs.open(path, encoding='utf-8') as f:
            sent = []
            sents = []
            for line in f:
                if line == '\n':
                    sents.append(sent)
                    sent = []
                    continue
                morph_info = line.strip().split('\t')
                sent.append(morph_info) # 形態素の保存 
        train_num = int(len(sents) * 0.9) # 9割を学習に、1割をテストに
        self.__train_sents = sents[:train_num]
        self.__test_sents = sents[train_num:]
        self.__all_sents = sents
        
    def iob_sents(self, name):
        if name == 'train':
            return self.__train_sents
        elif name == 'test':
            return self.__test_sents
        elif name == 'all':
            return self.__all_sents
        else:
            return None
    

In [106]:
# 文字種取得
def is_hiragana(ch):
    return 0x3040 <= ord(ch) <= 0x309F 
    # ひらがな：True or False

def is_katakana(ch):
    return 0x30A0 <= ord(ch) <= 0x30FF
    # カタカタ：True or False

def get_character_type(ch): # 文字種を取得する
    if ch.isspace(): # 空白の場合
        return 'ZSPACE'
    elif ch.isdigit(): # 数字の場合
        return 'ZDIGIT'
    elif ch.islower(): # 小文字の場合
        return 'ZLLET'
    elif ch.isupper(): # 大文字の場合
        return 'ZULET'
    elif is_hiragana(ch): # ひらがなの場合
        return 'HIRAG'
    elif is_katakana(ch): # カタカナの場合
        return 'KATAK'
    else: # それ以外
        return 'OTHER'

def get_character_types(string): # 文字列の文字種を変換する
    character_types = map(get_character_type, string)
    character_types_str = '-'.join(sorted(set(character_types)))

    return character_types_str

In [107]:
# 品詞細分類の取得
def extract_pos_with_subtype(morph):
    idx = morph.index('*')
    return '-'.join(morph[1:idx])

In [108]:
# 単語を特徴量に変換する
def word2features(sent, i):
    word = sent[i][0]
    chtype = get_character_types(sent[i][0]) # 文字種取得
    postag = extract_pos_with_subtype(sent[i]) # 品詞分類取得
    
    # 該当単語の前後2文字の単語の特徴を用意
    features = [ 
        'bias',
        'word=' + word,
        'type=' + chtype,
        'postag=' + postag,
    ]
    
    if i >= 2: # 現在の単語の前に、2単語以上あるとき
        word2 = sent[i-2][0]
        chtype2 = get_character_types(sent[i-2][0])
        postag2 = extract_pos_with_subtype(sent[i-2])
        iobtag2 = sent[i-2][-1]
        features.extend([
            '-2:word=' + word2,
            '-2:type=' + chtype2,
            '-2:postag=' + postag2,
        ])
    else: # それ以外は、BOS
        features.append('BOS')

    if i >= 1: # 現在の単語の前に、1単語以上あるとき
        word1 = sent[i-1][0]
        chtype1 = get_character_types(sent[i-1][0])
        postag1 = extract_pos_with_subtype(sent[i-1])
        iobtag1 = sent[i-1][-1]
        features.extend([
            '-1:word=' + word1,
            '-1:type=' + chtype1,
            '-1:postag=' + postag1,
        ])
    else: # それ以外は、BOS
        features.append('BOS')

    if i < len(sent)-1: # 現在の単語の後ろに、1単語以上あるとき
        word1 = sent[i+1][0]
        chtype1 = get_character_types(sent[i+1][0])
        postag1 = extract_pos_with_subtype(sent[i+1])
        features.extend([
            '+1:word=' + word1,
            '+1:type=' + chtype1,
            '+1:postag=' + postag1,
        ])
    else: # それ以外は、EOS
        features.append('EOS')

    if i < len(sent)-2: # 現在の単語の後ろに、2単語以上あるとき
        word2 = sent[i+2][0]
        chtype2 = get_character_types(sent[i+2][0])
        postag2 = extract_pos_with_subtype(sent[i+2])
        features.extend([
            '+2:word=' + word2,
            '+2:type=' + chtype2,
            '+2:postag=' + postag2,
        ])
    else: # それ以外は、EOS
        features.append('EOS')

    return features    

def sent2features(sent): # 情報系列から特徴を取得
    # 単語ごとに特徴変換していく
    return [word2features(sent, i) for i in range(len(sent))]

def sent2labels(sent): # 情報系列からラベル[B、I、O]を取得
    return [morph[-1] for morph in sent]

def sent2tokens(sent): # 情報系列から単語原文を取得
    return [morph[0] for morph in sent]


In [109]:
# ラベル評価
def bio_classification_report(y_true, y_pred):
    lb = LabelBinarizer()
    # 正解ラベルの二値化の保存 (1文字ずつのラベルを二値化に変換)
    y_true_combined = lb.fit_transform(list(chain.from_iterable(y_true)))
    # 予測結果の二値化の保存 (1文字ずつのラベルを二値化に変換)
    y_pred_combined = lb.transform(list(chain.from_iterable(y_pred)))  

    tagset = set(lb.classes_) - {'O'} # O以外のタグセットの保存

    # B-NAME、I-NAME、B-THEME..等の順番に並び替える
    tagset = sorted(tagset, key=lambda tag: tag.split('-', 1)[::-1])
    # タグのクラスのid化？
    class_indices = {cls: idx for idx, cls in enumerate(lb.classes_)}
    
    # 正解ラベルと予想ラベルを引数として、評価表の作成をする
    return classification_report(
        y_true_combined,
        y_pred_combined,
        labels = [class_indices[cls] for cls in tagset],
        target_names = tagset,
    )


In [110]:
# ラベルごとに単語を保存する
def label_report(text,label_data):
    wh_dic = {"WHERE":[], "WHEN":[], "WHO":[], "WHAT":[], "HOW":[], "WHY":[],"SERIF":[],"O":[]}
    
    set_label = ""
    result_word = ""
    wc = -1
    wc_l = []
    
    for i,word in enumerate(text):
        
        ld = label_data[i]
        
        # B,Iタグをとりあえず、無視する
        if "B-" in ld:
            ld = ld.replace("B-", "")
        elif "I-" in ld:
            ld = ld.replace("I-", "")
            
        # 最後の要素のとき
        if i == (len(text)-1):
            wh_dic[set_label].append([result_word,wc_l])
            break
        
        # ラベルごとに辞書へ保存する。
        if set_label != "" and set_label != ld: 
            # 辞書に保存            
            wh_dic[set_label].append([result_word,wc_l])
            
            # 初期化
            result_word = ""
            wc_l = []

        # 同一ラベルの情報を結合
        for wi in range(len(word)):
            wc += 1
            wc_l.append(wc)
        
        # 同一ラベルの情報を結合
        set_label = ld
        result_word += word   
            
    return wh_dic
    

In [111]:
"""
評価用
入力：正解データ辞書、予測データ辞書
出力： 5W1Hごとの失敗パターン辞書

===
完全： 正解データ
一部一致： 正解データ/予測データ
該当なし： 正解データ
誤予測： 予測データ

"""
def eval_report(true_wh_dic,pred_wh_dic):
    report_data = {}
    
    
    for k,v in true_wh_dic.items():
        report_v = {}
        result_type1 = []
        result_type2 = []
        result_type3 = []
        result_type4 = []
        
        check_pred = [0 for i in pred_wh_dic[k]]
        for true_text in v:    
            true_id = true_text[1] 

            if len(pred_wh_dic[k]) == 0: # 予想辞書にデータが存在しないとき
                result_type3.append(true_text[0])
                
            else: # 予想辞書にデータが存在するとき
                check_true = 0 # 正解データが予想辞書データとマッチングしたか確認
                for num, pred_text in enumerate(pred_wh_dic[k]):
                    pred_id = pred_text[1]                
                    match_n = list(set(true_id) & set(pred_id))
                    if len(match_n) > 0:
                        if len(true_id) == len(pred_id):
                            check_pred[num] = 1
                            check_true = 1
                            result_type1.append(true_text[0]) # 完全
                        else:
                            check_pred[num] = 1
                            check_true = 1
                            result_type2.append([true_text[0],pred_text[0]]) #一部一致
                            
                    if num == (len(pred_wh_dic[k])-1) and check_true == 0: # 正解データが、予想データの最後の要素まで一致しなかったとき
                        result_type3.append(true_text[0]) # 該当なし
                        
        miss_pred = [i for i,c in enumerate(check_pred) if c == 0]
        if len(miss_pred) > 0:
            for i in miss_pred:
                result_type4.append(pred_wh_dic[k][i][0]) # 誤予測
            
        report_v["完全"] = result_type1
        report_v["一部一致"] = result_type2
        report_v["該当なし"] = result_type3
        report_v["誤予測"] = result_type4            
        report_data[k] = report_v
        
    return report_data

In [112]:
# 4パターンの数を数える
def cal_report(file_id,result,wh_label):
    wh_num = [0 for i in range(5)]
    wh_num[0] = file_id
    for wh in wh_label:
        for k,v in result[wh].items():
            if k == "完全":
                wh_num[1] += len(v)
            elif k == "一部一致":
                wh_num[2] += len(v)
            elif k == "該当なし":
                wh_num[3] += len(v)
            elif k == "誤予測":
                wh_num[4] += len(v)
                    
    return wh_num  

In [116]:
# サンプルソースコード
# 1.データ読み込み
#c = CorpusReader('corpus.txt') # ファイル指定
c = CorpusReader('corpus_5w1hs.txt') # ファイル指定
train_sents = c.iob_sents('train') # データの読み込み
test_sents = c.iob_sents('test') # データの読み込み

X_train = [sent2features(s) for s in train_sents] # 学習データの特徴量
y_train = [sent2labels(s) for s in train_sents] # 学習データのラベル

X_test = [sent2features(s) for s in test_sents] # テストデータの特徴量
y_test = [sent2labels(s) for s in test_sents] # テストデータのラベル

# 2.学習
trainer = pycrfsuite.Trainer(verbose=False) # モデルの定義

for xseq, yseq in zip(X_train, y_train):
    trainer.append(xseq, yseq) # 学習データの追加
    
trainer.set_params({
    'c1': 1.0,   # coefficient for L1 penalty
    'c2': 1e-3,  # coefficient for L2 penalty
    'max_iterations': 50,  # stop earlier

    # include transitions that are possible, but not observed
    'feature.possible_transitions': True
}) # パラメータの設定

trainer.train('model.crfsuite') # モデル学習

# 3.ラベル予測・評価
tagger = pycrfsuite.Tagger() #  pycrfsuiteのモデルを用意
tagger.open('model.crfsuite') # モデルを開く
tagger.info()

# 4.テストの用意
example_sent = test_sents[0]
text = sent2tokens(example_sent)
p_data = tagger.tag(sent2features(example_sent))
c_data = sent2labels(example_sent)

wh_label = ["WHERE","WHO","WHEN","WHAT"]
true_wh_dic = label_report(text,c_data)
pred_wh_dic = label_report(text,p_data)


result = eval_report(true_wh_dic,pred_wh_dic)
c_r = cal_report(1,result,wh_label)
output_result = []
output_result.append(c_r)

print(''.join(text))
for wh_l in wh_label:
    print("==== {0} ====".format(wh_l))
    print("正解：{0}".format([t for t in true_wh_dic[wh_l]]))
    print("予想：{0}".format([t for t in pred_wh_dic[wh_l]]))

#print("----------------------")
#y_pred = [tagger.tag(xseq) for xseq in X_test] # テストデータにタグ付け
#print(bio_classification_report(y_test, y_pred))

【宜野湾】磁石の性質を利用して方位を測定する「磁気コンパス」の誤差修正を担う「コンパスアジャスタ」という資格がある。沖縄県内で唯一、その資格を取得しているのが、宜野湾市愛知に住む１級海技士の上原伸浩さん（６６）だ。
==== WHERE ====
正解：[['宜野湾', [1, 2, 3]], ['沖縄県内', [58, 59, 60, 61]], ['宜野湾市愛知', [80, 81, 82, 83, 84, 85]]]
予想：[['宜野湾', [1, 2, 3]], ['沖縄県内', [58, 59, 60, 61]], ['宜野湾市愛知に住む１級海技士', [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93]]]
==== WHO ====
正解：[['１級海技士の上原伸浩', [89, 90, 91, 92, 93, 94, 95, 96, 97, 98]]]
予想：[['上原伸浩', [95, 96, 97, 98]]]
==== WHEN ====
正解：[]
予想：[]
==== WHAT ====
正解：[['磁石の性質を利用して方位を測定する「磁気コンパス」の誤差修正を担う「コンパスアジャスタ」という資格', [5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53]]]
予想：[['磁石の性質', [5, 6, 7, 8, 9]], ['方位を測定する「磁気コンパス」の誤差修正を担う「コンパスアジャスタ」', [15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48]]]


In [117]:
# ファイル書き込み
output_file = "result_5w1hs.csv"

# 評価結果
with open(output_file, 'w') as f:
    writer = csv.writer(f, lineterminator='\n') # 改行コード（\n）を指定しておく
    writer.writerow(["記事id","完全","一部一致","該当なし","誤予測"])
    for o_data in output_result:
        writer.writerow(o_data)
        

In [51]:
# ラベル評価 Leave one out

import csv
f = open('loo_result_5w1hs.csv', 'w')
writer = csv.writer(f, lineterminator='\n')

c = CorpusReader('corpus_5w1hs.txt') # ファイル指定
all_sents = c.iob_sents('all') # データの読み込み

lb = LabelBinarizer() # one-hot値に変換する
loo = LeaveOneOut()

for train_index, test_index in loo.split(all_sents):

    X_train = [sent2features(all_sents[i]) for i in train_index] # 学習データの特徴量
    y_train = [sent2labels(all_sents[i]) for i in train_index] # 学習データのラベル

    X_test = [sent2features(all_sents[i]) for i in test_index] # テストデータの特徴量
    y_test = [sent2labels(all_sents[i]) for i in test_index] # テストデータのラベル

    # 2.学習
    trainer = pycrfsuite.Trainer(verbose=False) # モデルの定義

    for xseq, yseq in zip(X_train, y_train):
        trainer.append(xseq, yseq) # 学習データの追加
    
    trainer.set_params({
        'c1': 1.0,   # coefficient for L1 penalty
        'c2': 1e-3,  # coefficient for L2 penalty
        'max_iterations': 50,  # stop earlier
        # include transitions that are possible, but not observed
        'feature.possible_transitions': True
    }) # パラメータの設定

    trainer.train('model.crfsuite') # モデル学習

    # 3.ラベル予測・評価
    tagger = pycrfsuite.Tagger() #  pycrfsuiteのモデルを用意
    tagger.open('model.crfsuite') # モデルを開く
    tagger.info()
    
    
    # 4.テストの用意
    example_sent = all_sents[test_index[0]]
    
    #print("Predicted:", ' '.join(tagger.tag(sent2features(example_sent))))
    #print("Correct:  ", ' '.join(sent2labels(example_sent)))
    predicted = tagger.tag(sent2features(example_sent))
    correct = sent2labels(example_sent)
    count = 0
    for i,p_label in enumerate(predicted):
        if p_label == correct[i]:
            count += 1    
    print(test_index[0])
    writer.writerow([test_index[0],count/len(predicted),' '.join(predicted),' '.join(correct),' '.join(sent2tokens(example_sent))])

f.close()

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
