匯入套件

In [47]:
import pandas as pd
import numpy as np
from sklearn import preprocessing
import plotly.graph_objects as go
import copy
from sklearn.metrics import mean_squared_error

讀檔

In [48]:
data = pd.read_csv('麥寮測站_v1.csv').iloc[:,4:]
data

Unnamed: 0,PM25,WindSpeed,PM10
0,35,3.8,77
1,22,4.2,78
2,21,3.2,71
3,26,4.4,83
4,25,3.5,70
...,...,...,...
195,7,6.0,18
196,8,7.0,22
197,15,5.7,25
198,13,6.6,21


輸入和輸出

In [49]:
Input = data[['PM25','WindSpeed']]
Output = data[['PM10']]

最大最小正規化

In [50]:
min_max_scaler = preprocessing.MinMaxScaler(feature_range = (0,1))
Input = min_max_scaler.fit_transform(Input)
Input = pd.DataFrame(Input)
Input

Unnamed: 0,0,1
0,0.452055,0.459459
1,0.273973,0.513514
2,0.260274,0.378378
3,0.328767,0.540541
4,0.315068,0.418919
...,...,...
195,0.068493,0.756757
196,0.082192,0.891892
197,0.178082,0.716216
198,0.150685,0.837838


In [51]:
min_max_scaler_1 = preprocessing.MinMaxScaler(feature_range = (0,1))
Output = min_max_scaler_1.fit_transform(Output)
Output = pd.DataFrame(Output)
Output

Unnamed: 0,0
0,0.368132
1,0.373626
2,0.335165
3,0.401099
4,0.329670
...,...
195,0.043956
196,0.065934
197,0.082418
198,0.060440


激活函數

In [52]:
def Activation_function(x, activation='sigmoid', alpha=0.1): # alpha = 0.1 ~ 0.3
    
    activation_function = []
    derivative_function = []
    
    if activation == 'sigmoid':
        activation_function = 1/(1+np.exp(-x))
        derivative_function = np.exp(-x)/(1+np.exp(-x))**2
        
    if activation == 'tanh':
        activation_function = (np.exp(x)-np.exp(-x))/(np.exp(x)+np.exp(-x))
        derivative_function = 4/((np.exp(x)+np.exp(-x))**2)  
        
    if activation == 'elu':
        for i in range(len(x[0])):
            if x[0][i] >= 0:
                activation_function.append(x[0][i])
            else:
                activation_function.append((np.exp(x[0][i])-1)*alpha)
        activation_function = np.array([activation_function])
        for i in range(len(activation_function[0])):
            if activation_function[0][i] >= 0:
                derivative_function.append(1)
            else:
                derivative_function.append(activation_function[0][i]+alpha)
        derivative_function = np.array([derivative_function])
        
    if activation == 'relu':
        for i in range(len(x[0])):
            activation_function.append(max(0, x[0][i]))
            if x[0][i] >= 0:
                derivative_function.append(1)
            else:
                derivative_function.append(0)
        activation_function = np.array([activation_function])
        derivative_function = np.array([derivative_function])
        
    if activation == 'leaky_relu':
        for i in range(len(x[0])):
            if x[0][i] >= 0:
                activation_function.append(x[0][i])
                derivative_function.append(1)
            else:
                activation_function.append(0.01*x[0][i])
                derivative_function.append(0.01)
        activation_function = np.array([activation_function])
        derivative_function = np.array([derivative_function])
        
    return activation_function, derivative_function

淺層NN架構(2-3-1)

- forward：輸入 >> 隱藏層 >> 輸出 (產出結果)
- backward：輸出 >> 隱藏層 >> 輸入 (修正參數)

In [53]:
#模型
class NeuralNetwork:
    def __init__(self,X1,X2,y,lr,W1,W2,b,Activation,Alpha):
    
        self.input_layer = np.array([X1,X2])    #二維資料
        self.W1 = W1
        self.W2 = W2
        self.y = np.array(y)
        self.lr = lr
        self.b = b
        self.Activation = Activation
        self.Alpha = Alpha
    
    def forward(self):
        self.op = np.dot(self.input_layer.T,self.W1)
        self.hidden_layer = Activation_function(self.op+(self.b.T), activation=self.Activation, alpha=self.Alpha)[0]
        self.output_layer = np.dot((Activation_function(self.op+(self.b.T), activation=self.Activation, alpha=self.Alpha)[0]), self.W2)
        self.error = self.y - self.output_layer
    
    def backward(self):
        self.W2 = self.W2 + np.dot(self.hidden_layer.T,(self.lr * self.error))
        op1 = np.dot(self.input_layer[0,0],self.W1[0])
        op1_b = self.b.T
        op1_sum = op1_b + op1
        op2 = np.dot(self.input_layer[1,0],self.W1[1])
        op2_sum = op1_b + op2
        op3 = np.dot(self.input_layer[0,0],self.W1[0])+np.dot(self.input_layer[1,0],self.W1[1])
        op3_sum = op1_b + op3
        b_W1_1 = self.W1[0] + np.dot(self.input_layer[0,0],(self.lr*self.error*self.W2.T*Activation_function(op1_sum, activation=self.Activation, alpha=self.Alpha)[1]))
        b_W1_2 = self.W1[1] + np.dot(self.input_layer[1,0],(self.lr*self.error*self.W2.T*Activation_function(op2_sum, activation=self.Activation, alpha=self.Alpha)[1]))
        self.W1 = (np.array([b_W1_1,b_W1_2])).sum(axis=1)
        b_b1 = np.dot(self.W2[0],(self.lr*self.error*Activation_function(op3_sum, activation=self.Activation, alpha=self.Alpha)[1])[0][0])
        b_b2 = np.dot(self.W2[1],(self.lr*self.error*Activation_function(op3_sum, activation=self.Activation, alpha=self.Alpha)[1])[0][1])
        b_b3 = np.dot(self.W2[2],(self.lr*self.error*Activation_function(op3_sum, activation=self.Activation, alpha=self.Alpha)[1])[0][2])
        self.b = self.b + np.array([b_b1,b_b2,b_b3])

模型輸入及輸出

In [79]:
X1 = list(Input[0])
X2 = list(Input[1])
y = list(Output[0])

訓練/測試集

In [80]:
Train_X1, Test_X1 = X1[:100], X1[100:]
Train_X2, Test_X2 = X2[:100], X2[100:]
Train_y, Test_y = y[:100], y[100:]

模型參數

In [81]:
epoch = 50
learning_rate = 0.001
batch_size = 20
Activation = 'leaky_relu'
Alpha = 0.1
loss_list = []
w1 = []
w2 = []
b = []

In [82]:
Train_X1 = np.array(Train_X1).reshape(int(len(Train_X1)/batch_size), batch_size)
Train_X2 = np.array(Train_X2).reshape(int(len(Train_X2)/batch_size), batch_size)
Train_y = np.array(Train_y).reshape(int(len(Train_y)/batch_size), batch_size)

訓練NN

In [83]:
for j in range(epoch):
    weight1 = np.random.rand(2,3)
    weight2 = np.random.rand(3,1)
    bias = np.random.rand(3,1)
    while True:
        epoch_loss = []
        for i in range(len(Train_X1)):
            while True:
                batch_loss = []
                for k in range(len(Train_X1[i])):      
                    nn = NeuralNetwork([Train_X1[i][k]],[Train_X2[i][k]],[Train_y[i][k]],lr=learning_rate,W1=weight1,W2=weight2,b=bias,Activation=Activation,Alpha=Alpha)
                    nn.forward()
                    loss = ((nn.y - nn.output_layer)**2)[0][0]
                    nn.backward()
                    weight1 = copy.deepcopy(nn.W1)
                    weight2 = copy.deepcopy(nn.W2)
                    bias = copy.deepcopy(nn.b)
                    batch_loss.append(abs(nn.error[0][0]))   
                if sum(batch_loss)/len(batch_loss) <= 1.0:  
                    epoch_loss.append(sum(batch_loss)/len(batch_loss))
                    break
        if (len(Train_X1) - i == 1) & (sum(epoch_loss)/len(epoch_loss) <= 0.5):
            loss_list.append(sum(epoch_loss)/len(epoch_loss))            
            w1.append(nn.W1)
            w2.append(nn.W2)
            b.append(nn.b)  
            break        

找出最小誤差的權重/偏差，測試結果

In [84]:
Find_min_loss = loss_list.index(min(loss_list))

In [85]:
predict_values = []

for i in range(len(Test_X1)):
    nn = NeuralNetwork([Test_X1[i]],[Test_X2[i]],[Test_y[i]],W1=w1[Find_min_loss],W2=w2[Find_min_loss],b=b[Find_min_loss],lr=learning_rate,Activation=Activation,Alpha=Alpha)
    nn.forward()
    predict_values.append(nn.output_layer[0][0])

計算RMSE

In [86]:
Test_y = min_max_scaler_1.inverse_transform(pd.DataFrame(Test_y)).flatten()
predict_values = min_max_scaler_1.inverse_transform(pd.DataFrame(predict_values)).flatten()

In [87]:
np.sqrt(mean_squared_error(Test_y, predict_values))

27.08849957078995

視覺化

In [88]:
line1 = go.Scatter(x=list(range(1,len(Test_y)+1)), y=Test_y, name='實際值')
line2 = go.Scatter(x=list(range(1,len(Test_y)+1)), y=predict_values, name='預測值')
fig = go.Figure([line1,line2])
fig.update_layout(xaxis_title='測試集資料', yaxis_title='PM10')
fig.show()