# datasets
加载数据集的基本模块，支持不同格式，包括网络数据集，不用思考，直接用就对了。

## 基础知识
### 安装
```shell
pip install datasets
```

### 支持的格式：
| Data format	        | Loading script | 	Example                                               |
|---------------------|----------------|--------------------------------------------------------|
| CSV & TSV	          | csv	           | load_dataset("csv", data_files="my_file.csv")          |
| Text files	         | text	          | load_dataset("text", data_files="my_file.txt")         |
| JSON & JSON Lines	  | json           | 	load_dataset("json", data_files="my_file.jsonl")      |
| Pickled DataFrames	 | pandas         | 	load_dataset("pandas", data_files="my_dataframe.pkl") |

### load_dataset参数列表
参考：
- [Datasets中文介绍](https://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/huggingface_transformer/chapters/2_datasets.html)
- [官方文档](https://huggingface.co/docs/datasets/index)

```python
def load_dataset(
    path: str,
    name: Optional[str] = None,
    data_dir: Optional[str] = None,
    data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None,
    split: Optional[Union[str, Split]] = None,
    cache_dir: Optional[str] = None,
    features: Optional[Features] = None,
    download_config: Optional[DownloadConfig] = None,
    download_mode: Optional[DownloadMode] = None,
    ignore_verifications: bool = False,
    keep_in_memory: Optional[bool] = None,
    save_infos: bool = False,
    revision: Optional[Union[str, Version]] = None,
    use_auth_token: Optional[Union[bool, str]] = None,
    task: Optional[Union[str, TaskTemplate]] = None,
    streaming: bool = False,
    num_proc: int = None,
    **config_kwargs,
) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]
```

- path：一个字符串，指定数据集的路径或名字。
    - 对于本地数据集：
        - 如果 path 是一个本地目录（仅包含数据文件），那么 Datasets 基于目录的内容加载一个通用的 dataset builder （如，csv, json, text）。
        - 如果 path 是一个本地的 dataset script 或包含一个本地的 dataset script ，那么从这个 dataset script 加载 dataset builder 。
    - 对于 Hugging Face Hub 数据集：
        - 如果 path 是一个 dataset repository（仅包含数据文件），那么 Datasets 基于 repository 的内容加载一个通用的 dataset builder （如，csv, json, text）。
        - 如果 path 是一个带有 dataset script 的 dataset repository，那么从这个 dataset script 加载 dataset builder 。
- name：一个字符串，指定数据集配置的名字。
- data_dir：一个字符串，指定数据集配置的 data_dir 。
- data_files：一个字符串或字符串序列或字符串映射，指定源数据文件的路径。
- split：一个字符串或 Split，指定加载数据集的哪个部分。如果为 None，则返回一个字典，该字典包含所有的 split 。
- cache_dir：一个字符串，指定读写数据的缓存位置，默认为 "~/.cache/huggingface/datasets" 。
- features：一个 Features，指定数据集的特征类型。
- download_config：一个 DownloadConfig，指定下载配置。
- download_mode：一个 DownloadMode，指定下载模式。
- ignore_verifications：一个布尔值，指定是否验证下载/处理的数据集。
- keep_in_memory：一个布尔值，指定是否拷贝数据集到内存中。
- save_infos：一个布尔值，指定是否保存数据集信息（如 checksums/size/splits/... ）。
- revision：一个字符串或 Version，指定加载 dataser script 的哪个版本。默认为 "main" 分支。
- use_auth_token：一个字符串或布尔值，参考 DatasetBuilder 构造方法。
- task：一个字符串，指定该数据集需要为哪个任务进行 prepare 从而训练和评估。将会对数据集的 Features 强制类型转换，从而匹配该任务的标准列名和列类型。
- streaming：一个布尔值。如果为 True，则不会下载数据文件，而是流式地迭代该数据集。仅 txt, csv, jsonl 文件支持流式下载，而 Json 文件需要完整地下载。
  - 如果 streaming = False，返回一个 Dataset 或 DatasetDict。
    - 如果 split 不是 None，则返回 Dataset。
    - 如果 split = None，则返回 DatasetDict，它包含每个 split 。
  - 如果 streaming = True，则返回一个 IterableDataset 或 IterableDatasetDict 。
- num_proc：一个整数，指定下载和生成数据集的进程数。默认不使用多进程。
- config_kwargs：关键字参数，被传递给 BuilderConfig 和 DatasetBuilder 。
    - 比如json格式下有field字段，从一个json文件中提取对应field的value，比如是一个数组类型    

几个结论：
- 如果path直接是远程的话，那么不用指定构造器，会自动根据文件后缀来识别对应的构造器

### Dataset
Dataset：数据集的基类，基于 Apache Arrow table 来实现。

#### 方法：
- add_column 增加列
- add_item 增加行
- 创建：
    - from_file 从文件加载
    - from_buffer 从Arrow buffer 中初始化一个数据集。
    - from_pandas 从pandas加载
    - from_dict 从dict字典创建
    - from_list 从List[dict]创建
    - from_csv 从 CSV 文件中创建数据集。
    - from_generator 从迭代器中创建一个数据集。
    - from_json()：从 JSON 文件中创建数据集。
    - from_parquet()：从 Parquet 文件中创建数据集。
    - from_text()：从文本文件中创建数据集。
    - from_sql()：从 SQL query 或 database table 中创建数据集。
- unique( column: str ) -> list：返回指定列的 unique element 的列表。
- flatten(new_fingerprint: Optional[str] = None, max_depth=16) -> Dataset：对数据集的所有列进行展平，返回当前数据集的、列被展平了的 copy 。每个 struct type 列被展平为：每个 struct filed 一个列。非 struct type 列被保留。
- cast()：对数据集的列进行类型强制转换，返回当前数据集的被类型转换的 copy 。
- cast_column(column: str, feature: FeatureType, new_fingerprint: Optional[str] = None) -> Dataset：强制类型转换指定的列。
- remove_columns(column_names: Union[str, List[str]], new_fingerprint: Optional[str] = None) -> Dataset：返回数据集的一个 copy 版本，该版本移除数据集中的某些列及其关联的内容。
- rename_column( original_column_name: str, new_column_name: str, new_fingerprint: Optional[str] = None) -> Dataset：返回数据集的一个 copy 版本，该版本重命名了数据集的指定列。
- rename_columns(column_mapping: Dict[str, str], new_fingerprint: Optional[str] = None) -> Dataset：返回数据集的一个 copy 版本，该版本重命名了数据集的一些列。
- class_encode_column(column: str, include_nulls: bool = False) -> Dataset：将指定的列强制类型转换为 datasets.features.ClassLabel ，并更新数据集。
- formatted_as()：用于在 with 表达式中使用，它设置了 __getitem__ 所返回的格式。
- set_format()：设置 getitem 所返回的格式。数据格式化是 on-the-fly 应用的。
- set_transform()：利用 transform 来转换 getitem 所返回的内容。 transform 是 on-the-fly 应用到 batch 上的。
- reset_format()：恢复 getitem 的格式为，对所有的列返回 python 对象。
- with_format()：设置 getitem 所返回的格式。几乎类似于 set_format() 。
- with_transform()：利用 transform 来转换 getitem 所返回的内容。几乎类似于 set_transform() 。
- cleanup_cache_files() -> int：清理数据集缓存目录中的所有缓存文件，当前使用的缓存文件（如果有的话）除外。
- __getitem__(key)：用于对列索引（key 为列名字符串）或行索引（key 为整数索引、或者索引集合、或布尔值集合）。返回指定索引的值。也就是支持[]操作
- __len__() -> int：返回数据集的行数。也就是支持len()操作
- __iter__() ：迭代从而每次迭代产生样本。如果已经通过 Dataset.set_format() 来设置了格式，那么迭代返回的 row 将具有指定的格式。也就是支持for操作
- filter()：应用一个 filter function 到数据集中所有的样本（以单个样本的形式或 batch 的形式），并更新数据集，使得数据集仅包含 filter function 返回为 True 的样本。
- select()：创建一个新的数据集，这个新的数据集的row 是根据索引（以列表或 array 来提供）从原始数据集检索得到。
- sort()：创建一个新的数据集，该数据集根据指定的列来排序。
- shuffle()：创建一个新的数据集，该数据集随机混洗了 row 。
- train_test_split()：返回一个 datasets.DatasetDict，它具有两个随机拆分的子集（train 和 test 的 Dataset splits）。
    - 参数test_size和train_size，默认为 test_size = 0.25, train_size = 0.75 。
- shard()：执行数据集分片，并返回第 index 个分片。
- push_to_hub()：将数据集作为一个 Parquet dataset 推送到 hub 上。推送是通过 HTTP 请求进行的，无需 git 或 git-lfs 。
- save_to_disk(dataset_path: str, fs=None)：保存数据集到目录或 S3FileSystem 。
- load_from_disk(dataset_path: str, fs=None, keep_in_memory: Optional[bool] = None) -> Dataset ：从 save_to_disk() 存储的目录中加载数据集。
- flatten_indices()：通过展平 indices mapping 来创建并缓存一个新的 Dataset
- 导出数据
    - to_csv()：数据集导出为 csv 文件。
    - to_pandas( batch_size: Optional[int] = None, batched: bool = False) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]：将数据集转换为 pandas.DataFrame。对于较大的数据集，也可以返回一个 generator 。
    - to_dict(batch_size: Optional[int] = None, batched: bool = False) -> Union[dict, Iterator[dict]]：将数据集转换为 Python 字典。对于较大的数据集，也可以返回一个 generator 。
    - to_json()：将数据集导出为 JSON 文件。
        - 内部调用 pandas.DataFrame.to_json()，参考[官方文档的参数说明](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html)
        - force_ascii常用，默认是True，在中文保存的时候建议是False，这样肉眼可见，不然全部编程unicode码了
    - to_parquet()：将数据集导出为 parquet 文件。
    - to_sql()：导出数据集到 SQL 数据库。
- faiss向量数据库相关：
    - add_faiss_index()：添加一个 dense index （使用 Faiss）来用于快速检索。默认情况下，索引是在指定的列上的向量上完成的。如果要在 GPU 上运行，也可以指定设备。**这个有意思，默认支持faiss向量数据库了**
    - save_faiss_index(index_name: str, file: typing.Union[str, pathlib.PurePath])：保存 FaissIndex 到磁盘。
    - load_faiss_index()：从硬盘加载 FaissIndex 。
- elastisearch相关：
    - add_elasticsearch_index()：添加一个 text index （使用 ElasticSearch）来用于快速检索。这是原地操作。
    - load_elasticsearch_index()：加载已有的 text index （使用 ElasticSearch）用于快速检索。
- list_indexes()：列出所有 attached indexes 的列名。
- drop_index(index_name: str)：移除指定列上的 index 。
- search()：在数据集中寻找与给定 query 最近邻的样本，返回临近度得分。**与get_nearest_examples有什么区别？**
- search_batch()：在数据集中寻找与给定的一组 query 最近邻的样本，返回针对每个 query 的临近度得分。
- get_nearest_examples(index_name: str, query: Union[str, np.array], k: int = 10) -> scores (List[float])：类似于 search()。
- get_nearest_examples_batch(index_name: str, queries: typing.Union[typing.List[str], <built-in function array>], k: int = 10) -> total_scores (List[List[float])：参考 search_batch() 。
- prepare_for_task(task: Union[str, TaskTemplate], id: int = 0) -> Dataset ：通过将数据集的 Features 强制类型转换为标准化的列名和列类型（由 dataset.tasks 中描述），从而为给定的 task 来准备数据集。
- align_labels_with_mapping(label2id: Dict, label_column: str) -> Dataset：根据输入的 label2id 来对齐数据集的 label ID 和 label name 。注意，对齐的过程中，对 label name 使用小写。



## 任务

### 取一个dataset的100条
注意，默认是列数据库，所以通过`data[key]`的方式其实是生成了一个dict

In [None]:
import datasets
from datasets import load_dataset
import os
dataset = load_dataset('json', data_files=os.path.expanduser('~/Downloads/bugs.json'))
print(type(dataset['train'])) # datasets.arrow_dataset.Dataset
print(type(dataset['train'].select_columns(["wybug_title"]))) # datasets.arrow_dataset.Dataset
print(type(dataset['train'][:2])) # dict : {"id":[1,2],"title":["a","b"]}
dataset = dataset['train'].select_columns(["wybug_title"])[:2] # dict: {"title":["a","b"]}
print(type(dataset))
dataset = datasets.Dataset.from_dict(dataset)
dataset



## 常见问题

## 实验
### field参数加载测试
下载SQuAD_it数据集：

In [None]:
# !wget https://github.com/crux82/squad-it/raw/master/SQuAD_it-test.json.gz
# !gunzip SQuAD_it-test.json.gz
# !ls #可以看到SQuAD_it-test.json

In [None]:
from datasets import load_dataset

# 格式{"data":[], "version":""}，是一个完整的文件
squad_it_dataset = load_dataset("json", data_files="SQuAD_it-test.json") # 这里会报错 Failed to read file 'SQuAD_it-test.json' with error <class 'pyarrow.lib.ArrowInvalid'>: JSON parse error: Missing a name for object member. in row 0
squad_it_dataset

上面可以看到，load_dataset必须是返回一个数组，这里的`SQuAD_it-test.json`是一个完整的json文件，但是又是多行格式，导致了既不是数组，也不是jsonl，会报错。
我们改一改：

In [None]:
#!cat SQuAD_it-test.json | jq -c > s_min.json
from datasets import load_dataset
squad_it_dataset = load_dataset("json", data_files="s_min.json") 
squad_it_dataset

可以看到，转换成jsonl的格式，这次就不会报错了。

如果想要加载`SQuAD_it-test.json`数据，就必须要用到**field参数**：

In [None]:
from datasets import load_dataset

# 格式{"data":[], "version":""}，是一个完整的文件
squad_it_dataset = load_dataset("json", data_files="SQuAD_it-test.json", field="data") # 默认会加载到名为train的dataset里面去
squad_it_dataset



### 加载json和jsonl的区别

In [None]:
!echo '{"a":1,"b":2}\n{"a":3,"b":4}' > a.jsonl
from datasets import load_dataset
ds = load_dataset("json", data_files="a.jsonl") # 默认会加载到名为train的dataset里面去
print(ds)

!echo '[{"a":1,"b":2},{"a":3,"b":4}]' > b.json
ds = load_dataset("json", data_files="b.json") # 默认会加载到名为train的dataset里面去
print(ds)


可以看到，其实是一样的。底层`datasets/packaged_modules/json/json.py`先调用了`pyarrow.json`的`read_json`，这时只能是jsonl的格式，如果解析失败了，再尝试用`json.load`来做整体文件加载。

参考：[pyarrow.json.read_json](https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html) 提到：Currently only the line-delimited JSON format is supported.

### split测试

path对应一个仓库名称，name对应一个子目录，比如`https://huggingface.co/datasets/glue/tree/main` 仓库下面可以看到多个目录，其中就包含mrpc目录。其中mrpc目录下面包含文件：
- test-00000-of-00001.parquet
- train-00000-of-00001.parquet
- validation-00000-of-00001.parquet

In [None]:
from datasets import load_dataset
dataset = load_dataset("glue", "mrpc") # 相当于 path="glue", name="mrpc"
dataset
# DatasetDict({
#     train: Dataset({
#         features: ['sentence1', 'sentence2', 'label', 'idx'],
#         num_rows: 3668
#     })
#     validation: Dataset({
#         features: ['sentence1', 'sentence2', 'label', 'idx'],
#         num_rows: 408
#     })
#     test: Dataset({
#         features: ['sentence1', 'sentence2', 'label', 'idx'],
#         num_rows: 1725
#     })
# })

In [None]:
from datasets import load_dataset
dataset = load_dataset("glue", "mrpc", split='train') # 只要train的dataset
dataset

# Dataset({
#     features: ['sentence1', 'sentence2', 'label', 'idx'],
#     num_rows: 3668
# })

感觉split参数带有误导性，其实就是选择哪个数据文件。

### dataset测试

#### 读取数据

In [None]:
from datasets import load_dataset
dataset = load_dataset("glue", "mrpc", split='train') # 只要train的dataset
print(dataset[0]) # 取行
# {'sentence1': 'Amrozi accused his brother , whom he called " the witness " , of deliberately distorting his evidence .', 'sentence2': 'Referring to him as only " the witness " , Amrozi accused his brother of deliberately distorting his evidence .', 'label': 1, 'idx': 0}
print(dataset["sentence1"][0]) # 取列
# Amrozi accused his brother , whom he called " the witness " , of deliberately distorting his evidence .

print(dataset.train_test_split()) # 相当于一个Dataset，切分为一个DatasetDict包含train和test两个field。
# DatasetDict({
#     train: Dataset({
#         features: ['sentence1', 'sentence2', 'label', 'idx'],
#         num_rows: 2751
#     })
#     test: Dataset({
#         features: ['sentence1', 'sentence2', 'label', 'idx'],
#         num_rows: 917
#     })
# })

#### 向量数据库faiss测试

In [None]:
from transformers import AutoTokenizer, AutoModel
import datasets
from datasets import load_dataset
import torch
import os

os.environ['KMP_DUPLICATE_LIB_OK']='True'

######### 加载预训练模型 #########
model_ckpt = "maidalun1020/bce-embedding-base_v1"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = AutoModel.from_pretrained(model_ckpt)
device = torch.device("mps")
model.to(device)  # 移动到 GPU


########## 函数: 获取 batch 样本的 cls emebdding ######
def get_embeddings(text_list):
    encoded_input = tokenizer(
        text_list, padding=True, truncation=True, return_tensors="pt"
    )
    encoded_input = {k: v.to(device) for k, v in encoded_input.items()}
    model_output = model(**encoded_input)
    return model_output.last_hidden_state[:, 0]


########### 对数据集增加 cls emebdding 列 #######
dataset = load_dataset('json', data_files=os.path.expanduser('~/Downloads/bugs.json'))
dataset = datasets.Dataset.from_dict(dataset["train"].select_columns(["wybug_title"])[:1000])  # 取100条title做测试
embeddings_dataset = dataset.map(
    lambda x: {"embeddings": get_embeddings(x["wybug_title"]).detach().cpu().numpy()[0]}
)
embeddings_dataset.add_faiss_index(column="embeddings")

######### 获取 query emebdding #######
question = "文件读取漏洞"
question_embedding = get_embeddings([question]).cpu().detach().numpy()
print(question_embedding.shape)

########## 检索 query 最相似的 top-k 样本 ######
scores, samples = embeddings_dataset.get_nearest_examples(
    "embeddings", question_embedding, k=5
)
print(scores, samples["wybug_title"])

#### 生成预训练的数据集

In [8]:
import os
import json

# 指定需要读取的目录
directory_path = os.path.expanduser('~/tmpbuild/Awesome-FOFA/')

def process_files(directory_path, output_file):
    # 遍历当前目录下的所有条目
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # 检查是否是目录
        if os.path.isdir(file_path):
            # 递归处理子目录
            process_files(file_path, output_file)
        elif os.path.isfile(file_path) and file_path.endswith(('.md', '.txt')):
            # 检查文件扩展名是否为.md或.txt
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    if len(content) < 100:
                        continue

                entry = {'text': content, 'filename': os.path.relpath(file_path, start=directory_path)}

                # 将字典转换为JSON并写入jsonl文件，每行一个JSON对象
                output_file.write(json.dumps(entry, ensure_ascii=False) + '\n')
            except Exception as e:
                print(f"Error reading file {file_path}: {e}")

# 初始化输出的jsonl文件
output_file_path = 'Awesome-FOFA.jsonl'
with open(output_file_path, 'w', encoding='utf-8') as output_file:
    # 开始处理指定的根目录
    process_files(directory_path, output_file)

# 输出文件在处理完所有文件后会自动关闭

