In [59]:
from __future__ import annotations
import random
import shutil
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
import numpy as np
import tensorflow as tf
import torch
from loguru import logger
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from scipy.io import arff
import math

Tensor = torch.Tensor

def get_eeg(data_dir: Path = "../../data/raw") -> Path:
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00264/EEG%20Eye%20State.arff"
    datapath = tf.keras.utils.get_file(
        "eeg", origin=url, untar=False, cache_dir=data_dir
    )
    datapath = Path(datapath)
    logger.info(f"Data is downloaded to {datapath}.")
    return datapath

class BaseDataset:
    def __init__(self, datapath: Path):
        self.path = datapath
        self.data = self.process_data()

    def process_data(self) -> None:
        data = arff.loadarff(self.path)
        first_label = int(data[0][0][14])
        label = first_label
        chunck = []
        chuncks = []
        for line in data[0]:
            if int(line[14]) == label:
                observation = []
                for index, i in enumerate(line):
                    if index != 14:
                        observation.append(i)
                observation = torch.Tensor(observation)
                chunck.append(observation)
            else:
                chunck_tuple = (label, torch.stack(chunck))
                chuncks.append(chunck_tuple)
                label = int(line[14])
                chunck = []
                observation = []
                for index, i in enumerate(line):
                    if index != 14:
                        observation.append(i)
                observation = torch.Tensor(observation)
                chunck.append(observation)
        chunck_tuple = (label, torch.stack(chunck))
        chuncks.append(chunck_tuple)
        return chuncks

    def __getitem__(self, idx: int):
        item = self.data[idx]
        x = item[1]
        y = item[0]
        return x,y


    def __len__(self):
        length = len(self.data)
        return length


dataloader = BaseDataset(datapath = get_eeg())
dataloader.__getitem__(23)[0].shape

2022-06-09 16:31:48.507 | INFO     | __main__:get_eeg:27 - Data is downloaded to ../../data/raw/datasets/eeg.


torch.Size([21, 14])

In [69]:
class BaseDataIterator:
    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        self.dataset = dataset #set dataset
        self.batchsize = batchsize #set batchsize
        self.window_size = window_size #set windowsize

    def __iter__(self) -> BaseDataIterator:
        self.index = 0
        self.index_list = torch.randperm(10000)
        return self

    def get_chuck(self) -> Tuple[List, List]:
        i = random.randint(0,len(self.dataset)-1)
        observation = self.dataset.__getitem__(i)
        chuncks = torch.split(observation[0], self.window_size)
        Y = observation[1]
        nr_chuncks = len(chuncks)
        random_chunck = random.randint(0,nr_chuncks-1)
        return chuncks[random_chunck], Y
        
    def batchloop(self) -> Tuple[List, List]:
        X = []  # noqa N806
        Y = []  # noqa N806
        for _ in range(self.batchsize): 
            x,y = self.get_chuck()
            X.append(x)
            Y.append(y)
            self.index += 1
        return X, Y


class PaddedDatagenerator(BaseDataIterator):
    """Iterator with additional padding of X

    Args:
        BaseDataIterator (_type_): _description_
    """

    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        super().__init__(dataset, window_size, batchsize)

    def __iter__(self) -> BaseDataIterator:
        self.index = 0
        count = 0
        for index, i in enumerate(self.dataset):
            observation = self.dataset.__getitem__(index)
            chuncks = torch.split(observation[0], self.window_size)
            nr_chuncks = math.ceil(len(chuncks))
            count += nr_chuncks
        self.index_list = torch.randperm(count)
        return self

    def __next__(self) -> Tuple[Tensor, Tensor]:
        if self.index <= (len(self.index_list) - self.batchsize):
            X, Y = self.batchloop()  # noqa N806
            X_ = pad_sequence(X, batch_first=True, padding_value=0)  # noqa N806
            return X_, torch.tensor(Y)
        else:
            raise StopIteration

class BufferedDatagenerator(BaseDataIterator):
    """Iterator with additional padding of X

    Args:
        BaseDataIterator (_type_): _description_
    """

    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        super().__init__(dataset, window_size, batchsize)

    def __iter__(self):
        self.index = 0
        count = 0
        for index, i in enumerate(self.dataset):
            observation = self.dataset.__getitem__(index)
            chuncks = torch.split(observation[0], self.window_size)
            nr_chuncks = math.ceil(len(chuncks))
            count += nr_chuncks
        self.index_list = torch.randperm(count)
        return self

    def get_chunck(self):
        i = random.randint(0,len(self.dataset)-1)
        observation = self.dataset.__getitem__(i)
        chuncks = torch.split(observation[0], self.window_size)
        Y = observation[1]
        nr_chuncks = len(chuncks)
        random_chunck = random.randint(0,nr_chuncks-1)
        return chuncks[random_chunck], Y

    def __next__(self) -> Tuple[Tensor, Tensor]:
        if self.index <= (len(self.index_list) - self.batchsize):
            X, Y = self.batchloop()  # noqa N806
            X_ = pad_sequence(X, batch_first=True, padding_value=0)  # noqa N806
            return X_, torch.tensor(Y)
        else:
            raise StopIteration            

dataset = BaseDataset(datapath = get_eeg())
loader = PaddedDatagenerator(dataset = dataset, window_size=50, batchsize=32)

2022-06-09 16:56:57.960 | INFO     | __main__:get_eeg:27 - Data is downloaded to ../../data/raw/datasets/eeg.


In [70]:
iterator = iter(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch[0].shape


torch.Size([32, 50, 14])

In [None]:
#om obs in stukken te splitten: torch.split()
#niet elke batch hoeft dezelfde lengte te hebben of torch.cat()
#elk item uit dezelfde batch mag uit dezelfde obs komen
#self.buffer = []
#for .. in ..
#batch = self.buffer[:32]
#self.buffer = self.buffer[32:]


#examen iets met een dataloader
#we krijgen een dataset + dataloader, maak model, train model, experimenteer met model
#relatie met trax - hier is plaatje, maak dit in trax.
#als je maar weinig tijdsstappen hebt (5 oid), geen gru of lstm nodig, maar rnn is beter

#git config --get remote.origin.url
#git remote --set 

class BaseDatastreamer:
    """This datastreamer wil never stop
    The dataset should have a:
        __len__ method
        __getitem__ method

    """

    def __init__(
        self,
        dataset: BaseDataset,
        batchsize: int
    ) -> None:
        self.dataset = dataset
        self.batchsize = batchsize
        self.size = len(self.dataset)
        self.reset_index()

    def __len__(self) -> int:
        return int(len(self.dataset) / self.batchsize)

    def reset_index(self) -> None:
        self.index_list = np.random.permutation(self.size)
        self.index = 0

    def batchloop(self) -> Sequence[Tuple]:
        batch = []
        for _ in range(self.batchsize):
            x, y = self.get_chunck()
            batch.append((x, y))
            self.index += 1
        return batch

    def stream(self) -> Iterator:
        while True:
            if self.index > (self.size - self.batchsize):
                self.reset_index()
            batch = self.batchloop()
            if self.preprocessor is not None:
                X, Y = self.preprocessor(batch)  # noqa N806
            else:
                X, Y = zip(*batch)  # noqa N806
            yield X, Y

class EEGStreamer(BaseDatastreamer):
    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        super().__init__(dataset, window_size, batchsize)
            
    def get_chuck(self):
        
    
    def batchloop(self) -> Sequence[Tuple]:
        batch = []
        for _ in range(self.batchsize):
            pass

In [51]:
class BaseDataIterator:
    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        self.dataset = dataset #set dataset
        self.batchsize = batchsize #set batchsize
        self.window_size = window_size #set windowsize

    def __iter__(self) -> BaseDataIterator:
        self.index = 0
        self.index_list = torch.randperm(10000)
        return self

    def get_chuck(self):
        i = random.randint(0,len(self.dataset)-1)
        observation = self.dataset.__getitem__(i)
        chuncks = torch.split(observation[0], self.window_size)
        Y = observation[1]
        nr_chuncks = len(chuncks)
        random_chunck = random.randint(0,nr_chuncks-1)
        return chuncks[random_chunck], Y
        
    def batchloop(self) -> Tuple[List, List]:
        X = []  # noqa N806
        Y = []  # noqa N806
        for _ in range(self.batchsize): 
            x,y = self.get_chuck()
            X.append(x)
            Y.append(y)
            self.index += 1
        return X, Y


class PaddedDatagenerator(BaseDataIterator):
    """Iterator with additional padding of X

    Args:
        BaseDataIterator (_type_): _description_
    """

    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        super().__init__(dataset, window_size, batchsize)

    def __iter__(self):
        self.index = 0
        self.index_list = torch.randperm(15000)
        return self

    def __next__(self) -> Tuple[Tensor, Tensor]:
        if self.index <= 15000:
            X, Y = self.batchloop()  # noqa N806
            X_ = pad_sequence(X, batch_first=True, padding_value=0)  # noqa N806
            return X_, torch.tensor(Y)
        else:
            raise StopIteration

dataset = BaseDataset(datapath = get_eeg())
loader = PaddedDatagenerator(dataset = dataset, window_size=50, batchsize=32)


2022-06-09 16:30:11.348 | INFO     | __main__:get_eeg:27 - Data is downloaded to ../../data/raw/datasets/eeg.


In [58]:
iterator = iter(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch = next(loader)
batch[0].shape


torch.Size([32, 50, 14])

In [50]:
class BaseDataIterator:
    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        self.dataset = dataset #set dataset
        self.batchsize = batchsize #set batchsize
        self.window_size = window_size #set windowsize

    def __len__(self) -> int:
        window_size = self.window_size
        self.count = 0 #init count
        for i in self.dataset: #loop through all observations
            lenght_observation = i[0].shape[0] #get nr of rows in the observation
            if lenght_observation < window_size:
                items_obs = 1
            else:
                items_obs = lenght_observation-window_size+1 #get possible nr of windows from observation
            self.count += items_obs #increment count
        return int(self.count / self.batchsize)

    def __iter__(self) -> BaseDataIterator: 
        window_size = self.window_size
        self.count = 0 #init count
        item = self.dataset(random.randint(0,len(self.dataset)))

        for i in self.dataset: #loop through all observations
            lenght_observation = i[0].shape[0] #get nr of rows in the observation
            if lenght_observation < window_size:
                items_obs = 1
            else:
                items_obs = lenght_observation-window_size+1 #get possible nr of windows from observation
            self.count += items_obs #increment count
        
        
        lengthobs = len(self.dataset) #get nr of observations
        for i in range(lengthobs):
            x,y = dataset.__getitem__(i)
            lenght_item = x.shape[0]-self.window_size+1

        length = self.length #get the max nr of iterations
        self.dict_iterator = {} #init dict to store indexes 
        lengthobs = len(self.dataset) #get nr of observations
        index_tuples = [] #to store max nr tuples in
        for i in range(lengthobs):
            x,y = dataset.__getitem__(i) 
            lenght_item = x.shape[0]-self.window_size+1 
            index_tuples.append((i, lenght_item)) 
        for item in range(self.count):
            obs = random.randint(0,23)
            self.dict_iterator[item] = (obs, random.randint(0, index_tuples[obs][1]))
        self.index = 0
        return self

    def __next__(self) -> Tuple[Tensor, Tensor]:
        X, Y = self.batchloop()  # noqa N806
        return torch.tensor(X), torch.tensor(Y)

    def window(self, x: Tensor, n_time: int) -> Tensor: #function to get windowed items from observation
        """
        Generates and index that can be used to window a timeseries.
        E.g. the single series [0, 1, 2, 3, 4, 5] can be windowed into 4 timeseries with
        length 3 like this:

        [0, 1, 2]
        [1, 2, 3]
        [2, 3, 4]
        [3, 4, 5]

        We now can feed 4 different timeseries into the model, instead of 1, all
        with the same length.
        """
        n_window = len(x) - n_time + 1
        time = torch.arange(0, n_time).reshape(1, -1)
        window = torch.arange(0, n_window).reshape(-1, 1)
        idx = time + window
        return idx

    def get_windowed_item(self):
        print(self.index)
        window_size = self.window_size #get window size
        itemIndexes = self.dict_iterator[self.index]
        print(itemIndexes)
        x,y = self.dataset.__getitem__(itemIndexes[0]) #get the item and store in x and y
        if self.window_size > int(x.shape[0]): #if the window size is larger than the nr of lines in observation
            idx = self.window(x, x.shape[0]) #take the whole item
        else: # if the window size is smaller than the nr of lines in the observation
            idx = self.window(x, window_size) #use window function to get windowed sample
        currentObservation = x[idx] #apply idx to currentobservation
        currentitem = currentObservation[itemIndexes[1]] #get one windowed item
        return currentitem, y #return windowed item and its class

    def batchloop(self) -> Tuple[List, List]:
        X = []  # noqa N806
        Y = []  # noqa N806
        for _ in range(self.batchsize): 
            x,y = self.get_windowed_item()
            X.append(x)
            Y.append(y)
            self.index += 1
        return X, Y

class PaddedDatagenerator(BaseDataIterator):
    """Iterator with additional padding of X

    Args:
        BaseDataIterator (_type_): _description_
    """

    def __init__(self, dataset: BaseDataset, window_size: int, batchsize: int) -> None:
        super().__init__(dataset, window_size, batchsize)

    def __next__(self):
        if self.index <= (len(self) - self.batchsize):
            X, Y = self.batchloop()
            X_ = pad_sequence(X, batch_first=True, padding_value=0) #if there are shorter sequences, add padding
            return X_, torch.tensor(Y)
        else:
            raise StopIteration

dataset = BaseDataset(datapath = get_eeg())
loader = BaseDataIterator(dataset = dataset, window_size=30, batchsize=32)

#moeten we ervoor zorgen dat elk window max 1 keer gebruikt wordt of is het ok dat er een kleine 
# kans bestaat dat iets dubben gebruikt wordt

#omgaan met train en test set

2022-06-09 16:30:06.350 | INFO     | __main__:get_eeg:27 - Data is downloaded to ../../data/raw/datasets/eeg.
