In [1]:
"""数据导入"""

import re
data_file = "../00-data/tf_data.txt"
filename =open(data_file,'r',encoding='utf-8')        #打开数据文件

text = filename.read()        #将数据读取到字符串text中
text = ' '.join(re.split(' |\t|\v',text))        #将数据中的空格符统一，便于后期处理(原始数据中空格符包含\t、\v等)   
text = re.split('([: ,.*\n(){}\[\]=])',text)        #将字符串数据按照括号中的符号进行分割，分割成列表格式，并且在列表中保留分隔符

text = list(filter(lambda x: x!=' 'and x!='',text))        #将列表中的空格和非空格筛选掉
list_text = text        #保留一份列表格式的数据
text = ' '.join(text)        #将列表转换成字符串

In [2]:
"""文本词频统计"""

def word_count(list_text):        #定义计算文本词频的函数，传入list_text列表
    import collections
    word_freq = collections.defaultdict(int)        #定义一个int型的词频词典，并提供默认值
    for w in list_text:        #遍历列表中的元素，元素出现一次，频次加一
        word_freq[w] += 1
    return word_freq        #返回词频词典
    
    #return word_freq.items()   该语句返回值的类型为list（这句话有语法问题，不必考虑）

In [3]:
"""根据text文本创建代码词词典"""

def build_dict(text, min_word_freq=50):
    word_freq = word_count(text)         #文本词频统计，返回一个词频词典
    word_freq = filter(lambda x: x[1] > min_word_freq, word_freq.items())          # filter将词频数量低于指定值的代码词删除。
    word_freq_sorted = sorted(word_freq, key=lambda x: (-x[1], x[0]))         # key用于指定排序的元素，因为sorted默认使用list中每个item的第一个元素从小到大排列，所以这里通过lambda进行前后元素调序，并对词频去相反数，从而将词频最大的排列在最前面
    words, _ = list(zip(*word_freq_sorted))         #获取每一个代码词
    words = list(words)
    words.append('<unk>')
    word_idx = dict(zip(words, range(len(words))))         #构建词典（不包含词频）
    return words,word_idx        #这里只返回了words，倒数两行代码还用不上。返回的是一个不含重复的代码词词典，不包含词频。

In [4]:
"""数据预处理-字符串序列向量化"""

import numpy as np
import keras
import json

maxlen = 50         #提取50个代码词组成的序列
step = 5         #每5个代码词采样一个新序列
sentences = []         #保存所提取的序列
next_words = []         #保存目标代码词
vocab_file = "../00-data/vocab"

cut_words = list_text         #将列表形式的元数据保存在cut_words中
for i in range(0,len(cut_words) - maxlen,step):
    sentences.append(cut_words[i:i + maxlen])         #将元数据按照步长来存储在每个序列中       
    next_words.append(cut_words[i + maxlen])         #将目标代码词存储在next_words中
    
    
print('Number of sequences:', len(sentences))


words,word_idx = list(build_dict(list_text,1))         #创建代码词词典，返回的是一个不含重复的代码词词典，不包含词频。
print('Unique words:',len(words))
json_dict = json.dumps(word_idx)
with open(vocab_file,"w") as f:
    f.write(json_dict)

word_indices = dict((word,words.index(word)) for word in words)         #创建一个包含代码词唯一索引的代码词词典，返回的是一个字典
#print(word_indices)

print('Vectorization...')
x = np.zeros((len(sentences),maxlen))         #初始化x
y = np.zeros((len(sentences)))         #初始化y
for i,sentence in enumerate(sentences):
    for t,word in enumerate(sentence):
        x[i,t] = word_indices.get(word,word_indices['<unk>'])         #将代码词转换成向量形式的编码
    #y[i] = word_indices[next_words[i]]
    y[i] = word_indices.get(next_words[i],word_indices['<unk>'])

y = keras.utils.to_categorical(y, len(words))         #将int型数组y转换成one-hot编码

Using TensorFlow backend.


Number of sequences: 79181
Unique words: 7865
Vectorization...


In [6]:
"""定义下一个代码词的采样函数---temperature越大，代码生成的随机性越强---"""

def sample(preds,temperature=0.1):
    preds = np.asarray(preds).astype('float')
    preds = np.log(preds) /temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1,preds,1)
    return np.argmax(probas)

In [7]:
"""将字符串写到指定文件中"""

def save(filename, contents): 
      file = open(filename, 'a', encoding='utf-8')
      file.write(contents)
      file.close()

### 模型训练

In [8]:
import keras
from keras import layers
from keras.layers import LSTM, Dense, Dropout

def create_model(words,learning_rate):         #定义创建模型的函数
    model = keras.models.Sequential()         #模型初始化
    model.add(layers.Embedding(len(words),512))         #模型第一层为embedding层
    model.add(layers.LSTM(512,return_sequences=True,dropout=0.2,recurrent_dropout=0.2))         #模型第二层为LSTM层，加入dropout减少过拟合
    model.add(layers.LSTM(512,dropout=0.2,recurrent_dropout=0.2))         #模型第三层为LSTM层，加入dropout减少过拟合
    model.add(layers.Dense(len(words),activation='softmax'))         #模型第三层为全连接层

    optimizer = keras.optimizers.RMSprop(lr=learning_rate)         #定义优化器
    model.compile(loss='categorical_crossentropy',optimizer=optimizer)         #模型编译
    
    return model

In [9]:
"""创建模型实例"""
learning_rate = 0.003
model = create_model(words,learning_rate)         #创建模型
model.summary()         #打印模型结构

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      (None, None, 512)         4026880   
_________________________________________________________________
lstm_1 (LSTM)                (None, None, 512)         2099200   
_________________________________________________________________
lstm_2 (LSTM)                (None, 512)               2099200   
_________________________________________________________________
dense_1 (Dense)              (None, 7865)              4034745   
Total params: 12,260,025
Trainable params: 12,260,025
Non-trainable params: 0
_________________________________________________________________


In [10]:
def clean_and_split(line):
    #将数据中的空格符统一，便于后期处理(原始数据中空格符包含\t、\v等)  
    line = ' '.join(re.split(' |\t|\v',line))
    #将字符串数据按照括号中的符号进行分割，分割成列表格式，并且在列表中保留分隔符
    line = re.split('([: ,.*\n(){}\[\]=])',line)        
    line = list(filter(lambda x: x!=' 'and x!='',line))
    return line

In [11]:
"""打印生成的结果"""
def print_code_text(code_list):
    mark = ".,*()[]:{}\n"
    
    result = ""
    last_word = ""
    
    for word in code_list:
        if last_word not in mark and word not in mark:
            result += " " + word
        else:
            result += word
        
        last_word = word
        
    print(result)
    
    return result

In [12]:
def text_generate(model,input_file,maxlen,
                  temperatures,save_path,epoch,gen_lines=30):
    import random
    import codecs
    
    #从原始数据中随机找一行作为文本生成的起点
    with codecs.open(input_file,"r","utf-8") as fin:
        lines = fin.readlines()
        random_line = random.randint(0,len(lines))
        start_line = clean_and_split(lines[random_line])
        
    one_line_max_words = 30
    
    print("===========epoch:%d===========" % epoch)
    
    for temperature in temperatures:
        generated_text = start_line[:]
        print_string = start_line[:]

        for i in range(gen_lines):
            for j in range(one_line_max_words):
                sampled = np.zeros((1,len(generated_text))) 
                #向量化
                for t,word in enumerate(generated_text): 
                    sampled[0,t] = word_indices.get(word,word_indices['<unk>'])
                #预测下一个词
                preds = model.predict(sampled,verbose=0)[0]
                next_index = sample(preds,temperature)
                next_word = words[next_index]

                if len(generated_text) == maxlen:
                    generated_text = generated_text[1:]
                generated_text.append(next_word)
                print_string.append(next_word)
                if next_word == '\n':
                    break

        print("-----temperature: {}-----".format(temperature))
        result = print_code_text(print_string)

        save_file = save_path + "/{}_epoch_{}_temperature".format(epoch,temperature)
        with codecs.open(save_file,"w","utf-8") as fout:
            fout.write(result)

In [13]:
"""模型保存"""
from keras.callbacks import ModelCheckpoint
filepath = "../02-checkpoints/"
checkpoint = ModelCheckpoint(filepath, save_weights_only=False,verbose=1,save_best_only=False)         #回调函数，实现断点续训功能

In [14]:
"""学习率随模型效果变小"""
import keras.backend as K
from keras.callbacks import LearningRateScheduler
 
def scheduler(epoch):
    # 每隔10个epoch，学习率减小为原来的5/10
    if epoch % 10 == 0 and epoch != 0:
        lr = K.get_value(model.optimizer.lr)
        K.set_value(model.optimizer.lr, lr * 0.5)
        print("lr changed to {}".format(lr * 0.5))
    return K.get_value(model.optimizer.lr)
 
reduce_lr = LearningRateScheduler(scheduler)

In [15]:
"""训练模型"""
import os
print_save_path = "../00-data/"
model_save_path = "../02-checkpoints/"
total_epochs = 50  
for epoch in range(1,total_epochs):
    if os.path.exists(filepath):        #如果模型存在，则从现有模型开始训练
        model.load_weights(filepath)
    init_epoch = (epoch - 1) * 5
    model.fit(x,y,batch_size=1024,epochs=(init_epoch + 5),initial_epoch=init_epoch,callbacks=[reduce_lr,checkpoint])        #开始训练模型
    text_generate(model,data_file,maxlen,[0.1,0.4,0.8],print_save_path,epoch * 5)
    model.save(model_save_path + "epoch_{}.hdf5".format(epoch * 5))

Instructions for updating:
Use tf.cast instead.
Epoch 1/5
 4096/79181 [>.............................] - ETA: 2:37 - loss: 0.6361

KeyboardInterrupt: 

### 模型预测

In [32]:
def generate_text_sentence(seed_text,model_filename):        #测试代码和上面训练模型的代码基本一样，就不再介绍
    model.load_weights(model_filename)
    
    strings=''
    last_word=''
    seed_text = re.split('([: ,.\n(){}\[\]=])',seed_text)
    seed_text = list(filter(lambda x: x!=' 'and x!='',seed_text))
    
    generated_text = seed_text[:]
    
    for temperature in [0.1,0.4,0.8]:
        strings += '\n' + '-------------temperature:' + str(temperature) +'-------------\n' +'\n'
        
        for i in range(50):
            if i == 0:
                for k in range(len(generated_text)):
                    if generated_text[k] not in mark and last_word not in mark:
                        strings += ' ' + generated_text[k]
                    else:
                        strings += generated_text[k]
                    last_word = generated_text[k]

            sampled = np.zeros((1,len(generated_text)))
            for t,word in enumerate(generated_text):
                sampled[0,t] = word_indices[word]

            preds = model.predict(sampled,verbose=0)[0]
            next_index = sample(preds,temperature = 0.3)
            next_word = words[next_index]


            generated_text.append(next_word)

            #if len(generated_text) == maxlen:
            #    generated_text = generated_text[1:]

            if next_word not in mark and last_word not in mark:
                strings += ' ' + next_word
            else:
                strings +=  next_word

            last_word = next_word

            if next_word == '\n':
                break
        
        generated_text = seed_text[:]
        
    return strings
