Skip to content

Commit

Permalink
Adding more tests for resharding and legacy-to-modern conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Bharath Ramsundar authored and Bharath Ramsundar committed Jul 31, 2020
1 parent fdcd5ea commit f888b87
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 96 deletions.
131 changes: 78 additions & 53 deletions deepchem/data/datasets.py
Expand Up @@ -1018,6 +1018,21 @@ class DiskDataset(Dataset):
>> data_dir = "/path/to/my/data"
>> dataset = dc.data.DiskDataset(data_dir)
Once you have a dataset you can access its attributes as follows
>>> X = np.random.rand(10, 10)
>>> y = np.random.rand(10,)
>>> w = np.ones_like(y)
>>> dataset = dc.data.DiskDataset.from_numpy(X)
>>> X, y, w = dataset.X, dataset.y, dataset.w
One thing to beware of is that `dataset.X`, `dataset.y`, `dataset.w` are
loading data from disk! If you have a large dataset, these operations can be
extremely slow. Instead try iterating through the dataset instead.
>>> for (xi, yi, wi, idi) in dataset.itersamples():
... pass
Attributes
----------
data_dir: str
Expand Down Expand Up @@ -1056,7 +1071,9 @@ def __init__(self, data_dir: str) -> None:
self.tasks, self.metadata_df = self.load_metadata()
if len(self.metadata_df.columns) == 4 and list(
self.metadata_df.columns) == ['ids', 'X', 'y', 'w']:
logger.info("Detected legacy metatadata on disk.")
logger.info(
"Detected legacy metatadata on disk. You can upgrade from legacy metadata to the more efficient current metadata by resharding this dataset."
)
self.legacy_metadata = True
elif len(self.metadata_df.columns) == 8 and list(
self.metadata_df.columns) == [
Expand Down Expand Up @@ -1085,7 +1102,7 @@ def create_dataset(shard_generator: Iterable[Batch],
(X, y, w, ids). Each tuple will be written to a separate shard on disk.
data_dir: str
Filename for data directory. Creates a temp directory if none specified.
tasks: list
tasks: Optional[sequence]
List of tasks for this dataset.
legacy_metadata: bool, optional (default False)
If `True` use the legacy format for metadata without shape information
Expand Down Expand Up @@ -1138,20 +1155,22 @@ def load_metadata(self) -> Tuple[List[str], pd.DataFrame]:
raise ValueError("No Metadata Found On Disk")

@staticmethod
def _save_metadata(tasks: List[str], metadata_df: pd.DataFrame,
def _save_metadata(tasks: Optional[Sequence], metadata_df: pd.DataFrame,
data_dir: str) -> None:
"""Saves the metadata for a DiskDataset
Parameters
----------
tasks: list of str
Tasks of DiskDataset
tasks: Sequence
Tasks of DiskDataset.
metadata_df: pd.DataFrame
The dataframe which will be written to disk.
data_dir: str
Directory to store metadata
"""
if isinstance(tasks, np.ndarray):
if tasks is None:
tasks = []
elif isinstance(tasks, np.ndarray):
tasks = tasks.tolist()
metadata_filename = os.path.join(data_dir, "metadata.csv.gzip")
tasks_filename = os.path.join(data_dir, "tasks.json")
Expand All @@ -1176,9 +1195,10 @@ def _construct_metadata(metadata_entries: List,
if not legacy_metadata:
columns = ('ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape',
'w_shape')
metadata_df = pd.DataFrame(metadata_entries, columns=columns)
else:
columns = ('ids', 'X', 'y', 'w')
metadata_df = pd.DataFrame(metadata_entries, columns=columns)
legacy_columns = ('ids', 'X', 'y', 'w')
metadata_df = pd.DataFrame(metadata_entries, columns=legacy_columns)
return metadata_df

@staticmethod
Expand Down Expand Up @@ -1285,25 +1305,47 @@ def get_task_names(self) -> np.ndarray:
def reshard(self, shard_size: int) -> None:
"""Reshards data to have specified shard size.
Examples
--------
>>> import deepchem as dc
>>> import numpy as np
>>> X = np.random.rand(100, 10)
>>> d = dc.data.DiskDataset.from_numpy(X)
>>> d.reshard(shard_size=10)
>>> d.get_number_shards()
10
Note
----
If this `DiskDataset` is in `legacy_metadata` format, reshard will
maintain legacy metadata format.
convert this dataset to have non-legacy metadata.
"""
# Create temp directory to store resharded version
reshard_dir = tempfile.mkdtemp()

n_shards = self.get_number_shards()

# Get correct shapes for y/w
tasks = self.get_task_names()
_, y_shape, w_shape, _ = self.get_shape()
if len(y_shape) == 1:
y_shape = (len(y_shape), len(tasks))
if len(w_shape) == 1:
w_shape = (len(w_shape), len(tasks))

# Write data in new shards
def generator():
tasks = self.get_task_names()
X_next = np.zeros((0,) + self.get_data_shape())
y_next = np.zeros((0,) + (len(tasks),))
w_next = np.zeros((0,) + (len(tasks),))
y_next = np.zeros((0,) + y_shape[1:])
w_next = np.zeros((0,) + w_shape[1:])
ids_next = np.zeros((0,), dtype=object)
for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
logger.info("Resharding shard %d/%d" % (shard_num, n_shards))
# Handle shapes
X = np.reshape(X, (len(X),) + self.get_data_shape())
# Note that this means that DiskDataset resharding currently doesn't
# work for datasets that aren't regression/classification.
y = np.reshape(y, (len(y),) + y_shape[1:])
w = np.reshape(w, (len(w),) + w_shape[1:])
X_next = np.concatenate([X_next, X], axis=0)
y_next = np.concatenate([y_next, y], axis=0)
w_next = np.concatenate([w_next, w], axis=0)
Expand All @@ -1318,12 +1360,11 @@ def generator():
yield (X_next, y_next, w_next, ids_next)

resharded_dataset = DiskDataset.create_dataset(
generator(),
data_dir=reshard_dir,
tasks=self.tasks,
legacy_metadata=self.legacy_metadata)
generator(), data_dir=reshard_dir, tasks=self.tasks)
shutil.rmtree(self.data_dir)
shutil.move(reshard_dir, self.data_dir)
# Should have updated to non-legacy metadata
self.legacy_metadata = False
self.metadata_df = resharded_dataset.metadata_df
# Note that this resets the cache internally
self.save_to_disk()
Expand All @@ -1334,10 +1375,14 @@ def get_data_shape(self) -> Shape:
"""
if not len(self.metadata_df):
raise ValueError("No data in dataset.")
sample_X = load_from_disk(
os.path.join(self.data_dir,
next(self.metadata_df.iterrows())[1]['X']))
return np.shape(sample_X)[1:]
if self.legacy_metadata:
sample_X = load_from_disk(
os.path.join(self.data_dir,
next(self.metadata_df.iterrows())[1]['X']))
return np.shape(sample_X)[1:]
else:
X_shape, _, _, _ = self.get_shape()
return X_shape[1:]

def get_shard_size(self) -> int:
"""Gets size of shards on disk."""
Expand Down Expand Up @@ -1701,38 +1746,15 @@ def from_numpy(X: np.ndarray,
-------
A `DiskDataset` constructed from the provided information.
"""
n_samples = len(X)
if ids is None:
ids = np.arange(n_samples)

if y is not None:
if w is None:
if len(y.shape) == 1:
w = np.ones(y.shape[0], np.float32)
else:
w = np.ones((y.shape[0], 1), np.float32)

if tasks is None:
if len(y.shape) > 1:
n_tasks = y.shape[1]
else:
n_tasks = 1
tasks = np.arange(n_tasks)

else:
if w is not None:
warnings.warn('y is None but w is not None. Setting w to None',
UserWarning)
w = None

if tasks is not None:
warnings.warn('y is None but tasks is not None. Setting tasks to None',
UserWarning)
tasks = None
# To unify shape handling so from_numpy behaves like NumpyDataset, we just
# make a NumpyDataset under the hood
dataset = NumpyDataset(X, y, w, ids)
if tasks is None:
tasks = dataset.get_task_names()

# raw_data = (X, y, w, ids)
return DiskDataset.create_dataset(
[(X, y, w, ids)],
[(dataset.X, dataset.y, dataset.w, dataset.ids)],
data_dir=data_dir,
tasks=tasks,
legacy_metadata=legacy_metadata)
Expand All @@ -1758,10 +1780,13 @@ def merge(datasets: Iterable["DiskDataset"],
except AttributeError:
pass
if tasks:
if len(tasks) < len(datasets) or len(set(map(tuple, tasks))) > 1:
task_tuples = [tuple(task_list) for task_list in tasks]
if len(tasks) < len(datasets) or len(set(task_tuples)) > 1:
raise ValueError(
'Cannot merge datasets with different task specifications')
tasks = tasks[0]
merge_tasks = tasks[0]
else:
merge_tasks = []

def generator():
for ind, dataset in enumerate(datasets):
Expand All @@ -1770,7 +1795,7 @@ def generator():
yield (X, y, w, ids)

return DiskDataset.create_dataset(
generator(), data_dir=merge_dir, tasks=tasks)
generator(), data_dir=merge_dir, tasks=merge_tasks)

def subset(self, shard_nums: Sequence[int],
subset_dir: Optional[str] = None) -> "DiskDataset":
Expand Down
4 changes: 0 additions & 4 deletions deepchem/data/tests/test_datasets.py
@@ -1,10 +1,6 @@
"""
Tests for dataset creation
"""
__author__ = "Bharath Ramsundar"
__copyright__ = "Copyright 2016, Stanford University"
__license__ = "MIT"

import random
import math
import unittest
Expand Down
27 changes: 27 additions & 0 deletions deepchem/data/tests/test_legacy.py
Expand Up @@ -23,3 +23,30 @@ def test_make_legacy_dataset_from_numpy():
assert dataset2.legacy_metadata
assert len(dataset2.metadata_df.columns) == 4
assert list(dataset2.metadata_df.columns) == ['ids', 'X', 'y', 'w']


def test_reshard():
"""Test that resharding updates legacy datasets."""
num_datapoints = 100
num_features = 10
num_tasks = 10
# Generate data
X = np.random.rand(num_datapoints, num_features)
y = np.random.randint(2, size=(num_datapoints, num_tasks))
w = np.random.randint(2, size=(num_datapoints, num_tasks))
ids = np.array(["id"] * num_datapoints)

dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids, legacy_metadata=True)
assert dataset.legacy_metadata
assert len(dataset.metadata_df.columns) == 4
assert list(dataset.metadata_df.columns) == ['ids', 'X', 'y', 'w']

# Reshard this dataset
dataset.reshard(shard_size=10)
assert dataset.get_number_shards() == 10
# Check metadata has been updated
assert not dataset.legacy_metadata
assert len(dataset.metadata_df.columns) == 8
assert list(dataset.metadata_df.columns) == [
'ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape', 'w_shape'
]
69 changes: 30 additions & 39 deletions deepchem/data/tests/test_merge.py
@@ -1,61 +1,52 @@
"""
Testing singletask/multitask dataset merging
"""
__author__ = "Bharath Ramsundar"
__copyright__ = "Copyright 2016, Stanford University"
__license__ = "MIT"

import os
import shutil
import tempfile
import unittest
import deepchem as dc
import numpy as np


class TestMerge(unittest.TestCase):
"""
Test singletask/multitask dataset merging.
"""
def test_merge():
"""Test that datasets can be merged."""
current_dir = os.path.dirname(os.path.realpath(__file__))

def test_merge(self):
"""Test that datasets can be merged."""
current_dir = os.path.dirname(os.path.realpath(__file__))
dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")

dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")
featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
first_dataset = loader.create_dataset(dataset_file)
second_dataset = loader.create_dataset(dataset_file)

featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
first_dataset = loader.featurize(dataset_file)
second_dataset = loader.featurize(dataset_file)
merged_dataset = dc.data.DiskDataset.merge([first_dataset, second_dataset])

merged_dataset = dc.data.DiskDataset.merge([first_dataset, second_dataset])
assert len(merged_dataset) == len(first_dataset) + len(second_dataset)

assert len(merged_dataset) == len(first_dataset) + len(second_dataset)

def test_subset(self):
"""Tests that subsetting of datasets works."""
current_dir = os.path.dirname(os.path.realpath(__file__))
def test_subset():
"""Tests that subsetting of datasets works."""
current_dir = os.path.dirname(os.path.realpath(__file__))

dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")
dataset_file = os.path.join(current_dir, "../../models/tests/example.csv")

featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
dataset = loader.featurize(dataset_file, shard_size=2)
featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["log-solubility"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
dataset = loader.create_dataset(dataset_file, shard_size=2)

shard_nums = [1, 2]
shard_nums = [1, 2]

orig_ids = dataset.ids
_, _, _, ids_1 = dataset.get_shard(1)
_, _, _, ids_2 = dataset.get_shard(2)
orig_ids = dataset.ids
_, _, _, ids_1 = dataset.get_shard(1)
_, _, _, ids_2 = dataset.get_shard(2)

subset = dataset.subset(shard_nums)
after_ids = dataset.ids
subset = dataset.subset(shard_nums)
after_ids = dataset.ids

assert len(subset) == 4
assert sorted(subset.ids) == sorted(np.concatenate([ids_1, ids_2]))
assert list(orig_ids) == list(after_ids)
assert len(subset) == 4
assert sorted(subset.ids) == sorted(np.concatenate([ids_1, ids_2]))
assert list(orig_ids) == list(after_ids)
20 changes: 20 additions & 0 deletions deepchem/data/tests/test_non_classification_regression_datasets.py
@@ -0,0 +1,20 @@
import deepchem as dc
import numpy as np


def test_disk_generative_dataset():
"""Test for a hypothetical generative dataset."""
X = np.random.rand(100, 10, 10)
y = np.random.rand(100, 10, 10)
dataset = dc.data.DiskDataset.from_numpy(X, y)
assert (dataset.X == X).all()
assert (dataset.y == y).all()


def test_numpy_generative_dataset():
"""Test for a hypothetical generative dataset."""
X = np.random.rand(100, 10, 10)
y = np.random.rand(100, 10, 10)
dataset = dc.data.NumpyDataset(X, y)
assert (dataset.X == X).all()
assert (dataset.y == y).all()

0 comments on commit f888b87

Please sign in to comment.