In [2]:
from operator import mod
from pandas.core.algorithms import mode
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
# 定义模型
class LstmModel(nn.Module):
    def __init__(self, inputSize=5, hiddenSize=6):
        super().__init__()
        # LSTM层-> 两个LSTM单元叠加
        self.lstm = nn.LSTM(input_size=inputSize, \
                            hidden_size=hiddenSize, num_layers=2)
        self.output = nn.Linear(6, 2)  # 线性输出

    def forward(self, x):
        # x: input->(time_step, batch, input_size)
        x1, _ = self.lstm(x)
        # x1: output->(time_step, batch, output_size)
        a, b, c = x1.shape
        out = self.output(x1.view(-1, c))  # 只有三维数据转化为二维才能作为输入
        # 重新将结果转化为三维
        out = out.view(a, b, -1)
        return out

# 训练模型
def training_loop(nEpochs, model, optimizer, lossFn, trainData, testData=None):
    trainX, trainY = trainData
    if testData is not None:
        testX, testY = testData
    for epoch in range(1, nEpochs + 1):
        optimizer.zero_grad()  # 梯度清0
        trainP = model(trainX)
        loss = lossFn(trainP, trainY)
        loss.backward()  # 反向传播
        optimizer.step()
        loss_.append(loss.item())

        # 计算 预测的acc
        trainP_ = trainP.view(-1, 2)
        trainY_ = trainY.view(-1, 2)
        _, trainP_ = torch.max(trainP_, 1)
        _, trainY_ = torch.max(trainY_, 1)
        acc = torch.sum(trainP_ == trainY_).item() / len(trainY)
        acc_.append(acc)

        if epoch % 100 == 0:
            print(f"Epoch: {epoch}, Loss: {loss.item()}")
    return model

In [4]:
# 读取源数据
source_data = pd.read_csv('../data/source_data.csv', usecols=[2, 5, 6, 7, 8, 9, 10, 11])
handle_data = pd.read_csv('../data/handle_data.csv', usecols=[0, 2, 3, 4])
# 将两者合并
data = pd.merge(source_data, handle_data)
data.head()

Unnamed: 0,Word,1 try,2 tries,3 tries,4 tries,5 tries,6 tries,7 or more tries (X),syllable,frequency,tag
0,slump,1,3,23,39,24,9,1,1,5078,8
1,crank,1,5,23,31,24,14,2,1,3506,8
2,gorge,1,3,13,27,30,22,4,1,2019,8
3,query,1,4,16,30,30,17,2,2,2587,8
4,drink,1,9,35,34,16,5,1,1,54651,13


In [5]:
# 输入1： 单词编码
def word2index(word):
    index_list = []
    # 定义 a 到 z 的字母表对于数字 1-26 的映射
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    for i in word:
        value = alphabet.index(i) + 1
        # 归一化value
        value = (value-1) / 25
        index_list.append(value)

    return index_list

# 输入2： 频率编码
def freq_encode(frequency):
    freq = (frequency - data['frequency'].min()) / (data['frequency'].max() - data['frequency'].min())
    return freq

# 输入3： 词性编码
def pos_encode(pos):
    pos = (pos - data['tag'].min()) / (data['tag'].max() - data['tag'].min())
    return pos

# 输入4： 音节编码
def syllable_encode(syllable):
    syllable = (syllable - data['syllable'].min()) / (data['syllable'].max() - data['syllable'].min())
    return syllable

# 输出
def norm_(y_s):
    temp = [1, 2, 3, 4, 5, 6, 7]
    # data_为0个1， 1个2， 9个3， 29个4， 34个5， 22个6， 5个7
    datas = []
    for i, j in enumerate(y_s):
        datas.extend([temp[i]] * j)
    mu = np.mean(datas)
    sigma = np.std(datas)
    return mu, sigma, datas

# 划分训练集和测试集
# 由于是时间序列数据，不适合这样随机打乱
def splitData(data, rate=0.7):
    # 默认训练集比例为0.7
    dataX, dataY = data
    nSample = dataX.shape[0]
    nTrain = int(nSample * rate)
    # shuffledIndices = torch.randperm(nSample)
    trainData = (dataX[:nTrain], dataY[:nTrain])
    testData = (dataX[nTrain:], dataY[nTrain:])
    return trainData, testData

import math
def normal_distribution(x, mean, sigma):
    return np.exp(-1*((x-mean)**2)/(2*(sigma**2)))/(math.sqrt(2*np.pi)* sigma)

In [6]:
# 输入数据转化
dataX = []
for i in range(len(data)):
    word = data['Word'][i]
    dataX.append(word2index(word))
    freq = freq_encode(data['frequency'][i])
    dataX[i].append(freq)
    pos = pos_encode(data['tag'][i])
    dataX[i].append(pos)
    syllable = syllable_encode(data['syllable'][i])
    dataX[i].append(syllable)

# 转化为tensor
dataX = torch.tensor(dataX)
dataX = dataX.reshape(-1, 1, 8)

In [17]:
# 输出数据转化
from scipy import stats
dataY = []
mus = []
sigmas = []
statistics = []
pvalues = []
datass = []
for i in range(len(data)):
    y_s = [data['1 try'][i], data['2 tries'][i], data['3 tries'][i], data['4 tries'][i], data['5 tries'][i], data['6 tries'][i], data['7 or more tries (X)'][i]]

    mu, sigma, datas = norm_(y_s)
    datass.append(datas)
    mus.append(mu)
    sigmas.append(sigma)
    result = stats.shapiro(datas)
    statistics.append(result[0])
    pvalues.append(result[1])

# mus, sigmas 归一化
mus_ = (mus - np.min(mus)) / (np.max(mus) - np.min(mus))
sigmas_ = (sigmas - np.min(sigmas)) / (np.max(sigmas) - np.min(sigmas))
# 组合形成dataY
for i in range(len(data)):
    dataY.append([mus_[i], sigmas_[i]])

# 转化为tensor
dataY = torch.tensor(dataY)
dataY = dataY.reshape(-1, 1, 2)
# dataY
datass
pd.DataFrame(datass).T.to_csv('../data/datass.csv')

In [8]:
# 将数据类型都转变为 float32
dataX = dataX.float()
dataY = dataY.float()

data_ = dataX, dataY
# 获取训练集和测试集，用80%的数据来训练拟合，20%的数据来预测
rate = 0.8
trainData, testData = splitData(data_, rate=rate)

In [9]:
# 定义模型
lstm = LstmModel(inputSize=8)  # inputSize与look_back保持一致
# 使用优化器Adam比SGD更好
optimizer = optim.Adam(lstm.parameters(), lr=0.1)
loss_func = nn.MSELoss()

In [10]:
#! 寻找mu和freq的相关性
# freq = []
# for i in range(len(data)):
#     freq.append(freq_encode(data['frequency'][i]))
#     dataX[i].append(freq)
#
# np.corrcoef(freq, mus_)
#
# # 将mus, sigmas, freq 组合起来形成 dataframe
# import pandas as pd
# df = pd.DataFrame({'mus': mus_, 'sigmas': sigmas_, 'freq': freq})
# mapdata = df.corr(method='spearman')
# # 画出热力图
# import seaborn as sns
# fig = plt.figure(dpi=400)
# sns.heatmap(mapdata, annot=True, cmap='YlGnBu_r', vmax=1, square=True)
# plt.show()
# fig.savefig('corr.png', dpi=400)

In [11]:
# 训练模型
loss_ = []
acc_ = []
lstm = training_loop(
    nEpochs=1000,
    model=lstm,
    optimizer=optimizer,
    lossFn=loss_func,
    trainData=trainData)

KeyboardInterrupt: 

In [None]:
# 计算Accuracy
# 计算测试集的预测值
testX, testY = testData
testY = testY.reshape(-1, 2)

testX = testX.float()
testY = testY.float()
# trainX, trainY = trainData
# trainX = trainX.float()
# trainY = trainY.float()

out = lstm(testX)
out = out.view(-1, 2)
out = out.detach().numpy()
# 计算accuracy并绘制图像
from sklearn.metrics import r2_score
r2_score(testY, out)

In [None]:
# 绘制loss曲线
fig = plt.figure(dpi=400)
plt.plot(loss_)
plt.xlabel('Epoch')
plt.ylabel('Loss')
fig.savefig('loss.png', dpi=400)

# 绘制acc曲线
fig = plt.figure(dpi=400)
plt.plot(acc_)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
fig.savefig('acc.png', dpi=400)

In [None]:
# 求acc的最高值的index
max_acc_index = 0

max_acc = 0
for i in range(len(acc_)):
    if acc_[i] > max_acc:
        max_acc = acc_[i]
        max_acc_index = i
print('max_acc: ', max_acc)
print('max_acc_index: ', max_acc_index)

In [None]:
data_list = word2index('eerie')
freq = freq_encode(2272)
pos = pos_encode(8)
syllable = syllable_encode(2)
data_list.append(freq)
data_list.append(pos)
data_list.append(syllable)

data_list = torch.tensor(data_list)
data_list = data_list.reshape(-1, 1, 8)
data_list = data_list.float()

In [None]:
from torch import tensor
from scipy.stats import norm
import matplotlib.pyplot as plt
import numpy as np
# 将out 转变为原始数据
# out = lstm(data_list)
# out = out.view(-1).data.numpy()
out = tensor([[[0.5331, 0.5688]]])
out = out.view(-1).data.numpy()
out[0] = out[0] * (np.max(mus) - np.min(mus)) + np.min(mus)
out[1] = out[1] * (np.max(sigmas) - np.min(sigmas)) + np.min(sigmas)
# 转化为正态分布并绘制图像
mu = out[0]
sigma = out[1]

# 分别计算正态分布在 1， 2， 3， 4， 5， 6， 7以上的概率，并绘制柱形图
x = [1, 2, 3, 4, 5, 6, 7]
y = []
for i in range(1, 8):
    y.append(round(normal_distribution(i, mu, sigma), 2))

print(y)
fig = plt.figure(dpi=400)
plt.bar(x, y, width=0.5, alpha=0.5)
plt.xlabel('Number of tries')
plt.ylabel('Probability')



print(mu, sigma)
x = np.linspace(mu - 3 * sigma, mu + 3 * sigma, 50)
y = norm.pdf(x, mu, sigma)
plt.plot(x, y, 'r-', lw=2)
plt.xlabel('Number of tries')
plt.ylabel('Probability')
fig.savefig('result2.png', dpi=400)

In [None]:
# dataX, dataY = data  # 原始数据 -> (time_step, batch, input_size)
# dataY = dataY.view(-1).data.numpy()  # 展开为1维
# dataY = dataY * (maxPassenger - minPassenger) + minPassenger
# dataP = lstm(dataX)  # 进行拟合
# dataP = dataP.view(-1).data.numpy()  # 展开为1维
# dataP = dataP * (maxPassenger - minPassenger) + minPassenger

In [None]:


# dataX, dataY = data  # 原始数据 -> (time_step, batch, input_size)
# predicts = []
# for i in range(30):
#     data_last = dataX[-1, :, :]
#     print(data_last)
#     # 将data_last转化为三维数据
#     data_last = data_last.reshape(1, 1, -1)
#     predict = lstm(data_last)
#     print(predict)
#     # 将data_last后两个的数据前移,并加入预测值
#     data_last = torch.cat((data_last[:, :, 1:], predict), dim=2)
#
#     dataX = torch.cat((dataX, data_last), dim=0)
#     predict = predict.view(-1).data.numpy()
#     predict = predict * (maxPassenger - minPassenger) + minPassenger
#     predicts.append(predict)

In [None]:
# nTrain = int(dataY.shape[0] * rate)  # 拟合的数量
# nData = dataY.shape[0]  # 预测的数量
#
# # 绘制对比图
# plt.rcParams['font.sans-serif'] = 'KaiTi'  # 正常显示中文
# fig = plt.figure(dpi=400)
# ax = fig.add_subplot(111)
# ax.plot(np.arange(202, 202+360), dataY[:360], color='blue', label="Actual value")
# ax.plot(np.arange(202, 202+360), dataP[:360], color='orange', \
#         linestyle='--', label='Fitted value')
# ax.plot(np.arange(202+360, 202+nData), dataP[360:], \
#         linestyle='--', color='red', label='Predictive value')
# ax.legend()
# plt.xlabel('Contest number',fontsize=14)
# plt.ylabel('Number of  reported results',fontsize=14)
#
# fig.savefig('test.png', dpi=400)