From 5c3d9f017319023dd31e16d30f23f1b5668b22bf Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Wed, 9 Jun 2021 23:28:12 +0800 Subject: [PATCH 01/26] [DKMED] add CSV datalist Signed-off-by: Nic Ma --- docs/source/data.rst | 5 +++++ monai/config/deviceconfig.py | 1 + monai/data/__init__.py | 1 + monai/data/csv_datalist.py | 27 +++++++++++++++++++++++++++ requirements-dev.txt | 1 + 5 files changed, 35 insertions(+) create mode 100644 monai/data/csv_datalist.py diff --git a/docs/source/data.rst b/docs/source/data.rst index 7d0ffbd7b1..f7a1f92ac0 100644 --- a/docs/source/data.rst +++ b/docs/source/data.rst @@ -188,3 +188,8 @@ ThreadBuffer TestTimeAugmentation ~~~~~~~~~~~~~~~~~~~~ .. autoclass:: monai.data.TestTimeAugmentation + + +CSV Datalist +~~~~~~~~~~~~ +.. autofunction:: monai.data.load_csv_datalist diff --git a/monai/config/deviceconfig.py b/monai/config/deviceconfig.py index 2ec29255bf..c790a85277 100644 --- a/monai/config/deviceconfig.py +++ b/monai/config/deviceconfig.py @@ -79,6 +79,7 @@ def get_optional_config_values(): output["tqdm"] = get_package_version("tqdm") output["lmdb"] = get_package_version("lmdb") output["psutil"] = psutil_version + output["pandas"] = get_package_version("pandas") return output diff --git a/monai/data/__init__.py b/monai/data/__init__.py index e2eec0ef12..785e8c2ced 100644 --- a/monai/data/__init__.py +++ b/monai/data/__init__.py @@ -9,6 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .csv_datalist import load_csv_datalist from .csv_saver import CSVSaver from .dataloader import DataLoader from .dataset import ( diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py new file mode 100644 index 0000000000..5ed85b4c28 --- /dev/null +++ b/monai/data/csv_datalist.py @@ -0,0 +1,27 @@ +# Copyright 2020 - 2021 MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Sequence, Union +from monai.utils import optional_import +pd, _ = optional_import("pandas") + + +def load_csv_datalist( + filename: Union[str, Sequence[str]], + row_indices: Optional[Sequence[Union[int, str]]] = None, + col_names: Optional[Sequence[str]] = None, + col_groups: Optional[Dict[str, Sequence[str]]] = None, + **kwargs, +) -> List[Dict]: + """Load data list from CSV files. + + """ + pass diff --git a/requirements-dev.txt b/requirements-dev.txt index 9924cda33c..30f6a9b1b0 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -33,3 +33,4 @@ sphinx-autodoc-typehints==1.11.1 sphinx-rtd-theme==0.5.2 cucim~=0.19.0; platform_system == "Linux" openslide-python==1.1.2 +pandas From 372256e16266ab1c11c30664f9da8cd8bf480547 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 18:16:34 +0800 Subject: [PATCH 02/26] [DLMED] add group feature Signed-off-by: Nic Ma --- monai/data/csv_datalist.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index 5ed85b4c28..994ee0bf75 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -9,8 +9,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Sequence, Union -from monai.utils import optional_import +from functools import reduce +from typing import Dict, List, Optional, Sequence, Union +from monai.utils import ensure_tuple, optional_import pd, _ = optional_import("pandas") @@ -24,4 +25,31 @@ def load_csv_datalist( """Load data list from CSV files. """ - pass + files = ensure_tuple(filename) + # join tables with additional kwargs + dfs = [pd.read_csv(f) for f in files] + df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) + + # parse row indices + rows: List[int] + if row_indices is None: + rows = list(range(df.shape[0])) + else: + for i in row_indices: + if isinstance(i, (tuple, list)): + if len(i) != 2: + raise ValueError("range of row indices must contain 2 values: start and end.") + rows.extend(list(range(i[0], i[1]))) + else: + rows.append(i) + + data = df.loc(rows) if col_names is None else df.loc(rows, col_names) + + # group columns to generate new column + if col_groups is not None: + for name, cols in col_names.items(): + data[name] = df.loc(rows, cols).values + + # convert to a list of dictionaries + length = len(data[data.keys()[0]]) + return [{data[k][i] for k in data.keys()} for i in length] From 58a0fa780a01f2a051482a108cfdc9f684a5d667 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 20:15:28 +0800 Subject: [PATCH 03/26] [DLMED] add unit test Signed-off-by: Nic Ma --- monai/data/csv_datalist.py | 7 ++-- tests/min_tests.py | 1 + tests/test_load_csv_datalist.py | 58 +++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 tests/test_load_csv_datalist.py diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index 994ee0bf75..eb51d624ce 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -9,6 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import OrderedDict from functools import reduce from typing import Dict, List, Optional, Sequence, Union from monai.utils import ensure_tuple, optional_import @@ -43,7 +44,7 @@ def load_csv_datalist( else: rows.append(i) - data = df.loc(rows) if col_names is None else df.loc(rows, col_names) + data: List[OrderedDict] = OrderedDict(df.loc[rows] if col_names is None else df.loc[rows, col_names]) # group columns to generate new column if col_groups is not None: @@ -51,5 +52,5 @@ def load_csv_datalist( data[name] = df.loc(rows, cols).values # convert to a list of dictionaries - length = len(data[data.keys()[0]]) - return [{data[k][i] for k in data.keys()} for i in length] + length = len(data[list(data.keys())[0]]) + return [OrderedDict({k: data[k][i] for k in data.keys()}) for i in range(length)] diff --git a/tests/min_tests.py b/tests/min_tests.py index 782ceeb576..094501f42e 100644 --- a/tests/min_tests.py +++ b/tests/min_tests.py @@ -125,6 +125,7 @@ def run_testsuit(): "test_invertd", "test_handler_post_processing", "test_write_metrics_reports", + "test_load_csv_datalist", ] assert sorted(exclude_cases) == sorted(set(exclude_cases)), f"Duplicated items in {exclude_cases}" diff --git a/tests/test_load_csv_datalist.py b/tests/test_load_csv_datalist.py new file mode 100644 index 0000000000..8437b5f9e1 --- /dev/null +++ b/tests/test_load_csv_datalist.py @@ -0,0 +1,58 @@ +# Copyright 2020 - 2021 MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import OrderedDict +import os +import tempfile +import unittest + +from monai.data import load_csv_datalist + + +class TestLoadCSVDatalist(unittest.TestCase): + def test_values(self): + with tempfile.TemporaryDirectory() as tempdir: + test_data1 = [ + ["subject_id", "label", "image", "ehr_0", "ehr_1", "ehr_2"], + ["s000000", 5, "./imgs/s000000.png", 2.007843256, 2.29019618, 2.054902077], + ["s000001", 0, "./imgs/s000001.png", 6.839215755, 6.474509716, 5.862744808], + ["s000002", 4, "./imgs/s000002.png", 3.772548914, 4.211764812, 4.635294437], + ["s000003", 1, "./imgs/s000003.png", 3.333333254, 3.235294342, 3.400000095], + ["s000004", 9, "./imgs/s000004.png", 6.427451134, 6.254901886, 5.976470947], + ] + test_data2 = [ + + ] + + def prepare_csv_file(data, filepath): + with open(filepath, "a") as f: + for d in data: + f.write((",".join([str(i) for i in d])) + "\n") + + filepath1 = os.path.join(tempdir, "test_data1.csv") + prepare_csv_file(test_data1, filepath1) + + # load single CSV file + result = load_csv_datalist(filepath1) + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, str) else v for k, v in result[2].items()}, + { + "subject_id": "s000002", + "label": 4, + "image": "./imgs/s000002.png", + "ehr_0": 3.7725, + "ehr_1": 4.2118, + "ehr_2": 4.6353, + }, + ) + +if __name__ == "__main__": + unittest.main() From cf817bba5d353474e4cddf94471fdcd3940bc7b0 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 22:13:22 +0800 Subject: [PATCH 04/26] [DLMED] add more unit tests Signed-off-by: Nic Ma --- monai/data/csv_datalist.py | 18 ++++---- tests/test_load_csv_datalist.py | 80 +++++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 11 deletions(-) diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index eb51d624ce..01bb17cd90 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -9,9 +9,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict from functools import reduce from typing import Dict, List, Optional, Sequence, Union + from monai.utils import ensure_tuple, optional_import pd, _ = optional_import("pandas") @@ -32,7 +32,7 @@ def load_csv_datalist( df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) # parse row indices - rows: List[int] + rows: List[int] = [] if row_indices is None: rows = list(range(df.shape[0])) else: @@ -44,13 +44,15 @@ def load_csv_datalist( else: rows.append(i) - data: List[OrderedDict] = OrderedDict(df.loc[rows] if col_names is None else df.loc[rows, col_names]) + # convert to a list of dictionaries corresponding to every row + data = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") # group columns to generate new column if col_groups is not None: - for name, cols in col_names.items(): - data[name] = df.loc(rows, cols).values + groups: Dict[List] = {} + for name, cols in col_groups.items(): + groups[name] = df.loc[rows, cols].values + # invert items of groups to every row of data + data = [dict(d, **{k: v[i] for k, v in groups.items()}) for i, d in enumerate(data)] - # convert to a list of dictionaries - length = len(data[list(data.keys())[0]]) - return [OrderedDict({k: data[k][i] for k in data.keys()}) for i in range(length)] + return data diff --git a/tests/test_load_csv_datalist.py b/tests/test_load_csv_datalist.py index 8437b5f9e1..fc899ef71f 100644 --- a/tests/test_load_csv_datalist.py +++ b/tests/test_load_csv_datalist.py @@ -9,10 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict import os import tempfile import unittest +import numpy as np from monai.data import load_csv_datalist @@ -29,7 +29,20 @@ def test_values(self): ["s000004", 9, "./imgs/s000004.png", 6.427451134, 6.254901886, 5.976470947], ] test_data2 = [ - + ["subject_id", "ehr_3", "ehr_4", "ehr_5", "ehr_6", "ehr_7", "ehr_8"], + ["s000000", 3.019608021, 3.807843208, 3.584313869, 3.141176462, 3.1960783, 4.211764812], + ["s000001", 5.192157269, 5.274509907, 5.250980377, 4.647058964, 4.886274338, 4.392156601], + ["s000002", 5.298039436, 9.545097351, 12.57254887, 6.799999714, 2.1960783, 1.882352948], + ["s000003", 3.164705753, 3.086274624, 3.725490093, 3.698039293, 3.698039055, 3.701960802], + ["s000004", 6.26274538, 7.717647076, 9.584313393, 6.082352638, 2.662744999, 2.34117651], + ] + test_data3 = [ + ["subject_id", "ehr_9", "ehr_10", "meta_0", "meta_1", "meta_2"], + ["s000000", 6.301961422, 6.470588684, "TRUE", "TRUE", "TRUE"], + ["s000001", 5.219608307, 7.827450752, "FALSE", "TRUE", "FALSE"], + ["s000002", 1.882352948, 2.031372547, "TRUE", "FALSE", "TRUE"], + ["s000003", 3.309803963, 3.729412079, "FALSE", "FALSE", "TRUE"], + ["s000004", 2.062745094, 2.34117651, "FALSE", "TRUE", "TRUE"], ] def prepare_csv_file(data, filepath): @@ -38,9 +51,13 @@ def prepare_csv_file(data, filepath): f.write((",".join([str(i) for i in d])) + "\n") filepath1 = os.path.join(tempdir, "test_data1.csv") + filepath2 = os.path.join(tempdir, "test_data2.csv") + filepath3 = os.path.join(tempdir, "test_data3.csv") prepare_csv_file(test_data1, filepath1) + prepare_csv_file(test_data2, filepath2) + prepare_csv_file(test_data3, filepath3) - # load single CSV file + # test loading single CSV file result = load_csv_datalist(filepath1) self.assertDictEqual( {k: round(v, 4) if not isinstance(v, str) else v for k, v in result[2].items()}, @@ -54,5 +71,62 @@ def prepare_csv_file(data, filepath): }, ) + # test loading multiple CSV files, join tables with kwargs + result = result = load_csv_datalist([filepath1, filepath2, filepath3], on="subject_id") + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in result[3].items()}, + { + "subject_id": "s000003", + "label": 1, + "image": "./imgs/s000003.png", + "ehr_0": 3.3333, + "ehr_1": 3.2353, + "ehr_2": 3.4000, + "ehr_3": 3.1647, + "ehr_4": 3.0863, + "ehr_5": 3.7255, + "ehr_6": 3.6980, + "ehr_7": 3.6980, + "ehr_8": 3.7020, + "ehr_9": 3.3098, + "ehr_10": 3.7294, + "meta_0": False, + "meta_1": False, + "meta_2": True, + }, + ) + + # test loading selected rows and columns + result = result = load_csv_datalist( + filename=[filepath1, filepath2, filepath3], + row_indices=[[0, 2], 3], # load row: 0, 1, 3 + col_names=["subject_id", "image", "ehr_1", "ehr_7", "meta_1"], + ) + self.assertEqual(len(result), 3) + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in result[-1].items()}, + { + "subject_id": "s000003", + "image": "./imgs/s000003.png", + "ehr_1": 3.2353, + "ehr_7": 3.6980, + "meta_1": False, + }, + ) + + # test group columns + result = result = load_csv_datalist( + filename=[filepath1, filepath2, filepath3], + row_indices=[1, 3], # load row: 1, 3 + col_names=["subject_id", "image", *[f"ehr_{i}" for i in range(11)], "meta_0", "meta_1", "meta_2"], + col_groups={"ehr": [f"ehr_{i}" for i in range(11)], "meta12": ["meta_1", "meta_2"]}, + ) + np.testing.assert_allclose( + [round(i, 4) for i in result[-1]["ehr"]], + [3.3333, 3.2353, 3.4000, 3.1647, 3.0863, 3.7255, 3.6980, 3.6980, 3.7020, 3.3098, 3.7294] + ) + np.testing.assert_allclose(result[-1]["meta12"], [False, True]) + + if __name__ == "__main__": unittest.main() From 9ca6d075da44f2000e5bdba0838ec887411c6f2a Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 22:17:39 +0800 Subject: [PATCH 05/26] [DLMED] add optional install Signed-off-by: Nic Ma --- docs/source/installation.md | 4 ++-- setup.cfg | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/source/installation.md b/docs/source/installation.md index ceb2774c52..efa5cf08a8 100644 --- a/docs/source/installation.md +++ b/docs/source/installation.md @@ -174,9 +174,9 @@ Since MONAI v0.2.0, the extras syntax such as `pip install 'monai[nibabel]'` is - The options are ``` -[nibabel, skimage, pillow, tensorboard, gdown, ignite, torchvision, itk, tqdm, lmdb, psutil] +[nibabel, skimage, pillow, tensorboard, gdown, ignite, torchvision, itk, tqdm, lmdb, psutil, cucim, openslide, pandas] ``` which correspond to `nibabel`, `scikit-image`, `pillow`, `tensorboard`, -`gdown`, `pytorch-ignite`, `torchvision`, `itk`, `tqdm`, `lmdb` and `psutil`, respectively. +`gdown`, `pytorch-ignite`, `torchvision`, `itk`, `tqdm`, `lmdb`, `psutil`, `cucim` `openslide-python` and `pandas`, respectively. - `pip install 'monai[all]'` installs all the optional dependencies. diff --git a/setup.cfg b/setup.cfg index c184fe23e7..f19d6a8213 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,6 +42,7 @@ all = psutil cucim~=0.19.0 openslide-python==1.1.2 + pandas nibabel = nibabel skimage = @@ -68,6 +69,8 @@ cucim = cucim~=0.19.0 openslide = openslide-python==1.1.2 +pandas = + pandas [flake8] select = B,C,E,F,N,P,T4,W,B9 From 6adc759c8c0b20b1c0fb9a7310348fbcf7f75429 Mon Sep 17 00:00:00 2001 From: monai-bot Date: Fri, 11 Jun 2021 14:26:51 +0000 Subject: [PATCH 06/26] [MONAI] python code formatting Signed-off-by: monai-bot --- monai/data/csv_datalist.py | 5 ++--- tests/test_load_csv_datalist.py | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index 01bb17cd90..4447d6352a 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -13,6 +13,7 @@ from typing import Dict, List, Optional, Sequence, Union from monai.utils import ensure_tuple, optional_import + pd, _ = optional_import("pandas") @@ -23,9 +24,7 @@ def load_csv_datalist( col_groups: Optional[Dict[str, Sequence[str]]] = None, **kwargs, ) -> List[Dict]: - """Load data list from CSV files. - - """ + """Load data list from CSV files.""" files = ensure_tuple(filename) # join tables with additional kwargs dfs = [pd.read_csv(f) for f in files] diff --git a/tests/test_load_csv_datalist.py b/tests/test_load_csv_datalist.py index fc899ef71f..e2da83e10d 100644 --- a/tests/test_load_csv_datalist.py +++ b/tests/test_load_csv_datalist.py @@ -12,6 +12,7 @@ import os import tempfile import unittest + import numpy as np from monai.data import load_csv_datalist @@ -123,7 +124,7 @@ def prepare_csv_file(data, filepath): ) np.testing.assert_allclose( [round(i, 4) for i in result[-1]["ehr"]], - [3.3333, 3.2353, 3.4000, 3.1647, 3.0863, 3.7255, 3.6980, 3.6980, 3.7020, 3.3098, 3.7294] + [3.3333, 3.2353, 3.4000, 3.1647, 3.0863, 3.7255, 3.6980, 3.6980, 3.7020, 3.3098, 3.7294], ) np.testing.assert_allclose(result[-1]["meta12"], [False, True]) From 02f14e89da8c2432122a4b6e815e1994e04ac332 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 23:21:13 +0800 Subject: [PATCH 07/26] [DLMED] fix flake8 issue Signed-off-by: Nic Ma --- monai/data/csv_datalist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index 4447d6352a..de333d4a00 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -31,7 +31,7 @@ def load_csv_datalist( df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) # parse row indices - rows: List[int] = [] + rows: List[Union[int, str]] = [] if row_indices is None: rows = list(range(df.shape[0])) else: @@ -44,11 +44,11 @@ def load_csv_datalist( rows.append(i) # convert to a list of dictionaries corresponding to every row - data = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") + data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") # group columns to generate new column if col_groups is not None: - groups: Dict[List] = {} + groups: Dict[str, List] = {} for name, cols in col_groups.items(): groups[name] = df.loc[rows, cols].values # invert items of groups to every row of data From 6aa97b9b2ee844a1abef4991a47de6d5ac717fcc Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 11 Jun 2021 23:49:02 +0800 Subject: [PATCH 08/26] [DLMED] add doc-strings Signed-off-by: Nic Ma --- monai/data/csv_datalist.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py index de333d4a00..8e2531f707 100644 --- a/monai/data/csv_datalist.py +++ b/monai/data/csv_datalist.py @@ -24,7 +24,35 @@ def load_csv_datalist( col_groups: Optional[Dict[str, Sequence[str]]] = None, **kwargs, ) -> List[Dict]: - """Load data list from CSV files.""" + """ + Utility to load data from CSV files and return a list of dictionaries, + every dictionay maps to a row of the CSV file, and the keys of dictionary + map to the column names of the CSV file. + + It can load multiple CSV files and join the tables with addtional `kwargs`. + To support very big CSV files, it can load specific rows and columns. And it + can also group several loaded columns to generate a new column, for example, + set `col_groups={"meta": ["meta_0", "meta_1", "meta_2"]}`, output can be:: + + [ + {"image": "./image0.nii", "meta_0": 11, "meta_1": 12, "meta_2": 13, "meta": [11, 12, 13]}, + {"image": "./image1.nii", "meta_0": 21, "meta_1": 22, "meta_2": 23, "meta": [21, 22, 23]}, + ] + + Args: + filename: the filename of expected CSV file to load. if providing a list + of filenames, it will load all the files and join tables. + row_indices: indices of the expected rows to load. it should be a list, + every item can be a int number or a range `[start, end)` for the indices. + for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, + load all the rows. + col_names: names of the expected columns to load. if None, load all the columns. + col_groups: args to group the loaded columns to generate a new column, + it should be a dictionary, every item maps to a group, the `key` will + be the new column name, the `value` is the names of columns to combine. + kwargs: additional arguments for `pandas.merge()` API to join tables. + + """ files = ensure_tuple(filename) # join tables with additional kwargs dfs = [pd.read_csv(f) for f in files] From fcef348c5ab25dcdcf24ebb655f20050b2b19970 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Sun, 20 Jun 2021 23:01:42 +0800 Subject: [PATCH 09/26] [DLMED] fix typo Signed-off-by: Nic Ma --- tests/test_load_csv_datalist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_load_csv_datalist.py b/tests/test_load_csv_datalist.py index e2da83e10d..2a0bab2939 100644 --- a/tests/test_load_csv_datalist.py +++ b/tests/test_load_csv_datalist.py @@ -73,7 +73,7 @@ def prepare_csv_file(data, filepath): ) # test loading multiple CSV files, join tables with kwargs - result = result = load_csv_datalist([filepath1, filepath2, filepath3], on="subject_id") + result = load_csv_datalist([filepath1, filepath2, filepath3], on="subject_id") self.assertDictEqual( {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in result[3].items()}, { @@ -98,7 +98,7 @@ def prepare_csv_file(data, filepath): ) # test loading selected rows and columns - result = result = load_csv_datalist( + result = load_csv_datalist( filename=[filepath1, filepath2, filepath3], row_indices=[[0, 2], 3], # load row: 0, 1, 3 col_names=["subject_id", "image", "ehr_1", "ehr_7", "meta_1"], @@ -116,7 +116,7 @@ def prepare_csv_file(data, filepath): ) # test group columns - result = result = load_csv_datalist( + result = load_csv_datalist( filename=[filepath1, filepath2, filepath3], row_indices=[1, 3], # load row: 1, 3 col_names=["subject_id", "image", *[f"ehr_{i}" for i in range(11)], "meta_0", "meta_1", "meta_2"], From 3408ffbbc01f6ff6c22fdd2d28f309979d213376 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 12:03:49 +0800 Subject: [PATCH 10/26] [DLMED] add CSVDataset for non-iterable data Signed-off-by: Nic Ma --- docs/source/data.rst | 11 +-- monai/data/__init__.py | 2 +- monai/data/csv_datalist.py | 85 ------------------- monai/data/dataset.py | 78 ++++++++++++++++- ...ad_csv_datalist.py => test_csv_dataset.py} | 49 +++++++---- 5 files changed, 118 insertions(+), 107 deletions(-) delete mode 100644 monai/data/csv_datalist.py rename tests/{test_load_csv_datalist.py => test_csv_dataset.py} (77%) diff --git a/docs/source/data.rst b/docs/source/data.rst index f7a1f92ac0..212b424972 100644 --- a/docs/source/data.rst +++ b/docs/source/data.rst @@ -75,6 +75,12 @@ Generic Interfaces :members: :special-members: __getitem__ +`CSVDataset` +~~~~~~~~~~~~ +.. autoclass:: CSVDataset + :members: + :special-members: __getitem__ + Patch-based dataset ------------------- @@ -188,8 +194,3 @@ ThreadBuffer TestTimeAugmentation ~~~~~~~~~~~~~~~~~~~~ .. autoclass:: monai.data.TestTimeAugmentation - - -CSV Datalist -~~~~~~~~~~~~ -.. autofunction:: monai.data.load_csv_datalist diff --git a/monai/data/__init__.py b/monai/data/__init__.py index 785e8c2ced..80cb9bb189 100644 --- a/monai/data/__init__.py +++ b/monai/data/__init__.py @@ -9,13 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .csv_datalist import load_csv_datalist from .csv_saver import CSVSaver from .dataloader import DataLoader from .dataset import ( ArrayDataset, CacheDataset, CacheNTransDataset, + CSVDataset, Dataset, LMDBDataset, NPZDictItemDataset, diff --git a/monai/data/csv_datalist.py b/monai/data/csv_datalist.py deleted file mode 100644 index 8e2531f707..0000000000 --- a/monai/data/csv_datalist.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright 2020 - 2021 MONAI Consortium -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from functools import reduce -from typing import Dict, List, Optional, Sequence, Union - -from monai.utils import ensure_tuple, optional_import - -pd, _ = optional_import("pandas") - - -def load_csv_datalist( - filename: Union[str, Sequence[str]], - row_indices: Optional[Sequence[Union[int, str]]] = None, - col_names: Optional[Sequence[str]] = None, - col_groups: Optional[Dict[str, Sequence[str]]] = None, - **kwargs, -) -> List[Dict]: - """ - Utility to load data from CSV files and return a list of dictionaries, - every dictionay maps to a row of the CSV file, and the keys of dictionary - map to the column names of the CSV file. - - It can load multiple CSV files and join the tables with addtional `kwargs`. - To support very big CSV files, it can load specific rows and columns. And it - can also group several loaded columns to generate a new column, for example, - set `col_groups={"meta": ["meta_0", "meta_1", "meta_2"]}`, output can be:: - - [ - {"image": "./image0.nii", "meta_0": 11, "meta_1": 12, "meta_2": 13, "meta": [11, 12, 13]}, - {"image": "./image1.nii", "meta_0": 21, "meta_1": 22, "meta_2": 23, "meta": [21, 22, 23]}, - ] - - Args: - filename: the filename of expected CSV file to load. if providing a list - of filenames, it will load all the files and join tables. - row_indices: indices of the expected rows to load. it should be a list, - every item can be a int number or a range `[start, end)` for the indices. - for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, - load all the rows. - col_names: names of the expected columns to load. if None, load all the columns. - col_groups: args to group the loaded columns to generate a new column, - it should be a dictionary, every item maps to a group, the `key` will - be the new column name, the `value` is the names of columns to combine. - kwargs: additional arguments for `pandas.merge()` API to join tables. - - """ - files = ensure_tuple(filename) - # join tables with additional kwargs - dfs = [pd.read_csv(f) for f in files] - df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) - - # parse row indices - rows: List[Union[int, str]] = [] - if row_indices is None: - rows = list(range(df.shape[0])) - else: - for i in row_indices: - if isinstance(i, (tuple, list)): - if len(i) != 2: - raise ValueError("range of row indices must contain 2 values: start and end.") - rows.extend(list(range(i[0], i[1]))) - else: - rows.append(i) - - # convert to a list of dictionaries corresponding to every row - data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") - - # group columns to generate new column - if col_groups is not None: - groups: Dict[str, List] = {} - for name, cols in col_groups.items(): - groups[name] = df.loc[rows, cols].values - # invert items of groups to every row of data - data = [dict(d, **{k: v[i] for k, v in groups.items()}) for i, d in enumerate(data)] - - return data diff --git a/monai/data/dataset.py b/monai/data/dataset.py index af70d3fe02..4f761951e2 100644 --- a/monai/data/dataset.py +++ b/monai/data/dataset.py @@ -20,6 +20,7 @@ import time import warnings from copy import deepcopy +from functools import reduce from multiprocessing.pool import ThreadPool from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union @@ -31,7 +32,7 @@ from monai.data.utils import first, pickle_hashing from monai.transforms import Compose, Randomizable, ThreadUnsafe, Transform, apply_transform -from monai.utils import MAX_SEED, get_seed, min_version, optional_import +from monai.utils import MAX_SEED, ensure_tuple, get_seed, min_version, optional_import if TYPE_CHECKING: from tqdm import tqdm @@ -41,6 +42,7 @@ tqdm, has_tqdm = optional_import("tqdm", "4.47.0", min_version, "tqdm") lmdb, _ = optional_import("lmdb") +pd, _ = optional_import("pandas") class Dataset(_TorchDataset): @@ -1061,3 +1063,77 @@ def _transform(self, index: int): data = apply_transform(self.transform, data) return data + + +class CSVDataset(Dataset): + """ + Dataset to load data from CSV files and generate a list of dictionaries, + every dictionay maps to a row of the CSV file, and the keys of dictionary + map to the column names of the CSV file. + + It can load multiple CSV files and join the tables with addtional `kwargs` arg. + Support to only load specific rows and columns. + And it can also group several loaded columns to generate a new column, for example, + set `col_groups={"meta": ["meta_0", "meta_1", "meta_2"]}`, output can be:: + + [ + {"image": "./image0.nii", "meta_0": 11, "meta_1": 12, "meta_2": 13, "meta": [11, 12, 13]}, + {"image": "./image1.nii", "meta_0": 21, "meta_1": 22, "meta_2": 23, "meta": [21, 22, 23]}, + ] + + Args: + filename: the filename of expected CSV file to load. if providing a list + of filenames, it will load all the files and join tables. + row_indices: indices of the expected rows to load. it should be a list, + every item can be a int number or a range `[start, end)` for the indices. + for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, + load all the rows in the file. + col_names: names of the expected columns to load. if None, load all the columns. + col_groups: args to group the loaded columns to generate a new column, + it should be a dictionary, every item maps to a group, the `key` will + be the new column name, the `value` is the names of columns to combine. for example: + `col_groups={"ehr": [f"ehr_{i}" for i in range(10)], "meta": ["meta_1", "meta_2"]}` + transform: transform to apply on the loaded items of a dictionary data. + kwargs: additional arguments for `pandas.merge()` API to join tables. + + """ + + def __init__( + self, + filename: Union[str, Sequence[str]], + row_indices: Optional[Sequence[Union[int, str]]] = None, + col_names: Optional[Sequence[str]] = None, + col_groups: Optional[Dict[str, Sequence[str]]] = None, + transform: Optional[Callable] = None, + **kwargs, + ): + files = ensure_tuple(filename) + # join tables with additional kwargs + dfs = [pd.read_csv(f) for f in files] + df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) + + # parse row indices + rows: List[Union[int, str]] = [] + if row_indices is None: + rows = list(range(df.shape[0])) + else: + for i in row_indices: + if isinstance(i, (tuple, list)): + if len(i) != 2: + raise ValueError("range of row indices must contain 2 values: start and end.") + rows.extend(list(range(i[0], i[1]))) + else: + rows.append(i) + + # convert to a list of dictionaries corresponding to every row + data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") + + # group columns to generate new column + if col_groups is not None: + groups: Dict[str, List] = {} + for name, cols in col_groups.items(): + groups[name] = df.loc[rows, cols].values + # invert items of groups to every row of data + data = [dict(d, **{k: v[i] for k, v in groups.items()}) for i, d in enumerate(data)] + + super().__init__(data=data, transform=transform) diff --git a/tests/test_load_csv_datalist.py b/tests/test_csv_dataset.py similarity index 77% rename from tests/test_load_csv_datalist.py rename to tests/test_csv_dataset.py index 2a0bab2939..f223a9b1a9 100644 --- a/tests/test_load_csv_datalist.py +++ b/tests/test_csv_dataset.py @@ -15,10 +15,11 @@ import numpy as np -from monai.data import load_csv_datalist +from monai.data import CSVDataset +from monai.transforms import ToNumpyd -class TestLoadCSVDatalist(unittest.TestCase): +class TestCSVDataset(unittest.TestCase): def test_values(self): with tempfile.TemporaryDirectory() as tempdir: test_data1 = [ @@ -58,10 +59,10 @@ def prepare_csv_file(data, filepath): prepare_csv_file(test_data2, filepath2) prepare_csv_file(test_data3, filepath3) - # test loading single CSV file - result = load_csv_datalist(filepath1) + # test single CSV file + dataset = CSVDataset(filepath1) self.assertDictEqual( - {k: round(v, 4) if not isinstance(v, str) else v for k, v in result[2].items()}, + {k: round(v, 4) if not isinstance(v, str) else v for k, v in dataset[2].items()}, { "subject_id": "s000002", "label": 4, @@ -72,10 +73,10 @@ def prepare_csv_file(data, filepath): }, ) - # test loading multiple CSV files, join tables with kwargs - result = load_csv_datalist([filepath1, filepath2, filepath3], on="subject_id") + # test multiple CSV files, join tables with kwargs + dataset = CSVDataset([filepath1, filepath2, filepath3], on="subject_id") self.assertDictEqual( - {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in result[3].items()}, + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in dataset[3].items()}, { "subject_id": "s000003", "label": 1, @@ -97,15 +98,15 @@ def prepare_csv_file(data, filepath): }, ) - # test loading selected rows and columns - result = load_csv_datalist( + # test selected rows and columns + dataset = CSVDataset( filename=[filepath1, filepath2, filepath3], row_indices=[[0, 2], 3], # load row: 0, 1, 3 col_names=["subject_id", "image", "ehr_1", "ehr_7", "meta_1"], ) - self.assertEqual(len(result), 3) + self.assertEqual(len(dataset), 3) self.assertDictEqual( - {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in result[-1].items()}, + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in dataset[-1].items()}, { "subject_id": "s000003", "image": "./imgs/s000003.png", @@ -116,17 +117,35 @@ def prepare_csv_file(data, filepath): ) # test group columns - result = load_csv_datalist( + dataset = CSVDataset( filename=[filepath1, filepath2, filepath3], row_indices=[1, 3], # load row: 1, 3 col_names=["subject_id", "image", *[f"ehr_{i}" for i in range(11)], "meta_0", "meta_1", "meta_2"], col_groups={"ehr": [f"ehr_{i}" for i in range(11)], "meta12": ["meta_1", "meta_2"]}, ) np.testing.assert_allclose( - [round(i, 4) for i in result[-1]["ehr"]], + [round(i, 4) for i in dataset[-1]["ehr"]], [3.3333, 3.2353, 3.4000, 3.1647, 3.0863, 3.7255, 3.6980, 3.6980, 3.7020, 3.3098, 3.7294], ) - np.testing.assert_allclose(result[-1]["meta12"], [False, True]) + np.testing.assert_allclose(dataset[-1]["meta12"], [False, True]) + + # test transform + dataset = CSVDataset( + filename=[filepath1, filepath2, filepath3], + col_groups={"ehr": [f"ehr_{i}" for i in range(5)]}, + transform=ToNumpyd(keys="ehr"), + ) + self.assertEqual(len(dataset), 5) + expected = [ + [2.0078, 2.2902, 2.0549, 3.0196, 3.8078], + [6.8392, 6.4745, 5.8627, 5.1922, 5.2745], + [3.7725, 4.2118, 4.6353, 5.2980, 9.5451], + [3.3333, 3.2353, 3.4000, 3.1647, 3.0863], + [6.4275, 6.2549, 5.9765, 6.2627, 7.7176] + ] + for item, exp in zip(dataset, expected): + self.assertTrue(isinstance(item["ehr"], np.ndarray)) + np.testing.assert_allclose(np.around(item["ehr"], 4), exp) if __name__ == "__main__": From 4ccd36b5dd404cc367380f7ad5fd5545eea18791 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 13:30:26 +0800 Subject: [PATCH 11/26] [DLMED] fix min test Signed-off-by: Nic Ma --- tests/min_tests.py | 2 +- tests/test_csv_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/min_tests.py b/tests/min_tests.py index 094501f42e..320e92b728 100644 --- a/tests/min_tests.py +++ b/tests/min_tests.py @@ -125,7 +125,7 @@ def run_testsuit(): "test_invertd", "test_handler_post_processing", "test_write_metrics_reports", - "test_load_csv_datalist", + "test_csv_dataset", ] assert sorted(exclude_cases) == sorted(set(exclude_cases)), f"Duplicated items in {exclude_cases}" diff --git a/tests/test_csv_dataset.py b/tests/test_csv_dataset.py index f223a9b1a9..f5020db0d9 100644 --- a/tests/test_csv_dataset.py +++ b/tests/test_csv_dataset.py @@ -141,7 +141,7 @@ def prepare_csv_file(data, filepath): [6.8392, 6.4745, 5.8627, 5.1922, 5.2745], [3.7725, 4.2118, 4.6353, 5.2980, 9.5451], [3.3333, 3.2353, 3.4000, 3.1647, 3.0863], - [6.4275, 6.2549, 5.9765, 6.2627, 7.7176] + [6.4275, 6.2549, 5.9765, 6.2627, 7.7176], ] for item, exp in zip(dataset, expected): self.assertTrue(isinstance(item["ehr"], np.ndarray)) From 1aebfb71f4a629cd7d83215a9ff50c033f2cd339 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 15:40:34 +0800 Subject: [PATCH 12/26] [DLMED] add CSVIterableDataset base Signed-off-by: Nic Ma --- docs/source/data.rst | 6 ++++++ monai/data/__init__.py | 2 +- monai/data/iterable_dataset.py | 13 +++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/docs/source/data.rst b/docs/source/data.rst index 212b424972..a5c3509fc9 100644 --- a/docs/source/data.rst +++ b/docs/source/data.rst @@ -21,6 +21,12 @@ Generic Interfaces :members: :special-members: __next__ +`CSVIterableDataset` +~~~~~~~~~~~~~~~~~~~~ +.. autoclass:: CSVIterableDataset + :members: + :special-members: __next__ + `PersistentDataset` ~~~~~~~~~~~~~~~~~~~ .. autoclass:: PersistentDataset diff --git a/monai/data/__init__.py b/monai/data/__init__.py index 80cb9bb189..df37ccab7d 100644 --- a/monai/data/__init__.py +++ b/monai/data/__init__.py @@ -27,7 +27,7 @@ from .grid_dataset import GridPatchDataset, PatchDataset, PatchIter from .image_dataset import ImageDataset from .image_reader import ImageReader, ITKReader, NibabelReader, NumpyReader, PILReader, WSIReader -from .iterable_dataset import IterableDataset +from .iterable_dataset import CSVIterableDataset, IterableDataset from .nifti_saver import NiftiSaver from .nifti_writer import write_nifti from .png_saver import PNGSaver diff --git a/monai/data/iterable_dataset.py b/monai/data/iterable_dataset.py index 7f0a0986dd..32ccfb1761 100644 --- a/monai/data/iterable_dataset.py +++ b/monai/data/iterable_dataset.py @@ -43,3 +43,16 @@ def __iter__(self): if self.transform is not None: data = apply_transform(self.transform, data) yield data + + +class CSVIterableDataset(IterableDataset): + """ + Iterable dataset to load CSV files and generate dictionary data. + It can be helpful when loading extemely big CSV files that can't read into memory directly. + + """ + def __init__(self, data: Iterable, transform: Optional[Callable]) -> None: + super().__init__(data, transform=transform) + + def __iter__(self): + return super().__iter__() From bca5afa8b972b49cd0e5d3b2d264e71090f38dcd Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 17:53:22 +0800 Subject: [PATCH 13/26] [DLMED] add CSVIterableDataset Signed-off-by: Nic Ma --- monai/data/__init__.py | 1 + monai/data/dataset.py | 37 ++----- monai/data/iterable_dataset.py | 64 ++++++++++- monai/data/utils.py | 63 ++++++++++- tests/test_csv_iterable_dataset.py | 163 +++++++++++++++++++++++++++++ 5 files changed, 293 insertions(+), 35 deletions(-) create mode 100644 tests/test_csv_iterable_dataset.py diff --git a/monai/data/__init__.py b/monai/data/__init__.py index df37ccab7d..a82f80213a 100644 --- a/monai/data/__init__.py +++ b/monai/data/__init__.py @@ -39,6 +39,7 @@ from .utils import ( compute_importance_map, compute_shape_offset, + convert_tables_to_dicts, correct_nifti_header_if_necessary, create_file_basename, decollate_batch, diff --git a/monai/data/dataset.py b/monai/data/dataset.py index 4f761951e2..fdf014f77f 100644 --- a/monai/data/dataset.py +++ b/monai/data/dataset.py @@ -20,7 +20,6 @@ import time import warnings from copy import deepcopy -from functools import reduce from multiprocessing.pool import ThreadPool from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union @@ -30,7 +29,7 @@ from torch.utils.data import Dataset as _TorchDataset from torch.utils.data import Subset -from monai.data.utils import first, pickle_hashing +from monai.data.utils import first, pickle_hashing, convert_tables_to_dicts from monai.transforms import Compose, Randomizable, ThreadUnsafe, Transform, apply_transform from monai.utils import MAX_SEED, ensure_tuple, get_seed, min_version, optional_import @@ -1108,32 +1107,12 @@ def __init__( **kwargs, ): files = ensure_tuple(filename) - # join tables with additional kwargs dfs = [pd.read_csv(f) for f in files] - df = reduce(lambda l, r: pd.merge(l, r, **kwargs), dfs) - - # parse row indices - rows: List[Union[int, str]] = [] - if row_indices is None: - rows = list(range(df.shape[0])) - else: - for i in row_indices: - if isinstance(i, (tuple, list)): - if len(i) != 2: - raise ValueError("range of row indices must contain 2 values: start and end.") - rows.extend(list(range(i[0], i[1]))) - else: - rows.append(i) - - # convert to a list of dictionaries corresponding to every row - data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") - - # group columns to generate new column - if col_groups is not None: - groups: Dict[str, List] = {} - for name, cols in col_groups.items(): - groups[name] = df.loc[rows, cols].values - # invert items of groups to every row of data - data = [dict(d, **{k: v[i] for k, v in groups.items()}) for i, d in enumerate(data)] - + data = convert_tables_to_dicts( + dfs=dfs, + row_indices=row_indices, + col_names=col_names, + col_groups=col_groups, + **kwargs, + ) super().__init__(data=data, transform=transform) diff --git a/monai/data/iterable_dataset.py b/monai/data/iterable_dataset.py index 32ccfb1761..b9311ad2cb 100644 --- a/monai/data/iterable_dataset.py +++ b/monai/data/iterable_dataset.py @@ -9,11 +9,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Iterable, Optional - +from typing import Callable, Iterable, Optional, Sequence, Union, Dict from torch.utils.data import IterableDataset as _TorchIterableDataset +from monai.data.utils import convert_tables_to_dicts from monai.transforms import apply_transform +from monai.utils import ensure_tuple, optional_import + +pd, _ = optional_import("pandas") class IterableDataset(_TorchIterableDataset): @@ -50,9 +53,60 @@ class CSVIterableDataset(IterableDataset): Iterable dataset to load CSV files and generate dictionary data. It can be helpful when loading extemely big CSV files that can't read into memory directly. + It can load data from multiple CSV files and join the tables with addtional `kwargs` arg. + Support to only load specific columns. + And it can also group several loaded columns to generate a new column, for example, + set `col_groups={"meta": ["meta_0", "meta_1", "meta_2"]}`, output can be:: + + [ + {"image": "./image0.nii", "meta_0": 11, "meta_1": 12, "meta_2": 13, "meta": [11, 12, 13]}, + {"image": "./image1.nii", "meta_0": 21, "meta_1": 22, "meta_2": 23, "meta": [21, 22, 23]}, + ] + + Args: + filename: the filename of expected CSV file to load. if providing a list + of filenames, it will load all the files and join tables. + chunksize: rows of a chunk when loading iterable data from CSV files, default to 1000. more details: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html. + col_names: names of the expected columns to load. if None, load all the columns. + col_groups: args to group the loaded columns to generate a new column, + it should be a dictionary, every item maps to a group, the `key` will + be the new column name, the `value` is the names of columns to combine. for example: + `col_groups={"ehr": [f"ehr_{i}" for i in range(10)], "meta": ["meta_1", "meta_2"]}` + transform: transform to apply on the loaded items of a dictionary data. + kwargs: additional arguments for `pandas.merge()` API to join tables. + """ - def __init__(self, data: Iterable, transform: Optional[Callable]) -> None: - super().__init__(data, transform=transform) + def __init__( + self, + filename: Union[str, Sequence[str]], + chunksize: int = 1000, + col_names: Optional[Sequence[str]] = None, + col_groups: Optional[Dict[str, Sequence[str]]] = None, + transform: Optional[Callable] = None, + **kwargs, + ): + self.files = ensure_tuple(filename) + self.chunksize = chunksize + self.iters = self.reset() + self.col_names = col_names + self.col_groups = col_groups + self.kwargs = kwargs + super().__init__(data=None, transform=transform) + + def reset(self, filename: Optional[Union[str, Sequence[str]]] = None): + if filename is not None: + # update files if necessary + self.files = ensure_tuple(filename) + self.iters = [pd.read_csv(f, chunksize=self.chunksize) for f in self.files] + return self.iters def __iter__(self): - return super().__iter__() + for chunks in zip(*self.iters): + self.data = convert_tables_to_dicts( + dfs=chunks, + col_names=self.col_names, + col_groups=self.col_groups, + **self.kwargs, + ) + return super().__iter__() diff --git a/monai/data/utils.py b/monai/data/utils.py index d9bfafde08..24a2a79dd7 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -16,9 +16,10 @@ import pickle import warnings from collections import defaultdict +from functools import reduce from itertools import product, starmap from pathlib import PurePath -from typing import Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union import numpy as np import torch @@ -37,8 +38,14 @@ ) from monai.utils.enums import Method +if TYPE_CHECKING: + import pandas as pd +else: + pd, _ = optional_import("pandas") + nib, _ = optional_import("nibabel") + __all__ = [ "get_random_patch", "iter_patch_slices", @@ -65,6 +72,7 @@ "decollate_batch", "pad_list_data_collate", "no_collation", + "convert_tables_to_dicts", ] @@ -983,3 +991,56 @@ def sorted_dict(item, key=None, reverse=False): if not isinstance(item, dict): return item return {k: sorted_dict(v) if isinstance(v, dict) else v for k, v in sorted(item.items(), key=key, reverse=reverse)} + + +def convert_tables_to_dicts( + dfs: Union[Sequence[pd.DataFrame], pd.DataFrame], + row_indices: Optional[Sequence[Union[int, str]]] = None, + col_names: Optional[Sequence[str]] = None, + col_groups: Optional[Dict[str, Sequence[str]]] = None, + **kwargs, +): + """ + Utility to join pandas tables, select rows, columns and generate groups. + Will return a list of dictionaries, every dictionary maps to a row of data in tables. + + Args: + dfs: data table in pandas Dataframe format. if providing a list of tables, will join them. + row_indices: indices of the expected rows to load. it should be a list, + every item can be a int number or a range `[start, end)` for the indices. + for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, + load all the rows in the file. + col_names: names of the expected columns to load. if None, load all the columns. + col_groups: args to group the loaded columns to generate a new column, + it should be a dictionary, every item maps to a group, the `key` will + be the new column name, the `value` is the names of columns to combine. for example: + `col_groups={"ehr": [f"ehr_{i}" for i in range(10)], "meta": ["meta_1", "meta_2"]}` + kwargs: additional arguments for `pandas.merge()` API to join tables. + + """ + df = reduce(lambda l, r: pd.merge(l, r, **kwargs), ensure_tuple(dfs)) + # parse row indices + rows: List[Union[int, str]] = [] + if row_indices is None: + rows = slice(df.shape[0]) + else: + for i in row_indices: + if isinstance(i, (tuple, list)): + if len(i) != 2: + raise ValueError("range of row indices must contain 2 values: start and end.") + rows.extend(list(range(i[0], i[1]))) + else: + rows.append(i) + + # convert to a list of dictionaries corresponding to every row + data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") + + # group columns to generate new column + if col_groups is not None: + groups: Dict[str, List] = {} + for name, cols in col_groups.items(): + groups[name] = df.loc[rows, cols].values + # invert items of groups to every row of data + data = [dict(d, **{k: v[i] for k, v in groups.items()}) for i, d in enumerate(data)] + + return data diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py new file mode 100644 index 0000000000..59cdbb4f88 --- /dev/null +++ b/tests/test_csv_iterable_dataset.py @@ -0,0 +1,163 @@ +# Copyright 2020 - 2021 MONAI Consortium +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile +import unittest + +import numpy as np + +from monai.data import CSVIterableDataset +from monai.transforms import ToNumpyd + + +class TestCSVIterableDataset(unittest.TestCase): + def test_values(self): + with tempfile.TemporaryDirectory() as tempdir: + test_data1 = [ + ["subject_id", "label", "image", "ehr_0", "ehr_1", "ehr_2"], + ["s000000", 5, "./imgs/s000000.png", 2.007843256, 2.29019618, 2.054902077], + ["s000001", 0, "./imgs/s000001.png", 6.839215755, 6.474509716, 5.862744808], + ["s000002", 4, "./imgs/s000002.png", 3.772548914, 4.211764812, 4.635294437], + ["s000003", 1, "./imgs/s000003.png", 3.333333254, 3.235294342, 3.400000095], + ["s000004", 9, "./imgs/s000004.png", 6.427451134, 6.254901886, 5.976470947], + ] + test_data2 = [ + ["subject_id", "ehr_3", "ehr_4", "ehr_5", "ehr_6", "ehr_7", "ehr_8"], + ["s000000", 3.019608021, 3.807843208, 3.584313869, 3.141176462, 3.1960783, 4.211764812], + ["s000001", 5.192157269, 5.274509907, 5.250980377, 4.647058964, 4.886274338, 4.392156601], + ["s000002", 5.298039436, 9.545097351, 12.57254887, 6.799999714, 2.1960783, 1.882352948], + ["s000003", 3.164705753, 3.086274624, 3.725490093, 3.698039293, 3.698039055, 3.701960802], + ["s000004", 6.26274538, 7.717647076, 9.584313393, 6.082352638, 2.662744999, 2.34117651], + ] + test_data3 = [ + ["subject_id", "ehr_9", "ehr_10", "meta_0", "meta_1", "meta_2"], + ["s000000", 6.301961422, 6.470588684, "TRUE", "TRUE", "TRUE"], + ["s000001", 5.219608307, 7.827450752, "FALSE", "TRUE", "FALSE"], + ["s000002", 1.882352948, 2.031372547, "TRUE", "FALSE", "TRUE"], + ["s000003", 3.309803963, 3.729412079, "FALSE", "FALSE", "TRUE"], + ["s000004", 2.062745094, 2.34117651, "FALSE", "TRUE", "TRUE"], + ] + + def prepare_csv_file(data, filepath): + with open(filepath, "a") as f: + for d in data: + f.write((",".join([str(i) for i in d])) + "\n") + + filepath1 = os.path.join(tempdir, "test_data1.csv") + filepath2 = os.path.join(tempdir, "test_data2.csv") + filepath3 = os.path.join(tempdir, "test_data3.csv") + prepare_csv_file(test_data1, filepath1) + prepare_csv_file(test_data2, filepath2) + prepare_csv_file(test_data3, filepath3) + + # test single CSV file + dataset = CSVIterableDataset(filepath1) + for i, item in enumerate(dataset): + if i == 2: + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, str) else v for k, v in item.items()}, + { + "subject_id": "s000002", + "label": 4, + "image": "./imgs/s000002.png", + "ehr_0": 3.7725, + "ehr_1": 4.2118, + "ehr_2": 4.6353, + }, + ) + break + # test reset iterables + dataset.reset(filename=filepath3) + for i, item in enumerate(dataset): + if i == 3: + self.assertEqual(item["meta_0"], False) + + # test multiple CSV files, join tables with kwargs + dataset = CSVIterableDataset([filepath1, filepath2, filepath3], on="subject_id") + for i, item in enumerate(dataset): + if i == 3: + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in item.items()}, + { + "subject_id": "s000003", + "label": 1, + "image": "./imgs/s000003.png", + "ehr_0": 3.3333, + "ehr_1": 3.2353, + "ehr_2": 3.4000, + "ehr_3": 3.1647, + "ehr_4": 3.0863, + "ehr_5": 3.7255, + "ehr_6": 3.6980, + "ehr_7": 3.6980, + "ehr_8": 3.7020, + "ehr_9": 3.3098, + "ehr_10": 3.7294, + "meta_0": False, + "meta_1": False, + "meta_2": True, + }, + ) + + # test selected columns and chunk size + dataset = CSVIterableDataset( + filename=[filepath1, filepath2, filepath3], + chunksize=2, + col_names=["subject_id", "image", "ehr_1", "ehr_7", "meta_1"], + ) + for i, item in enumerate(dataset): + if i == 3: + self.assertDictEqual( + {k: round(v, 4) if not isinstance(v, (str, np.bool_)) else v for k, v in item.items()}, + { + "subject_id": "s000003", + "image": "./imgs/s000003.png", + "ehr_1": 3.2353, + "ehr_7": 3.6980, + "meta_1": False, + }, + ) + + # test group columns + dataset = CSVIterableDataset( + filename=[filepath1, filepath2, filepath3], + col_names=["subject_id", "image", *[f"ehr_{i}" for i in range(11)], "meta_0", "meta_1", "meta_2"], + col_groups={"ehr": [f"ehr_{i}" for i in range(11)], "meta12": ["meta_1", "meta_2"]}, + ) + for i, item in enumerate(dataset): + if i == 3: + np.testing.assert_allclose( + [round(i, 4) for i in item["ehr"]], + [3.3333, 3.2353, 3.4000, 3.1647, 3.0863, 3.7255, 3.6980, 3.6980, 3.7020, 3.3098, 3.7294], + ) + np.testing.assert_allclose(item["meta12"], [False, True]) + + # test transform + dataset = CSVIterableDataset( + filename=[filepath1, filepath2, filepath3], + col_groups={"ehr": [f"ehr_{i}" for i in range(5)]}, + transform=ToNumpyd(keys="ehr"), + ) + expected = [ + [2.0078, 2.2902, 2.0549, 3.0196, 3.8078], + [6.8392, 6.4745, 5.8627, 5.1922, 5.2745], + [3.7725, 4.2118, 4.6353, 5.2980, 9.5451], + [3.3333, 3.2353, 3.4000, 3.1647, 3.0863], + [6.4275, 6.2549, 5.9765, 6.2627, 7.7176], + ] + for item, exp in zip(dataset, expected): + self.assertTrue(isinstance(item["ehr"], np.ndarray)) + np.testing.assert_allclose(np.around(item["ehr"], 4), exp) + + +if __name__ == "__main__": + unittest.main() From 8a169bdfe7d1e5a8d76095b3b6d6b0c6fab41df1 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 18:56:53 +0800 Subject: [PATCH 14/26] [DLMED] support multiple processes Signed-off-by: Nic Ma --- monai/data/dataset.py | 2 +- monai/data/iterable_dataset.py | 18 ++++++++++++++++-- monai/data/utils.py | 6 +++--- tests/test_csv_iterable_dataset.py | 12 +++++++++++- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/monai/data/dataset.py b/monai/data/dataset.py index fdf014f77f..a1eef7ca9e 100644 --- a/monai/data/dataset.py +++ b/monai/data/dataset.py @@ -29,7 +29,7 @@ from torch.utils.data import Dataset as _TorchDataset from torch.utils.data import Subset -from monai.data.utils import first, pickle_hashing, convert_tables_to_dicts +from monai.data.utils import convert_tables_to_dicts, first, pickle_hashing from monai.transforms import Compose, Randomizable, ThreadUnsafe, Transform, apply_transform from monai.utils import MAX_SEED, ensure_tuple, get_seed, min_version, optional_import diff --git a/monai/data/iterable_dataset.py b/monai/data/iterable_dataset.py index b9311ad2cb..0231ac0fed 100644 --- a/monai/data/iterable_dataset.py +++ b/monai/data/iterable_dataset.py @@ -9,8 +9,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Iterable, Optional, Sequence, Union, Dict +import math +from typing import Callable, Dict, Iterable, Optional, Sequence, Union + from torch.utils.data import IterableDataset as _TorchIterableDataset +from torch.utils.data import get_worker_info from monai.data.utils import convert_tables_to_dicts from monai.transforms import apply_transform @@ -52,6 +55,9 @@ class CSVIterableDataset(IterableDataset): """ Iterable dataset to load CSV files and generate dictionary data. It can be helpful when loading extemely big CSV files that can't read into memory directly. + To accelerate the loading process, it can support multi-processing based on PyTorch DataLoader workers, + every process executes tranforms on part of every loaded chunk. + Note: the order of output data may not match data source in multi-processing mode. It can load data from multiple CSV files and join the tables with addtional `kwargs` arg. Support to only load specific columns. @@ -77,6 +83,7 @@ class CSVIterableDataset(IterableDataset): kwargs: additional arguments for `pandas.merge()` API to join tables. """ + def __init__( self, filename: Union[str, Sequence[str]], @@ -92,7 +99,7 @@ def __init__( self.col_names = col_names self.col_groups = col_groups self.kwargs = kwargs - super().__init__(data=None, transform=transform) + super().__init__(data=None, transform=transform) # type: ignore def reset(self, filename: Optional[Union[str, Sequence[str]]] = None): if filename is not None: @@ -109,4 +116,11 @@ def __iter__(self): col_groups=self.col_groups, **self.kwargs, ) + info = get_worker_info() + if info is not None: + length = len(self.data) + per_worker = int(math.ceil(length / float(info.num_workers))) + start = info.id * per_worker + self.data = self.data[start : min(start + per_worker, length)] + return super().__iter__() diff --git a/monai/data/utils.py b/monai/data/utils.py index 24a2a79dd7..e103d84dd0 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -19,7 +19,7 @@ from functools import reduce from itertools import product, starmap from pathlib import PurePath -from typing import TYPE_CHECKING, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, TYPE_CHECKING, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union import numpy as np import torch @@ -999,7 +999,7 @@ def convert_tables_to_dicts( col_names: Optional[Sequence[str]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, **kwargs, -): +) -> List[Dict[str, Any]]: """ Utility to join pandas tables, select rows, columns and generate groups. Will return a list of dictionaries, every dictionary maps to a row of data in tables. @@ -1022,7 +1022,7 @@ def convert_tables_to_dicts( # parse row indices rows: List[Union[int, str]] = [] if row_indices is None: - rows = slice(df.shape[0]) + rows = slice(df.shape[0]) # type: ignore else: for i in row_indices: if isinstance(i, (tuple, list)): diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 59cdbb4f88..e70af23bb0 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -15,7 +15,7 @@ import numpy as np -from monai.data import CSVIterableDataset +from monai.data import CSVIterableDataset, DataLoader from monai.transforms import ToNumpyd @@ -158,6 +158,16 @@ def prepare_csv_file(data, filepath): self.assertTrue(isinstance(item["ehr"], np.ndarray)) np.testing.assert_allclose(np.around(item["ehr"], 4), exp) + # test multiple processes loading + dataset = CSVIterableDataset(filepath1, transform=ToNumpyd(keys="label")) + dataloader = DataLoader(dataset=dataset, num_workers=2, batch_size=2) + for i, item in enumerate(dataloader): + # test the last item which only has 1 data + if len(item) == 1: + self.assertListEqual(item["subject_id"], ["s000002"]) + np.testing.assert_allclose(item["label"], [4]) + self.assertListEqual(item["image"], ["./imgs/s000002.png"]) + if __name__ == "__main__": unittest.main() From 000372d9ae7b0d88bc27ca07cc60b296b3ebe200 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 19:11:48 +0800 Subject: [PATCH 15/26] [DLMED] fix tests Signed-off-by: Nic Ma --- monai/data/utils.py | 2 +- tests/min_tests.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/monai/data/utils.py b/monai/data/utils.py index e103d84dd0..b130a39946 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -19,7 +19,7 @@ from functools import reduce from itertools import product, starmap from pathlib import PurePath -from typing import Any, TYPE_CHECKING, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union import numpy as np import torch diff --git a/tests/min_tests.py b/tests/min_tests.py index 320e92b728..046f9b4a40 100644 --- a/tests/min_tests.py +++ b/tests/min_tests.py @@ -126,6 +126,7 @@ def run_testsuit(): "test_handler_post_processing", "test_write_metrics_reports", "test_csv_dataset", + "test_csv_iterable_dataset", ] assert sorted(exclude_cases) == sorted(set(exclude_cases)), f"Duplicated items in {exclude_cases}" From bdd67c7b48096b22228320fe890ea9e73c35c992 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 19:17:08 +0800 Subject: [PATCH 16/26] [DLMED] fix flake8 Signed-off-by: Nic Ma --- tests/test_csv_iterable_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index e70af23bb0..ee7cae7ce3 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -161,7 +161,7 @@ def prepare_csv_file(data, filepath): # test multiple processes loading dataset = CSVIterableDataset(filepath1, transform=ToNumpyd(keys="label")) dataloader = DataLoader(dataset=dataset, num_workers=2, batch_size=2) - for i, item in enumerate(dataloader): + for item in dataloader: # test the last item which only has 1 data if len(item) == 1: self.assertListEqual(item["subject_id"], ["s000002"]) From 356e33941e817f675400b949ccd67b859945455b Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 19:27:09 +0800 Subject: [PATCH 17/26] [DLMED] fix docs-build Signed-off-by: Nic Ma --- docs/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/requirements.txt b/docs/requirements.txt index acc983129f..3622fd599c 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -18,3 +18,4 @@ sphinxcontrib-jsmath sphinxcontrib-qthelp sphinxcontrib-serializinghtml sphinx-autodoc-typehints==1.11.1 +pandas From 19035297bba932ea07cb90281a70da51a28dacb2 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 19:55:47 +0800 Subject: [PATCH 18/26] [DLMED] fix min tests Signed-off-by: Nic Ma --- monai/data/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/monai/data/utils.py b/monai/data/utils.py index b130a39946..902e6f72b9 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -40,8 +40,10 @@ if TYPE_CHECKING: import pandas as pd + from pandas import DataFrame else: pd, _ = optional_import("pandas") + DataFrame, _ = optional_import("pandas", name="DataFrame") nib, _ = optional_import("nibabel") @@ -994,7 +996,7 @@ def sorted_dict(item, key=None, reverse=False): def convert_tables_to_dicts( - dfs: Union[Sequence[pd.DataFrame], pd.DataFrame], + dfs: Union[Sequence[DataFrame], DataFrame], row_indices: Optional[Sequence[Union[int, str]]] = None, col_names: Optional[Sequence[str]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, From 48d4ef75b72747b7f8a65f1d6820f5e55791978e Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 21:25:01 +0800 Subject: [PATCH 19/26] [DLMED] fix CI tests Signed-off-by: Nic Ma --- monai/data/utils.py | 11 +++-------- tests/test_csv_iterable_dataset.py | 5 ++++- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/monai/data/utils.py b/monai/data/utils.py index 902e6f72b9..16389519e0 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -38,13 +38,8 @@ ) from monai.utils.enums import Method -if TYPE_CHECKING: - import pandas as pd - from pandas import DataFrame -else: - pd, _ = optional_import("pandas") - DataFrame, _ = optional_import("pandas", name="DataFrame") - +pd, _ = optional_import("pandas") +DataFrame, _ = optional_import("pandas", name="DataFrame") nib, _ = optional_import("nibabel") @@ -996,7 +991,7 @@ def sorted_dict(item, key=None, reverse=False): def convert_tables_to_dicts( - dfs: Union[Sequence[DataFrame], DataFrame], + dfs, row_indices: Optional[Sequence[Union[int, str]]] = None, col_names: Optional[Sequence[str]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index ee7cae7ce3..ef7f481936 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -11,6 +11,7 @@ import os import tempfile +import sys import unittest import numpy as np @@ -160,7 +161,9 @@ def prepare_csv_file(data, filepath): # test multiple processes loading dataset = CSVIterableDataset(filepath1, transform=ToNumpyd(keys="label")) - dataloader = DataLoader(dataset=dataset, num_workers=2, batch_size=2) + # num workers = 0 for mac + num_workers = 0 if sys.platform == "darwin" else 2 + dataloader = DataLoader(dataset=dataset, num_workers=num_workers, batch_size=2) for item in dataloader: # test the last item which only has 1 data if len(item) == 1: From e1e3273232dc6b3189b80d47b71aea3ae3dcaf0b Mon Sep 17 00:00:00 2001 From: monai-bot Date: Mon, 21 Jun 2021 13:30:33 +0000 Subject: [PATCH 20/26] [MONAI] python code formatting Signed-off-by: monai-bot --- tests/test_csv_iterable_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index ef7f481936..9d61b8561f 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -10,8 +10,8 @@ # limitations under the License. import os -import tempfile import sys +import tempfile import unittest import numpy as np From 5195b083b46eef7062964c306fec00cede5a761a Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 21:39:38 +0800 Subject: [PATCH 21/26] [DLMED] fix typo Signed-off-by: Nic Ma --- monai/data/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monai/data/utils.py b/monai/data/utils.py index 16389519e0..0f3d666cbe 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -19,7 +19,7 @@ from functools import reduce from itertools import product, starmap from pathlib import PurePath -from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Dict, Generator, Iterable, List, Mapping, Optional, Sequence, Tuple, Union import numpy as np import torch From 95f25a8b253db1069cf50d2ab1048d0bb281d62b Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 22:34:07 +0800 Subject: [PATCH 22/26] [DLMED] change sys.platform Signed-off-by: Nic Ma --- tests/test_csv_iterable_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 9d61b8561f..5fd4271a87 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -161,8 +161,8 @@ def prepare_csv_file(data, filepath): # test multiple processes loading dataset = CSVIterableDataset(filepath1, transform=ToNumpyd(keys="label")) - # num workers = 0 for mac - num_workers = 0 if sys.platform == "darwin" else 2 + # set num workers = 0 for mac / win + num_workers = 2 if sys.platform == "linux" else 0 dataloader = DataLoader(dataset=dataset, num_workers=num_workers, batch_size=2) for item in dataloader: # test the last item which only has 1 data From 14f3a8e8276d7a810c02e69024525c1b49b1383c Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Mon, 21 Jun 2021 23:35:55 +0800 Subject: [PATCH 23/26] [DLMED] skip if windows Signed-off-by: Nic Ma --- tests/test_csv_iterable_dataset.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 5fd4271a87..9c30fd008a 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -18,8 +18,10 @@ from monai.data import CSVIterableDataset, DataLoader from monai.transforms import ToNumpyd +from .utils import skip_if_windows +@skip_if_windows class TestCSVIterableDataset(unittest.TestCase): def test_values(self): with tempfile.TemporaryDirectory() as tempdir: From 780ca06882b16966dda7fc8c3dc28462c99f8912 Mon Sep 17 00:00:00 2001 From: monai-bot Date: Mon, 21 Jun 2021 15:42:30 +0000 Subject: [PATCH 24/26] [MONAI] python code formatting Signed-off-by: monai-bot --- tests/test_csv_iterable_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 9c30fd008a..699b04e44e 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -18,6 +18,7 @@ from monai.data import CSVIterableDataset, DataLoader from monai.transforms import ToNumpyd + from .utils import skip_if_windows From 8207fe338a7758e46b4fee4f4e62e19f35debbf8 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Tue, 22 Jun 2021 15:07:39 +0800 Subject: [PATCH 25/26] [DLMED] add col_types arg Signed-off-by: Nic Ma --- monai/data/dataset.py | 15 +++++++++++++++ monai/data/iterable_dataset.py | 18 +++++++++++++++++- monai/data/utils.py | 26 +++++++++++++++++++++++++- tests/test_csv_dataset.py | 14 ++++++++++++++ tests/test_csv_iterable_dataset.py | 2 +- 5 files changed, 72 insertions(+), 3 deletions(-) diff --git a/monai/data/dataset.py b/monai/data/dataset.py index a1eef7ca9e..74b9726081 100644 --- a/monai/data/dataset.py +++ b/monai/data/dataset.py @@ -1088,6 +1088,19 @@ class CSVDataset(Dataset): for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, load all the rows in the file. col_names: names of the expected columns to load. if None, load all the columns. + col_types: `type` and `default value` to convert the loaded columns, if None, use original data. + it should be a dictionary, every item maps to an expected column, the `key` is the column + name and the `value` is None or a dictionary to define the default value and data type. + the supported keys in dictionary are: ["type", "default"]. for example:: + + col_types = { + "subject_id": {"type": str}, + "label": {"type": int, "default": 0}, + "ehr_0": {"type": float, "default": 0.0}, + "ehr_1": {"type": float, "default": 0.0}, + "image": {"type": str, "default": None}, + } + col_groups: args to group the loaded columns to generate a new column, it should be a dictionary, every item maps to a group, the `key` will be the new column name, the `value` is the names of columns to combine. for example: @@ -1102,6 +1115,7 @@ def __init__( filename: Union[str, Sequence[str]], row_indices: Optional[Sequence[Union[int, str]]] = None, col_names: Optional[Sequence[str]] = None, + col_types: Optional[Dict[str, Optional[Dict[str, Any]]]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, transform: Optional[Callable] = None, **kwargs, @@ -1112,6 +1126,7 @@ def __init__( dfs=dfs, row_indices=row_indices, col_names=col_names, + col_types=col_types, col_groups=col_groups, **kwargs, ) diff --git a/monai/data/iterable_dataset.py b/monai/data/iterable_dataset.py index 0231ac0fed..75bab462d4 100644 --- a/monai/data/iterable_dataset.py +++ b/monai/data/iterable_dataset.py @@ -10,7 +10,7 @@ # limitations under the License. import math -from typing import Callable, Dict, Iterable, Optional, Sequence, Union +from typing import Any, Callable, Dict, Iterable, Optional, Sequence, Union from torch.utils.data import IterableDataset as _TorchIterableDataset from torch.utils.data import get_worker_info @@ -75,6 +75,19 @@ class CSVIterableDataset(IterableDataset): chunksize: rows of a chunk when loading iterable data from CSV files, default to 1000. more details: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html. col_names: names of the expected columns to load. if None, load all the columns. + col_types: `type` and `default value` to convert the loaded columns, if None, use original data. + it should be a dictionary, every item maps to an expected column, the `key` is the column + name and the `value` is None or a dictionary to define the default value and data type. + the supported keys in dictionary are: ["type", "default"]. for example:: + + col_types = { + "subject_id": {"type": str}, + "label": {"type": int, "default": 0}, + "ehr_0": {"type": float, "default": 0.0}, + "ehr_1": {"type": float, "default": 0.0}, + "image": {"type": str, "default": None}, + } + col_groups: args to group the loaded columns to generate a new column, it should be a dictionary, every item maps to a group, the `key` will be the new column name, the `value` is the names of columns to combine. for example: @@ -89,6 +102,7 @@ def __init__( filename: Union[str, Sequence[str]], chunksize: int = 1000, col_names: Optional[Sequence[str]] = None, + col_types: Optional[Dict[str, Optional[Dict[str, Any]]]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, transform: Optional[Callable] = None, **kwargs, @@ -97,6 +111,7 @@ def __init__( self.chunksize = chunksize self.iters = self.reset() self.col_names = col_names + self.col_types = col_types self.col_groups = col_groups self.kwargs = kwargs super().__init__(data=None, transform=transform) # type: ignore @@ -113,6 +128,7 @@ def __iter__(self): self.data = convert_tables_to_dicts( dfs=chunks, col_names=self.col_names, + col_types=self.col_types, col_groups=self.col_groups, **self.kwargs, ) diff --git a/monai/data/utils.py b/monai/data/utils.py index 0f3d666cbe..2958fad18a 100644 --- a/monai/data/utils.py +++ b/monai/data/utils.py @@ -994,6 +994,7 @@ def convert_tables_to_dicts( dfs, row_indices: Optional[Sequence[Union[int, str]]] = None, col_names: Optional[Sequence[str]] = None, + col_types: Optional[Dict[str, Optional[Dict[str, Any]]]] = None, col_groups: Optional[Dict[str, Sequence[str]]] = None, **kwargs, ) -> List[Dict[str, Any]]: @@ -1008,6 +1009,19 @@ def convert_tables_to_dicts( for example: `row_indices=[[0, 100], 200, 201, 202, 300]`. if None, load all the rows in the file. col_names: names of the expected columns to load. if None, load all the columns. + col_types: `type` and `default value` to convert the loaded columns, if None, use original data. + it should be a dictionary, every item maps to an expected column, the `key` is the column + name and the `value` is None or a dictionary to define the default value and data type. + the supported keys in dictionary are: ["type", "default"], and note that the value of `default` + should not be `None`. for example:: + + col_types = { + "subject_id": {"type": str}, + "label": {"type": int, "default": 0}, + "ehr_0": {"type": float, "default": 0.0}, + "ehr_1": {"type": float, "default": 0.0}, + } + col_groups: args to group the loaded columns to generate a new column, it should be a dictionary, every item maps to a group, the `key` will be the new column name, the `value` is the names of columns to combine. for example: @@ -1030,7 +1044,17 @@ def convert_tables_to_dicts( rows.append(i) # convert to a list of dictionaries corresponding to every row - data: List[Dict] = (df.loc[rows] if col_names is None else df.loc[rows, col_names]).to_dict(orient="records") + data_ = df.loc[rows] if col_names is None else df.loc[rows, col_names] + if isinstance(col_types, dict): + # fill default values for NaN + defaults = {k: v["default"] for k, v in col_types.items() if v is not None and v.get("default") is not None} + if len(defaults) > 0: + data_ = data_.fillna(value=defaults) + # convert data types + types = {k: v["type"] for k, v in col_types.items() if v is not None and "type" in v} + if len(types) > 0: + data_ = data_.astype(dtype=types) + data: List[Dict] = data_.to_dict(orient="records") # group columns to generate new column if col_groups is not None: diff --git a/tests/test_csv_dataset.py b/tests/test_csv_dataset.py index f5020db0d9..d187f4e64d 100644 --- a/tests/test_csv_dataset.py +++ b/tests/test_csv_dataset.py @@ -45,6 +45,8 @@ def test_values(self): ["s000002", 1.882352948, 2.031372547, "TRUE", "FALSE", "TRUE"], ["s000003", 3.309803963, 3.729412079, "FALSE", "FALSE", "TRUE"], ["s000004", 2.062745094, 2.34117651, "FALSE", "TRUE", "TRUE"], + # generate NaN values in the row + ["s000005", 3.353655643, 1.675674543, "TRUE", "TRUE", "FALSE"], ] def prepare_csv_file(data, filepath): @@ -147,6 +149,18 @@ def prepare_csv_file(data, filepath): self.assertTrue(isinstance(item["ehr"], np.ndarray)) np.testing.assert_allclose(np.around(item["ehr"], 4), exp) + # test default values and dtype + dataset = CSVDataset( + filename=[filepath1, filepath2, filepath3], + col_names=["subject_id", "image", "ehr_1", "ehr_9", "meta_1"], + col_types={"image": {"type": str, "default": "No image"}, "ehr_1": {"type": int, "default": 0}}, + how="outer", # generate NaN values in this merge mode + ) + self.assertEqual(len(dataset), 6) + self.assertEqual(dataset[-1]["image"], "No image") + self.assertEqual(type(dataset[-1]["ehr_1"]), int) + np.testing.assert_allclose(dataset[-1]["ehr_9"], 3.3537, rtol=1e-2) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 699b04e44e..10455cbb45 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -19,7 +19,7 @@ from monai.data import CSVIterableDataset, DataLoader from monai.transforms import ToNumpyd -from .utils import skip_if_windows +from tests.utils import skip_if_windows @skip_if_windows From 9e08e1dc0a481bad1e5c2e381f74bfc443cfd819 Mon Sep 17 00:00:00 2001 From: monai-bot Date: Tue, 22 Jun 2021 07:13:12 +0000 Subject: [PATCH 26/26] [MONAI] python code formatting Signed-off-by: monai-bot --- tests/test_csv_iterable_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_csv_iterable_dataset.py b/tests/test_csv_iterable_dataset.py index 10455cbb45..c7a3f31dc6 100644 --- a/tests/test_csv_iterable_dataset.py +++ b/tests/test_csv_iterable_dataset.py @@ -18,7 +18,6 @@ from monai.data import CSVIterableDataset, DataLoader from monai.transforms import ToNumpyd - from tests.utils import skip_if_windows