# 青空文庫データクレンジングv2

In [1]:
import os
import glob
import shutil
import numpy as np
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt

## 定数定義

In [None]:
import sys

print(type(sys.modules))
moduleList = sys.modules
ENV_COLAB = False

if 'google.colab' in moduleList:
    print("google_colab")
    ENV_COLAB = True
else:
    print("Not google_colab")

In [3]:
if ENV_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')

In [4]:
if ENV_COLAB:
    TARGET_PATH = 'drive/MyDrive/data/target.csv'
else:
    TARGET_PATH = 'data/target.csv'

In [None]:
import pathlib

abs_path = pathlib.Path(os.getcwd())
rel_path = abs_path.relative_to(os.getcwd())
abs_path, rel_path

In [None]:
# CARD_DIR = f'{os.getcwd()}/bunko/cards'
# CARD_DIR
CARD_DIR = f'{rel_path}/bunko/cards'
CARD_DIR

In [9]:
DEBUG = False

## 関数定義

In [10]:
def debug(message, flg=True):
    if flg:
        print(message)

In [11]:
def get_parameters(data):
    url = data['テキストファイルURL'].split('/')
    file_name = url[-1].split('.')[0] + '.txt'
    author_id = url[4]
    return author_id, file_name

In [12]:
def check_directory(data, card_dir):
    url = data['テキストファイルURL'].split('/')
    try:
        target_dir = f"{card_dir}/{'/'.join(url[4:-1])}"
        edit_dir = f"{target_dir}/edit"
        org_dir = f"{target_dir}/org"
        score_dir = f"{target_dir}/score"
        # files directory
        if os.path.isdir(target_dir):
            debug(f'dirs:{target_dir} already exists', DEBUG)
        else:
            debug(f'make dirs:{target_dir}', DEBUG)
            os.makedirs(target_dir)
        # files/edit directory
        if os.path.isdir(edit_dir):
            debug(f'dirs:{edit_dir} already exists', DEBUG)
        else:
            debug(f'make dirs:{edit_dir}', DEBUG)
            os.makedirs(edit_dir)
        # files/org directory
        if os.path.isdir(org_dir):
            debug(f'dirs:{org_dir} already exists', DEBUG)
        else:
            debug(f'make dirs:{org_dir}', DEBUG)
            os.makedirs(org_dir)
        # files/score directory
        if os.path.isdir(score_dir):
            debug(f'dirs:{score_dir} already exists', DEBUG)
        else:
            debug(f'make dirs:{score_dir}', DEBUG)
            os.makedirs(score_dir)

        return target_dir, edit_dir, org_dir, score_dir
    except Exception as e:
        print(e)
        return None

In [13]:
import urllib.request

# zipファイルを保存する
def save_zip(target_file, zip_file):
    try:
        file_path = ''
        if os.path.isfile(zip_file):
            file_path = zip_file
        else:
            file_path = zip_file
            urllib.request.urlretrieve(target_file, zip_file)
    except Exception as e:
        file_path = target_file
        print(e)
    finally:
        return file_path

In [14]:
# クレンジングされたデータを保存する
def save_text(target_file, org_dir, file_name):
    try:
        save_file = f'{org_dir}/{file_name}'
        if os.path.isfile(save_file):
            # Txtファイルの読み込み
            debug(f'read {save_file}', DEBUG)
            df_org = pd.read_csv(save_file)
            pass
        else:
            # Zipファイルの読み込み
            debug(f'read {target_file}', DEBUG)
            df_org = pd.read_csv(target_file, encoding='cp932', names=['text'])
            df_org.to_csv(save_file, index=False)
        return df_org
    except Exception as e:
        print(f'ERROR: {target_file}, {str(e)}')
        return None

In [15]:
def get_text_body(target_df, author_name):
    # 本文の先頭を探す（'---…'区切りの直後から本文が始まる前提）
    head_txt = list(target_df[target_df['text'].str.contains(
        '-------------------------------------------------------')].index)
    # 本文の末尾を探す（'底本：'の直前に本文が終わる前提）
    atx = list(target_df[target_df['text'].str.contains('底本：')].index)
    if head_txt == []:
        # もし'---…'区切りが無い場合は、作家名の直後に本文が始まる前提
        head_txt = list(target_df[target_df['text'].str.contains(author_name)].index)
        head_txt_num = head_txt[0]+1
    else:
        # 2個目の'---…'区切り直後から本文が始まる
        head_txt_num = head_txt[1]+1
    return target_df[head_txt_num:atx[0]]

In [16]:
def split_kuten(target_df, split_kuten=True):
    df = target_df.copy()
    # 句点で分割
    if split_kuten:
        df = target_df.assign(text=target_df['text'].str.split(r'(?<=。)(?=..)')).explode('text')
    return df

In [17]:
def reset_index(target_df):
    df = target_df.copy()
    # インデックスがずれるので振りなおす
    df = df.reset_index().drop(['index'], axis=1)

    # 空白行を削除する（念のため）
    df = df[~(df['text'] == '')]

    # インデックスがずれるので振り直し、文字の長さの列を削除する
    df = df.reset_index().drop(['index'], axis=1)
    return df

In [18]:
def remove_aozora_format(target_df):
    df = target_df.copy()
    # 青空文庫の書式削除
    df = df.replace({'text': {'《.*?》': ''}}, regex=True)
    df = df.replace({'text': {'［.*?］': ''}}, regex=True)
    df = df.replace({'text': {'｜': ''}}, regex=True)
    # 字下げ（行頭の全角スペース）を削除
    df = df.replace({'text': {'　': ''}}, regex=True)
    # 節区切りを削除
    # df = df.replace({'text': {'^.$': ''}}, regex=True) # 1文字
    # df = df.replace({'text': {'^―――.*$': ''}}, regex=True) # 先頭が"―――"
    df = df.replace({'text': {'^＊＊＊.*$': ''}}, regex=True)
    df = df.replace({'text': {'^×××.*$': ''}}, regex=True)
    # # 記号、および記号削除によって残ったカッコを削除
    # df = df.replace({'text': {'―': ''}}, regex=True)
    # df = df.replace({'text': {'…': ''}}, regex=True)
    # df = df.replace({'text': {'※': ''}}, regex=True)
    df = df.replace({'text': {'「」': ''}}, regex=True)
    # 一文字以下で構成されている行を削除 -> しない
    # df['length'] = df['text'].map(lambda x: len(x))
    # df = df[df['length'] > 1]
    return df

In [19]:
def add_type_column(target_df):
    df = target_df.copy()
    df.insert(loc=0, column='type', value='本文')
    df.loc[df['text'].str.contains('字下げ.*見出し*'), 'type'] = '見出し'
    return df

## 前処理

In [None]:
target_list = pd.read_csv(TARGET_PATH)
target_list.shape

In [None]:
print('----- 以下は対象外とする -----')
print(target_list.loc[3256])
target_list.loc[3256, '対象'] = False
target_list.loc[3256, '備考'] = '例外起きる&行数少ない'
print('-----')
print(target_list.loc[3259])
target_list.loc[3259, '対象'] = False
target_list.loc[3259, '備考'] = '例外起きる&行数少ない'

In [None]:
# エラーが起きるもの等は対象をFalseにして対象外とする
target_list[target_list['対象']==False]

In [None]:
target_list.shape

In [None]:
import time

try:
    total = target_list[target_list['対象']==True].shape[0]
    for i, target_data in target_list[target_list['対象']==True].iterrows():
        # 必要な情報の取得
        author_id, file_name = get_parameters(target_data)
        target_dir, edit_dir, org_dir, score_dir = check_directory(target_data, CARD_DIR)
        target_file = target_data['テキストファイルURL']
        zip_file = f"{target_dir}/{target_file.split('/')[-1]}" # add
        author_name = target_data['氏名'].replace(' ', '')
        novel_name = target_data['作品名']
        edit_file = f'{edit_dir}/{file_name}'
        debug(f'--- {str(i).zfill(4)}/{str(total).zfill(4)} {author_id} {author_name} {novel_name} ---', True)
        # データクレンジング実施
        file_path = save_zip(target_file, zip_file)
        target_df = save_text(file_path, org_dir, file_name) # データ取得＆保存
        if target_df is None:
            debug(f'--> skip {novel_name} <--', DEBUG)
        elif os.path.isfile(edit_file):
            debug(f'{novel_name} has been already cleansed', DEBUG)
        else:
            body_df = get_text_body(target_df, author_name) # 本文取得
            body_df2 = split_kuten(body_df) # 句点「。」で分割
            body_df3 = add_type_column(body_df2) # type列追加
            body_df4 = remove_aozora_format(body_df3) # 青空文庫の書式を削除
            body_df5 = reset_index(body_df4) # インデックスをリセット
            # 結果の保存
            body_df5.to_csv(edit_file, index=False)
            target_list.loc[i,'テキストファイルパス'] = edit_file
            target_list.loc[i,'length'] = body_df5.shape[0]
            debug(f'length: {body_df5.shape[0]}', DEBUG)
        time.sleep(1.0)
        if i > 10:
            break
except Exception as e:
    print(target_data)
    print(e)


In [31]:
# リストをCSVに出力する
target_list.to_csv('target_list2.csv')

In [None]:
target_list[target_list['length']>500]

## Step by Step

In [None]:
import pathlib

abs_path = pathlib.Path(os.getcwd())
rel_path = abs_path.relative_to(os.getcwd())
abs_path, rel_path

In [None]:
CARD_DIR = f'{rel_path}/bunko/cards'
CARD_DIR

In [None]:
target_data = target_list.loc[3441]
target_data

In [None]:
author_id, file_name = get_parameters(target_data)
print(f'author_id:{author_id}')
print(f'file_name:{file_name}')

In [None]:
target_dir, edit_dir, org_dir, score_dir = check_directory(target_data, CARD_DIR)

print(f'target_dir:{target_dir}')
print(f'edit_dir:{edit_dir}')
print(f'org_dir:{org_dir}')
print(f'score_dir:{score_dir}')

In [None]:
target_file = target_data['テキストファイルURL']
author_name = target_data['氏名'].replace(' ', '')
zip_file = f"{target_dir}/{target_file.split('/')[-1]}" # add
print(f'target_file: {target_file}')
print(f'author_name: {author_name}')
print(f'zip_file: {zip_file}')

In [None]:
file_path = save_zip(target_file, zip_file)
file_path

In [None]:
# データ取得＆保存
target_df = save_text(file_path, org_dir, file_name)
target_df.head()

In [None]:
# 本文取得
body_df = get_text_body(target_df, author_name)
body_df.head()

In [None]:
# 句点「。」で分割
body_df2 = split_kuten(body_df)
body_df2.head()

In [None]:
# type列追加
body_df3 = add_type_column(body_df2)
body_df3.head()

In [None]:
# 青空文庫の書式を削除
body_df4 = remove_aozora_format(body_df3)
body_df4.head()

In [None]:
# インデックスをリセット
body_df5 = reset_index(body_df4)
body_df5.head()

In [236]:
body_df5.to_csv(f'{edit_dir}/{file_name}', index=False)