In [11]:
import sys

parent_dir = 'Predict-Future-Sales'
p_sub = sys.path[0]

ride = ''
for path in p_sub.split('/'):
    if path != parent_dir:
        ride = ride + path + '/'
    else:
        ride = ride + path + '/'
        break
sys.path[0] = ride

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch
from torch.utils.data import TensorDataset

from typing import Tuple, Optional
from pandas import DataFrame, Series, DatetimeIndex
from numpy import ndarray
from torch.utils.data import DataLoader

### Time Delay Embedding に対応したデータセットを出力する

In [10]:
from module.lino_module.preprocess import time_series_dataset

data = pd.read_csv('../data/sales_train.csv')

kwrgs = {'data': data,
         'seq': 7,
         'd_model': 32,
         'dilation': 1,
         'src_tgt_seq': (6, 2),
         'batch_size': 64}

train, test = time_series_dataset(**kwrgs)
print(train)
src, tgt, y = next(iter(train))
src.shape, tgt.shape, y.shape

<torch.utils.data.dataloader.DataLoader object at 0x7fa22aa18af0>


(torch.Size([64, 6, 32]), torch.Size([64, 2, 32]), torch.Size([64, 1]))

### 作成した関数

In [2]:
def time_series_dataset(data: DataFrame,
                        seq: int,
                        d_model: int,
                        dilation: int,
                        src_tgt_seq: Tuple[int],
                        batch_size: int,
                        trg_column='item_cnt_day') -> Tuple[DataLoader]:
    """TDEデータセットのメイン関数"""
    data = getattr(mode_of_freq(data), trg_column)
    data = StandardScaler().fit_transform(data.values.reshape(-1, 1))
    data = data.reshape(-1)
    x, y = expand_and_split(data, seq)
    tded, label = time_delay_embedding(x, y, d_model, dilation)
    src, tgt = src_tgt_split(tded, *src_tgt_seq)
    train, test = to_torch_dataset(src, tgt, label, batch_size)
    return train, test

In [2]:
def mode_of_freq(data: DataFrame,
                 key='date',
                 freq='D',
                 mode='sum'
                 ) -> DataFrame:
    """時系列データを基本統計量で統合する
    引数:
        data: 対象を含むオリジナルデータ
        key: 時間軸のカラム名
        freq: グループ単位（D: 日ごと, M: 月ごと, Y: 年ごと）
        mode: 統計量（sum, mean, etc）
    """
    # 日付をobjectからdate_time型に変更
    data[key] = pd.to_datetime(data[key], format=('%d.%m.%Y'))
    # 時系列(key)についてグループ単位(freq)の売上数の基本統計量(mode)で出力
    mode_of_key = getattr(data.groupby(pd.Grouper(key=key, freq=freq)), mode)
    return mode_of_key()

In [3]:
def expand_and_split(ds: Series, seq: int) -> Tuple[ndarray]:
    """2次元にd_modelずらしたデータと正解データを作成する
    引数:
        ds: 単変量時系列データ
        seq: transformerのシーケンス
    """
    endpoint = len(ds) - (seq + 1)
    expanded = np.stack([ds[i: i + seq + 1] for i in range(0, endpoint)])
    x = expanded[:, :-1]
    y = expanded[:, -1]
    return x, y

In [4]:
def time_delay_embedding(x: ndarray,
                         y: Optional[ndarray],
                         d_model: int,
                         dilation: int
                         ) -> Tuple[ndarray]:
    """Time Delay Embedding
    引数:
        x: 訓練データ
        y: 正解データ
        d_model: エンべディング次元数
        dilation: エンべディングの間隔
    """
    endpoint = x.shape[0] - d_model * (dilation + 1)
    span = d_model * (dilation + 1)

    tded = [x[i: i + span: (dilation + 1), :].T for i in range(endpoint)]
    if y is not None:
        y = y[span - (dilation + 1):]
        return np.array(tded), np.array(y)
    return np.array(tded)

In [5]:
def src_tgt_split(tded: ndarray,
                  src_seq: int,
                  tgt_seq: int
                  ) -> Tuple[ndarray]:
    """エンコーダ入力とデコーダ入力への分割"""
    src = tded[:, :src_seq]
    tgt = tded[:, -tgt_seq:]
    return src, tgt

In [6]:
def to_torch_dataset(src: ndarray,
                     tgt: ndarray,
                     label: ndarray,
                     batch_size: int,
                     train_rate=0.9
                     ) -> DataLoader:
    """Pytorch用のデータセットへの変換
    引数:
        src: エンコーダ入力データ
        tgt: デコーダ入力データ
        label: 正解データ
        batch_size: ミニバッチのバッチサイズ
    """
    label = label.reshape(-1, 1)[:len(src)]
    pack = (src, tgt, label)
    train_pack = [
        torch.from_numpy(i.astype(np.float32))[:int(len(src) * train_rate)]
        for i in pack
        ]
    test_pack = [
        torch.from_numpy(i.astype(np.float32))[int(len(src) * train_rate):]
        for i in pack
        ]
    train = TensorDataset(*train_pack)
    train = DataLoader(train, batch_size, shuffle=False)
    test = TensorDataset(*test_pack)
    test = DataLoader(test, batch_size=1, shuffle=False)
    return train, test