-
Notifications
You must be signed in to change notification settings - Fork 2
/
_generator.py
85 lines (61 loc) · 2.56 KB
/
_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os
import os.path
import typing
import dask.dataframe
import pandas
def generator(
directories: typing.List[typing.Union[str, bytes, os.PathLike]],
image: typing.Optional[typing.Union[str, bytes, os.PathLike]] = "Image.csv",
objects: typing.List[typing.Union[str, bytes, os.PathLike]] = [
"Cells.csv",
"Cytoplasm.csv",
"Nuclei.csv",
],
partition_on: typing.Optional[typing.List[str]] = ["Metadata_Well"],
):
index = 0
while index < len(directories):
directory = directories[index]
image_pathname = os.path.join(directory, image)
try:
image_records = pandas.read_csv(image_pathname)
image_records.set_index("ImageNumber", inplace=True)
concatenated_object_records = pandas.DataFrame()
# Open object CSVs (e.g. Cells.csv, Cytoplasm.csv, Nuclei.csv, etc.)
# as Pandas DataFrames:
for object in objects:
object_pathname = os.path.join(directory, object)
prefix, _ = os.path.splitext(object)
object_records = pandas.read_csv(object_pathname)
object_records = object_records.add_prefix(f"{prefix}_")
columns = {
f"{prefix}_ImageNumber": "ImageNumber",
f"{prefix}_ObjectNumber": "ObjectNumber",
}
object_records.rename(columns=columns, inplace=True)
object_records.set_index(
["ImageNumber", "ObjectNumber"], drop=False, inplace=True
)
object_records[f"{prefix}_ImageNumber"] = object_records["ImageNumber"]
object_records[f"{prefix}_ObjectNumber"] = object_records[
"ObjectNumber"
]
object_records.drop(
["ImageNumber", "ObjectNumber"], axis=1, inplace=True
)
concatenated_object_records = pandas.concat(
[concatenated_object_records, object_records], axis=1
)
records = image_records.merge(
concatenated_object_records,
how="outer",
left_index=True,
right_index=True,
)
records.reset_index(drop=False, inplace=True)
npartitions = records[partition_on[0]].unique().size
records = dask.dataframe.from_pandas(records, npartitions=npartitions)
yield records
index += 1
except FileNotFoundError as _:
yield