In [None]:
from sklearn.preprocessing import LabelEncoder

target_authors = [
    "太宰治", "海野十三", "芥川龍之介", "宮沢賢治",
    "岡本かの子", "江戸川乱歩", "夏目漱石", "紫式部",
]

le = LabelEncoder()
le.fit(target_authors)

In [None]:
import json

X, y = [], []
with open("stories.jsonl", "r") as stories:
    for raw_story in stories:
        story = json.loads(raw_story)
        if story["author"] in target_authors:
            X.append(story["body"])
            y.append(story["author"])
y = le.transform(y)


In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline


story_clf = Pipeline([
    ("tokenize", MeCabTokenize(pos_keep_filters=["名詞", "形容詞", "動詞"])),
    ("tfidf", TfidfVectorizer()),
    ("clf", LinearSVC())
])

In [None]:
import MeCab
from sklearn.base import BaseEstimator, TransformerMixin


class Token:
    """MeCabのトークンを保持するクラス"""
    def __init__(self, node):
        # 表層形
        self.surface = node.surface

        features = node.feature.split(",")
        # 品詞
        self.part_of_speech = features[0]
        # 基本形
        self.base_form = features[6]

    def __str__(self):
        return "{}\t{}".format(self.surface, self.part_of_speech)


class MeCabTokenize(BaseEstimator, TransformerMixin):
    """MeCabを利用して形態素解析を行うクラス"""
    def __init__(self, pos_keep_filters=[]):
        # MeCabインスタンスの生成
        self.tokenizer = MeCab.Tagger("-b 100000")
        # メモリの初期化周りでバグがあるため、一度解析することで回避
        self.tokenizer.parse("init")

        # 前処理を手軽に行えるように、品詞フィルタを作る
        self.pos_keep_filters = pos_keep_filters

    def fit(self, X, y=None):
        # scikit-learn互換のインターフェイス
        return self

    def transform(self, X):
        # scikit-learn互換のインターフェイス
        docs = []
        # 1文書ずつ処理する
        for text in X:
            words = []
            # 文書内のテキストを改行でさらに文に分ける
            for sentence in self.split_text_to_sentences(text):
                # 対象の文を分かち書き
                words.extend(self.wakati(sentence))
            # 文書はスペース区切りで追加する
            docs.append(" ".join(words))
        return docs

    def split_text_to_sentences(self, text, delimiter="。"):
        return [t for t in text.replace(delimiter, delimiter + "\n").splitlines() if t]

    def wakati(self, text):
        return [t.base_form for t in self.tokenize(text)]

    def tokenize(self, text):
        tokens = []

        node = self.tokenizer.parseToNode(text)
        node = node.next
        while node.next:
            token = Token(node)
            # 品詞フィルターを適用する
            if token.part_of_speech in self.pos_keep_filters:
                tokens.append(Token(node))
            node = node.next

        return tokens


In [None]:
from sklearn.model_selection import train_test_split


X_train, X_test, y_train, y_test = train_test_split(
                                      X,  # 入力
                                      y, # 正解ラベル
                                      test_size=0.3, # テストデータのサイズ（全体の何割か）
                                      random_state=42, # シャッフルしたときの乱数の種を固定
                                      shuffle=True
)

In [None]:
story_clf.fit(X_train, y_train)

In [None]:
from sklearn.metrics import classification_report

pred = story_clf.predict(X_test)
# 出力は数値なので、ラベルへと逆変換するために、LabelEncoderのclasses_を入力する
print(classification_report(y_test, pred, target_names=le.classes_))

In [None]:
# 学習データのテキストごとに文字集合を作成
train_set = [set(train_x) for train_x in X_train]

for test_idx, test in tqdm(enumerate(X_test)):
    test = set(test)
    for train_idx, train in enumerate(train_set):
	    # 学習データとテストデータで同じ文字集合か確認
        if test == train:
            print(test_idx, train_idx)

In [None]:
import pandas as pd

# pandasにより重複を排除したindexを取得
X_set = [set(x) for x in X]
df = pd.DataFrame(X_set)
df_index = df.drop_duplicates().index
target_indices = set(df_index.tolist())

# 重複を排除した学習用データを取得
new_X = [nx for i, nx in enumerate(X) if i in target_indices]
new_y = [ny for i, ny in enumerate(y) if i in target_indices]

# サイズの確認
print(len(new_X), len(X))
print(len(new_y), len(y))
X_train, X_test, y_train, y_test = train_test_split(
                                      new_X,  # 入力
                                      new_y, # 正解ラベル
                                      test_size=0.3, # テストデータのサイズ（全体の何割か）
                                      random_state=42, # シャッフルしたときの乱数の種を固定
                                      shuffle=True
)

story_clf.fit(X_train, y_train)

pred = story_clf.predict(X_test)
# 出力は数値なので、ラベルへと逆変換するために、LabelEncoderのclasses_を入力する
print(classification_report(y_test, pred, target_names=le.classes_))

In [None]:
import numpy as np


for i in np.where(le.inverse_transform(new_y) == "宮沢賢治")[0]:
    print(new_X[i][:100])
    print("...")
    print(new_X[i][-100:])
    print("-------------------")

In [None]:
print(confusion_matrix(y_test, pred, labels=le.transform(le.classes_)))

In [None]:
import itertools
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm

# フォントキャッシュの削除
matplotlib.font_manager._rebuild()

# 日本語フォントの設定
matplotlib.rcParams['font.family'] = "IPAGothic"

cm = confusion_matrix(y_test, pred, labels=le.transform(le.classes_))

# 値の正規化
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# 混同行列を画像として描画する。カラーマップは青系統を利用
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
# タイトルを追加
plt.title("著者分類")
# カラーバーを追加
plt.colorbar()

# x軸とy軸のそれぞれの要素のラベルを追加
tick_marks = np.arange(len(le.classes_))
plt.xticks(tick_marks, le.classes_, rotation=90)
plt.yticks(tick_marks, le.classes_)

# 各要素に数値を描画する。閾値は色の変更のために利用する
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, format(cm[i, j], ".2f"),
                  horizontalalignment="center",
                  color="white" if cm[i, j] > thresh else "black")

# 軸ラベルの追加
plt.ylabel('正解ラベル')
plt.xlabel('推定ラベル')

# 保存したい場合
# plt.savefig("confusion_matrix.png", dpi=300, bbox_inches="tight")