In [None]:
import json
import os
import urllib.request as request

def download_and_load_file(file_path: str, url: str):
    """下载，并加载指令微调数据集

    Args:
        file_path (str): 文件地址
        url (str): URL
    """
    if not os.path.exists(file_path):
        with request.urlopen(url) as response:
            text_data = response.read().decode("utf-8")
        with open(file_path, 'w', encoding="utf-8") as file:
            file.write(text_data)
    else:
        with open(file_path, 'r', encoding="utf-8") as file:
            text_data = file.read()
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

data = download_and_load_file(file_path="instruction-data.json", url=(
    "https://raw.githubusercontent.com/rasbt/LLMs-from-scratch"
    "/main/ch07/01_main-chapter-code/instruction-data.json"
))

In [3]:
def format_input(entry: dict[str, str]) -> str:
    """格式化输入

    Args:
        entry (dict[str, str]): 输入字典（JSON）

    Returns:
        str: 格式化指令微调数据
    """
    instruction_text = (
        f"Below is an instruction that describes a task."
        f"Write a response that appropriately completes the request."
        f"\n\n### Instruction:\n{entry['instruction']}"
    )
    input_text = (f"\n\n### Input:\n{entry['input']}" if entry['input'] else "")
    return instruction_text + input_text
model_input = format_input(data[50])
desired_response = f"\n\n### Response:\n{data[50]['output']}"
model_input + desired_response

"Below is an instruction that describes a task.Write a response that appropriately completes the request.\n\n### Instruction:\nIdentify the correct spelling of the following word.\n\n### Input:\nOcassion\n\n### Response:\nThe correct spelling is 'Occasion.'"

In [4]:
# 划分数据集
train_portion = int(len(data) * 0.85) # 使用85%的数据集作为训练集
test_portion = int(len(data) * 0.1)   # 使用10%的数据集作为测试集
val_portion = len(data) - train_portion - test_portion # 使用剩余（5%）数据集作为验证集

# 通过切片索引划分数据集
train_data = data[:train_portion]
test_data = data[train_portion:train_portion+test_portion]
val_portion = data[train_portion + test_portion:]

In [5]:
import torch
from torch.utils.data import Dataset
from tiktoken.core import Encoding

class InstructionDataset(Dataset):
    """创建指令微调数据集

    Args:
        Dataset (Dataset): PyTorch Dataset
    """
    def __init__(self, data: list[dict[str, str]], tokenizer: Encoding) -> None:
        """创建指令微调数据集

        Args:
            data (list[dict[str, str]]): 原始指令数据
            tokenizer (Encoding): 分词器
        """
        self.data = data
        self.encoded_texts = []
        for entry in data:
            instruction_plus_input = format_input(entry=entry)
            response_text = f"\n\n### Response:\n{entry['output']}"
            full_text = instruction_plus_input + response_text
            self.encoded_texts.append(
                tokenizer.encode(full_text)
            )
        
    def __getitem__(self, index: int):
        return self.encoded_texts[index]

    def __len__(self) -> int:
        return len(self.data)

In [7]:
def custom_collate_draft(batch: Dataset, pad_token_id: int=50256, device: str="cuda") -> tuple[torch.Tensor, torch.Tensor]:
    """数据集聚合函数

    Args:
        batch (Dataset): 批次
        pad_token_id (int, optional): Padding词元ID. Defaults to 50256.
        device (str, optional): 设备. Defaults to "cuda".

    Returns:
        tuple[torch.Tensor, torch.Tensor]: 输入，目标
    """
    batch_max_length = max(len(item)+1 for item in batch) # 找到批次最长的序列
    inputs_list, target_list = [], []
    
    for item in batch:
        new_item = item.copy()
        new_item += [pad_token_id]
        
        padded = (
            new_item + [pad_token_id] * (batch_max_length - len(new_item))
        )
        
        inputs  = torch.tensor(padded[:-1]) # 删除之前添加的额外填充词元
        targets = torch.tensor(padded[1:])  # 因为已经添加了额外一个词元，因此直接从index=1处开始切片即可
        inputs_list.append(inputs)
        target_list.append(targets)
        
    inputs_tensor = torch.stack(inputs_list).to(device) # 输入列表变成一个张量，并转移到目标设备
    target_tensor = torch.stack(target_list).to(device)
    return inputs_tensor, target_tensor

inputs1 = [0, 1, 2, 3, 4]
inputs2 = [5, 6]
inputs3 = [7, 8, 9]
batch = (inputs1, inputs2, inputs3)
inputs_tensor, target_tensor = custom_collate_draft(batch=batch)
inputs_tensor, target_tensor

(tensor([[    0,     1,     2,     3,     4],
         [    5,     6, 50256, 50256, 50256],
         [    7,     8,     9, 50256, 50256]], device='cuda:0'),
 tensor([[    1,     2,     3,     4, 50256],
         [    6, 50256, 50256, 50256, 50256],
         [    8,     9, 50256, 50256, 50256]], device='cuda:0'))