# dataset

> Fill in a module description here

In [None]:
#| default_exp dataset

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
#| export
from pathlib import Path
from typing import Optional

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

from singleline_models.utils import CN

In [None]:
#| export
class StrokesDataset(Dataset):
    """
    ## Dataset

    This class loads and pre-processes the data.
    """

    def __init__(self, dataset: np.array, max_seq_length: int, scale: Optional[float] = None):
        """
        `dataset` is a list of numpy arrays of shape [seq_len, 3].
        It is a sequence of strokes, and each stroke is represented by
        3 integers.
        First two are the displacements along x and y ($\Delta x$, $\Delta y$)
        and the last integer represents the state of the pen, $1$ if it's touching
        the paper and $0$ otherwise.
        """

        data = []
        # We iterate through each of the sequences and filter
        for idx, seq in enumerate(dataset):
            if len(seq) < 10:
                print(f"filtering out {idx} - length: {len(seq)}")
                continue
            elif len(seq) > max_seq_length:
                print(f"truncating {idx} - length: {len(seq)}")
                seq = seq[:max_seq_length]
            # Clamp $\Delta x$, $\Delta y$ to $[-1000, 1000]$
            seq = np.minimum(seq, 1000)
            seq = np.maximum(seq, -1000)
            # Convert to a floating point array and add to `data`
            seq = np.array(seq, dtype=np.float32)
            data.append(seq)
        print(f"finished filtering - len(dataset) = {len(dataset)}, len(data) = {len(data)}")

        # We then calculate the scaling factor which is the
        # standard deviation of ($\Delta x$, $\Delta y$) combined.
        # Paper notes that the mean is not adjusted for simplicity,
        # since the mean is anyway close to $0$.
        if scale is None:
            scale = np.std(np.concatenate([np.ravel(s[:, 0:2]) for s in data]))
        self.scale = scale

        # Get the longest sequence length among all sequences
        longest_seq_len = max([len(seq) for seq in data])

        # We initialize PyTorch data array with two extra steps for start-of-sequence (sos)
        # and end-of-sequence (eos).
        # Each step is a vector $(\Delta x, \Delta y, p_1, p_2, p_3)$.
        # Only one of $p_1, p_2, p_3$ is $1$ and the others are $0$.
        # They represent *pen down*, *pen up* and *end-of-sequence* in that order.
        # $p_1$ is $1$ if the pen touches the paper in the next step.
        # $p_2$ is $1$ if the pen doesn't touch the paper in the next step.
        # $p_3$ is $1$ if it is the end of the drawing.
        self.data = torch.zeros(len(data), longest_seq_len + 2, 5, dtype=torch.float)
        # The mask array needs only one extra-step since it is for the outputs of the
        # decoder, which takes in `data[:-1]` and predicts next step.
        self.mask = torch.zeros(len(data), longest_seq_len + 1)

        for i, seq in enumerate(data):
            seq = torch.from_numpy(seq)
            len_seq = len(seq)
            # Scale and set $\Delta x, \Delta y$
            self.data[i, 1:len_seq + 1, :2] = seq[:, :2] / scale
            # $p_1$
            self.data[i, 1:len_seq + 1, 2] = 1 - seq[:, 2]
            # $p_2$
            self.data[i, 1:len_seq + 1, 3] = seq[:, 2]
            # $p_3$
            self.data[i, len_seq + 1:, 4] = 1
            # Mask is on until end of sequence
            self.mask[i, :len_seq + 1] = 1

        # Start-of-sequence is $(0, 0, 1, 0, 0)$
        self.data[:, 0, 2] = 1

    def __len__(self):
        """Size of the dataset"""
        return len(self.data)

    def __getitem__(self, idx: int):
        """Get a sample"""
        return self.data[idx], self.mask[idx]


In [None]:
#| export
def random_scale(data, random_scale_factor=0.15):
    """Augment data by stretching x and y axis randomly [1-e, 1+e]."""
    x_scale_factor = (
        np.random.random() - 0.5) * 2 * random_scale_factor + 1.0
    y_scale_factor = (
        np.random.random() - 0.5) * 2 * random_scale_factor + 1.0
    result = np.copy(data)
    result[:, 0] *= x_scale_factor
    result[:, 1] *= y_scale_factor
    return result

In [None]:
#| export
def augment_strokes(strokes, prob=0.0):
    """Perform data augmentation by randomly dropping out strokes."""
    # drop each point within a line segments with a probability of prob
    # note that the logic in the loop prevents points at the ends to be dropped.
    result = []
    prev_stroke = [0, 0, 1]
    count = 0
    stroke = [0, 0, 1]  # Added to be safe.
    for i in range(len(strokes)):
        candidate = [strokes[i][0], strokes[i][1], strokes[i][2]]
        if candidate[2] == 1 or prev_stroke[2] == 1:
            count = 0
        else:
            count += 1
        urnd = np.random.rand()  # uniform random variable
        if candidate[2] == 0 and prev_stroke[2] == 0 and count > 2 and urnd < prob:
            stroke[0] += candidate[0]
            stroke[1] += candidate[1]
        else:
            stroke = candidate
            prev_stroke = stroke
            result.append(stroke)
    return np.array(result)



In [None]:
#| export
def create_dataloaders(hp: CN):
    path = Path(hp.dataset_fname)
    if not hp.dataset_fname:
        # `npz` file path is `data/quickdraw/[DATASET NAME].npz`
        base_path = Path(f"data/{hp.dataset_source}")
        path = base_path / f'{hp.dataset_name}.npz'
    # Load the numpy file
    dataset = np.load(str(path), encoding='latin1', allow_pickle=True)

    # Create training dataset
    train_dataset = StrokesDataset(dataset['train'], hp.max_seq_length)
    # Create validation dataset
    valid_dataset = StrokesDataset(dataset['valid'], hp.max_seq_length, train_dataset.scale)

    def collate_fn(batch, **kwargs):
        assert type(batch) == list
        # assert len(batch) == hp.batch_size

        all_data = []
        all_mask = []
        for data, mask in batch:
            # print(f"data.shape[0]={data.shape[0]}")
            assert data.shape[0] == hp.max_seq_length + 2
            assert data.shape[1] == 5
            assert len(data.shape) == 2
            assert mask.shape[0] == hp.max_seq_length + 1
            assert len(mask.shape) == 1

            _data = data
            if hp.use_random_scale:
                _data = random_scale(data, hp.random_scale_factor)

            if hp.augment_stroke_prob > 0:
                _data = augment_strokes(_data, hp.augment_stroke_prob)

            all_data.append(data)
            all_mask.append(mask)


        # print(f"collate - batch: {len(batch)}, {batch[0][0].shape}, {batch[0][1].shape}")
        # print(f"collate - kwargs: {kwargs}")
        return torch.stack(all_data), torch.stack(all_mask)

    # Create training data loader
    train_loader = DataLoader(train_dataset, hp.batch_size, shuffle=True, collate_fn=collate_fn)
    # Create validation data loader
    valid_loader = DataLoader(valid_dataset, hp.batch_size)

    return train_dataset, train_loader, valid_dataset, valid_loader

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()