In [10]:
import collections
import re
from d2l import torch as d2l

#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

# 文本总行数: 3221
the time machine by h g wells
twinkled and his usually pale face was flushed and animated the


In [11]:
def tokenize(lines, token='word'):  #@save
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('错误：未知词元类型：' + token)

tokens = tokenize(lines)
for i in range(11):
    print(tokens[i])

['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
[]
[]
[]
[]
['i']
[]
[]
['the', 'time', 'traveller', 'for', 'so', 'it', 'will', 'be', 'convenient', 'to', 'speak', 'of', 'him']
['was', 'expounding', 'a', 'recondite', 'matter', 'to', 'us', 'his', 'grey', 'eyes', 'shone', 'and']
['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']


In [12]:
def count_corpus(tokens):  #@save
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

class Vocab:
    def __init__(self, tokens = None, min_freq = 0, reserved_tokens = None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key = lambda x : x[1], reverse = True)

        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token : idx for idx, token in enumerate(self.idx_to_token)}
        
        for token, freq in self._token_freqs :
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1 # 维护token和下标的一个映射
    
    def __len__(self):
        return len(self.idx_to_token)
    
    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        
    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]
    
    @property
    def unk(self):
        return 0
    
    @property
    def token_freqs(self):
        return self._token_freqs
    

In [13]:
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])

[('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8), ('that', 9)]


In [14]:
for i in [0,10]:
    # token是一行句子分开的词汇
    print('文本: ',tokens[i])
    print('索引: ',[vocab[token] for token in tokens[i]])

文本:  ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
索引:  [1, 19, 50, 40, 2183, 2184, 400]
文本:  ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
索引:  [2186, 3, 25, 1044, 362, 113, 7, 1421, 3, 1045, 1]


In [15]:
def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落，
    # 所以将所有文本行展平到一个列表中
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)

(170580, 28)

In [17]:
# 随机抽样与顺序分区
import random
import torch
def seq_data_iter_random(corpus, batch_size, num_steps):
    """使用随机抽样生成一个小批量子序列"""
    # 随机偏移序列
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 计算子序列数量（减1为标签预留空间）
    num_subseqs = (len(corpus) - 1) // num_steps
    # 生成子序列起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 随机打乱索引
    random.shuffle(initial_indices)

    def data(pos):
        """返回从pos开始的长度为num_steps的子序列"""
        return corpus[pos: pos + num_steps]

    # 计算批次数量
    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 提取当前批次的起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        # 生成输入X和标签Y
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)

def seq_data_iter_sequential(corpus, batch_size, num_steps):
    """使用顺序分区生成一个小批量子序列"""
    # 随机偏移序列
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 计算子序列数量
    num_subseqs = (len(corpus) - 1) // num_steps
    # 计算总令牌数
    num_tokens = num_subseqs * num_steps
    # 生成子序列起始索引
    initial_indices = list(range(0, num_tokens, num_steps))
    
    def data(pos):
        """返回从pos开始的长度为num_steps的子序列"""
        return corpus[pos: pos + num_steps]

    # 计算每个分区的大小（每个分区包含batch_size个子序列）
    num_tokens_per_partition = num_tokens // batch_size
    for i in range(0, num_tokens, num_tokens_per_partition):
        # 每个分区包含连续的batch_size个子序列
        indices = []
        for j in range(batch_size):
            # 计算每个子序列的起始索引
            index = i + j * num_tokens_per_partition
            if index >= num_tokens:
                break
            indices.extend(initial_indices[index: index + num_steps])
        # 生成输入X和标签Y
        X = [data(j) for j in indices[:batch_size]]
        Y = [data(j + 1) for j in indices[:batch_size]]
        yield torch.tensor(X), torch.tensor(Y)

# 测试代码
corpus = list(range(35))  # 示例序列 [0, 1, ..., 34]
batch_size, num_steps = 2, 5

print("随机采样：")
for X, Y in seq_data_iter_random(corpus, batch_size, num_steps):
    print(f"X: {X}\nY: {Y}\n")

print("顺序分区：")
for X, Y in seq_data_iter_sequential(corpus, batch_size, num_steps):
    print(f"X: {X}\nY: {Y}\n")

随机采样：
X: tensor([[ 0,  1,  2,  3,  4],
        [15, 16, 17, 18, 19]])
Y: tensor([[ 1,  2,  3,  4,  5],
        [16, 17, 18, 19, 20]])

X: tensor([[10, 11, 12, 13, 14],
        [ 5,  6,  7,  8,  9]])
Y: tensor([[11, 12, 13, 14, 15],
        [ 6,  7,  8,  9, 10]])

X: tensor([[25, 26, 27, 28, 29],
        [20, 21, 22, 23, 24]])
Y: tensor([[26, 27, 28, 29, 30],
        [21, 22, 23, 24, 25]])

顺序分区：
X: tensor([[ 2,  3,  4,  5,  6],
        [ 7,  8,  9, 10, 11]])
Y: tensor([[ 3,  4,  5,  6,  7],
        [ 8,  9, 10, 11, 12]])

X: tensor([])
Y: tensor([])

