# 默认代码
>- `filePath` 需要带文件后缀 `.txt` 或 `.xlsx` 或 `.json`
>- 除了读取操作，其他任何操作都不要在打开文件的情况下进行

In [None]:
# 导入形式

# %%capture
# %run "file_Tool.ipynb"

In [None]:
# 只要导入了 file_Tool.ipynb ，则不再需要重复导入这些包

import warnings
warnings.filterwarnings('ignore')

from IPython.display import display

import os
import numpy as np
import pandas as pd

# Text
>- `DataFrame` $\Leftrightarrow$ `.txt` 文件

## 写入
>- 将 `List` 元素一行一行地写入 `.txt` 文件

In [None]:
# .txt 文件不存在：自动创建，并写入
# .txt 文件存在时：直接覆盖

def cover_txt(lines , filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    with open(filePath , 'w' , encoding = 'utf-8') as file:
        for line in lines:
            file.write(str(line))
            file.write('\n')
    
    if E:
        print(f"文件存在，直接覆盖 '{filePath}'")
    else:
        print(f"文件不存在，自动创建，并写入 '{filePath}'")

In [None]:
# .txt 文件不存在：自动创建，并写入
# .txt 文件存在时：末尾追加

def append_txt(lines , filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    with open(filePath , 'a' , encoding = 'utf-8') as file:
        
        # 存在
        if E:
            file.write('\n-----追加-----\n')
            
        for line in lines:
            file.write(str(line))
            file.write('\n')
    
    if E:
        print(f"文件存在，末尾追加 '{filePath}'")
    else:
        print(f"文件不存在，自动创建，并写入 '{filePath}'")

## 读取
>- `.txt` 文件必须已经存在

In [None]:
# 返回 lines == ['第一行' , '第二行' , '第三行' , ...]

def read_txt(filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    if E:
        with open(filePath , 'r' , encoding = 'utf-8') as file:
            lines = file.readlines()
            lines = [line.strip() for line in lines]                       # 去除每一行首尾的：空白 + \n + \r + \t
            lines = [line for line in lines if (len(line) != 0)]           # 去除空字符串
            lines = [line for line in lines if (line.isspace() != True)]   # 去除空白行
            return lines
    else:
        print(f"指定的文件 '{filePath}' 不存在！！！")

# Excel
>- `DataFrame` $\Leftrightarrow$ `.xlsx` 文件

In [None]:
import openpyxl

## 单 sheet
>- 注意：`List`  ==>  存储  ==>  Excel  ==>  读取  ==> `String`（详见 `file_Tool_Test.ipynb`）

In [None]:
# .xlsx 文件不存在：自动创建，并写入（sheetName 默认命名为 Sheet1）
# .xlsx 文件存在时：直接覆盖（多个原先的 sheet 会被全部删除，仅有一个覆盖后的 Sheet1）

def save_ex(data , filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    # 修改 data 的行索引名称，方便下面保存为一列数据
    data.index.name = 'Index'
    
    # index = True 表明将 data 的行索引也作为一列数据保存，方便到时候 data = data.set_index('Index') 重置行索引
    data.to_excel(filePath , index = True)
    
    if E:
        print(f"文件存在，直接覆盖 '{filePath}'")
    else:
        print(f"文件不存在，自动创建，并写入 '{filePath}'")

In [None]:
# 也可用于读取多 sheet 的 .xlsx 文件，默认仅读取第一个 sheet

def read_ex(filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    if E:
        res = pd.read_excel(filePath)
        return res
    else:
        print(f"指定的文件 '{filePath}' 不存在！！！")

## 多 sheet
>- 仅读取

In [None]:
# 返回所有 sheetName 组成的 List

def read_all_sheets(filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    if E:
        return openpyxl.load_workbook(filePath).sheetnames
    else:
        print(f"指定的文件 '{filePath}' 不存在！！！")

In [None]:
# 读取指定的 sheetName ，单 sheetName 也适用

def read_ex_sheet(filePath , sheetName):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    if E:
        res = pd.read_excel(filePath , sheetName)
        return res
    else:
        print(f"指定的文件 '{filePath}' 不存在！！！")

# Json
>- `DataFrame` $\Leftrightarrow$ `.json` 文件
>  - 最外面是 `[]`
>  - 里面一个一个是 `{}, ...`
>  - 再里面一个一个是 `"key" : "value", ...`
>  - 若 `value` 是数值或 `List` ，则没有双引号 `"` 括起来
>- `orient = 'records'`
>  - `DataFrame` 的一行  ==  一个 `{}`
>  - 不包括行索引 index
>- 注意一：`List`  ==>  存储  ==>  JSON  ==>  读取  ==> `List`（区别于上述的 Eecel）
>- 注意二：存储的字符串带有 `/` 时，会在 `.json` 文件中显示为 `\/`

In [None]:
# 直接覆盖

def save_json(data , filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    # 设置参数 force_ascii = False 确保中文不乱码
    data.to_json(filePath , orient = 'records' , force_ascii = False)
    
    if E:
        print(f"文件存在，直接覆盖 '{filePath}'")
    else:
        print(f"文件不存在，自动创建，并写入 '{filePath}'")

In [None]:
def read_json(filePath):
    
    # 判断文件是否存在
    E = os.path.exists(filePath)
    
    if E:
        res = pd.read_json(filePath , orient = 'records')
        return res
    else:
        print(f"指定的文件 '{filePath}' 不存在！！！")

# 各类统计

## 每列的数据类型
>- 比单纯的 `data.dtypes` 要更加详细
>- 详见 `README_Soft` 的 `WPS`

In [None]:
def allCols(data):
    
    print('----------------------')
    
    # (行 , 列)
    print(data.shape)
    
    print('-------------------------------------------------------------------')
    
    cols = data.columns.values
    # print(f'{str(type(cols)):<25s}     {cols}')
    # 所有列名组成的 numpy.ndarray
    
    for c in cols:
        print(f'{str(type(data.iloc[0][c])):>25s}     {c:<10s}')
    
    print('-------------------------------------------------------------------')

In [None]:
# 所有列的数据类型作为 List 返回，方便一些判断
# 收集的结果可以直接用 == 或 in 作判断，例如下面的搜索列值
# 同时注意 np. 和 pd. 缩写前缀即可，具体见 file_Tool_Test.ipynb 的测试数据类型

def allCols_simple(data):
    
    cols = data.columns.values
    
    res = []
    
    for c in cols:
        res.append(type(data.iloc[0][c]))
    
    return res

## 每列的 `NaN` 值分布

In [None]:
def allNaN(data):
    
    nullData = pd.DataFrame()
    nullData['列'] = data.columns.values
    nullData['NaN 个数'] = data.isnull().sum().values
    nullData['NaN 占比'] = (data.isnull().sum() / len(data)).values
    
    display(nullData)

## 列值统计
>- 忽略 `NaN` 值，即不会统计 `NaN`

In [None]:
# colName 为 String

def val_cnt(data , colName):
    
    res = data[colName].value_counts().reset_index()
    res.columns = [colName , 'cnt']
    
    # 按 cnt 降序排列
    res = res.sort_values(by = 'cnt' , ascending = False)
    
    # 占比
    res['proportion'] = res.apply(lambda row : f"{(row['cnt'] / sum(res['cnt'])):6.2%}" , axis = 1)
    
    # 汇总
    # res = res.append(pd.Series({colName : '' , 'cnt' : sum(res['cnt']) , 'proportion' : ''} , name = 'ALL'))
    
    return res

## `int` 列绘统计
>- 制条形图

In [None]:
import plotly.express as px

In [None]:
# col 为 DataFrame 的一列 Series 且为 int 数值类型

def val_cnt_int(data , colName):
    
    res = val_cnt(data , colName)
    
    fig = px.bar(res , 
                 x = f'{colName}', 
                 y = 'cnt' , 
                 #title = f'{colName} 值分布'
                 )
    fig.show()
    
    return res

## 比较两列
>- 适用于 `int` 列或 `String` 列
>- [px 的使用]https://www.cnblogs.com/Mangnolia/p/14308471.html
>- [px 的使用]https://blog.csdn.net/qq_25443541/article/details/115603480
>- [px 的使用]https://blog.csdn.net/lemonbit/article/details/119012201

In [None]:
# int == int
# String == String

def isSame(a, b):
    if a == b:
        return True
    else:
        return False

In [None]:
# col1 为 DataFrame 的一列 Series ，即 data['列名']
# col2 为 DataFrame 的一列 Series ，即 data['列名']

def diff_Pie(col1 , col2):
    
    # 两列拼凑在一起，比较 True 或 False
    tab = pd.DataFrame()
    tab[col1.name] = col1.values
    tab[col2.name] = col2.values
    tab['same'] = tab.apply(lambda row : isSame(row[col1.name] , row[col2.name]) , axis = 1)
    
    # 统计 True 和 False
    res = tab['same'].value_counts().reset_index()
    res.columns = ['same' , 'cnt']
    display(res)
    
    fig = px.pie(
        res ,
        names = 'same',        # 划分块，即每个扇形的意义
        values = 'cnt',        # 算占比，即每个扇形的占比
        # title = f'Are the [{col1.name}] as same as the [{col2.name}] ?' , 
    )
    
    # 文本信息水平显示
    fig.update_traces(textposition = 'inside', 
                      textinfo = 'percent+label' , 
                      insidetextorientation = 'horizontal')
    
    fig.show()
    
    # return tab , res

# 搜索列值
>- 对于完整的列值进行 `==` 或 `<` 等，可见 `base_Pandas.ipynb` 的筛选访问
>- 对于列值的子部分进行搜索，需要用到如下函数，适用于 `String` 列或 `List` 列

In [None]:
# 在 data 的 trgC 列中搜索 sub 内容

# 传入参数一：（推荐）
# 在传入 data 时，可以少传入几列，仅传入想看到的列
# 例如 data[['想看到的列' , '想看到的列' , ...]] ，而且此时为副本传入，见 base_Pandas.ipynb 的切片视图测试

# 传入参数二：
# 若传入整个 data ，则是在原 data 上添加了 exist 列
# 除非传入时使用 data.copy() ，避免在原 data 上添加 exist 列

def find_sub(data , trgC , sub):
    
    data['exist'] = data.apply(lambda row : sub in row[trgC] , axis = 1)
    
    return data[data['exist'] == True]

In [None]:
# subs = List = [sub , sub , ...]
# 注意，若列值中出现多个 sub ，则该行会重复拼接到 res 中，可依据行索引观察到

def find_subs(data , trgC , subs):
    
    res = pd.DataFrame()
    
    for sub in subs:
        res = pd.concat([res , find_sub(data , trgC , sub)] , axis = 0)
    
    return res

In [None]:
# 上述 find_subs 的重置行索引，且去重
# 注意

def find_subs_nodul(data , trgC , subs):
    
    res = pd.DataFrame()
    
    for sub in subs:
        res = pd.concat([res , find_sub(data , trgC , sub)] , axis = 0)
    
    # 去重
    if list in allCols_simple(res):
        print(f'含有 List 列，无法去重，仅能重置行索引！！！')
    else:
        res = res.drop_duplicates()
    
    # 重置行索引为从 0 开始的整数
    res = res.reset_index(drop = True)
    
    return res

# 标准化
>- 不同列的量级差距太大

## `min-max`
>- $x \rightarrow \frac{x - min}{max - min} \in [0 , 1]$

In [None]:
from sklearn.preprocessing import MinMaxScaler

In [None]:
# 利用 max 和 min ，将数据转换到 [0 , 1] 范围内
# 数据中原 min 转换为 0
# 数据中原 max 转换为 1

# 参数必须得是 DataFrame

def standardize_min_max(data):
    
    # 二维 ndarray
    # return MinMaxScaler().fit_transform(data)

    return pd.DataFrame(MinMaxScaler().fit_transform(data) , columns = data.columns)

## `z-score`
>- $x \rightarrow \frac{x - \mu}{\sigma}$
>  - 均值为 0
>  - 方差为 1
>- 注意
>  - 不会改变原始数据的分布
>  - 原始数据是正态分布就依旧是正态分布
>  - 原始数据不是就不是
>  - 所以那些说这个操作使得数据变为 0-1 正态分布的说法是错误的

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
# 参数必须得是 DataFrame

def standardize_mean_var(data):
    
    # 二维 ndarray
    # return StandardScaler().fit_transform(data)

    return pd.DataFrame(StandardScaler().fit_transform(data) , columns = data.columns)

# 相似性
>- 见 `数学.md`

## 欧氏距离
>- `np.linalg.norm(vec)` 默认即求 `L2` 范数

In [None]:
# 可以传入 DataFrame 的两行 Series（.iloc[]）
# 也可以传入两个 ndarray

def similarity_o(vec1 , vec2):
    
    return np.linalg.norm(vec1 - vec2)

In [None]:
# 求 DataFrame 每一行之间的欧氏距离
# 也可以传入二维 ndarray

def similarity_o_all(data):
    
    n = len(data)
    
    arr = np.zeros((n , n))
    for i in range(0 , n):
        for j in range(0 , n):
            if type(data) == pd.core.frame.DataFrame:
                arr[i , j] = similarity_o(data.iloc[i] , data.iloc[j])
            elif type(data) == np.ndarray:
                arr[i , j] = similarity_o(data[i] , data[j])
    
    res = pd.DataFrame(arr)
    
    # 对称矩阵
    if type(data) == pd.core.frame.DataFrame:
        res.columns = data.index
        res.index = data.index
    elif type(data) == np.ndarray:
        res.columns = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
        res.index = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
    
    return res

## 余弦相似度

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
# 求 DataFrame 每一行之间的余弦相似度
# 也可以传入二维 ndarray

def similarity_cos_all(data):
    
    # 计算
    res = pd.DataFrame(cosine_similarity(data))
    
    # 对称矩阵
    if type(data) == pd.core.frame.DataFrame:
        res.columns = data.index
        res.index = data.index
    elif type(data) == np.ndarray:
        res.columns = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
        res.index = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
    
    return res

## 余弦距离
> 余弦距离 `==` 1 - 余弦相似度

In [None]:
from sklearn.metrics.pairwise import pairwise_distances

In [None]:
# 求 DataFrame 每一行之间的余弦距离
# 也可以传入二维 ndarray

def similarity_cos_dist_all(data):
    
    # 计算
    res = pd.DataFrame(pairwise_distances(data , metric = 'cosine'))
    
    # 对称矩阵
    if type(data) == pd.core.frame.DataFrame:
        res.columns = data.index
        res.index = data.index
    elif type(data) == np.ndarray:
        res.columns = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
        res.index = [f'vec{i}' for i in range(1 , data.shape[0] + 1)]
    
    return res

## 皮尔逊相关系数
>- $r \in [-1 , 1]$
>  - 越接近 `-1` 负相关性越强
>  - 越接近 `+1` 正相关性越强
>  - 绝对值越大，线性相关性越强

In [None]:
from scipy.stats import pearsonr

In [None]:
# 可以传入两个 List
# 可以传入两个 ndarray
# 可以传入 DataFrame 的两行 Series（.iloc[]）

def similarity_r(vec1 , vec2):
    
    corr = pearsonr(vec1 , vec2)
    
    print(f'相关系数 r = {corr[0]}')
    
    print(f'显著水平 P = {corr[1]}' , end = '   ')
    
    # P < 0.05 表示显著相关，相关性真实存在，非偶然情况
    if (corr[1] < 0.05):
        print(f'（显著相关）')
    else:
        print(f'（非显著相关）')
    
    # return corr

In [None]:
# DataFrame 自带的函数，由于默认计算的是列与列之间的皮尔逊相关系数，所以需要先转置一下 DataFrame

# 计算某一行与其他行的皮尔逊相关系数
# trgI 为行索引，为 String ，

def similarity_r_one(data , trgI):
    
    # 转置
    data = data.copy().T
    
    # 取出一列 Series
    trg = data[trgI]
    
    # 计算
    corr = data.corrwith(trg)
    
    res = pd.DataFrame(corr , columns = [trgI])
    
    return res

In [None]:
# DataFrame 自带的函数，由于默认计算的是列与列之间的皮尔逊相关系数，所以需要先转置一下 DataFrame

# 计算每一行之间的皮尔逊相关系数

def similarity_r_all(data):
    
    # 转置
    data = data.copy().T
    
    return data.corr()