Skip to content

Commit

Permalink
Merge pull request #2061 from deepchem/shuffle
Browse files Browse the repository at this point in the history
Fix DiskDataset.shuffle_each_shard
  • Loading branch information
Bharath Ramsundar committed Jul 30, 2020
2 parents 9436f33 + c94a59f commit b1e6316
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 130 deletions.
87 changes: 79 additions & 8 deletions deepchem/data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ def __init__(self, data_dir: str) -> None:
@staticmethod
def create_dataset(shard_generator: Iterable[Batch],
data_dir: Optional[str] = None,
tasks: Optional[Sequence] = []):
tasks: Optional[Sequence] = []) -> "DiskDataset":
"""Creates a new DiskDataset
Parameters
Expand All @@ -986,6 +986,10 @@ def create_dataset(shard_generator: Iterable[Batch],
Filename for data directory. Creates a temp directory if none specified.
tasks: list
List of tasks for this dataset.
Returns
-------
A `DiskDataset` constructed from the given data
"""
if data_dir is None:
data_dir = tempfile.mkdtemp()
Expand Down Expand Up @@ -1046,6 +1050,31 @@ def write_data_to_disk(
y: Optional[np.ndarray] = None,
w: Optional[np.ndarray] = None,
ids: Optional[np.ndarray] = None) -> List[Optional[str]]:
"""Static helper method to write data to disk.
This helper method is used to write a shard of data to disk.
Parameters
----------
data_dir: str
Data directory to write shard to
basename: str
Basename for the shard in question.
tasks: np.ndarray
The names of the tasks in question.
X: Optional[np.ndarray]
The features array
y: Optional[np.ndarray]
The labels array
w: Optional[np.ndarray]
The weights array
ids: Optional[np.ndarray]
The identifiers array
Returns
-------
List with values `[out_ids, out_X, out_y, out_w]` with filenames of locations to disk which these respective arrays were written.
"""
if X is not None:
out_X: Optional[str] = "%s-X.npy" % basename
save_to_disk(X, os.path.join(data_dir, out_X)) # type: ignore
Expand Down Expand Up @@ -1125,6 +1154,7 @@ def generator():
shutil.rmtree(self.data_dir)
shutil.move(reshard_dir, self.data_dir)
self.metadata_df = resharded_dataset.metadata_df
# Note that this resets the cache internally
self.save_to_disk()

def get_data_shape(self) -> Shape:
Expand Down Expand Up @@ -1474,7 +1504,28 @@ def from_numpy(X: np.ndarray,
ids: Optional[np.ndarray] = None,
tasks: Optional[Sequence] = None,
data_dir: Optional[str] = None) -> "DiskDataset":
"""Creates a DiskDataset object from specified Numpy arrays."""
"""Creates a DiskDataset object from specified Numpy arrays.
Parameters
----------
X: np.ndarray
Feature array
y: Optional[np.ndarray], optional (default None)
labels array
w: Optional[np.ndarray], optional (default None)
weights array
ids: Optional[np.ndarray], optional (default None)
identifiers array
tasks: Optional[Sequence], optional (default None)
Tasks in this dataset
data_dir: Optional[str], optional (default None)
The directory to write this dataset to. If none is specified, will use
a temporary dataset instead.
Returns
-------
A `DiskDataset` constructed from the provided information.
"""
n_samples = len(X)
if ids is None:
ids = np.arange(n_samples)
Expand Down Expand Up @@ -1578,6 +1629,7 @@ def sparse_shuffle(self) -> None:
ids: List[np.ndarray] = []
num_features = -1
for i in range(num_shards):
logger.info("Sparsifying shard %d/%d" % (i, num_shards))
(X_s, y_s, w_s, ids_s) = self.get_shard(i)
if num_features == -1:
num_features = X_s.shape[1]
Expand All @@ -1594,6 +1646,7 @@ def sparse_shuffle(self) -> None:
w[permutation], ids[permutation])
# Write shuffled shards out to disk
for i in range(num_shards):
logger.info("Sparse shuffling shard %d/%d" % (i, num_shards))
start, stop = i * shard_size, (i + 1) * shard_size
(X_sparse_s, y_s, w_s, ids_s) = (X_sparse[start:stop], y[start:stop],
w[start:stop], ids[start:stop])
Expand Down Expand Up @@ -1646,20 +1699,38 @@ def complete_shuffle(self, data_dir: Optional[str] = None) -> "DiskDataset":

return DiskDataset.from_numpy(Xs, ys, ws, ids, data_dir=data_dir)

def shuffle_each_shard(self) -> None:
"""Shuffles elements within each shard of the datset."""
def shuffle_each_shard(self,
shard_basenames: Optional[List[str]] = None) -> None:
"""Shuffles elements within each shard of the datset.
Parameters
----------
shard_basenames: Optional[List[str]], optional (default None)
The basenames for each shard. If this isn't specified, will assume the
basenames of form "shard-i" used by `create_dataset` and
`reshard`.
"""
tasks = self.get_task_names()
# Shuffle the arrays corresponding to each row in metadata_df
n_rows = len(self.metadata_df.index)
n_rows = len(self.metadata_df.index)
for i in range(n_rows):
row = self.metadata_df.iloc[i]
if shard_basenames is not None:
if len(shard_basenames) != n_rows:
raise ValueError(
"shard_basenames must provide a basename for each shard in this DiskDataset."
)
else:
shard_basenames = ["shard-%d" % shard_num for shard_num in range(n_rows)]
for i, basename in zip(range(n_rows), shard_basenames):
logger.info("Shuffling shard %d/%d" % (i, n_rows))
X, y, w, ids = self.get_shard(i)
n = X.shape[0]
permutation = np.random.permutation(n)
X, y, w, ids = (X[permutation], y[permutation], w[permutation],
ids[permutation])
DiskDataset.write_data_to_disk(self.data_dir, "", tasks, X, y, w, ids)
DiskDataset.write_data_to_disk(self.data_dir, basename, tasks, X, y, w,
ids)
# Reset cache
self._cached_shards = None

def shuffle_shards(self) -> None:
"""Shuffles the order of the shards for this dataset."""
Expand Down
240 changes: 118 additions & 122 deletions deepchem/data/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,125 +13,121 @@
import numpy as np


class TestShuffle(unittest.TestCase):
"""
Test singletask/multitask dataset shuffling.
"""

#def test_shuffle(self):
# """Test that datasets can be merged."""
# current_dir = os.path.dirname(os.path.realpath(__file__))

# dataset_file = os.path.join(
# current_dir, "../../models/tests/example.csv")

# featurizer = dc.feat.CircularFingerprint(size=1024)
# tasks = ["log-solubility"]
# loader = dc.data.CSVLoader(
# tasks=tasks, smiles_field="smiles", featurizer=featurizer)
# dataset = loader.featurize(dataset_file, shard_size=2)

# X_orig, y_orig, w_orig, orig_ids = (dataset.X, dataset.y, dataset.w,
# dataset.ids)
# orig_len = len(dataset)

# dataset.shuffle(iterations=5)
# X_new, y_new, w_new, new_ids = (dataset.X, dataset.y, dataset.w,
# dataset.ids)
#
# assert len(dataset) == orig_len
# # The shuffling should have switched up the ordering
# assert not np.array_equal(orig_ids, new_ids)
# # But all the same entries should still be present
# assert sorted(orig_ids) == sorted(new_ids)
# # All the data should have same shape
# assert X_orig.shape == X_new.shape
# assert y_orig.shape == y_new.shape
# assert w_orig.shape == w_new.shape

def test_sparse_shuffle(self):
"""Test that sparse datasets can be shuffled quickly."""
current_dir = os.path.dirname(os.path.realpath(__file__))

dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")

featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
dataset = loader.featurize(dataset_file, shard_size=2)

X_orig, y_orig, w_orig, orig_ids = (dataset.X, dataset.y, dataset.w,
dataset.ids)
orig_len = len(dataset)

dataset.sparse_shuffle()
X_new, y_new, w_new, new_ids = (dataset.X, dataset.y, dataset.w,
dataset.ids)

assert len(dataset) == orig_len
# The shuffling should have switched up the ordering
assert not np.array_equal(orig_ids, new_ids)
# But all the same entries should still be present
assert sorted(orig_ids) == sorted(new_ids)
# All the data should have same shape
assert X_orig.shape == X_new.shape
assert y_orig.shape == y_new.shape
assert w_orig.shape == w_new.shape

def test_shuffle_each_shard(self):
"""Test that shuffle_each_shard works."""
n_samples = 100
n_tasks = 10
n_features = 10

X = np.random.rand(n_samples, n_features)
y = np.random.randint(2, size=(n_samples, n_tasks))
w = np.random.randint(2, size=(n_samples, n_tasks))
ids = np.arange(n_samples)
dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)
dataset.reshard(shard_size=10)

dataset.shuffle_each_shard()
X_s, y_s, w_s, ids_s = (dataset.X, dataset.y, dataset.w, dataset.ids)
assert X_s.shape == X.shape
assert y_s.shape == y.shape
assert ids_s.shape == ids.shape
assert w_s.shape == w.shape

# The ids should now store the performed permutation. Check that the
# original dataset is recoverable.
for i in range(n_samples):
np.testing.assert_array_equal(X_s[i], X[ids_s[i]])
np.testing.assert_array_equal(y_s[i], y[ids_s[i]])
np.testing.assert_array_equal(w_s[i], w[ids_s[i]])
np.testing.assert_array_equal(ids_s[i], ids[ids_s[i]])

def test_shuffle_shards(self):
"""Test that shuffle_shards works."""
n_samples = 100
n_tasks = 10
n_features = 10

X = np.random.rand(n_samples, n_features)
y = np.random.randint(2, size=(n_samples, n_tasks))
w = np.random.randint(2, size=(n_samples, n_tasks))
ids = np.arange(n_samples)
dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)
dataset.reshard(shard_size=10)
dataset.shuffle_shards()

X_s, y_s, w_s, ids_s = (dataset.X, dataset.y, dataset.w, dataset.ids)

assert X_s.shape == X.shape
assert y_s.shape == y.shape
assert ids_s.shape == ids.shape
assert w_s.shape == w.shape

# The ids should now store the performed permutation. Check that the
# original dataset is recoverable.
for i in range(n_samples):
np.testing.assert_array_equal(X_s[i], X[ids_s[i]])
np.testing.assert_array_equal(y_s[i], y[ids_s[i]])
np.testing.assert_array_equal(w_s[i], w[ids_s[i]])
np.testing.assert_array_equal(ids_s[i], ids[ids_s[i]])
def test_complete_shuffle():
"""Test that complete shuffle."""
current_dir = os.path.dirname(os.path.realpath(__file__))

dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")

featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
dataset = loader.featurize(dataset_file, shard_size=2)

X_orig, y_orig, w_orig, orig_ids = (dataset.X, dataset.y, dataset.w,
dataset.ids)
orig_len = len(dataset)

dataset = dataset.complete_shuffle()
X_new, y_new, w_new, new_ids = (dataset.X, dataset.y, dataset.w, dataset.ids)

assert len(dataset) == orig_len
# The shuffling should have switched up the ordering
assert not np.array_equal(orig_ids, new_ids)
# But all the same entries should still be present
assert sorted(orig_ids) == sorted(new_ids)
# All the data should have same shape
assert X_orig.shape == X_new.shape
assert y_orig.shape == y_new.shape
assert w_orig.shape == w_new.shape


def test_sparse_shuffle():
"""Test that sparse datasets can be shuffled quickly."""
current_dir = os.path.dirname(os.path.realpath(__file__))

dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")

featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
dataset = loader.featurize(dataset_file, shard_size=2)

X_orig, y_orig, w_orig, orig_ids = (dataset.X, dataset.y, dataset.w,
dataset.ids)
orig_len = len(dataset)

dataset.sparse_shuffle()
X_new, y_new, w_new, new_ids = (dataset.X, dataset.y, dataset.w, dataset.ids)

assert len(dataset) == orig_len
# The shuffling should have switched up the ordering
assert not np.array_equal(orig_ids, new_ids)
# But all the same entries should still be present
assert sorted(orig_ids) == sorted(new_ids)
# All the data should have same shape
assert X_orig.shape == X_new.shape
assert y_orig.shape == y_new.shape
assert w_orig.shape == w_new.shape


def test_shuffle_each_shard():
"""Test that shuffle_each_shard works."""
n_samples = 100
n_tasks = 10
n_features = 10

X = np.random.rand(n_samples, n_features)
y = np.random.randint(2, size=(n_samples, n_tasks))
w = np.random.randint(2, size=(n_samples, n_tasks))
ids = np.arange(n_samples)
dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)
dataset.reshard(shard_size=10)

dataset.shuffle_each_shard()
X_s, y_s, w_s, ids_s = (dataset.X, dataset.y, dataset.w, dataset.ids)
assert X_s.shape == X.shape
assert y_s.shape == y.shape
assert ids_s.shape == ids.shape
assert w_s.shape == w.shape
assert not (ids_s == ids).all()

# The ids should now store the performed permutation. Check that the
# original dataset is recoverable.
for i in range(n_samples):
np.testing.assert_array_equal(X_s[i], X[ids_s[i]])
np.testing.assert_array_equal(y_s[i], y[ids_s[i]])
np.testing.assert_array_equal(w_s[i], w[ids_s[i]])
np.testing.assert_array_equal(ids_s[i], ids[ids_s[i]])


def test_shuffle_shards():
"""Test that shuffle_shards works."""
n_samples = 100
n_tasks = 10
n_features = 10

X = np.random.rand(n_samples, n_features)
y = np.random.randint(2, size=(n_samples, n_tasks))
w = np.random.randint(2, size=(n_samples, n_tasks))
ids = np.arange(n_samples)
dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)
dataset.reshard(shard_size=10)
dataset.shuffle_shards()

X_s, y_s, w_s, ids_s = (dataset.X, dataset.y, dataset.w, dataset.ids)

assert X_s.shape == X.shape
assert y_s.shape == y.shape
assert ids_s.shape == ids.shape
assert w_s.shape == w.shape

# The ids should now store the performed permutation. Check that the
# original dataset is recoverable.
for i in range(n_samples):
np.testing.assert_array_equal(X_s[i], X[ids_s[i]])
np.testing.assert_array_equal(y_s[i], y[ids_s[i]])
np.testing.assert_array_equal(w_s[i], w[ids_s[i]])
np.testing.assert_array_equal(ids_s[i], ids[ids_s[i]])

0 comments on commit b1e6316

Please sign in to comment.