Skip to content

Commit

Permalink
Support learning pipelines with checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnVinyard committed Feb 22, 2018
1 parent 22034c1 commit dc6d8c1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 60 deletions.
4 changes: 3 additions & 1 deletion zounds/learn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
from supervised import SupervisedTrainer
from embedding import TripletEmbeddingTrainer

from random_samples import Reservoir, ReservoirSampler, ShuffledSamples
from random_samples import \
Reservoir, ReservoirSampler, ShuffledSamples, InfiniteShuffledSamples, \
InfiniteIterator

from util import simple_settings

Expand Down
5 changes: 2 additions & 3 deletions zounds/learn/gan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def train(self, data):

zdim = self.latent_dimension

# TODO: These dimensions work for vanilla GANs, but need to be
# reversed (batch_size, zdim, 1) for convolutional GANs

noise_shape = (self.batch_size,) + self.latent_dimension
noise = torch.FloatTensor(*noise_shape)
fixed_noise = torch.FloatTensor(*noise_shape).normal_(0, 1)
Expand All @@ -43,6 +40,8 @@ def train(self, data):

self.generator.cuda()
self.discriminator.cuda()
self.generator.train()
self.discriminator.train()
self.loss.cuda()
label = label.cuda()
noise, fixed_noise = noise.cuda(), fixed_noise.cuda()
Expand Down
47 changes: 9 additions & 38 deletions zounds/learn/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,41 +236,6 @@ def _process(self, data):
data, op, inversion_data=inv_data, inverse=inv, name=self.name)


# class Log(Preprocessor):
# """
# Perform the log-modulus transform on data
# (http://blogs.sas.com/content/iml/2014/07/14/log-transformation-of-pos-neg.html)
#
# This transform will tend to compress the overall range of values
# """
#
# def __init__(self, needs=None):
# super(Log, self).__init__(needs=needs)
#
# def _forward_func(self):
# def x(d):
# from zounds.loudness import log_modulus
# return log_modulus(d)
#
# return x
#
# def _backward_func(self):
# def x(d):
# from zounds.loudness import inverse_log_modulus
# return inverse_log_modulus(d)
#
# return x
#
# def _process(self, data):
# data = self._extract_data(data)
# op = self.transform()
# inv_data = self.inversion_data()
# inv = self.inverse_transform()
# data = op(data)
# yield PreprocessResult(
# data, op, inversion_data=inv_data, inverse=inv, name='Log')


# TODO: what about functions with imports that aren't in the calling namespace?
# TODO: what about functions that require inversion data?
# TODO: what about functions that need to do some processing first (e.g. SimHash)
Expand Down Expand Up @@ -853,15 +818,21 @@ class ExamplePipeline(ff.BaseModel):

def __init__(self, needs=None):
super(PreprocessingPipeline, self).__init__(needs=needs)
self._pipeline = OrderedDict((id(n), None) for n in needs.values())
self._init_pipeline()

def _init_pipeline(self):
self._pipeline = OrderedDict((id(n), None) for n in self.needs.values())

def _enqueue(self, data, pusher):
self._pipeline[id(pusher)] = data

def _dequeue(self):
if not self._finalized or not all(self._pipeline.itervalues()):
if not all(self._pipeline.itervalues()):
raise NotEnoughData()

return Pipeline(map(
pipeline = Pipeline(map(
lambda x: x.for_storage(),
self._pipeline.itervalues()))

self._init_pipeline()
return pipeline
17 changes: 12 additions & 5 deletions zounds/learn/pytorch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,23 @@ def x(_):
return x

def _enqueue(self, data, pusher):
if self._cache is None:
self._cache = dict()
k = self._dependency_name(pusher)
self._cache[k] = data

def _dequeue(self):
if not self._finalized:

if self._cache is None:
raise ff.NotEnoughData()

if isinstance(self._cache, dict) \
and len(self._cache) != len(self.needs):
raise ff.NotEnoughData()
return self._cache

data = self._cache
self._cache = None
return data

def _train(self, data):
trained_network = self.trainer.train(data)
Expand All @@ -121,7 +131,6 @@ def _process(self, data):
try:
forward_func = self._forward_func()
x = self.post_training_func(data['data'])
print x.shape, x.dtype, self.chunksize
processed_data = forward_func(
x, network=trained_network, chunk_size=chunksize)
except RuntimeError as e:
Expand Down Expand Up @@ -214,8 +223,6 @@ def _process(self, data):
warnings.warn(e.message)

op = self.transform(network=network, apply_network=self.apply_network)
print op.network
print op.apply_network
inv_data = self.inversion_data()
inv = self.inverse_transform()

Expand Down
54 changes: 49 additions & 5 deletions zounds/learn/random_samples.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from featureflow import Node, NotEnoughData
from featureflow import Node, NotEnoughData, IteratorNode
from zounds.core import ArrayWithUnits, IdentityDimension
import numpy as np
from itertools import cycle


class Reservoir(object):
Expand All @@ -18,6 +19,9 @@ def __init__(self, nsamples, dtype=None):
self.indices = set()
self.dtype = dtype

def percent_full(self):
return float(len(self.indices)) / self.nsamples

def _init_arr(self, samples):
if self.arr is not None:
return
Expand Down Expand Up @@ -64,9 +68,9 @@ def get_batch(self, batch_size):
'Requested {batch_size} samples, but this instance only '
'currently has {n} samples, with a maximum of {nsamples}'
.format(
batch_size=batch_size,
n=len(self.indices),
nsamples=self.nsamples))
batch_size=batch_size,
n=len(self.indices),
nsamples=self.nsamples))

# TODO: this would be much more efficient for repeated calls if I
# instead maintained a sorted set
Expand Down Expand Up @@ -130,6 +134,46 @@ def _dequeue(self):
return self.reservoir.get()


class InfiniteIterator(IteratorNode):
def __init__(self, needs=None):
super(InfiniteIterator, self).__init__(needs=needs)

def _process(self, data):
for d in cycle(data):
yield d


class InfiniteShuffledSamples(ShuffledSamples):
def __init__(
self,
nsamples=None,
multiplexed=None,
dtype=None,
needs=None,
mixture_threshold=0.9,
update_threshold=10):

super(InfiniteShuffledSamples, self).__init__(
nsamples=nsamples,
multiplexed=multiplexed,
dtype=dtype,
needs=needs)
self.update_threshold = update_threshold
self.mixture_threshold = mixture_threshold
self._updates = 0

def _enqueue(self, data, pusher):
self._updates += 1
self.reservoir.add(data)

def _dequeue(self):
print self._updates, self.reservoir.percent_full()
if self.reservoir.percent_full() >= self.mixture_threshold \
and self._updates >= self.update_threshold:
self._updates = 0
return self.reservoir.get()


class ReservoirSampler(Node):
"""
Use reservoir sampling (http://en.wikipedia.org/wiki/Reservoir_sampling) to
Expand Down Expand Up @@ -186,4 +230,4 @@ def _dequeue(self):
arr = ArrayWithUnits(arr, self._r.dimensions)
return arr

return self._r
return self._r
12 changes: 4 additions & 8 deletions zounds/learn/test_pytorch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,27 +427,23 @@ def test_can_train_gan(self):
@simple_in_memory_settings
class Pipeline(ff.BaseModel):
inp = ff.PickleFeature(
ff.IteratorNode,
store=False)
ff.IteratorNode)

samples = ff.PickleFeature(
ShuffledSamples,
nsamples=500,
needs=inp,
dtype=np.float32,
store=False)
dtype=np.float32)

scaled = ff.PickleFeature(
InstanceScaling,
needs=samples,
store=False)
needs=samples)

network = ff.PickleFeature(
PyTorchGan,
apply_network='generator',
trainer=trainer,
needs=scaled,
store=False)
needs=scaled)

pipeline = ff.PickleFeature(
PreprocessingPipeline,
Expand Down

0 comments on commit dc6d8c1

Please sign in to comment.