In [114]:
#from __future__ import annotations

#import random
#import shutil
#from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import numpy as np
import tensorflow as tf
import torch
from loguru import logger
#from torch.nn.utils.rnn import pad_sequence
#from tqdm import tqdm
from scipy.io import arff
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F

Tensor = torch.Tensor

In [115]:
def get_eeg(data_dir: Path="../data/raw") -> Path:
    dataset_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00264/EEG%20Eye%20State.arff"  # noqa: E501
    datapath = tf.keras.utils.get_file(
        "eeg_data", origin=dataset_url, untar=False, cache_dir=data_dir
    )

    datapath = Path(datapath)
    logger.info(f"Data is downloaded to {datapath}.")
    return datapath

In [116]:
from __future__ import annotations
from typing import Tuple
from tqdm import tqdm
import random
Tensor = torch.Tensor


class BaseDataset:
    def __init__(self, datapath: Path) -> None:
        self.path = datapath
        self.data =  self.process_data()

    def process_data(self) -> None:
        data = arff.loadarff(self.path)
        cur_label = int(data[0][0][14]) #index 14 = label
        EEG_list = [] #Lege lijst waarin meerdere observaties worden opgeslagen
        EEG_full = [] #Lege lijst waarin meerdere batches in worden samengevoegd.
        for obs in data[0]:
            if int(obs[14]) == cur_label:
                EEG_dim = [] #Lege lijst waarin de EEG_dim van een bepaalde observatie in kunnen worden opgeslagen.
                for index, i in enumerate(obs):
                    if index != 14:
                        EEG_dim.append(i)
                EEG_dim = torch.Tensor(EEG_dim)
                EEG_list.append(EEG_dim)
            else:
                EEG_full_label = (cur_label, torch.stack(EEG_list))
                EEG_full.append(EEG_full_label)
                cur_label = int(obs[14])
                EEG_list = [] #Lege lijst waarin meerdere observaties in kunnen worden opgeslagen.
                EEG_dim = [] #Lege lijst waarin de EEG_dim van een bepaalde observatie in kunnen worden opgeslagen.
                for index, i in enumerate(obs):
                    if index != 14:
                        EEG_dim.append(i)
                EEG_dim = torch.Tensor(EEG_dim)
                EEG_list.append(EEG_dim)
        EEG_full_label = (cur_label, torch.stack(EEG_list))
        EEG_full.append(EEG_full_label)
        return EEG_full

    def __len__(self) -> int:
        return len(self.data)
   
    def __getitem__(self, idx: int) -> Tuple:
        return self.data[idx]

In [117]:
dataloader = BaseDataset(datapath=get_eeg())

2022-06-11 11:42:12.780 | INFO     | __main__:get_eeg:8 - Data is downloaded to ../data/raw/datasets/eeg_data.


In [119]:
x,y = dataloader[0]
y.shape

torch.Size([188, 14])

In [161]:
class BaseDataIterator_pad:
    """This iterator will consume all data and stop automatically.
    The dataset should have a:
        __len__ method
        __getitem__ method

    """

    def __init__(self, dataset: BaseDataset, window_size: int) -> None:
        self.dataset = dataset
        self.ws = window_size
        self.data = self.padding() 

    def __len__(self) -> int:
        return len(self.data)

    def padding(self) -> None:
        data = self.dataset
        window_size = self.ws
        list_padded = []
        for i in range(24):
            len_chunck = len(data[i][1])
            diff = len_chunck % window_size
            pad_value = window_size - diff
            if diff != 0:
                new_data = F.pad(input=data[i][1], pad=(0, 0, 0, pad_value), mode='constant', value=0)
                new_data2 = (data[i][0], new_data)
                list_padded.append(new_data2)
            else:
                new_data3 = (data[i][0], data[i][1])
                list_padded.append(new_data3)
        return list_padded

        
    def __getitem__(self, idx: int) -> Tuple:
        return self.data[idx]



In [174]:
test6 = BaseDataIterator_pad(dataloader,60)

In [131]:
x,y= test6[0]
y.shape

tensor([[4329.2300, 4009.2300, 4289.2300,  ..., 4280.5098, 4635.8999,
         4393.8501],
        [4324.6201, 4004.6201, 4293.8501,  ..., 4279.4902, 4632.8198,
         4384.1001],
        [4327.6899, 4006.6699, 4295.3799,  ..., 4282.0498, 4628.7202,
         4389.2300],
        ...,
        [4452.8198, 4032.3101, 4295.3799,  ..., 4353.3301, 4808.2100,
         4549.2300],
        [   0.0000,    0.0000,    0.0000,  ...,    0.0000,    0.0000,
            0.0000],
        [   0.0000,    0.0000,    0.0000,  ...,    0.0000,    0.0000,
            0.0000]])

In [213]:
n_window = len(test6[23][1])
n_window
time = torch.arange(0, 60).reshape(1, -1)
time
window = torch.arange(0, n_window).reshape(-1, 1)
window
idx = time + window
idx.shape
idx = idx - 60 + 1 
idx.shape
test = test6[23][1][idx]
test
test.shape


torch.Size([60, 60, 14])

In [None]:
time = torch.arange(0, 5).reshape(1, -1) 
window = torch.arange(0, n_window).reshape(-1, 1)
idx = time + window
n_window
test = test6[0][1][idx]

test = test6[0][1][idx]
test2 = (test6[i][0], test)

In [154]:
class BaseDataIterator_pad2:
    """This iterator will consume all data and stop automatically.
    The dataset should have a:
        __len__ method
        __getitem__ method

    """

    def __init__(self, dataset: BaseDataset, window_size: int) -> None:
        self.dataset = dataset
        self.ws = window_size
        self.data = self.padding_wind() 

    def __len__(self) -> int:
        return len(self.data)

    def padding_wind(self) -> None:
        data = self.dataset
        window_size = self.ws
        list2 = []
        list_padded = []
        for i in range(24):
            len_chunck = len(data[i][1])
            diff = len_chunck % window_size
            pad_value = window_size - diff
            if diff != 0:
                new_data = F.pad(input=data[i][1], pad=(0, 0, 0, pad_value), mode='constant', value=0)
                new_data2 = (data[i][0], new_data)
                list_padded.append(new_data2)
                n_window = len(list_padded[i][1]) 
                time = torch.arange(0, window_size).reshape(1, -1)
                window = torch.arange(0, n_window).reshape(-1, 1)
                idx = time + window
                idx = idx - window_size + 1
                test = list_padded[i][1][idx]
                test2 = (list_padded[i][0], test)
                list2.append(test2)
            else:
                new_data3 = (data[i][0], data[i][1])
                list_padded.append(new_data3)
                n_window = len(list_padded[i][1])
                time = torch.arange(0, window_size).reshape(1, -1)
                window = torch.arange(0, n_window).reshape(-1, 1)
                idx = time + window
                idx = idx - window_size + 1
                test = list_padded[i][1][idx]
                test2 = (list_padded[i][0], test)
                list2.append(test2)
        return list2

        
    def __getitem__(self, idx: int) -> Tuple:
        return self.data[idx]


In [158]:
test88 = BaseDataIterator_pad2(dataloader,60)

In [163]:
class BaseDataIterator_wind:
    """This iterator will consume all data and stop automatically.
    The dataset should have a:
        __len__ method
        __getitem__ method

    """

    def __init__(self, dataset: BaseDataset, window_size: int) -> None:
        self.dataset = dataset
        self.ws = window_size
        self.data = self.window()
        
   

    def __len__(self) -> int:
        return len(self.data)

    def window(self) -> None:
        data = self.dataset
        list2 = []
        ws = self.ws
        for i in range(24):
            n_window = len(data[i][1]) 
            time = torch.arange(0, ws).reshape(1, -1)
            window = torch.arange(0, n_window).reshape(-1, 1)
            idx = time + window
            idx = idx - ws + 1
            test = data[i][1][idx]
            test2 = (data[i][0], test)
            list2.append(test2)
        return list2
        
    def __getitem__(self, idx: int) -> Tuple:
        return self.data[idx]



In [175]:
test66 = BaseDataIterator_wind(test6,60)

In [182]:
x,y = dataloader[23]
y.shape

torch.Size([21, 14])

In [183]:
dataloader[23]

(1,
 tensor([[4312.3101, 4022.0500, 4278.4600, 4149.2300, 4336.9199, 4623.5898,
          4081.5400, 4639.4902, 4214.3599, 4230.7700, 4186.6699, 4284.6201,
          4612.3101, 4368.2100],
         [4304.1001, 4016.9199, 4273.8501, 4145.1299, 4340.0000, 4623.5898,
          4084.1001, 4646.6699, 4223.0801, 4229.2300, 4182.0498, 4282.5601,
          4606.1499, 4364.1001],
         [4303.0801, 4016.9199, 4270.7700, 4138.9702, 4342.0498, 4621.5400,
          4082.5601, 4645.6401, 4220.0000, 4229.2300, 4180.5098, 4276.9199,
          4602.0498, 4362.5601],
         [4304.6201, 4018.4600, 4272.3101, 4139.4902, 4340.0000, 4619.4902,
          4082.5601, 4637.9502, 4203.0801, 4226.6699, 4179.4902, 4278.4600,
          4602.0498, 4360.0000],
         [4301.0298, 4013.8501, 4268.7202, 4140.0000, 4341.0298, 4617.4399,
          4082.0500, 4626.1499, 4196.4102, 4224.1001, 4177.9502, 4278.9702,
          4603.0801, 4360.0000],
         [4300.0000, 4009.2300, 4263.5898, 4138.4600, 4342.5601, 4617.9

In [181]:
x,y = test66[23]
y.shape

torch.Size([60, 60, 14])

In [184]:
test6[23]

(1,
 tensor([[4312.3101, 4022.0500, 4278.4600, 4149.2300, 4336.9199, 4623.5898,
          4081.5400, 4639.4902, 4214.3599, 4230.7700, 4186.6699, 4284.6201,
          4612.3101, 4368.2100],
         [4304.1001, 4016.9199, 4273.8501, 4145.1299, 4340.0000, 4623.5898,
          4084.1001, 4646.6699, 4223.0801, 4229.2300, 4182.0498, 4282.5601,
          4606.1499, 4364.1001],
         [4303.0801, 4016.9199, 4270.7700, 4138.9702, 4342.0498, 4621.5400,
          4082.5601, 4645.6401, 4220.0000, 4229.2300, 4180.5098, 4276.9199,
          4602.0498, 4362.5601],
         [4304.6201, 4018.4600, 4272.3101, 4139.4902, 4340.0000, 4619.4902,
          4082.5601, 4637.9502, 4203.0801, 4226.6699, 4179.4902, 4278.4600,
          4602.0498, 4360.0000],
         [4301.0298, 4013.8501, 4268.7202, 4140.0000, 4341.0298, 4617.4399,
          4082.0500, 4626.1499, 4196.4102, 4224.1001, 4177.9502, 4278.9702,
          4603.0801, 4360.0000],
         [4300.0000, 4009.2300, 4263.5898, 4138.4600, 4342.5601, 4617.9

In [185]:
test66[23]

(1,
 tensor([[[4304.1001, 4016.9199, 4273.8501,  ..., 4282.5601, 4606.1499,
           4364.1001],
          [4303.0801, 4016.9199, 4270.7700,  ..., 4276.9199, 4602.0498,
           4362.5601],
          [4304.6201, 4018.4600, 4272.3101,  ..., 4278.4600, 4602.0498,
           4360.0000],
          ...,
          [   0.0000,    0.0000,    0.0000,  ...,    0.0000,    0.0000,
              0.0000],
          [   0.0000,    0.0000,    0.0000,  ...,    0.0000,    0.0000,
              0.0000],
          [4312.3101, 4022.0500, 4278.4600,  ..., 4284.6201, 4612.3101,
           4368.2100]],
 
         [[4303.0801, 4016.9199, 4270.7700,  ..., 4276.9199, 4602.0498,
           4362.5601],
          [4304.6201, 4018.4600, 4272.3101,  ..., 4278.4600, 4602.0498,
           4360.0000],
          [4301.0298, 4013.8501, 4268.7202,  ..., 4278.9702, 4603.0801,
           4360.0000],
          ...,
          [   0.0000,    0.0000,    0.0000,  ...,    0.0000,    0.0000,
              0.0000],
          [43

In [109]:
class BaseDataIterator:
    """This iterator will consume all data and stop automatically.
    The dataset should have a:
        __len__ method
        __getitem__ method

    """

    def __init__(self, dataset: BaseDataset, batchsize: int) -> None:
        self.dataset = dataset
        self.buffer = self.buffer[batchsize:]
        self.batchsize = batchsize

    def __len__(self) -> int:
        return int(len(self.dataset) / self.batchsize)

    def __iter__(self) -> BaseDataIterator:
        self.index = 0
        self.index_list = torch.randperm(len(self.dataset))
        return self


    def batchloop(self) -> Tuple[List, List]:
        X = []  # noqa N806
        Y = []  # noqa N806
        for _ in range(self.batchsize):
            x, y = self.dataset[int(self.index_list[self.index])]
            X.append(x)
            Y.append(y)
            self.index += 1
        return X, Y

    def __next__(self) -> Tuple[Tensor, Tensor]:
        if self.index <= (len(self.dataset) - self.batchsize):
            X, Y = self.batchloop()  # noqa N806
            return torch.tensor(X), torch.tensor(Y)
        else:
            raise StopIteration
   
    def __getitem__(self, idx: int) -> Tuple:
        return self.dataset[idx]

In [112]:
test55 = BaseDataIterator(test88,32)

In [113]:
test55.__len__()

0

Indien de chunk lengte niet deelbaar is door de window size dan padden. 

In [64]:
window_size = 5
x = 188 % window_size
pad = window_size - x


In [76]:
x

3