# 初回のみ: Mecab と辞書を Google Drive に配置

In [None]:
!apt-get install mecab libmecab-dev mecab-ipadic-utf8 git make curl xz-utils file -y
!apt-get -q -y install sudo file mecab libmecab-dev mecab-ipadic-utf8 git curl python-mecab > /dev/null
!git clone --depth 1 https://github.com/neologd/mecab-ipadic-neologd.git > /dev/null
!echo yes | mecab-ipadic-neologd/bin/install-mecab-ipadic-neologd -n > /dev/null 2>&1

# シンボリックリンクによるエラー回避
!ln -s /etc/mecabrc /usr/local/etc/mecabrc

!echo `mecab-config --dicdir`"/mecab-ipadic-neologd"
# /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd

In [None]:
# Google Drive に NEologd を格納する場合
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

!echo `mecab-config --dicdir`"/mecab-ipadic-neologd"
# /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd

# 「Colab Notebooks」の間にある空白は\ (backslash)でエスケープ
!cp -r /usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd /content/drive/MyDrive/Colab\ Notebooks/Libs/pylibs
# /etc/mecabrcが辞書データ参照時に必要なのでetcフォルダごとコピー↓
!cp -r /etc /content/drive/MyDrive/Colab\ Notebooks/Libs/pylibs/etc

In [None]:
# ローカルランタイムの場合
!pip install mecab-python3 > /dev/null

!echo `mecab-config --dicdir`"/mecab-ipadic-neologd"
import MeCab
path2dic = "/usr/lib/x86_64-linux-gnu/mecab/dic/mecab-ipadic-neologd"
mecab = MeCab.Tagger("-d {0}".format(path2dic))

print(mecab.parse("これは自動で読点を挿入する研究です"))

# MeCab

In [None]:
# Google Drive に NEologd を入れている場合
!pip install mecab-python3 > /dev/null
from tqdm.notebook import tqdm
from google.colab import drive
drive.mount('/content/drive')

import os
os.environ["MECABRC"] = '/content/drive/MyDrive/Colab Notebooks/Libs/pylibs/etc/mecabrc'

import MeCab
path2dic = "/content/drive/MyDrive/Colab\ Notebooks/Libs/pylibs/mecab-ipadic-neologd"
mecab = MeCab.Tagger("-d {0}".format(path2dic))

In [None]:
def bunsetsu_wakachi(text):
    mecab_result = mecab.parse(text).splitlines()
    mecab_result = mecab_result[:-1] #最後の1行は不要な行なので除く
    break_pos = ['名詞','動詞','接頭詞','副詞','感動詞','形容詞','形容動詞','連体詞'] #文節の切れ目を検出するための品詞リスト
    wakachi = [''] #分かち書きのリスト
    prev_pos_detail = []
    afterPrepos = False #接頭詞の直後かどうかのフラグ
    afterSahenNoun = False #サ変接続名詞の直後かどうかのフラグ
    nextNoBreak = False

    for v in mecab_result:
        if '\t' not in v: continue
        surface = v.split('\t')[0] #表層系
        pos = v.split('\t')[1].split(',') #品詞など
        pos_detail = ','.join(pos[1:4]) #品詞細分類（各要素の内部がさらに'/'で区切られていることがあるので、','でjoinして、inで判定する)

        #この単語が文節の切れ目とならないかどうかの判定
        noBreak = False
        if nextNoBreak:
          noBreak = True

        nextNoBreak = False
        noBreak = noBreak or (pos[0] not in break_pos)
        noBreak = noBreak or ('数' in prev_pos_detail and '数' in pos_detail) # 数が連続する場合は文節の切れ目としない
        noBreak = noBreak or '接尾' in pos_detail
        noBreak = noBreak or (pos[0]=='動詞' and 'サ変接続' in pos_detail)
        noBreak = noBreak or '非自立' in pos_detail #非自立な名詞、動詞を文節の切れ目としたい場合はこの行をコメントアウトする
        noBreak = noBreak or afterPrepos
        noBreak = noBreak or (afterSahenNoun and pos[0]=='動詞' and pos[4]=='サ変・スル')

        if surface.endswith('-'):
          noBreak = True
          nextNoBreak = True

        if noBreak == False:
            wakachi.append("") # 要素を増やす（つまり次の文節に行く）

        if not nextNoBreak: # '-' 以外は加える
          wakachi[-1] += surface # 要素は増やさず最後のやつに加える（まだ文節であるとして繋げる）

        afterPrepos = pos[0]=='接頭詞'
        afterSahenNoun = 'サ変接続' in pos_detail
        prev_pos_detail = pos_detail

    if wakachi[0] == '': wakachi = wakachi[1:] #最初が空文字のとき削除する
    return wakachi

# BERT

In [None]:
!pip install -U accelerate
!pip install -U transformers
!pip install fugashi ipadic
!pip install datasets unidic-lite

In [None]:
from transformers import BertJapaneseTokenizer, BertForMaskedLM, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset

In [None]:
# モデルとトークナイザの設定
tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-v3')
model = BertForMaskedLM.from_pretrained('cl-tohoku/bert-base-japanese-v3')
model = model.to('cuda:0')

# トークンIDの設定
tokenizer.add_tokens('[NONE]') # 特殊トークン [NONE] をトークナイザーに追加
model.resize_token_embeddings(len(tokenizer)) # モデルにも新しいトークンを追加
none_token_id = tokenizer.convert_tokens_to_ids('[NONE]') # トークンIDを取得
comma_id = tokenizer.convert_tokens_to_ids('、')

# 各種関数

In [None]:
def get_texts(data_path):
  texts = []
  with open(data_path, encoding='cp932') as file:
    for line in file:
      if 511 < len(line):
        while 511 < len(line):
          first_256_chars = line[:256] # 先頭から256文字を取得
          last_period_index = first_256_chars.rfind("。") # 取得した部分から最後の "。" までの位置を検索
          # 最後の "。" までの部分を別の変数に代入
          if last_period_index != -1:
            extracted_part = first_256_chars[:last_period_index + 1]
            texts.append(extracted_part)
            line = line[len(extracted_part):] # もとのtextから抽出した部分を除いた部分を更新
          else:
            print("先頭から256文字までに 。 がない")
            break

      line = line.replace("\n", "")
      sentence_list = line.split("。") # この時点で、"。" はなくなる
      sentences = []
      for s in sentence_list:
        if len(s) > 0: sentences.append(s + "。") # "。"　を復元する
      if 0 < len(sentences): texts += sentences

  return texts

In [None]:
# 入力：読点を除去したテキスト / 出力：文節境界に [MASK] を挿入している
def insert_masks_between_bunsetsu(text):
  bunsetsus = bunsetsu_wakachi(text)
  masked_text = ''
  for bunsetsu in bunsetsus:
    masked_text += bunsetsu
    if not bunsetsu.endswith("、"):
      masked_text += '[MASK]'
  masked_text = masked_text[:-6] # 末尾の余分な '[MASK]' を削除
  return masked_text.strip()

In [None]:
def get_is_comma(original_text, masked_text):
    result = ""
    i = j = 0
    while i < len(original_text) and j < len(masked_text):
      if masked_text[j] == '[':
        if original_text[i] == '、':
          return True
        else:
          return False
      elif original_text[i] == '、': # 文字列 A に読点があり、文字列 B に読点がない場合
        i += 1
      else: # 両方の文字が一致する場合
        i += 1
        j += 1

    return False

def get_masked_texts_and_is_commas(original_text):
  removed_text = original_text.replace("、", "")
  masked_text = insert_masks_between_bunsetsu(removed_text)

  masked_texts = []
  is_commas = []

  while "[MASK]" in masked_text:
    # target_masked_text は、masked_text の最初の [MASK] だけを残したもの
    first_mask_index = masked_text.find("[MASK]")
    target_masked_text = masked_text[:first_mask_index + 6] + masked_text[first_mask_index + 6:].replace("[MASK]", "")
    is_comma = get_is_comma(original_text, target_masked_text)

    masked_texts.append(target_masked_text) # 追加
    is_commas.append(is_comma) # 追加
    masked_text = masked_text.replace("[MASK]", "", 1) # 最初の [MASK] を消す（その [MASK] は完了）

  return [masked_texts, is_commas]

# ファインチューニング

In [None]:
class JapaneseMLMDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}
        label_ids = [-100] * len(item["input_ids"])
        mask_idx = torch.where(item["input_ids"] == tokenizer.mask_token_id)[0]
        label_ids[mask_idx] = comma_id if self.labels[idx] else none_token_id
        item['labels'] = torch.tensor(label_ids)
        return item

    def __len__(self):
        return len(self.labels)


def finetune(model, texts):
  masked_texts = []
  labels = []
  for i in tqdm(range(len(texts))):
    original_text = texts[i - 1]
    masked_texts_and_is_commas = get_masked_texts_and_is_commas(original_text)
    if len(masked_texts_and_is_commas) == 2:
      masked_texts += masked_texts_and_is_commas[0]
      labels += masked_texts_and_is_commas[1]

  # データセットのトークナイズ
  encodings = tokenizer(masked_texts, padding=True, truncation=True, return_tensors="pt")
  dataset = JapaneseMLMDataset(encodings, labels)

  # トレーニングの設定
  training_args = TrainingArguments(
      output_dir='./results',
      num_train_epochs=3,
      per_device_train_batch_size=16,
      warmup_steps=500,
      weight_decay=0.01,
      logging_dir='./logs',
  )

  # トレーナーの初期化
  trainer = Trainer(
      model=model,
      args=training_args,
      train_dataset=dataset,
  )

  # ファインチューニングの実行
  trainer.train()
  return model

# 読点挿入

In [None]:
def insert_commas(model, texts):
  masked_text = insert_masks_between_bunsetsu(text)

  prev_token = ""
  while "[MASK]" in masked_text:
    # target_masked_text は、masked_text の最初の [MASK] だけを残したもの
    first_mask_index = masked_text.find("[MASK]")
    target_masked_text = masked_text[:first_mask_index + 6] + masked_text[first_mask_index + 6:].replace("[MASK]", "")

    input_ids = tokenizer.encode(target_masked_text, return_tensors='pt')   # トークナイズしてテンソルに変換する
    input_ids = input_ids.to('cuda:0')  # GPU
    with torch.no_grad():
      output = model(input_ids) # モデルに入力して予測する
    predictions = output[0][0] # 予測結果をトークンに戻す

    input_tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

    for i, prediction in enumerate(predictions): # 各トークンに対するインデックスと予測
      if input_ids[0][i] == tokenizer.mask_token_id: # [MASK] トークンの場所以外に興味なし
        score = 1
        comma_value = prediction[comma_id]
        none_token_value = prediction[none_token_id]

        if comma_value > none_token_value
          masked_text = masked_text.replace("[MASK]", "、", 1) # 最初の [MASK] を置き換える
        else:
          masked_text = masked_text.replace("[MASK]", "", 1) # 最初の [MASK] を置き換える

      prev_token = input_tokens[i]

  punctuated_text = masked_text
  punctuated_text = re.sub("#|\[CLS]|\[SEP]", "", punctuated_text)
  return punctuated_text

# 挿入結果の評価

In [None]:
# テキストに含まれる読点のインデックスのリストを返す
def get_comma_indexes(text):
  comma_indexes = []
  for i in range(len(text)):
      if text[i] == "、":
          comma_indexes.append(i)
  return comma_indexes

def calculate_result(original_text, output_text):
  # 読点のインデックスのリストを取得
  original_indexes = get_comma_indexes(original_text)
  output_indexes = get_comma_indexes(output_text)
  saigen = 0 # 再現できている読点の数 （saigen / 挿入した数 で 適合率 も出せる）
  num_original_indexes = len(original_indexes)
  num_output_indexes = len(output_indexes)

  # 原文または出力文のどちらかの読点数がゼロなら saigen == 0 なので終わる
  if num_original_indexes == 0 or num_output_indexes == 0:
    return [original_text, output_text, num_original_indexes, num_output_indexes, saigen]

  # 再現数のカウント
  output_i = 0
  index_diff = 0

  for i in range(num_original_indexes):
    original_index = original_indexes[i]+index_diff
    output_index = output_indexes[output_i]

    if original_index == output_index:
      # output_index で再現できている場合
      saigen += 1
      if output_i+1 < num_output_indexes:
        output_i += 1
      else:
        break

    elif original_index > output_index:
      # 原文の読点までに、余分に挿入した読点がある場合
      while original_indexes[i]+index_diff > output_indexes[output_i]:
        if output_i+1 < num_output_indexes:
          output_i += 1
          index_diff += 1
        else:
          break
      if original_indexes[i]+index_diff == output_indexes[output_i]:
        # output_index で再現できている場合
        saigen += 1
        if output_i+1 < num_output_indexes:
          output_i += 1
        else:
          break
      else:
        # i番目の読点は再現できていないことが確定
        index_diff -= 1
        continue

    else:
      # i番目の読点は再現できていないことが確定
      index_diff -= 1
      continue

  return [original_text, output_text, num_original_indexes, num_output_indexes, saigen]

def print_result(results):
  num_all_saigen = 0
  num_all_original_commas = 0
  num_all_output_commas = 0

  for result in results:
    # print("原文： " + result[0])
    # print("出力： " + result[1])
    num_saigen = result[4]
    num_original_commas = result[2]
    num_output_commas = result[3]

    # 統計に追加
    num_all_saigen += num_saigen
    num_all_original_commas += num_original_commas
    num_all_output_commas += num_output_commas
    # print("------------------------")

    # 指標を計算
    precision = 0
    if num_all_output_commas > 0:
      precision = num_all_saigen / num_all_output_commas # 適合率
    recall = 0
    if num_all_original_commas > 0:
      recall = num_all_saigen / num_all_original_commas # 再現率
    f_value = 0
    if (precision + recall) > 0:
      f_value = 2 * precision * recall / (precision + recall) # F値

  if num_all_original_commas != 0 and num_all_output_commas != 0:
    print(f"再現率: {recall: .3f}, 適合率: {precision: .3f}, F値： {f_value: .3f}")
  else:
    print(f"再現率: 0, 適合率: 0, F値： {f_value: .3f}")

# テスト

In [None]:
train_data_path = '/content/drive/My Drive/Colab Notebooks/TextData/TrainData.txt'
test_data_path = '/content/drive/My Drive/Colab Notebooks/TextData/TestData.txt'

# 読点位置の学習
texts = get_texts(train_data_path)
finetuned_model = finetune(model, texts)

# 読点挿入
texts = get_texts(test_data_path)
for i in tqdm(range(len(texts))):
  text = texts[i - 1]
  removed_text = re.sub("、", "", text)
  output_text = ""
  output_text = insert_commas(finetuned_model, texts)
  results.append(calculate_result(text, output_text))

print_result(results)