In [1]:
# 本节中，我们将解析文本的常见预处理步骤。 这些步骤通常包括：
# 1.将文本作为字符串加载到内存中。
# 2.将字符串拆分为词元（如单词和字符）。
# 3.建立一个词表，将拆分的词元映射到数字索引。
# 4.将文本转换为数字索引序列，方便模型操作。

In [2]:
import collections
import re
from d2l import torch as d2l

In [3]:
# 首先，我们从H.G.Well的时光机器中加载文本。 
# 这是一个相当小的语料库，只有30000多个单词，但足够我们小试牛刀， 而现实中的文档集合可能会包含数十亿个单词。 
# 下面的函数将数据集读取到由多条文本行组成的列表中，其中每条文本行都是一个字符串。 为简单起见，我们在这里忽略了标点符号和字母大写

In [4]:
#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine(): #@save
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

# 文本总行数: 3221
the time machine by h g wells
twinkled and his usually pale face was flushed and animated the


In [5]:
# 下面的tokenize函数将文本行列表（lines）作为输入， 列表中的每个元素是一个文本序列（如一条文本行）
# 每个文本序列又被拆分成一个词元列表，词元（token）是文本的基本单位。 
# 最后，返回一个由词元列表组成的列表，其中的每个词元都是一个字符串（string）

def tokenize(lines, token = 'word'): #@save
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('tokenize类型输入错误')

tokens = tokenize(lines)
for i in range(11):
    print(tokens[i])

['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
[]
[]
[]
[]
['i']
[]
[]
['the', 'time', 'traveller', 'for', 'so', 'it', 'will', 'be', 'convenient', 'to', 'speak', 'of', 'him']
['was', 'expounding', 'a', 'recondite', 'matter', 'to', 'us', 'his', 'grey', 'eyes', 'shone', 'and']
['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']


In [6]:
# 词元的类型是字符串，而模型需要的输入是数字，因此这种类型不方便模型使用。 
# 现在，让我们构建一个字典，通常也叫做词表（vocabulary）， 用来将字符串类型的词元映射到从0开始的数字索引中。
# 我们先将训练集中的所有文档合并在一起，对它们的唯一词元进行统计， 得到的统计结果称之为语料（corpus）。 
# 然后根据每个唯一词元的出现频率，为其分配一个数字索引。 很少出现的词元通常被移除，这可以降低复杂性。
# 另外，语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元“<unk>”。 我们可以选择增加一个列表，用于保存那些被保留的词元

In [7]:
class Vocab: #@save
    # 创建这个Vocab语料库，我们的目的是创建一个 idx_to_token表示所有token，token_to_idx，通过token找下标
    def __init__(self, tokens = None, min_freq = 0, reserved_tokens = None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 按照出现频率排序,接下来我们先去写 count_corpus
        counter = count_corpus(tokens)
        # 我们拿到了counter，是一个字典，包含了token和出现的次数，接下来要排序
        # sorted(iterable, key, reverse)
        self._token_freqs = sorted(counter.items(), key = lambda x:x[1], reverse=True) # lamda x:x[1]表示按照x[1]排列
        # 未知词元的索引为0
        # 初始化
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {
            # 这是一个字典表达式
            token:idx for idx,token in enumerate(self.idx_to_token)
        }
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break
            if token not in self.token_to_idx: # 1.token_to_idx 是字典，查找更快 2.会默认把token当做key查找
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
                
    # 接下来我们要构造一些魔术方法，因为普通len不能返回这个Vocab的长度
    # 比如，我们可能要写 len(Vocab.idx_to_token)
    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens): # 在执行 obj[key]的时候就会被触发, 比如 vocab[token[i]]
        if not isinstance(tokens, (list, tuple)): # 如果tokens又不是list又不是tuple
            return self.token_to_idx.get(tokens, self.unk) # 那就直接去找，没找到就 self.unk
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[indice] for indice in indices]

    @property
    def unk(self):
        return 0

    @property
    def token_freqs(self):
        return self._token_freq
                
def count_corpus(tokens):
    # 数 语料 的个数
    # tokens 可以是 1d,可以是2d
    if len(tokens) == 0 or all(isinstance(token, list) for token in tokens):
        # 我们要把2d展开成1d
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens) # counter方法可以把一个列表变成字典，以出现次数的形式

In [8]:
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])

[('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8), ('that', 9)]


##### 现在，我们可以将每一条文本行转换成一个数字索引列表。

In [9]:
for i in [0, 10]:
    print('文本:', tokens[i])
    print('索引:', vocab[tokens[i]])

文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
索引: [1, 19, 50, 40, 2183, 2184, 400]
文本: ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
索引: [2186, 3, 25, 1044, 362, 113, 7, 1421, 3, 1045, 1]


In [10]:
# 在使用上述函数时，我们将所有功能打包到load_corpus_time_machine函数中
# 函数返回corpus（词元索引列表）和vocab（时光机器语料库的词表）

def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 展平，应为一行不一定是一句完整的话
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab), corpus[:5]

(170580, 28, [3, 9, 2, 1, 3])