# Smoothing

這邊我們先給各位看一下使用numpy手刻完成一些最簡單的forcasting

numpy手刻只是為了讓同學了解這些算法在使用python實現的方式，後面我們會附上一些簡單使用的套件，以後在使用時直接call套件的class或者funciton就可以簡單實現演算法

課程包含以下內容:
- Moving Average
- Simple Exponential Smoothing

#### **開始前請先安裝或import基本套件**
#### **若使用Jupyter Notebook開啟請轉成tree view方便顯示plotly出來的圖**

In [3]:
! pip install --user plotly

Collecting plotly
  Using cached plotly-5.11.0-py2.py3-none-any.whl (15.3 MB)
Installing collected packages: plotly
Successfully installed plotly-5.11.0


In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib notebook

from plotly import express as px

**另外我們也先準備一個畫圖的function，我們不會放重點在這邊但後面會用它來看一些time series處理的過程**

In [2]:
def plot_series(time, series, start=0, end=None, labels=None,title=None):
    
    ## Visualizes time series data
    ## Args:
    #  time (array of int) - 時間點, 長度為T
    #  series (list of array of int) - 時間點對應的資料列表，列表內時間序列數量為D，每筆資料長度為T，若非為列表則轉為列表
    #  start (int) - 開始的資料序(第幾筆)
    #  end (int) -   結束繪製的資料序(第幾筆)
    #  labels (list of strings)- 對於多時間序列或多維度的標註
    #  title (string)- 圖片標題
    
    # 若資料只有一筆，則轉為list
    if type(series)!=list:
        series=[series]
        
    if not end:
        end=len(series[0])
    
    if labels:
    # 設立dictionary, 讓plotly畫訊號線時可以標註label    
        dictionary={"time":time}
        for idx,l in enumerate(labels):
            # 截斷資料，保留想看的部分，並分段紀錄於dictionary中
            dictionary.update({l:series[idx][start:end]})
        # 畫訊號線
        fig = px.line(dictionary,x="time",y=list(dictionary.keys())[1:],width=1000, height=400,title=title)
    else:
        # 畫訊號線
        fig = px.line(x=time,y=series,width=1000, height=400,title=title)
    fig.show()

這邊也附上我們的toy data產生器

In [3]:
def trend(time, slope=0):
    # 產生合成水平直線資料，其長度與時間等長，直線趨勢與設定slope相同
    ##Args:
    #  time (array of int) - 時間點, 長度為T
    #  slope (float) - 設定資料的傾斜程度與正負
    ##Returns:
    #  series (array of float) -  產出slope 與設定相同的一條線

    series = slope * time

    return series
def seasonal_pattern(season_time,pattern_type='triangle'):
    # 產生某個特定pattern，
    ##Args:
    #  season_time (array of float) - 周期內的時間點, 長度為T
    #  pattern_type (str) -  這邊提供triangle與cosine
    ##Returns:
    #  data_pattern (array of float) -  根據自訂函式產出特定的pattern

    # 用特定function生成pattern
    
    if pattern_type=='triangle':
        
        data_pattern = np.where(season_time < 0.5,
                        season_time*2,
                        2-season_time*2)
        
    if pattern_type=='cosine':
        data_pattern=np.cos(season_time*np.pi*2)
        
    return data_pattern
def seasonality(time, period, amplitude=1, phase=30,pattern_type='triangle'):
    ## Repeats the same pattern at each period
    ## Args:
    #   time (array of int) - 時間點, 長度為T
    #   period (int) - 週期長度，必小於T
    #   amplitude (float) - 序列幅度大小
    #   phase (int) - 相位，為遞移量，正的向左(提前)、負的向右(延後)
    #   pattern_type (str) -  這邊提供triangle與cosine
    ## Returns:
    #   data_pattern (array of float) - 有指定周期、振幅、相位、pattern後的time series
    
    # 將時間依週期重置為0
    season_time = ((time + phase) % period) / period

    # 產生週期性訊號並乘上幅度
    data_pattern = amplitude * seasonal_pattern(season_time,pattern_type)

    return data_pattern
def noise(time, noise_level=1, seed=None):
    ## 合成雜訊，這邊用高斯雜訊，機率密度為常態分布
    ## Args:
    #   time (array of int) - 時間點, 長度為T
    #   noise_level (float) - 雜訊大小
    #   seed (int) - 同樣的seed可以重現同樣的雜訊
    ## Returns:
    #   noise (array of float) - 雜訊時間序列


    # 做一個基於某個seed的雜訊生成器
    rnd = np.random.RandomState(seed)
    
    # 生與time同長度的雜訊，並且乘上雜訊大小 (不乘的話，標準差是1)
    noise = rnd.randn(len(time)) * noise_level
    
    return noise

def toy_generation(time,
            bias=500.,
            slope = 0.1,
            period = 180,
            amplitude = 40.,
            phase= 30,
            pattern_type = 'triangle',
            noise_level=5.,
            seed=2022):
    signal_series =  trend(time, slope)\
                     +bias \
                     +seasonality(time,
                                  period,
                                  amplitude,
                                  phase,
                                  pattern_type)
    noise_series = noise(time,noise_level,seed) 

    series=signal_series+noise_series
    return series

我們附上最naive的k-step ahead prediction

In [74]:
def k_step_ahead(data,k):
    # 產生k-step ahead預測
    ##Args:
    #  data (array of float) - 輸入資料
    ##Returns:
    #  forcast (array of float) -  k-step ahead預測結果
    forcast=data[:-k]
    return forcast

也附上評估的function

In [4]:
def MAE(pred,gt):
    # 計算Mean Absolute Error
    ##Args:
    #  pred (array of float) - 預測資料
    #  gt (array of float) - 答案資料
    ##Returns:
    #  計算結果 (float)
    return abs(pred-gt).mean()
def MSE(pred,gt):
    # 計算Mean Square Error
    ##Args:
    #  pred (array of float) - 預測資料
    #  gt (array of float) - 答案資料
    ##Returns:
    #  計算結果 (float)
    return pow(pred-gt,2).mean()
def R2(pred,gt):
    # 計算R square score
    ##Args:
    #  pred (array of float) - 預測資料
    #  gt (array of float) - 答案資料
    ##Returns:
    #  計算結果 (float)
    return 1-pow(pred-gt,2).sum()/pow(gt-gt.mean(),2).sum()

另外我們記得生成資料

In [6]:
# 先合成資料
time=np.arange(4 * 365) # 定義時間點
series_sample=toy_generation(time) # 這就是我們合成出來的資料

#最簡單直接取前後，並且時間點也記得要切，我們直接立個function
def split(x,train_size):
    return x[...,:train_size],x[...,train_size:]
time_train,time_test=split(time,365*3)
series_train,series_test=split(series_sample,365*3)

## Moving Average

Moving average 把前K個時間點的資料做平均，使用這平均來預測下筆資料。
<img src="https://i.imgur.com/iPt1Iwq.png" width=400 align="right">

實現方式有很多種，包含對每個時間點慢慢使用平均、或者使用捲積運算等等。

因為是對未來的預測，所以會希望前K天[t-K,...,t-2,t-1]資料預測該時間點(t)資料

In [63]:
# 用捲積做看看5天平均的moving average
K=5
kernel=np.ones(K)/K # 對K個1做捲積在除以K就是做移動的平均
forcast=np.convolve(series_train,kernel,mode='valid')[:-1] # 最開始會取最前K筆做第一次平均，最後一次算會包含最後一筆，所以要去掉
ground_truth_for_view=series_train[K:] # 去掉前K日資料
time_for_view=time_train[K:]

plot_series(time_for_view, 
            [forcast,ground_truth_for_view],
            labels=['prediction','ground truth'])

In [75]:
# 可以把它包起來
def moving_average(data,k):
    # 產生前k個資料的moving average預測
    ##Args:
    #  data (array of float) - 輸入資料
    ##Returns:
    #  forcast (array of float) -  moving average預測結果
    forcast=np.convolve(data,np.ones(K)/K,mode='valid')[:-1]
    return forcast

In [77]:
# 我們把它跟k_step比較一下結果
forcast_d={
    'k_step':k_step_ahead(series_train,K),
    'ma':moving_average(series_train,K)
}

ground_truth_for_view=series_train[K:] # 去掉前K日資料
time_for_view=time_train[K:]

plot_series(time_for_view, 
            [ground_truth_for_view,*forcast_d.values()],
            labels=['ground truth',*forcast_d.keys()])

In [81]:
print(f'''k-step: 
-MAE:{MAE(forcast_d['k_step'],series_train[K:])} 
-MSE:{MSE(forcast_d['k_step'],series_train[K:])} 
-R2:{R2(forcast_d['k_step'],series_train[K:])}''')

print(f'''ma: 
-MAE:{MAE(forcast_d['ma'],series_train[K:])}
-MSE:{MSE(forcast_d['ma'],series_train[K:])}
-R2:{R2(forcast_d['ma'],series_train[K:])}''')

k-step: 
-MAE:5.9684489325715875 
-MSE:53.713040585152214 
-R2:0.9498292378074831
ma: 
-MAE:4.4734156744998375
-MSE:31.45323967382768
-R2:0.9706210448958262


## Simple Exponential Smoothing

若考慮到

## Metrics for Forcasting

比較特別的是它們會用tf.Tensor的方式跑，也會儲存為tf.Tensor。

如果自己設計training+validation機制，用tensorflow的這幾個算法可以讓Metric計算都在GPU裡面執行