Skip to content

Commit

Permalink
refactor tiler
Browse files Browse the repository at this point in the history
  • Loading branch information
ernestoarbitrio committed Jan 18, 2021
1 parent f1c74ae commit 8301127
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 138 deletions.
146 changes: 78 additions & 68 deletions src/histolab/tiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from .tile import Tile
from .types import CoordinatePair
from .util import (
lazyproperty,
region_coordinates,
regions_from_binary_mask,
scale_coordinates,
Expand Down Expand Up @@ -103,7 +102,11 @@ def locate_tiles(
img.putalpha(alpha)
draw = PIL.ImageDraw.Draw(img)

tiles = self._tiles_locator(slide)
tiles = (
self._tiles_generator(slide)[0]
if isinstance(self, ScoreTiler)
else self._tiles_generator(slide)
)
tiles_coords = (tile[1] for tile in tiles)
for coords in tiles_coords:
rescaled = scale_coordinates(coords, slide.dimensions, img.size)
Expand Down Expand Up @@ -158,16 +161,36 @@ def _tile_filename(

return tile_filename

def _tiles_generator(self, slide: Slide) -> Tuple[Tile, CoordinatePair]:
def _tiles_generator(self, slide: Slide) -> None:
raise NotImplementedError

@lazyproperty
def _tiles_locator(self):
return (
self._highest_score_tiles
if isinstance(self, ScoreTiler)
else self._tiles_generator
)
def _validate_levels(self, slide: Slide) -> None:
"""Validate the Tiler's level according to the Slide.
Parameters
----------
slide : Slide
Slide from which to extract the tiles
Raises
------
TileSizeError
If the tile size is larger than the slide size
LevelError
If the level is not available for the slide
"""
if len(slide.levels) - abs(self.level) < 0:
raise LevelError(
f"Level {self.level} not available. Number of available levels: "
f"{len(slide.levels)}"
)

if not self._has_valid_tile_size(slide):
raise TileSizeError(
f"Tile size {self.tile_size} is larger than slide size "
f"{slide.level_dimensions(self.level)} at level {self.level}"
)
pass


class GridTiler(Tiler):
Expand Down Expand Up @@ -213,7 +236,7 @@ def __init__(
self.prefix = prefix
self.suffix = suffix

def extract(self, slide: Slide):
def extract(self, slide: Slide) -> None:
"""Extract tiles arranged in a grid and save them to disk, following this
filename pattern:
`{prefix}tile_{tiles_counter}_level{level}_{x_ul_wsi}-{y_ul_wsi}-{x_br_wsi}-{y_br_wsi}{suffix}`
Expand All @@ -230,28 +253,16 @@ def extract(self, slide: Slide):
LevelError
If the level is not available for the slide
"""
if len(slide.levels) - abs(self.level) < 0:
raise LevelError(
f"Level {self.level} not available. Number of available levels: "
f"{len(slide.levels)}"
)

if not self._has_valid_tile_size(slide):
raise TileSizeError(
f"Tile size {self.tile_size} is larger than slide size "
f"{slide.level_dimensions(self.level)} at level {self.level}"
)
self._validate_levels(slide)

grid_tiles = self._tiles_generator(slide)

tiles_counter = 0

for tiles_counter, (tile, tile_wsi_coords) in enumerate(grid_tiles):
tile_filename = self._tile_filename(tile_wsi_coords, tiles_counter)
full_tile_path = os.path.join(slide.processed_path, "tiles", tile_filename)
tile.save(full_tile_path)
print(f"\t Tile {tiles_counter} saved: {tile_filename}")

print(f"{tiles_counter} Grid Tiles have been saved.")

@property
Expand Down Expand Up @@ -451,7 +462,7 @@ def __init__(
self.prefix = prefix
self.suffix = suffix

def extract(self, slide: Slide):
def extract(self, slide: Slide) -> None:
"""Extract random tiles and save them to disk, following this filename pattern:
`{prefix}tile_{tiles_counter}_level{level}_{x_ul_wsi}-{y_ul_wsi}-{x_br_wsi}-{y_br_wsi}{suffix}`
Expand All @@ -467,23 +478,15 @@ def extract(self, slide: Slide):
LevelError
If the level is not available for the slide
"""
if len(slide.levels) - abs(self.level) < 0:
raise LevelError(
f"Level {self.level} not available. Number of available levels: "
f"{len(slide.levels)}"
)
if not self._has_valid_tile_size(slide):
raise TileSizeError(
f"Tile size {self.tile_size} is larger than slide size "
f"{slide.level_dimensions(self.level)} at level {self.level}"
)
self._validate_levels(slide)

random_tiles = self._tiles_generator(slide)

tiles_counter = 0
for tiles_counter, (tile, tile_wsi_coords) in enumerate(random_tiles):
tile_filename = self._tile_filename(tile_wsi_coords, tiles_counter)
tile.save(os.path.join(slide.processed_path, "tiles", tile_filename))
full_tile_path = os.path.join(slide.processed_path, "tiles", tile_filename)
tile.save(full_tile_path)
print(f"\t Tile {tiles_counter} saved: {tile_filename}")
print(f"{tiles_counter+1} Random Tiles have been saved.")

Expand Down Expand Up @@ -650,7 +653,7 @@ def __init__(
suffix,
)

def extract(self, slide: Slide, report_path: str = None):
def extract(self, slide: Slide, report_path: str = None) -> None:
"""Extract grid tiles and save them to disk, according to a scoring function and
following this filename pattern:
`{prefix}tile_{tiles_counter}_level{level}_{x_ul_wsi}-{y_ul_wsi}-{x_br_wsi}-{y_br_wsi}{suffix}`
Expand All @@ -671,18 +674,10 @@ def extract(self, slide: Slide, report_path: str = None):
LevelError
If the level is not available for the slide
"""
if len(slide.levels) - abs(self.level) < 0:
raise LevelError(
f"Level {self.level} not available. Number of available levels: "
f"{len(slide.levels)}"
)
if not self._has_valid_tile_size(slide):
raise TileSizeError(
f"Tile size {self.tile_size} is larger than slide size "
f"{slide.level_dimensions(self.level)} at level {self.level}"
)
self._validate_levels(slide)

highest_score_tiles, highest_scaled_score_tiles = self._tiles_generator(slide)

highest_score_tiles = self._highest_score_tiles(slide)
tiles_counter = 0
filenames = []

Expand All @@ -694,7 +689,6 @@ def extract(self, slide: Slide, report_path: str = None):
print(f"\t Tile {tiles_counter} - score: {score} saved: {tile_filename}")

if report_path:
highest_scaled_score_tiles = self._highest_score_tiles(slide, scaled=True)
self._save_report(
report_path, highest_score_tiles, highest_scaled_score_tiles, filenames
)
Expand All @@ -703,21 +697,19 @@ def extract(self, slide: Slide, report_path: str = None):

# ------- implementation helpers -------

def _highest_score_tiles(
self, slide: Slide, scaled: bool = False
) -> List[Tuple[float, CoordinatePair]]:
def _tiles_generator(
self, slide: Slide
) -> Tuple[List[Tuple[float, CoordinatePair]], List[Tuple[float, CoordinatePair]]]:
r"""Calculate the tiles with the highest scores and their extraction coordinates
Parameters
----------
slide : Slide
The slide to extract the tiles from.
scaled: bool
Whether to return the scores scaled between 0 and 1.
Returns
-------
List[Tuple[float, CoordinatePair]]
Tuple[List[Tuple[float, CoordinatePair]], List[Tuple[float, CoordinatePair]]]
List of tuples containing the scores and the extraction coordinates
for the tiles with the highest scores. If scaled=True, each score `s_i` of
the i-th tile is normalized as
Expand All @@ -735,22 +727,40 @@ def _highest_score_tiles(
ValueError
If ``n_tiles`` is negative.
""" # noqa
scores = (
self._scale_scores(self._scores(slide)) if scaled else self._scores(slide)
# scores = (
# self._scale_scores(self._scores(slide)) if scaled else self._scores(slide)
# )
#
# sorted_tiles_by_score = sorted(scores, key=lambda x: x[0], reverse=True)
#
# if self.n_tiles < 0:
# raise ValueError(f"'n_tiles' cannot be negative ({self.n_tiles})")
#
# highest_score_tiles = (
# sorted_tiles_by_score[: self.n_tiles]
# if self.n_tiles > 0
# else sorted_tiles_by_score
# )
#
# return highest_score_tiles
all_scores = self._scores(slide)
scaled_scores = self._scale_scores(all_scores)

sorted_tiles_by_score = sorted(all_scores, key=lambda x: x[0], reverse=True)
sorted_tiles_by_scaled_score = sorted(
scaled_scores, key=lambda x: x[0], reverse=True
)

sorted_tiles_by_score = sorted(scores, key=lambda x: x[0], reverse=True)

if self.n_tiles < 0:
raise ValueError(f"'n_tiles' cannot be negative ({self.n_tiles})")

highest_score_tiles = (
sorted_tiles_by_score[: self.n_tiles]
if self.n_tiles > 0
else sorted_tiles_by_score
)
if self.n_tiles > 0:
highest_score_tiles = sorted_tiles_by_score[: self.n_tiles]
highest_scaled_score_tiles = sorted_tiles_by_scaled_score[: self.n_tiles]
else:
highest_score_tiles = sorted_tiles_by_score
highest_scaled_score_tiles = sorted_tiles_by_scaled_score

return highest_score_tiles
return highest_score_tiles, highest_scaled_score_tiles

@staticmethod
def _save_report(
Expand Down Expand Up @@ -834,12 +844,12 @@ def _scores(self, slide: Slide) -> List[Tuple[float, CoordinatePair]]:
List of tuples containing the score and the extraction coordinates for each
tile. Each tuple represents a tile.
"""
if next(self._tiles_generator(slide), None) is None:
if next(super()._tiles_generator(slide), None) is None:
raise RuntimeError(
"No tiles have been generated. This could happen if `check_tissue=True`"
)

grid_tiles = self._tiles_generator(slide)
grid_tiles = super()._tiles_generator(slide)
scores = []

for tile, tile_wsi_coords in grid_tiles:
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/test_tiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,15 @@ def it_locates_tiles_on_the_slide(
expectation,
type_="png",
)
tiles_location_img = scored_tiles_extractor.locate_tiles(slide, scale_factor=10)
expected_warning_regex = (
r"Input image must be RGB. NOTE: the image will be converted to RGB before"
r" HED conversion."
)

with pytest.warns(UserWarning, match=expected_warning_regex):
tiles_location_img = scored_tiles_extractor.locate_tiles(
slide, scale_factor=10
)
# --- Expanding test report with actual and expected images ---
expand_tests_report(request, expected=expected_img, actual=tiles_location_img)

Expand Down
Loading

0 comments on commit 8301127

Please sign in to comment.