# Biweekly Report 4
# Jacob Tiede
## Working with Big Data in Pytorch
One of my goals with this class was to learn how to manage big datasets. For instance, if we assume that the data does not fit in RAM then how does one work with it? I know this is a question I will be confronted with in industry (thankfully the data I am working with for research actually fits in RAM on CU's research computing cluster, but I know this will not always be the case), so I'd like to take a minute to learn how to work with big data in Pyotrch. Following this form post: https://discuss.pytorch.org/t/iterable-dataset-with-zarr-as-storage-backend-results-in-degrading-performance/91100 I know that I should first look at Zarr arrays, then "iterable datasets" in Pytorch, and finally so called "worker functions". 
## Zarr Arrays
I will be following a tutorial on the zarr documentation: https://zarr.readthedocs.io/en/stable/tutorial.html, though it is worth first talking about what a zarr array is. It seems to be a similar data storage type to 'dask' in that large datasets are stored in chunks in memory which are compressed, and only accessed in a python script needs them. This provides a way to work with data larger than RAM in python. With that being said let's go through a tutorial:

In [2]:
import zarr
import torch
import torch.nn as nn
import torch.nn.functional as F
#import pandas as pd
import numpy as np
import time
import os

print("Is a GPU available? ")
print(torch.cuda.is_available())
#this creates a 10000x10000 zarr array that has chunks of size (1000,1000)
z = zarr.zeros((10000, 10000), chunks=(1000, 1000), dtype='i4')
z

Is a GPU available? 
True


<zarr.core.Array (10000, 10000) int32>

In [7]:
#writes '42' to all of the values in the zarr array
z[:] = 42
#Basically we can access and write values to zarr arrays the same way we would in numpy

In [8]:
#This really isn't why zarr arrays are useful though, for that we will write a file where we save a zarr array:
z1 = zarr.open('data/example.zarr', mode='w', shape=(10000, 10000),
                chunks=(1000, 1000), dtype='i4')
#We can now edit this file:
z1[0, :] = np.arange(10000)
z1[:, 0] = np.arange(10000)
z1[0:10,0:10]

array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [2, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [3, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [4, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [5, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [6, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [7, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [8, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [9, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

In [9]:
#We can open a second instance of this data:
z2 = zarr.open('data/example.zarr', mode='r')
#We can preform basic numpy operations
print(np.all(z1[:] == z2[:]))
#as a final note we can save numpy arrays themselves using zarr.save

True


There's potentially a lot more that one can do with these arrays, but for our purposes this should be all that we need. Conviently, I wrote a function last week that will generate an arbitrarily sized data set. Let's generate a large dataset and save it to a zarr array. Note: this will fit in RAM, but if it didn't we could simply use zarr's append function to slowly create the complete Zarr array.

In [6]:
def generateData(m,n):
    #start by generating a target, a 0 will represent that they do not have the pathology and a 1 will represent that they do have the pathology
    target = torch.randint(0,2, size = (m,1))
    
    #Now we will generate the mxn matrix of genotypes:
    data = torch.zeros(m,n)
    for i in range(m):
        #if this individual has the pathology
        if target[i,0] == 1:
            #flip a coin to determine if they have one or two pathological mutations
            r = torch.rand(1,1)
            if r <= 0.5:
                data[i,0] = 2
                data[i,1] = 2
            else:
                #flip a coin to determine which allel is a 2 
                r = torch.rand(1,1)
                if r <= .5:
                    data[i,0] = 2
                    data[i,1] = torch.randint(0,2, size = (1,1))
                else:
                    data[i,1] = 2
                    data[i,0] = torch.randint(0,2, size = (1,1))
            #determine the values of the rest of the mutations in this row
            for j in range(2, n):
                r = torch.rand(1,1)
                #~70% of the other mutations are 0
                if r <= .7:
                    data[i,j] = 0
                else:
                    data[i,j] = torch.randint(1,3, size = (1,1))
        #if they do not have the pathology
        else:
            r = torch.rand(1,1)
            #right now 50% of the data has the pathology and the other 50% does not. That means that 25% of the data are positive with only 1 pathological mutation
            #so we need to choose 25% of healthy individuals to have a pathological mutation to keep our statistics in line with the description
            if r <= .25:
                r = torch.rand(1,1)
                if r <= .5:
                    data[i,0] = 2
                    data[i,1] = torch.randint(0,2, size = (1,1))
                else:
                    data[i,1] = 2
                    data[i,0] = torch.randint(0,2, size = (1,1))
            else:
                data[i,1] = torch.randint(0,2, size = (1,1))
                data[i,0] = torch.randint(0,2, size = (1,1))
            for j in range(2, n):
                r = torch.rand(1,1)
                #~70% of the other mutations are 0
                if r <= .7:
                    data[i,j] = 0
                else:
                    data[i,j] = torch.randint(1,3, size = (1,1))
    return data,target
n=100
data, target = generateData(2**14,n)
data = data.numpy()
target = target.numpy()

#Smallest int size allowable in python, for efficient storage
data = data.astype('int8')
target = target.astype('int8')
from sys import getsizeof
print("The size of the Data before loading into Zarr: " + str(getsizeof(data)))
data = zarr.array(data, chunks=(1000, n))
target = zarr.array(target, chunks=(1000, n))
print("The size of the Data after loading into Zarr: " + str(getsizeof(data)))
zarr.save('data/GenData.zarr', data)
zarr.save('data/TarData.zarr', target)
data = zarr.open('data/GenData.zarr', mode = 'r')
target = zarr.open('data/TarData.zarr', mode = 'r')
print("The size of the Data after loading from Zarr file in memory: " + str(getsizeof(data)))

The size of the Data before loading into Zarr: 1638512
The size of the Data after loading into Zarr: 48
The size of the Data after loading from Zarr file in memory: 48


In [7]:
data.info

0,1
Type,zarr.core.Array
Data type,int8
Shape,"(16384, 100)"
Chunk shape,"(1000, 100)"
Order,C
Read-only,True
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,1638400 (1.6M)
No. bytes stored,949731 (927.5K)


Now that we have the data and target saved as zarr files we can look into what an iterable dataset is in Pytorch.
## Iterable Datasets
From the documentation: https://pytorch.org/docs/stable/data.html#iterable-style-datasets we can see that an iterable dataset is pretty much what you would expect and "represents an iterable over data samples". According to the documentation this is useful when random reads of a dataset are infeasible. The example given in the documentation is particularly enlightening for what this is trying to accomplish: "For example, such a dataset, when called iter(dataset), could return a stream of data reading from a database, a remote server, or even logs generated in real time." The documentation also provides a little background into what works are. They are there to sit in memory and wait for the program to access data in the region that they are sitting in, if configured correctly (each worker is responsible for loading only a smaller independent portion of the data with no overlap of other works) this can be both very efficient and replicate the behavior of a normal dataset. We can follow https://discuss.pytorch.org/t/iterable-dataset-with-zarr-as-storage-backend-results-in-degrading-performance/91100 to implement this using our zarr array, but we will need to implement a DataLoader (basically just a sampler across a dataset that returns an iterable):

In [1]:
import itertools
class Data(torch.utils.data.IterableDataset):
    def __init__(self, path, start=None, end=None):
        super(Data, self).__init__()
        #
        store = zarr.DirectoryStore(path)
        
        self.array = zarr.open(store, mode='r')

        if start is None:
            start = 0
        if end is None:
            end = self.array.shape[0]

        assert end > start

        self.start = start
        self.end = end

    def __iter__(self):
        return itertools.islice(self.array, self.start, self.end)

NameError: name 'torch' is not defined

In [9]:
traindata = Data('data/GenData.zarr')
print("The size of the iterable dataset: " + str(getsizeof(traindata)))

The size of the iterable dataset: 48


In [65]:
def worker_init_fn(worker_id):
    worker_info = torch.utils.data.get_worker_info()
    
    dataset = worker_info.dataset  # the dataset copy in this worker process
    overall_start = dataset.start
    overall_end = dataset.end
    # configure the dataset to only process the split workload
    per_worker = int(
        math.ceil(
            (overall_end - overall_start) / float(worker_info.num_workers)
        )
    )
    worker_id = worker_info.id
    dataset.start = overall_start + worker_id * per_worker
    dataset.end = min(dataset.start + per_worker, overall_end)

In [12]:
trainloader = torch.utils.data.DataLoader(
            dataset=traindata,
            batch_size=64,
            shuffle=False,
            num_workers=2
        )
print("The size of the data loader: " + str(getsizeof(traindata)))

The size of the data loader: 48


In [13]:
for batch in trainloader:
    x = batch
print("The size of the each batch output by the train loader: " + str(getsizeof(x)))

RuntimeError: DataLoader worker (pid(s) 2428, 20396) exited unexpectedly

In [73]:
x.shape

torch.Size([64, 100])

In [3]:
import torch
class MyIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self, start, end):
        super(MyIterableDataset).__init__()
        assert end > start, "this example code only works with end >= start"
        self.start = start
        self.end = end

    def __iter__(self):
        return iter(range(self.start, self.end))

# should give same set of data as range(3, 7), i.e., [3, 4, 5, 6].
ds = MyIterableDataset(start=3, end=7)

# Single-process loading
print(list(torch.utils.data.DataLoader(ds, num_workers=0)))
[3, 4, 5, 6]
# Directly doing multi-process loading yields duplicate data
print(list(torch.utils.data.DataLoader(ds, num_workers=2)))
[3, 3, 4, 4, 5, 5, 6, 6]


[tensor([3]), tensor([4]), tensor([5]), tensor([6])]


RuntimeError: DataLoader worker (pid(s) 11848, 20864) exited unexpectedly