Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Continuously load data from disk in separate thread #20

Closed
octavian-ganea opened this issue Jul 7, 2016 · 14 comments
Closed

Continuously load data from disk in separate thread #20

octavian-ganea opened this issue Jul 7, 2016 · 14 comments

Comments

@octavian-ganea
Copy link

Hi,

After playing a bit with torchnet it is still unclear to me how to properly tackle the following problem: suppose my training data lies in a very big file on disk (not loadable into memory at once), each line of the file being one example point. I would like to build a data iterator that runs on a separate thread (or more threads) and that can provide mini-batches to the main thread that performs training of the network. I would also like to do multiple epochs on the ttraining data, so I require that the training file is reopened once it is finished.

I tried using ParallelDatasetIterator, but as far as I understand, the closure is run once per thread and the returned dataset is expected to have a finite size. Can someone please explain or give an example on this issue ? Thanks a lot.

@octavian-ganea octavian-ganea changed the title Continuously load data from disk in parallel thread Continuously load data from disk in separate thread Jul 7, 2016
@lvdmaaten
Copy link
Contributor

Yeah that is a little tricky to do if you don't have any index on the file. You could scan through the file once to determine the number of lines (wc -l) and the offsets for each of the lines. Now you can write a simple dataset that loads that information from disk and returns the number of lines in size(). You could implement the get{idx = idx} function by simply making your BigFileDataset remember its location in the file, and if that location does not equal the offset for the requested idx does not equal the current location, do an fseekf call.

If you don't do any shuffling or resampling of the data (but only batching), the vast majority of the reads will be sequential and so the data loading will actually be very efficient. (Note that ParallelDatasetIterator ensures that the same data is never read twice.)

@octavian-ganea
Copy link
Author

Thanks for your answer. I would be more inclined for an architecture in which the data thread loads in memory a maximum number K of batches at a time, refilling its queue each time the main thread retrieves one batch from this queue. Also, I would obviously need this operation to be async with the main thread. At the moment, I tried smthg similar to the MNIST example:

function get_it()
  local parallelit = tnt.ParallelDatasetIterator{
    nthread = 1,
    init    = function() require 'torchnet' end,
    closure = function()
       -- Load K mini-batches

      return tnt.BatchDataset{
        batchsize = 1,
        dataset = ....
      }        
    end,
    ordered = true,
  }
  return parallelit
end

while true do
  local it = get_it()()
  for i = 1,bs do
    local batch = it()
    -- Process batch
  end
end

However, the call of get_it()() is blocking which is not what I need.

@soumith
Copy link

soumith commented Jul 8, 2016

i did something like what Octavian suggests in dcgan.torch code, keeping the queue full:
https://github.com/soumith/dcgan.torch/blob/master/data/data.lua

@lvdmaaten
Copy link
Contributor

@octavian-ganea Maybe I am misunderstanding what you are saying, but this is exactly what ParallelDatasetIterator is doing: https://github.com/torchnet/torchnet/blob/master/dataset/paralleldatasetiterator.lua#L103-123

The K you mention is the nthread parameter. So in your code example, just set that to 10 or 20 and you should be fine...?

@octavian-ganea
Copy link
Author

I am not sure. As I said, the call of get_it()() in the code snippet I posted above is blocking instead of asynchronous. I need to have one separate thread that loads data mini batches in an infinite loop while my main thread is processing these mini batches with the neural network separated, without waiting for the data thread.

If I set nthreads higher like you suggest, it is not clear how they can read chuncks of a file in parallel. I am fine with just one thread doing this job, but again, I need this to be asynchronous from the main thread.

Would appreciate if you could show a small code snippet on how to do this properly. Thanks a lot!

@lvdmaaten
Copy link
Contributor

I would set it up something like this (this is untested, but I hope you get the gist of it):

-- I'm assuming `offsets` is a Tensor that contains the location in the file at
-- which line `idx` starts; you can construct this once and save it
local offsets = torch.load('offsets.t7')

-- set up file-loading dataset:
local MyDataset = torch.class('tnt.MyDataset', 'tnt.Dataset')
MyDataset.init = function(self, filename, offsets)
   self.offsets = offsets
   self.file = io.open(filename, 'r')
   self.idx = 0;
end
MyDataset.size = function(self)
   return self.offsets:nElement()
end
MyDataset.get(self, idx)
   assert(idx > 0 and idx <= self.offsets:nElement())

   -- move to right location in file; when you are reading large batches and
   -- did not shuffle this hardly ever happens:
   if self.idx ~= self.offsets[idx] then
      self.idx = self.offsets[idx]
      self.file:seek('set', self.idx)
   end

   -- read the line and update the index:
   local line = self.file:read('*l')
   self.idx = self.idx + line + 1
   return {input = line}  -- you may need to transform the line before returning
end

-- set up the dataset iterator:
local filename = 'datafile.txt'
local batchsize = 256  -- number of lines that will be read sequentially
local iterator = tnt.ParallelDatasetIterator{
   nthread = 5,  -- none of these threads will ever read the same data
   closure = function()
      local tnt = require 'torchnet'
      return tnt.BatchDataset{
         batchsize = batchsize,
         dataset = tnt.MyDataset(filename, offsets),
      }
   end,
}  -- this is blocking, but it is not actually doing any work

-- loop through the data once:
while sample in iterator.run() do  -- this is non-blocking

   -- process batch...
   for key, line in pairs(sample) do
      print(string.format('line %d: %s', key, line))
   end
end

@octavian-ganea
Copy link
Author

Wow. Really helpful! Will try this out! Many thanks to both of you!

@octavian-ganea
Copy link
Author

octavian-ganea commented Jul 8, 2016

I;ve tried to run your example, but I keep getting the error:

torch/install/share/lua/5.1/torch/init.lua:102: while creating metatable tnt.MyConcatDataset: bad argument #1 (tnt is an invalid module name)
stack traceback:
    [C]: in function 'newmetatable'
    ...t/Desktop/nlp/torch/install/share/lua/5.1/torch/init.lua:102: in function 'class'
    load_torchnet.lua:4: in main chunk

The problem appears even if I only have the following code:

local tnt = require 'torchnet'
local MyConcatDataset = torch.class('tnt.MyConcatDataset', 'tnt.Dataset')

I've tried importing torchnet.env like in the datasets folder, without success. Sorry if this is really simple and I am missing it, but I've spent a bit of time trying to figure it out. Thanks!

@lvdmaaten
Copy link
Contributor

Maybe you have an old version of the class package? Your two-line example works fine for me.

@lvdmaaten
Copy link
Contributor

luarocks install class. Can you please post back here to confirm this worked for you?

@octavian-ganea
Copy link
Author

octavian-ganea commented Jul 11, 2016

It actually works only if I declare tnt as global (removing local), or if I keep it local but replace the second line by

local MyDataset, Dataset = torch.class('tnt.MyDataset', 'tnt.Dataset', tnt)

I've tried this on a completely fresh installation of Torch, tds, argcheck and torchnet.

But, using this code, MyDataset seem to not be seen (a nil value) inside the closure function of the ParallelDatasetIterator. I've tried moving the tnt definition in an init function, still got the same error. Thanks for helping!

@octavian-ganea
Copy link
Author

Hi. Any idea how to solve the above issue ? Here is the code. Thanks!

local tnt = require 'torchnet'
local MyDataset, Dataset = torch.class('tnt.MyDataset', 'tnt.Dataset', tnt)
local iterator = tnt.ParallelDatasetIterator{
  nthread = 1, 
  closure = function()
    local tnt = require 'torchnet'
    local dataset = tnt.MyDataset()  --- MyDataset is nil here
  end,
} 

@lvdmaaten
Copy link
Contributor

Note that closure is run inside a worker thread, not inside the main thread. Each worker thread has its own environment, and so the code inside your closure cannot know about tnt.MyDataset because it was never defined: you only required the torchnet package itself, and not your MyDataset.

The best solution is to move your definition of tnt.MyDataset into a separate file (say, mydataset.lua), and then to require this file inside the closure.

This is a minimal working example:

local tnt = require 'torchnet'
local iterator = tnt.ParallelDatasetIterator{
  nthread = 1, 
  closure = function()
    local tnt = require 'torchnet'

    -- move all this stuff into a separate file and require it:
    local MyDataset, Dataset = torch.class('tnt.MyDataset', 'tnt.Dataset', tnt)
    MyDataset.size = function(self) return 0 end
    MyDataset.get  = function(self, idx) return {} end    

    -- now the thread knows about tnt.MyDataset:
    return tnt.MyDataset()
  end,
} 

@octavian-ganea
Copy link
Author

Works very nicely now! Thanks a lot!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants