# Linguistic measure collection

著者の態度や心理的傾向との関係が示唆されている、著者検出のための文体測定法を集めたライブラリです。
本ライブラリは、Asaishi(2017)で整理された日本語テキストメトリクスをベースに、12種類のスタイロメトリクスを算出することができる。

対応する自然言語

- 日本語
- 英語

## インストール

> NOTE: `pip` によるインストール可能なパッケージは、現在開発中

### 要件

- Python 3.6 以上
  - f-string (PEP 536):
  - Type Hints (PEP 484):

### 依存関係

`pipenv` を使用している場合は、`pipenv install` でこのライブラリの仮想環境が作成されます。
それ以外の場合は、`pip install -r requirements.txt` で十分です。

また，本ライブラリは，日本語処理のために以下の外部コマンドにも依存しています．

- MeCab (日本語テキスト処理用):
    - [mecab-ipadic-neologd](https://github.com/neologd/mecab-ipadic-neologd) (日本語のNERを向上させる):
- CaboCha (`analyse_parseddoc.py` 用):

### リソース

さらに、日本語メトリクスの場合は、以下の言語資源が必要です。

- [AWD-J EX](http://sociocom.jp/~data/2019-AWD-J/)をファイル名`AWD-J_EX.txt`とする。
- JIWCをファイル名 `2017-11-JIWC.csv`として
- 日本語のストップワードをファイル名 `stopwords_jp.txt` として保存

以上を`data/`フォルダに入れます。

> 今後はオプションにしていきます。
> また、リソースの置き場所も設定できるようにする予定です。

## 使い方

**重要：テキストの正規化はユーザーの責任です**。

### `measure_lang.py`

実行することで、テキストファイルからテーブルデータを生成することができます。

```
python measure_lang.py [csv/excel ファイルパス] [ターゲットカラム名]
```

`your_data.csv` を入力すると、入力ファイルと同じ場所に `your_data.measure.csv` が作成されます。

適切な文数を計算するために、入力テキストは1行に1文のような形式にすることをお勧めします。

また、以下のようにインポートして各メジャーを使用することもできます。

```python
import measure_lang as ml

def your_fancy_func(string):
    # ...
    res = ml.calc_ttrs(string)
    # ...
```

すべての関数は文字列を読みます（改行された長い文章でもOKです）。
文字列のリストに適用したい場合には、リストの上で反復する必要があります。

各メジャーの詳細な使用方法は、そのメジャーのdocstring（ワークインプログレス）に記載されています。

### `analyse_parseddoc.py`

(ここのドキュメントは作業中です)

準備します。
- 1行に1つの文、空白行なし
- ドキュメントは1つの空白行で分割されます

例を挙げると
```python
import re
import pandas as pd

import split_sentence  # find a great library to split sentences

col = "column_name"

with open('formatted_text.txt', 'w') as f:
    f.writelines(
        [
            re.sub("^$", "", re.sub("\n+", "\n", re.sub("\s", " ", t))) + "\n\n"
            for t in df[col].to_list()
        ]
    )
```
そして，`sentence-splitter formatted_text.txt > formatted_text-sentsplit.txt ` などと実行すると，`sentence-splitter` が入力されたテキストを，空白行を保持したまま一行一文の形式に分割します（良い本物のツールを見つけなければなりませんが，英語では `nltk` が提供しています）．

実行してみてください。
```shell
cabocha -f1 your_text.txt > your_text.cabocha.txt
python analyse_parseddoc.py your_text.cabocha.txt [your_csv.measured.csv]
```

第2引数に `measure_lang.py` で生成された測定済みの csv ファイルを追加すると、`analyse_parseddoc.py` は結果を列ごとに連結します (GNU `paste(1)` のように、ただし csv 内のエスケープされた複数行のテキストを考慮します)。


## Measures

```python
import measure_lang as ml
import analyse_parseddoc as ap
```


- **Percentages of character types** (`ml.count_charcat`):  
テキスト中の文字に対する、ひらがな、カタカナ、漢字のそれぞれの文字種の割合を表します。

In [None]:
def count_charcat(text: str) -> np.ndarray:
    text = re.sub(r"\s", " ", text)
    c = Counter([ud.name(char).split()[0] for char in text])
    counts = np.array([c["HIRAGANA"], c["KATAKANA"], c["CJK"]])
    return np.divide(counts, len(text))

- **Type Token Ratio (TTR)** (`ml.calc_ttrs`):  
テキスト中の総単語数に対する異種単語の比率。

In [None]:
def calc_ttrs(tokens: List[Tuple[str, List[str]]]) -> np.ndarray:
    cnt = Counter([token[1][6] for token in tokens])
    Vn = len(cnt)
    logVn = np.log(Vn)
    N = np.sum(list(cnt.values()))
    logN = np.log(N)
    # TODO: implement frequency-wise TTR variants
    return np.array(
        [
            np.divide(Vn, N),  # original plain TTR: not robust to the length
            np.divide(Vn, np.sqrt(N)),  # Guiraud's R
            np.divide(logVn, logN),  # Herdan's C_H
            np.divide(logVn, np.log(logN)),  # Rubet's k
            np.divide((logN - logVn), (logN ** 2)),  # Maas's a^2
            np.divide((1 - (Vn ** 2)), ((Vn ** 2) * logN)),  # Tuldava's LN
            np.float_power(N, np.float_power(Vn, 0.172)),  # Brunet's W
            np.divide((logN ** 2), (logN - logVn)),  # Dugast's U
        ]
    )


- **Percentage of content words** (`ml.measure_pos`):  
テキストの総単語数に対する内容語(名詞、動詞、形容詞、副詞)の割合です。

- **Modifying words and Verb Ratio (MVR)** (`ml.measure_pos`):  
文章中の単語に対して、動詞と形容詞・副詞・接続詞の割合を表したもの。作者推定の指標の一つとして用いられている。

- **Percentage of proper nouns** (`ml.measure_pos`):  
テキスト中の全単語に対する固有名詞(名前付き実体)の割合です。

In [None]:
def measure_pos(text: str) -> np.ndarray:
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NM.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    # print(tokens)

    # VERB RELATED MEASURES
    verbs = [token for token in tokens if token[1][0] == "動詞"]
    # TODO: 助動詞との連語も含める？
    # lens_verb = [len(verb) for verb in verbs]

    # CONTENT WORDS RATIO
    nouns = [token for token in tokens if token[1][0] == "名詞"]
    adjcs = [token for token in tokens if token[1][0] == "形容詞"]
    content_words = verbs + nouns + adjcs
    cwr_simple = np.divide(len(content_words), len(tokens))
    cwr_advance = np.divide(
        len(
            [
                token
                for token in content_words
                if (token[1][1] not in STOPPOS_JP) and (token[0] not in STOPWORDS_JP)
            ]
        ),
        len(tokens),
    )

    # NOTE: skip FUNCTION WORDS RATIO since it's equiv to 1 - CWR

    # Modifying words and verb ratio (MVR)
    advbs = [token for token in tokens if token[1][0] == "副詞"]
    padjs = [token for token in tokens if token[1][0] == "連体詞"]
    mvr = np.divide(len(adjcs + advbs + padjs), len(verbs))

    # NER
    ners = [token for token in tokens if token[1][1] == "固有名詞"]
    nerr = np.divide(len(ners), len(tokens))

    # TTR
    ttrs = calc_ttrs(tokens)

    return np.concatenate(
        (
            np.array(
                [
                    # np.mean(lens_verb),
                    # np.std(lens_verb),
                    # np.min(lens_verb),
                    # np.quantile(lens_verb, 0.25),
                    # np.median(lens_verb),
                    # np.quantile(lens_verb, 0.75),
                    # np.max(lens_verb),
                    cwr_simple,
                    cwr_advance,
                    mvr,
                    nerr,
                ]
            ),
            ttrs,
        )
    )


- **Word abstraction** (`ml.measure_abst`):  
テキストに含まれる単語の抽象度。具体的には、最も抽象度の高い単語の最大値と、抽象度の高い上位5つの単語の平均値を用いました。抽象度は、日本語の単語抽象度辞書である [AWD-J EX](http://sociocom.jp/~data/2019-AWD-J/)から取得した。

In [None]:
def measure_abst(text: str) -> np.ndarray:
    # AWD uses neologd
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NMN.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    # print(tokens)
    scores = [
        float(AWD.get(token[0] if token[1][6] == "*" else token[1][6], 0))
        for token in tokens
    ]
    # print(scores)

    # top k=5 mean
    return np.array([np.mean(sorted(scores, reverse=True)[:5]), max(scores)])


- **Ratios of emotional words** (`ml.calc_jiwc`):  
「悲しみ」、「不安」、「怒り」、「嫌悪」、「信頼」、「驚き」、「喜び」の7つの感情に関連する単語の、テキスト中の全単語に対する比率を表しています。7つの値は、確率の性質を満たすように変換されています（各値は0から1の間にあり、すべての値の合計は1になります）。感情との関連度は、日本語の感情語辞典JIWCに基づいて決定した。

In [None]:
def calc_jiwc(text: str) -> np.ndarray:
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NM.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    jiwc_words = set(
        [token[0] if token[1][6] == "*" else token[1][6] for token in tokens]
    ) & set(DF_jiwc.index)
    jiwc_vals = DF_jiwc.loc[jiwc_words].sum()
    return np.divide(jiwc_vals, jiwc_vals.sum())
    # Sad Anx Anger Hate Trustful S Happy


- **Number of sentences** (`ml.measure_sents`):  
テキストを構成する文の総数。

- **Length of sentences** (`ml.measure_sents`):  
文章を構成する各文の文字数の記述統計（平均値、標準偏差、四分位値、最小値、最大値）。  
特に、平均文量は、書き手の創造的な態度や性格との関連が示唆されている。

In [None]:
def measure_sents(text: str) -> np.ndarray:
    """Show descriptive stats of sentence length.

    input text should be one sentence per line.
    """
    # sents = DELIM_SENT.split(text)
    if "\r" in text:
        sents = text.split("\r\n")
    else:
        sents = text.split("\n")
    lens_char = np.array([len(sent) for sent in sents])
    return np.array(
        [
            len(lens_char),
            np.mean(lens_char),
            np.std(lens_char, ddof=1),
            np.min(lens_char),
            np.quantile(lens_char, 0.25),
            np.median(lens_char),
            np.quantile(lens_char, 0.75),
            np.max(lens_char),
        ]
    )

- **Percentage of conversational sentences** (`ml.count_conversations`):  
テキストに含まれる会話文の総数の割合。

In [None]:
def count_conversations(text: str) -> float:
    # 会話文の割合
    text = re.sub(r"\s", " ", text)
    singles = re.findall(r"「.+?」", text)
    doubles = re.findall(r"『.+?』", text)
    lens_single = [len(single) for single in singles]
    lens_double = [len(double) for double in doubles]
    return np.divide(sum(lens_single) + sum(lens_double), len(text))

- **Depth of syntax tree** (`ap.analyse_dep`):  
テキストの各文の依存関係ツリーの深さを算出した記述統計。


- **Mean of the number of chunks per sentence** (`ap.analyse_dep`):  
文章ごとのチャンク数の平均値について計算された記述統計量。

- **Mean of the words per chunk** (`ap.analyse_dep`):  
チャンクあたりの単語数の平均値を計算した記述統計量。


In [None]:
def analyse_dep(cfname: str, fname: str = None) -> None:
    """Apply dependency tree analyses and concat the original data"""
    with open(cfname, "r") as f:
        chunk_sents = read_chunks(f)

    docs = []
    doc = []
    for chunk_sent in chunk_sents:
        if chunk_sent:
            doc.append(chunk_sent)
        else:
            docs.append(doc)
            doc = []

    sr_depths = []
    sr_leaves = []
    sr_chunklen = []
    for doc in docs:
        depths = [count_sent_depth(sentchunk) for sentchunk in doc]
        sr_depths.append(pd.Series(depths).describe().to_frame().T)

        n_leaves = [len(sentchunk) for sentchunk in doc]
        sr_leaves.append(pd.Series(n_leaves).describe().to_frame().T)

        chunklen = [len(chunk.morphs) for sentchunk in doc for chunk in sentchunk]
        sr_chunklen.append(pd.Series(chunklen).describe().to_frame().T)

    # 構文木の深さ
    df_sdep = (
        pd.concat(sr_depths)
        .reset_index(drop=True)
        .rename(columns=lambda x: f"sdep_{x}")
    )
    # 構文木の葉の数（文節数）
    df_nleaf = (
        pd.concat(sr_leaves)
        .reset_index(drop=True)
        .rename(columns=lambda x: f"nleaf_{x}")
    )
    # 文節の長さ（形態素数）
    df_chklen = (
        pd.concat(sr_chunklen)
        .reset_index(drop=True)
        .rename(columns=lambda x: f"chklen_{x}")
    )

    if fname:
        df = pd.read_csv(fname)
        assert len(df) == len(df_sdep)
        pd.concat([df, df_sdep, df_nleaf, df_chklen], axis=1).to_csv(
            f"{fname}.parsed.csv", index=False
        )
    pd.concat([df_sdep, df_nleaf, df_chklen], axis=1).to_csv(
        f"{cfname}.parsed.csv", index=False
    )

### 要約表

| Stylometric                               | Sub-measures (value format)                                                 |
| :---------------------------------------- | :-------------------------------------------------------------------------- |
| Percentages of character types            | Hiragana, katakana, and kanji (Chinese characters) (%)                      |
| Type Token Ration (TTR)                   | (%)                                                                         |
| Percentages of content words              | (%)                                                                         |
| Modifying words and Verb Ratio (MVR)      | (%)                                                                         |
| Percentage of proper nouns                | (%)                                                                         |
| Word abstraction                          | The maximum, and the average of the top five abstract words (real number)   |
| Ratios of emotional words                 | sadness, anxiety, anger, disgust, trust, surprise, and happiness (%)        |
| Number of sentences                       | (integer)                                                                   |
| Length of sentences                       | mean, standard deviation, interquartile, minimum, and maximum (real number) |
| Percentage of conversational sentences    | (%)                                                                         |
| Depth of syntax tree                     | mean, standard deviation, interquartile, minimum, and maximum (real number) |
| Mean of the number of chunks per sentence | mean, standard deviation, interquartile, minimum, and maximum (real number) |
| Mean of the words per chunk               | mean, standard deviation, interquartile, minimum, and maximum (real number) |

---
## リファレンス

- [Asaishi, 2007]: 浅石卓真. (2017). テキストの特徴を計量する指標の概観. 日本図書館情報学会誌, 63(3), 159–169. https://doi.org/10.20651/jslis.63.3_159

In [None]:
from collections import Counter
import os
import re
from typing import List, Tuple, Dict, Union
import unicodedata as ud

import fire
from natto import MeCab
import pandas as pd
import numpy as np

Num = Union[int, float]

# TODO: 効率化
# 前処理としてmecabのトークン化を行い、ライブラリでの利用を可能にする

# TODO: これらのリソースをユーザーが割り当てられるようにする
# TODO: これらのリソースをオプションにする
# DELIM_SENT = re.compile(r"(?:[。？！\?\!]+|[。？！\?\!]?[」』])")  # FIXME: 文じゃないカギカッコが取れちゃう

#この辺を引数にする Fire絡み　Apply file
NM = MeCab()  # NOTE: assume IPADIC
NMN = MeCab("-d /usr/local/lib/mecab/dic/mecab-ipadic-neologd")

with open(os.path.join(os.path.dirname(__file__), "./data/stopwords_jp.txt"), "r") as f:
    STOPWORDS_JP = [line.strip() for line in f]
STOPPOS_JP = ["形容動詞語幹", "副詞可能", "代名詞", "ナイ形容詞語幹", "特殊", "数", "接尾", "非自立"]

with open(os.path.expanduser("./data/AWD-J_EX.txt"), "r") as f:
    rows = [line.strip().split("\t") for line in f]
    AWD = {word: score for word, score, _, _ in rows}
DF_jiwc = pd.read_csv(os.path.expanduser("./data/2017-11-JIWC.csv"), index_col=1).drop(
    columns="Unnamed: 0"
)


def measure_sents(text: str) -> np.ndarray:
    """Show descriptive stats of sentence length.

    input text should be one sentence per line.
    """
    # sents = DELIM_SENT.split(text)
    if "\r" in text:
        sents = text.split("\r\n")
    else:
        sents = text.split("\n")
    lens_char = np.array([len(sent) for sent in sents])
    return np.array(
        [
            len(lens_char),
            np.mean(lens_char),
            np.std(lens_char, ddof=1),
            np.min(lens_char),
            np.quantile(lens_char, 0.25),
            np.median(lens_char),
            np.quantile(lens_char, 0.75),
            np.max(lens_char),
        ]
    )


def count_conversations(text: str) -> float:
    # 会話文の割合
    text = re.sub(r"\s", " ", text)
    singles = re.findall(r"「.+?」", text)
    doubles = re.findall(r"『.+?』", text)
    lens_single = [len(single) for single in singles]
    lens_double = [len(double) for double in doubles]
    return np.divide(sum(lens_single) + sum(lens_double), len(text))


def count_charcat(text: str) -> np.ndarray:
    text = re.sub(r"\s", " ", text)
    c = Counter([ud.name(char).split()[0] for char in text])
    counts = np.array([c["HIRAGANA"], c["KATAKANA"], c["CJK"]])
    return np.divide(counts, len(text))


def measure_pos(text: str) -> np.ndarray:
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NM.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    #この辺
    # print(tokens)

    # VERB RELATED MEASURES
    verbs = [token for token in tokens if token[1][0] == "動詞"]
    # TODO: 助動詞との連語も含める？
    # lens_verb = [len(verb) for verb in verbs]

    # CONTENT WORDS RATIO
    nouns = [token for token in tokens if token[1][0] == "名詞"]
    adjcs = [token for token in tokens if token[1][0] == "形容詞"]
    content_words = verbs + nouns + adjcs
    cwr_simple = np.divide(len(content_words), len(tokens))
    cwr_advance = np.divide(
        len(
            [
                token
                for token in content_words
                if (token[1][1] not in STOPPOS_JP) and (token[0] not in STOPWORDS_JP)
            ]
        ),
        len(tokens),
    )

    # NOTE: skip FUNCTION WORDS RATIO since it's equiv to 1 - CWR

    # Modifying words and verb ratio (MVR)
    advbs = [token for token in tokens if token[1][0] == "副詞"]
    padjs = [token for token in tokens if token[1][0] == "連体詞"]
    mvr = np.divide(len(adjcs + advbs + padjs), len(verbs))

    # NER
    ners = [token for token in tokens if token[1][1] == "固有名詞"]
    nerr = np.divide(len(ners), len(tokens))

    # TTR
    ttrs = calc_ttrs(tokens)

    return np.concatenate(
        (
            np.array(
                [
                    # np.mean(lens_verb),
                    # np.std(lens_verb),
                    # np.min(lens_verb),
                    # np.quantile(lens_verb, 0.25),
                    # np.median(lens_verb),
                    # np.quantile(lens_verb, 0.75),
                    # np.max(lens_verb),
                    cwr_simple,
                    cwr_advance,
                    mvr,
                    nerr,
                ]
            ),
            ttrs,
        )
    )


def measure_abst(text: str) -> np.ndarray:
    # AWD uses neologd
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NMN.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    # print(tokens)
    scores = [
        float(AWD.get(token[0] if token[1][6] == "*" else token[1][6], 0))
        for token in tokens
    ]
    # print(scores)

    # top k=5 mean
    return np.array([np.mean(sorted(scores, reverse=True)[:5]), max(scores)])


def detect_bunmatsu(text: str) -> float:
    if "\r" in text:
        sents = text.split("\r\n")
    else:
        sents = text.split("\n")

    # 体言止め
    taigen = 0
    for sent in sents:
        tokens = [
            (n.surface, n.feature.split(","))
            for n in NM.parse(text, as_nodes=True)
            if not n.is_eos()
        ]
        taigen += 1 if tokens[-2][1] == "名詞" else 0
    ratio_taigen = np.divide(taigen, len(sents))

    # TODO: what else?

    return ratio_taigen


def calc_ttrs(tokens: List[Tuple[str, List[str]]]) -> np.ndarray:
    cnt = Counter([token[1][6] for token in tokens])
    Vn = len(cnt)
    logVn = np.log(Vn)
    N = np.sum(list(cnt.values()))
    logN = np.log(N)
    # TODO: implement frequency-wise TTR variants
    return np.array(
        [
            np.divide(Vn, N),  # original plain TTR: not robust to the length
            np.divide(Vn, np.sqrt(N)),  # Guiraud's R
            np.divide(logVn, logN),  # Herdan's C_H
            np.divide(logVn, np.log(logN)),  # Rubet's k
            np.divide((logN - logVn), (logN ** 2)),  # Maas's a^2
            np.divide((1 - (Vn ** 2)), ((Vn ** 2) * logN)),  # Tuldava's LN
            np.float_power(N, np.float_power(Vn, 0.172)),  # Brunet's W
            np.divide((logN ** 2), (logN - logVn)),  # Dugast's U
        ]
    )


def calc_potentialvocab(text: str) -> float:
    # 荒牧先生の潜在語彙量も
    raise NotImplementedError


def calc_jiwc(text: str) -> np.ndarray:
    tokens = [
        (n.surface, n.feature.split(","))
        for n in NM.parse(text, as_nodes=True)
        if not n.is_eos()
    ]
    jiwc_words = set(
        [token[0] if token[1][6] == "*" else token[1][6] for token in tokens]
    ) & set(DF_jiwc.index)
    jiwc_vals = DF_jiwc.loc[jiwc_words].sum()
    return np.divide(jiwc_vals, jiwc_vals.sum())
    # Sad Anx Anger Hate Trustful S Happy


def apply_all(text: str) -> Dict[str, Num]:
    try:
        all_res = np.concatenate(
            (
                measure_sents(text),
                [count_conversations(text)],
                count_charcat(text),
                measure_pos(text),
                measure_abst(text),
                [detect_bunmatsu(text)],
                calc_jiwc(text),
            )
        )
    except:
        print(text)
        raise
    headers = [
        "num_sent",
        "mean_sent_len",
        "std_sent_len",
        "min_sent_len",
        "q1_sent_len",
        "median_sent_len",
        "q3_sent_len",
        "max_sent_len",
        "num_conv",
        "pct_hiragana",
        "pct_katakana",
        "pct_kanji",
        "cwr_simple",
        "cwr_advance",
        "mvr",
        "pct_ner",
        "ttr_plain",
        "guiraud_r",
        "herdan_ch",
        "rubet_k",
        "maas_a2",
        "tuldava_ln",
        "brunet_w",
        "dugast_u",
        "top5_mean_abst",
        "max_abst",
        "ratio_taigendome",
        "jiwc_sadness",
        "jiwc_anxiety",
        "jiwc_anger",
        "jiwc_hatrid",
        "jiwc_trust",
        "jiwc_surprise",
        "jiwc_happiness",
    ]
    return dict(zip(headers, all_res))


def apply_file(fname, col):
    if fname.endswith(".csv"):
        df = pd.read_csv(fname)
    elif fname.endswith(".xls") or fname.endswith(".xlsx"):
        df = pd.read_excel(fname)
    else:
        raise ValueError("Unsupported input format: please use CSV or Excel data")

    assert col in df.columns, f"{col} is not found in the input data"

    pd.concat(
        [df, df.apply(lambda row: apply_all(row[col]), result_type="expand", axis=1)],
        axis=1,
    ).to_csv(f"{fname}.measured.csv", index=False)
#この辺

if __name__ == "__main__":
    fire.Fire(apply_file)
