# Transformer架构

In [4]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
from sklearn.model_selection import train_test_split
import pandas as pd

def create_transformer_model(vocab_size, max_length):
    # 输入层
    inputs = layers.Input(shape=(max_length,))
    
    # 嵌入层
    embedding_dim = 64
    x = layers.Embedding(vocab_size, embedding_dim, input_length=max_length)(inputs)
    
    # 添加位置信息
    x = layers.Embedding(input_dim=max_length, output_dim=embedding_dim)(
        tf.range(start=0, limit=max_length, delta=1)
    ) + x
    
    # 多头注意力层
    attention_output = layers.MultiHeadAttention(
        num_heads=8, 
        key_dim=embedding_dim
    )(x, x)
    
    # 添加残差连接和层归一化
    x = layers.LayerNormalization(epsilon=1e-6)(attention_output + x)
    
    # 前馈网络
    ffn = layers.Dense(128, activation='relu')(x)
    ffn = layers.Dense(embedding_dim)(ffn)
    
    # 再次添加残差连接和层归一化
    x = layers.LayerNormalization(epsilon=1e-6)(ffn + x)
    
    # 全局池化
    x = layers.GlobalAveragePooling1D()(x)
    
    # 全连接层
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.1)(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    
    # 创建模型
    model = Model(inputs=inputs, outputs=outputs)
    
    # 编译模型
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

def load_and_preprocess_data():
    # 示例数据
    data = {
        'review': [
            "这个产品很好用，我很喜欢",
            "质量特别差，退货了",
            "一般般，可以接受",
            "很满意，物超所值",
            "不推荐购买，浪费钱",
            "服务态度很好，下次还会购买",
            "出现故障，客服态度很差",
            "性价比很高，推荐购买",
            "完全不值这个价格",
            "快递很快，产品完好",
        ] * 10,  # 复制数据以增加数据量
        'label': [1, 0, 1, 1, 0, 1, 0, 1, 0, 1] * 10
    }
    df = pd.DataFrame(data)
    
    # 分割数据
    train_texts, test_texts, train_labels, test_labels = train_test_split(
        df['review'].values, 
        df['label'].values,
        test_size=0.2,
        random_state=42
    )
    
    # 文本处理
    tokenizer = Tokenizer(num_words=5000, oov_token='<OOV>')
    tokenizer.fit_on_texts(train_texts)
    
    # 转换为序列
    max_length = 50
    train_sequences = tokenizer.texts_to_sequences(train_texts)
    test_sequences = tokenizer.texts_to_sequences(test_texts)
    
    # 填充
    X_train = pad_sequences(train_sequences, maxlen=max_length, padding='post')
    X_test = pad_sequences(test_sequences, maxlen=max_length, padding='post')
    
    return (X_train, train_labels), (X_test, test_labels), tokenizer, max_length

def train_model(model, train_data, test_data, batch_size=32, epochs=10):
    X_train, y_train = train_data
    X_test, y_test = test_data
    
    # 创建验证集
    X_train_main, X_val, y_train_main, y_val = train_test_split(
        X_train, y_train, test_size=0.1, random_state=42
    )
    
    # 回调函数
    callbacks = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=3,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=2
        )
    ]
    
    # 训练模型
    history = model.fit(
        X_train_main, y_train_main,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks
    )
    
    # 评估模型
    test_loss, test_accuracy = model.evaluate(X_test, y_test)
    print(f"\n测试集准确率: {test_accuracy:.4f}")
    
    return history

def predict_sentiment(model, text, tokenizer, max_length):
    # 预处理文本
    sequence = tokenizer.texts_to_sequences([text])
    padded = pad_sequences(sequence, maxlen=max_length, padding='post')
    
    # 预测
    prediction = model.predict(padded)[0][0]
    sentiment = "正面评价" if prediction > 0.5 else "负面评价"
    confidence = prediction if prediction > 0.5 else 1 - prediction
    
    return sentiment, confidence

def main():
    print("加载和预处理数据...")
    train_data, test_data, tokenizer, max_length = load_and_preprocess_data()
    
    print("\n创建Transformer模型...")
    vocab_size = len(tokenizer.word_index) + 1
    model = create_transformer_model(vocab_size, max_length)
    model.summary()
    
    print("\n开始训练...")
    history = train_model(model, train_data, test_data)
    
    # 测试新评论
    test_texts = [
        "这个产品非常好用，超出我的预期",
        "质量很差，客服态度也不好",
        # "价格合理，性能还可以",
        # "完全是浪费钱，后悔购买",
        # "物流快，包装完好，推荐购买"
    ]
    
    print("\n预测新评论:")
    for text in test_texts:
        sentiment, confidence = predict_sentiment(model, text, tokenizer, max_length)
        print(f"\n文本: '{text}'")
        print(f"预测: {sentiment} (置信度: {confidence:.4f})")

if __name__ == "__main__":
    main()

加载和预处理数据...

创建Transformer模型...



开始训练...
Epoch 1/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 64ms/step - accuracy: 0.5443 - loss: 0.7021 - val_accuracy: 0.6250 - val_loss: 0.6982 - learning_rate: 0.0010
Epoch 2/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.5964 - loss: 0.6735 - val_accuracy: 0.3750 - val_loss: 0.7613 - learning_rate: 0.0010
Epoch 3/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.4913 - loss: 0.7653 - val_accuracy: 0.6250 - val_loss: 0.6652 - learning_rate: 0.0010
Epoch 4/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.4861 - loss: 0.7232 - val_accuracy: 0.6250 - val_loss: 0.6576 - learning_rate: 0.0010
Epoch 5/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step - accuracy: 0.5955 - loss: 0.6782 - val_accuracy: 0.3750 - val_loss: 0.6912 - learning_rate: 0.0010
Epoch 6/10
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20m

In [5]:
import tensorflow as tf
import numpy as np

# 创建一个简单的例子来展示注意力机制
def show_attention_example():
    # 假设我们有一个句子："我 喜欢 机器学习"
    sentence = ["我", "喜欢", "机器学习"]
    
    # 1. 创建简单的嵌入
    word_embeddings = np.random.randn(3, 4)  # 3个词，每个词4维
    
    # 2. 创建多头注意力层
    attention = tf.keras.layers.MultiHeadAttention(
        num_heads=2,  # 2个注意力头
        key_dim=2     # 每个头的维度
    )
    
    # 3. 计算注意力
    # 扩展维度以匹配批处理要求
    inputs = tf.expand_dims(word_embeddings, 0)  # [1, 3, 4]
    attention_output = attention(inputs, inputs)
    
    # 4. 提取注意力权重
    attention_weights = attention.get_weights()
    
    print("多头注意力的工作过程：")
    print(f"1. 输入词向量形状: {word_embeddings.shape}")
    print(f"2. 注意力输出形状: {attention_output.shape}")
    print("\n每个词都会通过注意力机制关注其他词：")
    for i, word in enumerate(sentence):
        print(f"\n对于词 '{word}':")
        print(f"- 它会通过注意力机制与其他词 {sentence} 产生联系")
        print("- 注意力分数表示它与每个词的关联程度")

# 创建一个简单的Transformer块来说明完整的处理过程
def create_simple_transformer_block(sequence_length=3, embedding_dim=4):
    inputs = tf.keras.Input(shape=(sequence_length, embedding_dim))
    
    # 1. 多头注意力
    attention_output = tf.keras.layers.MultiHeadAttention(
        num_heads=2,
        key_dim=2
    )(inputs, inputs)
    
    # 2. 第一个残差连接和层归一化
    x = tf.keras.layers.LayerNormalization()(attention_output + inputs)
    
    # 3. 前馈网络
    ffn = tf.keras.Sequential([
        tf.keras.layers.Dense(8, activation='relu'),
        tf.keras.layers.Dense(embedding_dim)
    ])(x)
    
    # 4. 第二个残差连接和层归一化
    outputs = tf.keras.layers.LayerNormalization()(ffn + x)
    
    return tf.keras.Model(inputs=inputs, outputs=outputs)

# 展示完整的处理过程
def show_transformer_process():
    # 创建示例数据
    batch_size = 1
    sequence_length = 3
    embedding_dim = 4
    
    # 创建模型
    model = create_simple_transformer_block(sequence_length, embedding_dim)
    
    # 创建示例输入
    example_input = tf.random.normal((batch_size, sequence_length, embedding_dim))
    
    # 获取输出
    output = model(example_input)
    
    print("\nTransformer块的处理过程：")
    print("1. 输入序列通过多头注意力层")
    print("2. 添加残差连接并进行层归一化")
    print("3. 通过前馈网络")
    print("4. 再次添加残差连接并进行层归一化")
    print(f"\n输入形状: {example_input.shape}")
    print(f"输出形状: {output.shape}")

# 运行示例
show_attention_example()
print("\n" + "="*50 + "\n")
show_transformer_process()

多头注意力的工作过程：
1. 输入词向量形状: (3, 4)
2. 注意力输出形状: (1, 3, 4)

每个词都会通过注意力机制关注其他词：

对于词 '我':
- 它会通过注意力机制与其他词 ['我', '喜欢', '机器学习'] 产生联系
- 注意力分数表示它与每个词的关联程度

对于词 '喜欢':
- 它会通过注意力机制与其他词 ['我', '喜欢', '机器学习'] 产生联系
- 注意力分数表示它与每个词的关联程度

对于词 '机器学习':
- 它会通过注意力机制与其他词 ['我', '喜欢', '机器学习'] 产生联系
- 注意力分数表示它与每个词的关联程度



Transformer块的处理过程：
1. 输入序列通过多头注意力层
2. 添加残差连接并进行层归一化
3. 通过前馈网络
4. 再次添加残差连接并进行层归一化

输入形状: (1, 3, 4)
输出形状: (1, 3, 4)


In [11]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

class TextGenerator:
    def __init__(self, vocab_size=5000, max_length=50, embedding_dim=256):
        self.vocab_size = vocab_size
        self.max_length = max_length
        self.embedding_dim = embedding_dim
        self.tokenizer = None
        self.model = self._build_model()
    
    def _build_model(self):
        # 输入层
        inputs = layers.Input(shape=(self.max_length,))
        
        # 嵌入层
        x = layers.Embedding(self.vocab_size, self.embedding_dim)(inputs)
        
        # 第一个Transformer块
        x1 = layers.MultiHeadAttention(num_heads=8, key_dim=32)(x, x)
        x1 = layers.LayerNormalization()(x + x1)
        x1 = layers.Dropout(0.1)(x1)
        
        # 前馈网络
        ffn = layers.Dense(512, activation='relu')(x1)
        ffn = layers.Dense(self.embedding_dim)(ffn)
        x1 = layers.LayerNormalization()(x1 + ffn)
        
        # 第二个Transformer块
        x2 = layers.MultiHeadAttention(num_heads=8, key_dim=32)(x1, x1)
        x2 = layers.LayerNormalization()(x1 + x2)
        x2 = layers.Dropout(0.1)(x2)
        
        # 全局平均池化，将序列压缩为单个向量
        x3 = layers.GlobalAveragePooling1D()(x2)
        
        # 输出层
        outputs = layers.Dense(self.vocab_size, activation='softmax')(x3)
        
        model = Model(inputs, outputs)
        model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        return model
    
    def prepare_data(self, texts):
        # 创建并训练tokenizer
        self.tokenizer = Tokenizer(num_words=self.vocab_size, oov_token='<OOV>')
        self.tokenizer.fit_on_texts(texts)
        
        # 创建训练序列
        sequences = []
        for text in texts:
            # 将文本转换为序列
            sequence = self.tokenizer.texts_to_sequences([text])[0]
            
            # 创建输入-输出对
            for i in range(1, len(sequence)):
                input_seq = sequence[:i]
                target = sequence[i]
                sequences.append((input_seq, target))
        
        # 填充序列
        X = []
        y = []
        for input_seq, target in sequences:
            padded_seq = pad_sequences([input_seq], maxlen=self.max_length, padding='pre')[0]
            X.append(padded_seq)
            y.append(target)
        
        return np.array(X), np.array(y)
    
    def train(self, texts, epochs=50):
        # 准备数据
        X, y = self.prepare_data(texts)
        
        # 训练模型
        self.model.fit(X, y, epochs=epochs, batch_size=32)
    
    def generate_text(self, seed_text, max_gen_length=50, temperature=1.0):
        # 将种子文本转换为序列
        input_seq = self.tokenizer.texts_to_sequences([seed_text])[0]
        generated_text = seed_text
        
        # 生成文本
        for _ in range(max_gen_length):
            # 填充序列
            padded_seq = pad_sequences([input_seq], maxlen=self.max_length, padding='pre')
            
            try:
                # 预测下一个词
                predictions = self.model.predict(padded_seq, verbose=0)[0]  # 获取批次中的第一个预测
                
                # 应用温度采样
                predictions = np.log(predictions) / temperature
                exp_predictions = np.exp(predictions)
                predictions = exp_predictions / np.sum(exp_predictions)
                
                # 确保predictions是一维的
                predictions = predictions.flatten()
                
                # 采样下一个词
                next_index = np.random.choice(len(predictions), p=predictions)
                
                # 将预测的词转换回文本
                for word, index in self.tokenizer.word_index.items():
                    if index == next_index:
                        next_word = word
                        break
                else:
                    next_word = '<UNK>'
                
                # 添加预测的词到生成的文本中
                generated_text += " " + next_word
                
                # 更新输入序列
                input_seq = list(input_seq)
                input_seq.append(next_index)
                if len(input_seq) > self.max_length:
                    input_seq = input_seq[-self.max_length:]
                
            except Exception as e:
                print(f"生成过程中出现错误: {str(e)}")
                break
        
        return generated_text


texts = [
    "今天天气真好 阳光明媚",
    "我很喜欢这个公园 风景优美",
    "这家餐厅的菜很好吃 服务态度也不错",
    "学习编程很有趣 可以创造很多东西",
    "音乐能让人心情愉快 舒缓压力",
    "运动对身体健康很重要 每天都要锻炼",
    "读书可以增长知识 开阔视野",
    "工作需要专注 认真对待",
    "旅行可以见识不同的风景 体验不同的文化",
    "生活中要保持乐观 积极向上"
] * 5  # 复制数据增加训练样本

# 创建生成器
generator = TextGenerator(vocab_size=1000, max_length=10, embedding_dim=128)

# 训练模型
print("开始训练模型...")
generator.train(texts, epochs=20)  # 减少epoch数避免过拟合

# 生成文本
print("\n生成示例：")
seed_texts = [
    "今天天气",
    "我很喜欢",
    "这家餐厅"
]

for seed in seed_texts:
    # 使用不同的温度参数生成文本
    for temp in [0.5, 1.0]:
        try:
            generated = generator.generate_text(
                seed_text=seed,
                max_gen_length=10,
                temperature=temp
            )
            print(f"\n种子文本: '{seed}'")
            print(f"温度: {temp}")
            print(f"生成结果: {generated}")
        except Exception as e:
            print(f"生成文本时出现错误: {str(e)}")

开始训练模型...
Epoch 1/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step - accuracy: 0.0000e+00 - loss: 6.8714
Epoch 2/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.1558 - loss: 5.5860 
Epoch 3/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - accuracy: 0.0875 - loss: 5.0656
Epoch 4/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - accuracy: 0.0771 - loss: 4.6878
Epoch 5/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.1083 - loss: 4.2631 
Epoch 6/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.1083 - loss: 3.8840 
Epoch 7/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.0875 - loss: 3.5157 
Epoch 8/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - accuracy: 0.2446 - loss: 3.0933
Epoch 9/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[

In [12]:

for temp in [0.5, 1.0]:
    try:
        generated = generator.generate_text(
            seed_text="今天天气",
            max_gen_length=10,
            temperature=temp
        )
        print(f"\n种子文本: '{seed}'")
        print(f"温度: {temp}")
        print(f"生成结果: {generated}")
    except Exception as e:
        print(f"生成文本时出现错误: {str(e)}")


种子文本: '这家餐厅'
温度: 0.5
生成结果: 今天天气 风景优美 服务态度也不错 服务态度也不错 服务态度也不错 <UNK> 服务态度也不错 认真对待 服务态度也不错 <UNK> <UNK>

种子文本: '这家餐厅'
温度: 1.0
生成结果: 今天天气 舒缓压力 风景优美 服务态度也不错 体验不同的文化 <UNK> 服务态度也不错 服务态度也不错 <UNK> <UNK> <UNK>
