In [1]:
# Import needed libraries
import sys
import os
from PIL import Image
#import matplotlib.pyplot as plt
import numpy as np
import cv2
import pandas as pd
import ast
import glob

import logging
logging.basicConfig(format='%(asctime)s %(message)s')


TILE_SIZE = 256
NUM_TILES = 2

In [2]:
MY_HOME = '/home/operador/saturdays-ai/PAIthology'

LABELS_FILE = f'{MY_HOME}/src/Labels.xlsx'
IMAGES_DIR = f'{MY_HOME}/dataset/annotated_images'
TILES_DIR = f'{MY_HOME}/dataset/tiles'

os.makedirs(TILES_DIR, exist_ok=True)
os.makedirs(os.path.join(TILES_DIR,'annotations'), exist_ok=True)
os.makedirs(os.path.join(TILES_DIR,'images'), exist_ok=True)

In [3]:
# Keep only mitosis cells
labels = pd.read_excel(LABELS_FILE, dtype={"MPoint": object})
labels_mitosis = labels[labels['Mitosis']]

In [4]:
labels_mitosis.head()

Unnamed: 0,name,Combinacion,Image,Mitosis,SubImage,MPoint
0,A03_00Aa_mitosis,A03_00Aa,A03,True,1,"[[1094.0, 1223.0, 0.8]]"
2,A03_00Ab_mitosis,A03_00Ab,A03,True,1,"[[476.0, 394.0, 0.65]]"
4,A03_00Ac_mitosis,A03_00Ac,A03,True,1,"[[289.0, 1316.0, 0.65]]"
6,A03_00Ad_mitosis,A03_00Ad,A03,True,3,"[[1420.0, 199.0, 1.0], [200.0, 1190.0, 0.8], [..."
7,A03_00Ba_mitosis,A03_00Ba,A03,True,1,"[[660.0, 248.0, 0.65]]"


In [None]:
'''Run data augmentation 
 --> for every mitotic point we create 
 - 10 images containing that cell and
 - 10 not containing any
 '''
for i, row in labels_mitosis.iterrows():
    filename = f'{row["Combinacion"]}.tiff'
    path_image = glob.glob(os.path.join(path_images,'*','x40', filename))
    if os.path.exists(path_image[0]):
        print(f"Interpreting {filename}")
        cells = row["MPoint"]
        cells = ast.literal_eval(cells)
        frame = Frame(path=path_image[0],
                      cells=cells,
                      tile_size=TILE_SIZE,
                      num_tiles=NUM_TILES,
                      path_annotations=path_annotations)

        frame.get_records()
        frame.create_mask()
        frame.get_all_tiles()
        frame.create_annotations()

In [122]:
for i, row in labels_mitosis.iterrows():
    filename = f'{row["Combinacion"]}_mitosis.jpg'
    #path_image = glob.glob(os.path.join(IMAGES_DIR,'*','x40', filename))
    path_image = glob.glob(os.path.join(IMAGES_DIR, filename))
    break

In [123]:
filename, path_image

('A03_00Aa_mitosis.jpg',
 ['/home/operador/saturdays-ai/PAIthology/dataset/annotated_images/A03_00Aa_mitosis.jpg'])

In [124]:
os.path.exists(path_image[0])

True

In [125]:
cells = row["MPoint"]
cells = ast.literal_eval(cells)
cells

[[1094.0, 1223.0, 0.8]]

In [127]:
frame = Frame(path=path_image[0],
                      cells=cells,
                      tile_size=TILE_SIZE,
                      num_tiles=NUM_TILES,
                      path_annotations=TILES_DIR)

In [128]:
frame.get_records()

Got records for A03_00Aa_mitosis.jpg


In [129]:
frame.records[0].__dict__

{'x': 1094.0, 'y': 1223.0, 'confidence': 0.8}

In [130]:
frame.create_mask()

Generated mask for A03_00Aa_mitosis.jpg


In [131]:
frame.frame_mask.sum()

65536.0

In [132]:
frame.frame.shape

(1376, 1539, 3)

In [133]:
frame.get_all_tiles()

TypeError: slice indices must be integers or None or have an __index__ method

In [134]:
tg = TileGenerator(frame, frame.tile_size, frame.num_tiles)

TypeError: slice indices must be integers or None or have an __index__ method

In [126]:
# This could be a namedtuple
class Record:
    """Class which gathers information of the position and confidence of a cell."""
    def __init__(self, x: int, y: int, confidence: float):
        self.x = x
        self.y = y
        self.confidence = confidence

class Frame:
    """Class containg all the information gathered of an specific frame and its
    mitotic and not mitotic tiles."""
    def __init__(self, path, cells, tile_size, num_tiles=10, path_annotations=None):
        self.path = path
        self.filename = os.path.basename(path)
        self.frame = cv2.imread(path)
        self.width = self.frame.shape[0]
        self.height = self.frame.shape[1]
        self.tile_size = tile_size
        self.num_tiles = num_tiles
        self.cells = cells
        self.tiles_mitosis = []
        self.tiles_not_mitosis = []
        self.records = []
        self.path_annotations = path_annotations
        self.frame_mask = []
        
    def get_records(self):
        self.records = [Record(*cell) for cell in self.cells]
        print(f'Got records for {self.filename}')
        
        
    def create_mask(self):
        '''Generate a mask of ones around the mitosis point'''
        mask = np.zeros((self.height, self.width))
        for record in self.records:
            # This could use floor and ceiling to also work with odd tile sizes 
            mask[int(record.x-(self.tile_size/2)):int(record.x+(self.tile_size/2)),
                 int(record.y-(self.tile_size/2)):int(record.y+(self.tile_size/2))] = 1
        self.frame_mask = mask
        print(f'Generated mask for {self.filename}')

    def get_all_tiles(self):
        tile_generator = TileGenerator(self, self.tile_size, self.num_tiles)
        self.tiles_mitosis = tile_generator.generate_real_positive_tiles()
        self.tiles_not_mitosis = tile_generator.generate_real_negative_tiles()

    def update_tiles_mitosis(self,tile):
        self.tiles_mitosis += [tile]
    
    def update_tiles_not_mitosis(self,tile):
        self.tiles_not_mitosis += [tile]
        
    def create_annotations(self):
        delta = 15
        count = 0
        for tile_mitosis in self.tiles_mitosis:
            image = tile_mitosis.tile
            tree = create_base_xml(self,image,f"{self.filename.replace('.tiff','')}_mitosis_{count}.jpg")
            for record in tile_mitosis.records:
                coordinates = (record.x - delta, record.x  + delta, record.y - delta, record.y + delta)
                tree = create_object_xml(tree,coordinates)
            tree.write(os.path.join(self.path_annotations,'annotations',f"{self.filename.replace('.tiff','')}_mitosis_{count}.xml"))
            cv2.imwrite(os.path.join(self.path_annotations,'images',f"{self.filename.replace('.tiff','')}_mitosis_{count}.jpg"),image) 
            count += 1
        count = 0
        for tile_not_mitosis in self.tiles_not_mitosis:
            image_not = tile_not_mitosis.tile
            tree = create_base_xml(self,image,f"{self.filename.replace('.tiff','')}_notmitosis_{count}.jpg")
            tree.write(os.path.join(self.path_annotations,'annotations',f"{self.filename.replace('.tiff','')}_notmitosis_{count}.xml"))
            cv2.imwrite(os.path.join(self.path_annotations,'images',f"{self.filename.replace('.tiff','')}_notmitosis_{count}.jpg"),image_not) 
            count += 1

In [121]:
class TileGenerator:
    def __init__(self, frame, tile_size, num_tiles):
        self.tile_size = tile_size
        self.num_tiles = num_tiles
        self.frame = frame
        self.image_frame = np.array(frame.frame)
        self.mitotic_coordinates = frame.records
        self.centroid_limits = self.get_centroid_limits()
        self.possible_negative_centroids = self.get_possible_negative_centroids()
        self.possible_positive_centroids = self.get_possible_positive_centroids()
        
    def get_possible_negative_centroids(self):
        
        possible_centroids = frame.frame_mask[
            self.centroid_limits[0][0]:self.centroid_limits[0][1],
            self.centroid_limits[1][0]:self.centroid_limits[1][1]
        ]
        return [*zip(*np.where(possible_centroids == 0))]
    
    def get_possible_positive_centroids(self):
        
        possible_centroids = frame.frame_mask[
            self.centroid_limits[0][0]:self.centroid_limits[0][1],
            self.centroid_limits[1][0]:self.centroid_limits[1][1]
        ]
        return [*zip(*np.where(possible_centroids == 1))]
        
    def get_random_negative_tile_centroid(self):
        return random.choice(self.possible_negative_centroids)
        
    def get_random_positive_tile_centroid(self):
        return random.choice(self.possible_postive_centroids)
    
    def get_centroid_limits(self):
        limits_random_centroid_x = (
            self.tile_size/2, # min limit
            frame.frame.shape[0] - self.tile_size/2  # max limit
        )

        limits_random_centroid_y = (
            self.tile_size/2, # min limit
            frame.frame.shape[1] - self.tile_size/2  # max limit
        )

        return limits_random_centroid_x, limits_random_centroid_y
    
    def get_boundaries_tile(self, centroid):
        return (
            (centroid_coord - self.tile_size/2, centroid_coord + self.tile_size/2) for centroid_coord in centroid
        )
    
    def check_record_presence_in_boundaries(record, boundaries):
        return (boundaries[0][0] <= record.x <= boundaries[0][1]) and \
               (boundaries[1][0] <= record.y <= boundaries[1][1])
    
    def frame_record_to_tile_record(x_min_tile : int, y_min_tile : int, record: Record) -> Record:
        x_tile = record.x - x_min_tile
        y_tile = record.y - y_min_tile    
        return Record(x_tile, y_tile, record.confidence)
    
    def generate_real_negative_tiles(self):
        negative_tiles_centroids = [self.get_random_negative_tile_centroid() for _ in range(num_tiles)]
        boundaries_negative_tiles = [self.get_boundaries_tile(cent) for cent in negative_tiles_centroids]
        negative_tiles_images = [self.image_frame[boundaries[0][0]:boundaries[0][1],
            boundaries[1][0]:boundaries[1][1]] for boundaries in boundaries_negative_tiles]
        negative_tiles_records = [
            [self.frame_record_to_tile_record(boundaries[0][0], boundaries[1][0], record) 
                 for record in self.mitotic_coordinates
                 if self.check_record_presence_in_boundaries(record, boundaries)
            ]
            for boundaries in boundaries_negative_tiles
        ]
        return [Tile(image, records) for image, records in zip(negative_tiles_images, negative_tiles_records)]
    
    def generate_real_positive_tiles(self):
        positive_tiles_centroids = [self.get_random_positive_tile_centroid() for _ in range(num_tiles)]
        boundaries_positive_tiles = [self.get_boundaries_tile(cent) for cent in positive_tiles_centroids]
        positive_tiles_images = [self.image_frame[boundaries[0][0]:boundaries[0][1],
            boundaries[1][0]:boundaries[1][1]] for boundaries in boundaries_positive_tiles]
        positive_tiles_records = [
            [self.frame_record_to_tile_record(boundaries[0][0], boundaries[1][0], record) 
                 for record in self.mitotic_coordinates
                 if self.check_record_presence_in_boundaries(record, boundaries)
            ]
            for boundaries in boundaries_positive_tiles
        ]
        return [Tile(image, records) for image, records in zip(positive_tiles_images, positive_tiles_records)]


    def generate_negative_tiles(self, coordinates):
        """Generates random tiles that do NOT contain the mitotic coordinates. We pick x1 and y1
        randomly within the range of the whole image.
        We then randomly choose x2 and y2 if the patch does not contain the mitotic coordinates,
        we discard it otherwise"""
        x_image, y_image, _ = self.image_frame.shape
        x_mitotic, y_mitotic = coordinates

        coord_x1 = []
        coord_y1 = []
        coord_x2 = []
        coord_y2 = []

        choice = [self.tile_size, -self.tile_size]

        i = 0
        while i < self.num_tiles:
            #choose x1 and y1 within the whole image
            x_choice = np.random.randint(0, x_image, 1)
            y_choice = np.random.randint(0, y_image, 1)
            #generate random x2 and y2 candidates, we then choose the ones that do
            # not contain the mitotic coordinates
            x2_candidate = x_choice + (random.choice(choice))
            y2_candidate = y_choice + (random.choice(choice))                  
            # search if choice of coordinates inside boundaries and if valid centre of \
            # coordinates. This is achieved when the value of frame.frame_mask == 0. 
            if (0 < x2_candidate[0] < x_image) \
                and (0 < x_choice[0] +int(self.frame.tile_size/2) < x_image)\
                and (0 < y2_candidate[0] < y_image) and (0 < y_choice[0] +int(self.frame.tile_size/2) < y_image)\
                and (self.frame.frame_mask[y_choice[0]+int(self.frame.tile_size/2),x_choice[0]+int(self.frame.tile_size/2)] == 0)\
                and (abs(x_choice - x2_candidate) == self.tile_size and abs(y_choice - y2_candidate == self.tile_size)):

                coord_x1.append(x_choice)
                coord_y1.append(y_choice)

                coord_x2.append(x2_candidate)
                coord_y2.append(y2_candidate)
                i += 1
        return (coord_x1, coord_x2, coord_y1, coord_y2)

    def generate_positive_patches(self, coordinates):
        """Generates random tiles that contain the mitotic coordinates. We pick x1 and y1
        randomly within the range of the mitotic coordinates, given a tile size. We then randomly choose
        x2 and y2 if the patch contains the mitotic coordinates, we discard it otherwise"""
        x_image, y_image, _ = self.image.shape
        x_mitotic, y_mitotic = coordinates

        coord_x1 = []
        coord_y1 = []
        coord_x2 = []
        coord_y2 = []

        choice = [self.tile_size, -self.tile_size]

        i = 0

        while i < self.num_tiles:
            #we get x1,y1 randomly within the range of the tile size, centered on the mitotic coordinates
            x_choice = np.random.randint(x_mitotic - self.tile_size, x_mitotic + self.tile_size, 1)
            y_choice = np.random.randint(y_mitotic - self.tile_size, y_mitotic + self.tile_size, 1)

            #We generate x2 and y2 candidates, and we check if the mitotic coordinates are contained in the patch
            x2_candidate = x_choice + (random.choice(choice))
            y2_candidate = y_choice + (random.choice(choice))

            if (0 < x_choice[0] < x_image) and (0 < y_choice[0] < y_image) \
            and (0 < x2_candidate[0] < x_image) and (0 < y2_candidate[0] < y_image) \
            and (x_mitotic - self.tile_size < x2_candidate < x_mitotic + self.tile_size) \
            and (y_mitotic - self.tile_size < y2_candidate < y_mitotic + self.tile_size)\
            and (abs(x_choice - x2_candidate) == self.tile_size and abs(y_choice - y2_candidate == self.tile_size)):

                coord_x1.append(x_choice)
                coord_y1.append(y_choice)

                coord_x2.append(x2_candidate)
                coord_y2.append(y2_candidate)
                i += 1

        return (coord_x1, coord_x2, coord_y1, coord_y2)
    
    

    def generate_tiles(self, frame):
        """calls both functions for generating positive and negative patches and stores the images in
        two separate lists (lists of numpy arrays)"""
        
        for m_coordinates in self.mitotic_coordinates:
            coordinates = (m_coordinates.x, m_coordinates.y)
            confidence = m_coordinates.confidence

            pos_x1_coord, pos_x2_coord, pos_y1_coord, pos_y2_coord = \
                self.generate_positive_patches(coordinates)
    
            
            neg_x1_coord, neg_x2_coord, neg_y1_coord, neg_y2_coord = \
                self.generate_negative_patches(coordinates)
    
    
            for i in range(self.num_tiles):
                x1_mitotic = int(min(pos_x1_coord[i], pos_x2_coord[i]))
                x2_mitotic = int(max(pos_x1_coord[i], pos_x2_coord[i]))
                y1_mitotic = int(min(pos_y1_coord[i], pos_y2_coord[i]))
                y2_mitotic = int(max(pos_y1_coord[i], pos_y2_coord[i]))
                
                individual_mitotic_patch = self.image[x1_mitotic:x2_mitotic, y1_mitotic:y2_mitotic, :]
                tile_mitosis = Tile(individual_mitotic_patch)
                record_tile = get_cell_coordinates_in_tile(x1_mitotic,y1_mitotic,\
                              coordinates[0],coordinates[1],confidence)
                tile_mitosis.update_records(record_tile)
                # Checkear si que el tile creado contiene mas de una céluala mitótica.
                for record in self.frame.records:
                    if (x1_mitotic < record.x <x2_mitotic) \
                    and (y1_mitotic < record.y <y2_mitotic) \
                    and (m_coordinates.x != record.x) \
                    and (m_coordinates.y != record.y):
                        record_tile_plus = get_cell_coordinates_in_tile(x1_mitotic,y1_mitotic,\
                              record.x,record.y,record.confidence)
                        tile_mitosis.update_records(record_tile_plus)
                
                
                self.frame.update_tiles_mitosis(tile_mitosis)
                

                x1_not_mitotic = int(min(neg_x1_coord[i], neg_x2_coord[i]))
                x2_not_mitotic = int(max(neg_x1_coord[i], neg_x2_coord[i]))
                y1_not_mitotic = int(min(neg_y1_coord[i], neg_y2_coord[i]))
                y2_not_mitotic = int(max(neg_y1_coord[i], neg_y2_coord[i]))
    
                individual_not_mitotic_patch = self.image[x1_not_mitotic:x2_not_mitotic, y1_not_mitotic:y2_not_mitotic, :]
                tile_not_mitosis = Tile(individual_not_mitotic_patch)
                self.frame.update_tiles_not_mitosis(tile_not_mitosis)

                
class Tile:
    """Class of all sub-frames and the specific position of mitotic cells in them."""
    def __init__(self, image, records=None):
        self.tile = image
        self.records = records or []
    
    def update_records(self,record):
        self.records += [record]