In [1]:
import torch
from transformers import BertTokenizer, BertModel
import numpy as np
from Utility import *
from typing import List, Union, Tuple
from collections import defaultdict

In [2]:

# 1. 数据预处理相关函数
lines = read_time_machine()
tokens = tokenize(lines)
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])

[('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8), ('that', 9)]


In [3]:
for i in [0, 10]:
    print('文本:', tokens[i])
    print('索引:', vocab[tokens[i]])

文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
索引: [1, 19, 50, 40, 2183, 2184, 400]
文本: ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
索引: [2186, 3, 25, 1044, 362, 113, 7, 1421, 3, 1045, 1]


In [4]:
# 2. BERT模型加载与配置
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')  # 加载BERT对应的词表等
bert_model = BertModel.from_pretrained('bert-base-uncased')  # 加载预训练BERT模型
bert_model.eval()  # 设置为评估模式，因为只是用它提取特征，不训练

BertModel(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(30522, 768, padding_idx=0)
    (position_embeddings): Embedding(512, 768)
    (token_type_embeddings): Embedding(2, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0-11): 12 x BertLayer(
        (attention): BertAttention(
          (self): BertSdpaSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): BertSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, inplace=False

In [5]:
BEGIN = "___BEGIN__"
END = "___END__"


class MarkovChain:
    """
    A class for creating and using a Markov Chain model.
    """
    def __init__(self, order: int, data: List[Tuple[str, int]]):
        self.order = order
        self.data = data
        self.model = defaultdict(lambda: defaultdict(int))
        self._build_model()

    def _build_model(self):
        """Builds the Markov Chain model from the given data."""
        # 统计不同状态转移的频次，同时使用拉普拉斯平滑，这里平滑系数设为1
        alpha = 1
        all_states = set()
        
        # 处理输入数据格式为包含词频的元组列表的情况
        for word_tuple in self.data:
            word = word_tuple[0]
            frequency = word_tuple[1]
            # 根据词频重复添加单词到构建模型的数据中
            for _ in range(frequency):
                words = ([BEGIN] * self.order) + [word] + [END]
                for i in range(len(words) - self.order):
                    state = tuple(words[i:i + self.order])
                    next_state = words[i + self.order]
                    self.model[state][next_state] += 1
                    all_states.add(next_state)

        # 重新计算概率，加入拉普拉斯平滑
        for state in self.model:
            total = sum(self.model[state].values()) + len(all_states) * alpha
            for next_state in all_states:
                self.model[state][next_state] = (self.model[state].get(next_state, 0) + alpha) / total


    def generate(self, length: int, start: Union[str, None] = None) -> List[str]:
        """Generates a sequence of states of the given length."""
        if start is None:
            start = random.choice(list(self.model.keys()))
        else:
            start = tuple(start[-self.order:])

        result = list(start)
        for _ in range(length - self.order):
            next_state = self._sample_next_state(start)
            result.append(next_state)
            start = tuple(result[-self.order:])

        return " ".join(result)

    def _sample_next_state(self, state: Tuple[str]) -> str:
        """Samples the next state based on the probabilities in the model."""
        probabilities = list(self.model[state].values())
        states = list(self.model[state].keys())
        return random.choices(states, probabilities)[0]

    def save_model(self, file_path: str):
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(self.model, f, ensure_ascii=False, indent=4)

    @classmethod
    def load_model(cls, file_path: str):
        with open(file_path, 'r', encoding='utf-8') as f:
            cls.model = json.load(f)

mc = MarkovChain(data=vocab.token_freqs,order=2)

In [6]:
my_dict = {'a':{'b':1,'c':2},'b':{'c':3,'d':4}}
keys_list = list(my_dict.keys())
values_list = list(my_dict.values())
print(keys_list)  
print(values_list)

['a', 'b']
[{'b': 1, 'c': 2}, {'c': 3, 'd': 4}]


In [14]:
for prefix in my_dict:
    print(prefix)
    print(sum(my_dict[prefix].values()))

a
3
b
7


In [33]:
list(my_dict.keys())

['a', 'b']

In [21]:
tokenized_text

['i', 'love', 'you']

In [22]:
tokenized_text = tokenizer.tokenize("I love you")
input_ids = tokenizer.convert_tokens_to_ids(tokenized_text)
emb = bert_model(torch.tensor([input_ids]))

In [31]:
emb.last_hidden_state.squeeze(0).shape

torch.Size([3, 768])

In [None]:
import torch
from transformers import BertTokenizer, BertModel
import numpy as np

# 1. BERT模型加载与配置
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')
bert_model.eval()  # 设置为评估模式，因为只是用它提取特征，不训练

# 2. 马尔科夫链相关类定义
class MarkovChainWithBERTStates:
    def __init__(self, order=1):
        self.order = order
        self.transition_matrix = {}

    def train(self, texts):
        """
        根据输入的文本训练马尔科夫链，这里的状态是基于BERT的单词语义特征
        """
        for text in texts:
            tokenized_text = tokenizer.tokenize(text)
            input_ids = tokenizer.convert_tokens_to_ids(tokenized_text)
            with torch.no_grad():
                outputs = bert_model(torch.tensor([input_ids]))
                word_embeddings = outputs.last_hidden_state.squeeze(0)  # 获取单词的特征表示

            for i in range(len(word_embeddings) - self.order):
                # 将前序单词的语义特征转换为元组作为前缀
                prefix = tuple(map(tuple, word_embeddings[i:i + self.order].numpy()))
                # 将当前单词的语义特征转换为元组作为后缀
                suffix = tuple(word_embeddings[i + self.order].numpy())
                # 更新转移矩阵
                if prefix not in self.transition_matrix:
                    self.transition_matrix[prefix] = {}
                if suffix not in self.transition_matrix[prefix]: 
                    self.transition_matrix[prefix][suffix] = 0
                self.transition_matrix[prefix][suffix] += 1 

            # 归一化转移概率
            for prefix in self.transition_matrix:
                total = sum(self.transition_matrix[prefix].values())
                for next_state in self.transition_matrix[prefix]:
                    self.transition_matrix[prefix][next_state] /= total
                    
            

    def generate_text(self, length=100):
        """
        使用训练好的马尔科夫链生成指定长度的文本
        """
        # keys2list
        all_keys = list(markov_chain.transition_matrix.keys())
        # 随机选择一个前缀
        key_index = np.random.choice(range(len(all_keys))) 
        current_prefix = all_keys[key_index]
        result = []
        for _ in range(length):
            if current_prefix not in self.transition_matrix:
                break
            # 根据当前前缀选择下一个状态
            next_prefix_keys = list(self.transition_matrix[current_prefix].keys())  # 获取当前前缀的所有后缀
            next_prefix_values = list(self.transition_matrix[current_prefix].values())  # 获取当前前缀的所有后缀对应的概率
            # 根据概率选择下一个状态
            next_word_embedding_idx = np.random.choice(
                range(len(next_prefix_keys)),
                p=next_prefix_values
            )
            # print(next_word_embedding_idx)
            next_word_embedding = next_prefix_keys[next_word_embedding_idx]
            # 通过词向量找到对应的单词（这里简单找最接近的已有单词示例，实际可更完善）
            closest_word = self.find_closest_word(next_word_embedding)
            result.append(closest_word)
            current_prefix = tuple(result[-self.order:])
        return " ".join(result)

    def find_closest_word(self, embedding):
        """
        简单示例：找到与给定语义特征向量最接近的单词（通过余弦相似度衡量）
        """
        all_words = list(self.transition_matrix.keys())
        similarities = []
        for word_embedding in all_words:
            embedding = np.array(word_embedding).flatten()
            word_embedding = (np.array(word_embedding)).T.flatten()
            similarity = np.dot(embedding, word_embedding) / (np.linalg.norm(embedding) * np.linalg.norm(word_embedding))
            similarities.append(similarity)
        similarities = np.array(similarities)
        max_index = np.argmax(similarities)
        return tokenizer.convert_ids_to_tokens([max_index])[0]



# 假设这里是你的训练文本数据，实际需替换为真实大量文本
train_texts = ["I love reading books", "She likes to play football", "They enjoy watching movies"]
markov_chain = MarkovChainWithBERTStates(order=2)
markov_chain.train(train_texts)

generated_text = markov_chain.generate_text(length=5)
print(generated_text)


[PAD]


In [39]:
with torch.no_grad():
    outputs = bert_model(encoded_input)
    last_hidden_states = outputs.last_hidden_state  # 获取句向量示例
    # 这里假设取第一个样本（batch_size维度上索引为0）中的词嵌入来演示解码
    word_embeddings = last_hidden_states[0]  

    for idx in input_ids[0]:  # 遍历输入的标记ID
        word = tokenizer.decode([idx.item()])  # 根据ID解码出单词
        print(word)

[CLS]
the
time
machine
by
h
g
wells
i
the
time
traveller
for
so
it
will
be
convenient
to
speak
of
him
was
expo
##und
##ing
a
rec
##ond
##ite
matter
to
us
his
grey
eyes
shone
and
twinkle
##d
and
his
usually
pale
face
was
flushed
and
animated
the
fire
burned
brightly
and
the
soft
ra
##dian
##ce
of
the
inca
##nde
##scent
lights
in
the
lil
##ies
of
silver
caught
the
bubbles
that
flashed
and
passed
in
our
glasses
our
chairs
being
his
patents
embraced
and
caressed
us
rather
than
submitted
to
be
sat
upon
and
there
was
that
luxurious
after
dinner
atmosphere
when
thought
roam
##s
gracefully
free
of
the
tram
##mel
##s
of
precision
and
he
put
it
to
us
in
this
way
marking
the
points
with
a
lean
fore
##finger
as
we
sat
and
la
##zily
admired
his
earnest
##ness
over
this
new
paradox
as
we
thought
it
and
his
fe
##cu
##ndi
##ty
you
must
follow
me
carefully
i
shall
have
to
con
##tro
##vert
one
or
two
ideas
that
are
almost
universally
accepted
the
geometry
for
instance
they
taught
you
at
school
is
founde

In [40]:
input_ids

tensor([[ 101, 1996, 2051,  ..., 1997, 2158,  102]])

In [19]:
# 使用BERT获取初始文本的语义表示（这里简单获取句向量示例，可更细化）
input_ids = tokenizer.encode(initial_text, return_tensors='pt')
with torch.no_grad():
    outputs = bert_model(input_ids)
    sentence_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()  # 获取句向量示例

# 根据BERT的语义表示调整马尔科夫链的初始状态（这里简单示意，实际需更合理策略）
start_words = []  # 根据语义表示从词表中选择合适起始词，暂为空
generated_text = markov_chain.generate_text(length=max_length, start_words=start_words)
return generated_text

TypeError: 'method' object is not iterable

In [None]:
# 4. 结合BERT和马尔科夫链进行文本生成的主函数
def generate_text_with_bert_markov(initial_text, max_length=100):
    """
    结合BERT和马尔科夫链进行文本生成的函数
    """
    # 预处理初始文本
    processed_initial_text = preprocess_text([initial_text])[0]

    # 用所有训练文本训练马尔科夫链（这里假设已有训练文本数据train_texts）
    markov_chain = MarkovChain(order=2)  # 假设二阶马尔科夫链，可调整
    train_texts = []  # 这里替换为实际的大量训练文本数据
    markov_chain.train(train_texts)

    # 使用BERT获取初始文本的语义表示（这里简单获取句向量示例，可更细化）
    input_ids = tokenizer.encode(initial_text, return_tensors='pt')
    with torch.no_grad():
        outputs = bert_model(input_ids)
        sentence_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()  # 获取句向量示例

    # 根据BERT的语义表示调整马尔科夫链的初始状态（这里简单示意，实际需更合理策略）
    start_words = []  # 根据语义表示从词表中选择合适起始词，暂为空
    generated_text = markov_chain.generate_text(length=max_length, start_words=start_words)
    return generated_text

In [None]:



# 3. 马尔科夫链相关类定义
class MarkovChain:
    def __init__(self, order=1):
        self.order = order
        self.transition_matrix = {}

    def train(self, texts):
        """
        根据输入的文本训练马尔科夫链，构建转移矩阵
        """
        for text in texts:
            for i in range(len(text) - self.order):
                prefix = tuple(text[i:i + self.order])
                suffix = text[i + self.order]
                if prefix not in self.transition_matrix:
                    self.transition_matrix[prefix] = {}
                if suffix not in self.transition_matrix[prefix]:
                    self.transition_matrix[prefix][suffix] = 0
                self.transition_matrix[prefix][suffix] += 1

            # 归一化转移概率
            for prefix in self.transition_matrix:
                total_count = sum(self.transition_matrix[prefix].values())
                for suffix in self.transition_matrix[prefix]:
                    self.transition_matrix[prefix][suffix] /= total_count

    def generate_text(self, length=100, start_words=None):
        """
        使用训练好的马尔科夫链生成指定长度的文本
        """
        if start_words is None:
            current_prefix = np.random.choice(list(self.transition_matrix.keys()))
        else:
            current_prefix = tuple(start_words[-self.order:])

        result = list(current_prefix)
        for _ in range(length - self.order):
            if current_prefix not in self.transition_matrix:
                break
            next_word = np.random.choice(
                list(self.transition_matrix[current_prefix].keys()),
                p=list(self.transition_matrix[current_prefix].values())
            )
            result.append(next_word)
            current_prefix = tuple(result[-self.order:])
        return " ".join(result)


# 4. 结合BERT和马尔科夫链进行文本生成的主函数
def generate_text_with_bert_markov(initial_text, max_length=100):
    """
    结合BERT和马尔科夫链进行文本生成的函数
    """
    # 预处理初始文本
    processed_initial_text = preprocess_text([initial_text])[0]

    # 用所有训练文本训练马尔科夫链（这里假设已有训练文本数据train_texts）
    markov_chain = MarkovChain(order=2)  # 假设二阶马尔科夫链，可调整
    train_texts = []  # 这里替换为实际的大量训练文本数据
    markov_chain.train(train_texts)

    # 使用BERT获取初始文本的语义表示（这里简单获取句向量示例，可更细化）
    input_ids = tokenizer.encode(initial_text, return_tensors='pt')
    with torch.no_grad():
        outputs = bert_model(input_ids)
        sentence_embedding = outputs.last_hidden_state.mean(dim=1).squeeze()  # 获取句向量示例

    # 根据BERT的语义表示调整马尔科夫链的初始状态（这里简单示意，实际需更合理策略）
    start_words = []  # 根据语义表示从词表中选择合适起始词，暂为空
    generated_text = markov_chain.generate_text(length=max_length, start_words=start_words)
    return generated_text


# 示例用法
initial_text = "I love reading"
generated_text = generate_text_with_bert_markov(initial_text, max_length=50)
print(generated_text)