# 0. 依赖声明

运行以下代码安装依赖

```shell
conda install ipython ipykernel ipywidgets pyarrow pandas -y
pip install hanlp[full] cachier
```

必须用conda安装兼容的pyarrow和pandas的环境依赖  

然后pip安装hanlp以及持久缓存  

安装nbstrpout工具，在每次提交时删除此文件中的敏感数据
```
pip install nbstripout
nbstripout --install
nbstripout --status
```

In [None]:
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Any
from pprint import pprint as print

import pandas as pd
from decimal import Decimal
import hanlp
from hanlp.components.mtl.tasks.ner.tag_ner import TaggingNamedEntityRecognition
from hanlp.components.tokenizers.transformer import TransformerTaggingTokenizer
from tqdm.notebook import tqdm
from cachier import cachier

cachier = cachier(cache_dir='.py_cache')

tqdm.pandas()

#### 0.1. NLP配置
 ~~大炮打蚊子的第一步是把大炮架起来~~

这里使用HanLP作为辅助处理工具，文档可以参考：[Github文档](https://github.com/hankcs/HanLP/tree/doc-zh)   

In [None]:
# 加载世界最大中文语料库
_nlp_model = hanlp.load(hanlp.pretrained.mtl.CLOSE_TOK_POS_NER_SRL_DEP_SDP_CON_ERNIE_GRAM_ZH)  # type: ignore
# ner: TaggingNamedEntityRecognition = _nlp_model['ner/msra']
# ner.dict_whitelist = {'全账户': ""}

# 配置精分固定短语
_nlp_model['tok/fine'].dict_force = {"全账户"}


# ner的意思是`命名实体识别`，可有效识别出人名、组织名等
@cachier
def ner(value) -> list[tuple[str, str, int, int]]:
    result = _nlp_model(
        value,
        tasks="ner/msra",
    )["ner/msra"]
    return result


def ner_first_name(value: str) -> Optional[str]:
    for content, type, _start, _end in ner(value):
        if type == "PERSON":
            return content


def ner_path_rev(path: str) -> Optional[str]:
    path = str(path)
    # 将value按照分割符号切割，避免错误识别
    path_part = path.split(os.path.sep)

    # 从后往前迭代，更后的文件夹更有可能是文件的名字
    path_part.reverse()
    for i in path_part:
        result = ner_first_name(i)
        if result is None:
            pass
        else:
            return result

In [None]:
result = _nlp_model(
    ["全账户分析自动化脚本，作者韦若枫，mailto://i@ruofengx.cn"],
    tasks="ner/msra",
)

result.pretty_print()

# 1. 读取数据

读取由rust程序生成的parquet文件

**数据列说明**

- **_config_name**  
  
  `str`
  
  前期处理的配置名称

- **_datetime**  
  
  `datetime.datetime`
  
  转账的日期，实际精度到秒。
  
  存在部分银行整点执行的自动划转，实际精度为日，精度自动扩大至当日零时零分零秒。  

- **_amount**  

  `decimal.Decimal`
  
  保留两位小数，整数部分精度为36位

- **_from_id**  

  `str`
  
  转账发出人在该平台的唯一ID

  应为非空字符串，但实际上存在空字符串的可能，需进一步研究TODO

- **_from_bank_id**  

  `Optional[str]`
  
  转账发出人的银行卡号，如账单未定义则为None

  存在部分平台账单在此列数据填入非银行卡号，需要正则清洗
  为非空字符串，但实际上存在空字符串的可能，建议drop

  也有的银行账单会填入存折号，清洗时注意

- **_from_name**  

  `Optional[str]`
  
  转账发出人的名字，也可以是商家名字

- **_to_id**

  `str`
  
  转账接收在该平台的唯一ID

  应为非空字符串，但实际上存在空字符串的可能，需进一步研究TODO

- **_to_bank_id**

  `Optional[str]`类型，转账接收人的银行卡号，如账单未定义则为None

  存在部分平台账单在此列数据填入非银行卡号，需要正则清洗
  为非空字符串，但实际上存在空字符串的可能，建议drop

  也有的银行账单会填入存折号，清洗时注意


- **_to_name**

  `Optional[str]`类型，转账发出人的名字，也可以是商家名字

- **_file_path**

  `str`

  文件路径，可通过NLP分析出账单主体，并和**_from_name**或**_to_name**对比，确认查询主体


In [None]:
df = pd.read_parquet("./tmp/batch.parquet")
print(df.columns.tolist())
df.shape

## 1.1. 检查数据


### 1.1.1. 检查双侧ID均空的行

输出结果应为0

In [None]:
df[df['_from_id'].isnull() & df['_to_id'].isnull()].__len__()

# 2. 定义数据模型(暂时无用)


In [None]:
# 目前没啥用
@dataclass
class Entity:
    id: str
    bank_id: Optional[str]
    name: Optional[str]


@dataclass
class Record:
    config: str
    time: datetime
    amount: Decimal
    from_: Entity
    to: Entity
    path: str

    def __hash__(self) -> int:
        return (
            self.time.__hash__()
            + self.amount.__hash__()
            + self.from_.id.__hash__()
            + self.to.id.__hash__()
        )


def row_to_record(row: pd.Series) -> Record:
    from_entity = Entity(
        id=row["_from_id"], bank_id=row["_from_bank_id"], name=row["_from_name"]  # type: ignore
    )
    to_entity = Entity(
        id=row["_to_id"], bank_id=row["_to_bank_id"], name=row["_to_name"]  # type: ignore
    )
    return Record(
        config=row["_config_name"],  # type: ignore
        time=row["_datetime"],  # type: ignore
        amount=row["_amount"],  # type: ignore
        from_=from_entity,
        to=to_entity,
        path=row["_file_path"],  # type: ignore
    )

计算数据模型，并存储到`ent`列

In [None]:
# df["ent"] = df.progress_apply(row_to_record, axis=1)  # type:ignore

# 3. 数据清洗

1. 将该文件查询对象（查询的银行卡、财付通账号等等，总是_from_id或_to_id中的一个）关联至该主体，对于一个文件，查询对象总是至少占一半。这个规律不一定适用步骤2。
2. 确定一个文件的主体姓名，首先通过NLP分析文件路径_file_path确定人名，其次可以通过出现频率大于一半的 _*_name 字段值即为主体人名


## 3.1. 确定文件主体


### 3.1.1. 同文件级别最常出现的name

`file_common_name`

#### 3.1.1.1. 获取全部names

的from_names和to_names，并合并为names表

In [None]:
from_names = df[["_file_path", "_from_name"]].copy()
to_names = df[["_file_path", "_to_name"]].copy()

from_names.rename(columns={"_from_name": "file_common_name"}, inplace=True)
to_names.rename(columns={"_to_name": "file_common_name"}, inplace=True)

names = pd.concat([from_names, to_names])
names.set_index("_file_path", inplace=True)

names

#### 3.1.1.2. 统计name频数

同文件的name出现频数

In [None]:
_names_count = names.groupby(["_file_path", "file_common_name"]).value_counts().reset_index()
_names_count

#### 3.1.1.3. 递归过滤最常出现的name

迭代file_common_name，利用ner函数排除机构组织名字，直至频数最大的file_common_name的为人名

In [None]:
# 递归验证频率最高的name是人名，不是人名则从names_count删除，继续下一个
# 这么做可以避免全量计算ner
i = 0
while True:
    i += 1
    most_name_for_file = _names_count[["_file_path", "file_common_name"]].loc[
        _names_count.groupby("_file_path")["count"].idxmax()
    ]
    most_name_for_file["ner_name"] = most_name_for_file[
        "file_common_name"
    ].progress_apply(ner_first_name)
    bad_names = most_name_for_file[most_name_for_file["ner_name"].isnull()][
        "file_common_name"
    ].to_list()
    if bad_names.__len__() > 0:
        _names_count = _names_count[
            _names_count["file_common_name"].apply(lambda x: x not in bad_names)
        ]
        print(f"第{i}次迭代{_names_count.__len__()}")
        print(_names_count.shape)
    else:
        break

#### 3.1.1.4. 附加到df表中

In [None]:
most_name_for_file = most_name_for_file.set_index('_file_path').drop(columns="ner_name") # 格式化表
df = df.join(most_name_for_file, on="_file_path")
df.head()

### 3.1.2. 同文件夹最常出现的name

`folder_common_name`

file_common_name 会有部分为空，这种情况会出现在账单非常稀缺的情况，甚至干脆是空表。  
这个时候可以用同样的办法研判同文件夹下最常出现的name作为依据

类似file_common_name的步骤，不再赘述

In [None]:
# 获取文件夹路径
df["folder_path"] = df["_file_path"].apply(lambda x: os.path.dirname(x))
# 取文件夹出现的名字表names
from_names = df[["folder_path", "_from_name"]].copy()
to_names = df[["folder_path", "_to_name"]].copy()
from_names.rename(columns={"_from_name": "folder_common_name"}, inplace=True)
to_names.rename(columns={"_to_name": "folder_common_name"}, inplace=True)
names = pd.concat([from_names, to_names])
names.set_index("folder_path", inplace=True)


# 对names计数得到most_name_for_folder
_names_count = names.groupby(["folder_path", "folder_common_name"]).value_counts().reset_index()
most_name_for_folder = _names_count[["folder_path", "folder_common_name"]].loc[_names_count.groupby("folder_path")["count"].idxmax()]
most_name_for_folder.set_index("folder_path", inplace=True)

# 迭代至全是人名
i = 0
while True:
    i += 1
    most_name_for_folder = _names_count[["folder_path", "folder_common_name"]].loc[
        _names_count.groupby("folder_path")["count"].idxmax()
    ]
    most_name_for_folder["ner_name"] = most_name_for_folder[
        "folder_common_name"
    ].progress_apply(ner_first_name)
    bad_names = most_name_for_folder[most_name_for_folder["ner_name"].isnull()][
        "folder_common_name"
    ].to_list()
    if bad_names.__len__() > 0:
        _names_count = _names_count[
            _names_count["folder_common_name"].apply(lambda x: x not in bad_names)
        ]
        print(f"第{i}次迭代{_names_count.__len__()}")
        print(_names_count.shape)
    else:
        break

# 并入df
most_name_for_folder = most_name_for_folder.set_index('folder_path').drop(columns="ner_name") # 格式化表
df = df.join(most_name_for_folder, on="folder_path")
df.head()

### 3.1.3. 从路径推理name

`path_reason_name`

#### 3.1.3.1. 举个例子

可能不同的数据集的例子不一样，所以看看就行。  
数据已作脱敏处理。  

```python
example = df.iloc[277572]
example
```

输出:
```txt
_config_name                                                    国反-三方
_datetime                                         2022-03-10 17:41:32
_amount                                                        193.99
_from_id                                            134****055@qq.com
_from_bank_id                                                    None
_from_name                                                       None
_to_id                                                             花呗
_to_bank_id                                                      None
_to_name                                                         None
_file_path          ./tmp/batch/倪*全账户/134****055@qq.com(2021010100...
...
Name: 277572, dtype: object
```

这一行没有任何当事人的信息，只有一个_from_id，还是QQ邮箱地址，只有path中包含主体名字“倪*”。  
如果此时`file_common_name`和`folder_common_name`均无效的话，就需要通过NLP对文件路径进行提取。

使用函数`ner_path`
```python
ner_path(example["_file_path"])
```
得到想要的输出：

'倪*'

#### 3.1.3.2. 正式开始

首先，查看有多少文件路径

In [None]:
df["_file_path"].drop_duplicates().shape

因为涉及神经网络计算，速度会很慢，建议以下几点：
* 对开头定义的ner加上持久化缓存cachier，对同一个路径碎片可以直接调用缓存
* 安装`HanLP[full]`配套CUDA食用（未测试），或使用强力CPU（7950X3D大约50it/s）
* 使用本地jupyter环境，远程环境带宽可以是瓶颈
* 先去重，再计算，随后使用`df.join`方法插回去
* 添加tqdm进度条，需要安装依赖，可参考[ipywidgets文档](https://ipywidgets.readthedocs.io/en/stable/user_install.html)，在开头应该安装过了


In [None]:
path_reason_names = df.drop_duplicates("_file_path")[["_file_path"]]
path_reason_names["path_reason_name"]= path_reason_names["_file_path"].progress_apply(ner_path_rev)
path_reason_names.set_index("_file_path", inplace=True)

# 合并到df
df = df.join(path_reason_names, on="_file_path")
df.head()

### 3.1.4. 合并主体

`main_name`

目前有三个不同的主体类型，需要推断、清洗，得出最后的类型

一般来说，`file_common_name`作为文件中最常出现的名字，有较高可信度。  
但是部分账单这条不适用：

比如（数据已脱敏）：
```txt
_config_name                                                      国反-三方
_datetime                                           2024-03-25 15:51:58
_amount                                                            6.00
_from_id                                                    158******49
_from_bank_id                                                      None
_from_name                                                         None
_to_id                                                t*******3@t**u.cn
_to_bank_id                                                        None
_to_name                                                   厦门*******有限公司
_file_path            ./tmp/batch/8人全账户流水/刘*/158******49(20240101000...
file_common_name                                                    NaN
_folder_path                                     ./tmp/batch/8人全账户流水/刘*
folder_common_name                                                   刘*
path_reason_name                                                     刘*
Name: 16554, dtype: object
```
这条账单中，`file_common_name`指向了一个公司，这里就需要将这类例子进行排除，  
并先后以`path_reason_name`和`folder_common_name`取代

首先把"(个人)"从字符串中删除  
这个无效标记在国反三方账单中非常常见


In [None]:
OP_COL = ["file_common_name", "folder_common_name", "path_reason_name"]
for col in OP_COL:
    df[col] = df[col].str.replace("(个人)", "", regex=False)


当`path_reason_name`为空时,此时说明文件夹制作者没有对文件进行分类，而是直接混杂放置  
这个时候`folder_common_name`也不可信，直接选用`file_common_name`作为主体名字  

In [None]:
filter_row = df[df["path_reason_name"].isnull()].index
df.loc[filter_row, "main_name"] = df.loc[filter_row, "file_common_name"]

路径已成功解析名字的可以直接作为主体名字  
TODO: 可以额外手动验证下冲突项中是否有错误的

In [None]:
filter_row = df[df["path_reason_name"].notnull()].index
df.loc[filter_row, "main_name"] = df.loc[filter_row, "path_reason_name"]

还会剩余一些“三无”记录  
只能通过ID进行关联

In [None]:
# 未研判主体记录的数量
df[df["main_name"].isna()].__len__()

todo

# 99. debug

In [None]:
def is_conflict(row: pd.Series) -> bool:
    a = row["file_common_name"]
    b = row["folder_common_name"]
    c = row["path_reason_name"]
    if a is None or b is None or c is None:
        return True
    return (a != b) | (a != c) | (b != c)

def clean_data(df):
    df = df[df.apply(is_conflict, axis=1)]
    df = df[df["main_name"].isnull()]
    # 删除列: '_config_name'、'_datetime'和其他列8
    # df = df.drop(columns=['_config_name', 'folder_path', '_datetime', '_amount', '_from_id', '_from_bank_id', '_from_name', '_to_id', '_to_bank_id', '_to_name', 'ent'])
    # 删除列: '_file_path' 中的重复行
    # df = df.drop_duplicates(['_file_path'])
    return df

df_clean = clean_data(df.copy())
df_clean.head()

In [None]:
debug_row = df.loc[16554]
debug_row

In [None]:
ner_path_rev(debug_row["_file_path"])  # type:ignore