In [6]:
import cv2
import json
import math
import matplotlib.colors as colors
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re

from pathlib import Path
from PIL import Image


# Path
DATA_DIR = "raw"

# Regex
LABEL_IMG_PATTERN = '.*\.(JPG|png)'
LABEL_MASK_PATTERN = '.*\.json'
MSI_IMG_PATTERN = '.*\.png'

cwd = Path.cwd()
data_folder = Path(cwd, DATA_DIR)

In [7]:
class Sample:

    def __init__(self, data_folder, sample_info):
        self.data_folder = data_folder
        self.sample_id = None
        self.sample_path = None
        self.references = None
        self.data = None
        self._init_sample_info(sample_info)

    def _init_sample_info(self, sample_info):
        self.sample_id = sample_info.get('id')
        self.sample_path = Path(self.data_folder, self.sample_id)
        self.references = sample_info.get('ref_runs')
        self.data = sample_info.get('data_runs')
        self.transforms = sample_info.get('transforms')

    def __repr__(self):
        return f"Sample(id={self.sample_id})"
    
    # Data Images
    def get_data_paths(self, with_cap=False, with_root=True):
        """Return list of key-value pairs of label image and run image paths.
        Args:
            with_cap (bool): Whether to include capillary.

        Return: [
            {
                'run_group': str,
                'run_id': str,
                'label_img_path': Path,
                'label_mask_path': Path,
                'run_img_dir': Path,
                'run_img_paths': [Path, Path, ...]
            },
        ]
        """
        
        labelled_folder = self.data.get('capillary').get('included') if with_cap else self.data.get('capillary').get('excluded')
        labelled_img_path = Path(self.sample_path, *self.data.get("label_dir"))
        labelled_img_path = Path(labelled_img_path, labelled_folder)
        
        if not labelled_img_path.exists():
            raise Exception(f"Labelled image folder not found for sample: {self.sample_id} — {labelled_img_path} - with_cap: {with_cap}")

        paths = []
        run_group = self.data.get('runs').keys()
        for group in run_group:
            group_path = Path(labelled_img_path, group)
            if not group_path.exists():
                print(f"Labelled image folder not found for sample: {self.sample_id} — {group_path} - with_cap: {with_cap}")
                continue
            label_img_path = [x for x in group_path.iterdir() if re.match(LABEL_IMG_PATTERN, x.name)][0]
            label_mask_path = [x for x in group_path.iterdir() if re.match(LABEL_MASK_PATTERN, x.name)][0]

            run_ids = self.data.get('runs').get(group)
            for run_id in run_ids:
                run_path = Path(data_folder, self.sample_id, run_id)
                run_img_paths = [x for x in run_path.iterdir() if re.match(MSI_IMG_PATTERN, x.name)]

                try:
                    if with_root:
                        paths.append({
                            'run_group': group,
                            'run_id': run_id,
                            'label_img_path': label_img_path,
                            'label_mask_path': label_mask_path,
                            'run_img_dir': run_path,
                            'run_img_paths': sorted(run_img_paths),
                            'flip': self.transforms.get(group).get('flip'),
                            'rotate': self.transforms.get(group).get('rotate'),
                        })
                    else:
                        paths.append({
                            'run_group': group,
                            'run_id': run_id,
                            'label_img_path': label_img_path.relative_to(data_folder),
                            'label_mask_path': label_mask_path.relative_to(data_folder),
                            'run_img_dir': run_path.relative_to(data_folder),
                            'run_img_paths': sorted([x.relative_to(data_folder) for x in run_img_paths]),
                            'flip': self.transforms.get(group).get('flip'),
                            'rotate': self.transforms.get(group).get('rotate'),
                        })
                except:
                    print(f"Transform not found for sample: {self.sample_id} — {group} - {run_id} - with_cap: {with_cap}")
                    
        
        return paths

    # Reference Images
    def get_reference_paths(self, with_root=True):
        """Return list of key-value pairs of reference image paths.
        
        Return: [
            {
                'run_group': str,
                'run_id': str,
                'run_img_dir': Path,
                'run_img_paths': [Path, Path, ...]  
            },
        ]
        """
        
        paths = []
        ref_imgs = self.references.keys()
        for ref_img in ref_imgs:
            run_ids = self.references.get(ref_img)
            for run_id in run_ids:
                if run_id == "":
                    run_path = Path(data_folder, self.sample_id)
                else:
                    run_path = Path(data_folder, self.sample_id, run_id)
                run_img_paths = [x for x in run_path.iterdir() if re.match(MSI_IMG_PATTERN, x.name)]
                
                if with_root:
                    paths.append({
                        'run_group': ref_img,
                        'run_id': run_id,
                        'run_img_dir': run_path,
                        'run_img_paths': sorted(run_img_paths),
                        'flip': None,
                        'rotate': None,
                    })
                else:
                    paths.append({
                        'run_group': ref_img,
                        'run_id': run_id,
                        'run_img_dir': run_path.relative_to(data_folder),
                        'run_img_paths': sorted([x.relative_to(data_folder) for x in run_img_paths]),
                        'flip': None,
                        'rotate': None,
                    })
        
        return paths

    # Sample Dataframe
    def get_sample_df(self, with_cap=False):
        """Return dataframe of sample data.
        Args:
            with_cap (bool): Whether to include capillary.

        Return: pd.DataFrame
        """
        
        paths = self.get_data_paths(with_cap, with_root=False)
        paths += self.get_reference_paths(with_root=False)
        paths.sort(key=lambda x: x['run_id'])

        df = pd.DataFrame(paths)
        df['sample_id'] = self.sample_id
        df['label_img_path'] = df['label_img_path'].astype(str)
        df['label_mask_path'] = df['label_mask_path'].astype(str)
        df['run_img_dir'] = df['run_img_dir'].astype(str)
        df['run_img_paths'] = df['run_img_paths'].apply(lambda x: [str(y) for y in x])
        df['flip'] = df['flip'].astype(str)
        df['rotate'] = df['rotate'].astype(str)

        df = df[['sample_id', 'run_group', 'run_id', 'label_img_path', 'label_mask_path', 'run_img_dir', 'run_img_paths', 'flip', 'rotate']]
        
        return df


In [8]:
sample_infos = json.load(Path('hs-info.json').open())
samples = [Sample(data_folder, sample_info) for sample_info in sample_infos.get('data')]
sample_dfs = [sample.get_sample_df(False) for sample in samples]

In [9]:
df = pd.concat(sample_dfs)
df

Unnamed: 0,sample_id,run_group,run_id,label_img_path,label_mask_path,run_img_dir,run_img_paths,flip,rotate
0,HS001,orient,Run01,,,HS001/Run01,"[HS001/Run01/image sample 440nm.png, HS001/Run...",,
1,HS001,white,Run02,,,HS001/Run02,"[HS001/Run02/image sample 440nm.png, HS001/Run...",,
2,HS001,white,Run03,,,HS001/Run03,"[HS001/Run03/image sample 440nm.png, HS001/Run...",,
3,HS001,Run 04-08,Run04,HS001/Labelling data/Definitive segmentation/H...,HS001/Labelling data/Definitive segmentation/H...,HS001/Run04,"[HS001/Run04/image sample 440nm.png, HS001/Run...",v,0.0
4,HS001,Run 04-08,Run05,HS001/Labelling data/Definitive segmentation/H...,HS001/Labelling data/Definitive segmentation/H...,HS001/Run05,"[HS001/Run05/image sample 440nm.png, HS001/Run...",v,0.0
...,...,...,...,...,...,...,...,...,...
6,HS020,Run 06-07,Run07,HS020/Labelling data/Definitive segmentations/...,HS020/Labelling data/Definitive segmentations/...,HS020/Run07,"[HS020/Run07/image sample 440nm 1.png, HS020/R...",h,0.0
7,HS020,Run 08 and 10,Run08,HS020/Labelling data/Definitive segmentations/...,HS020/Labelling data/Definitive segmentations/...,HS020/Run08,"[HS020/Run08/image sample 440nm 1.png, HS020/R...",h,0.0
8,HS020,Run 09 and 11,Run09,HS020/Labelling data/Definitive segmentations/...,HS020/Labelling data/Definitive segmentations/...,HS020/Run09,"[HS020/Run09/image sample 440nm 1.png, HS020/R...",h,0.0
9,HS020,Run 08 and 10,Run10,HS020/Labelling data/Definitive segmentations/...,HS020/Labelling data/Definitive segmentations/...,HS020/Run10,"[HS020/Run10/image sample 440nm 1.png, HS020/R...",h,0.0


In [10]:
df.to_hdf('data.h5', key='df', mode='w') # Save dataframe to hdf5 file
df.to_csv('data.csv', index=False) # Save dataframe to csv file

your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block0_values] [items->Index(['sample_id', 'run_group', 'run_id', 'label_img_path', 'label_mask_path',
       'run_img_dir', 'run_img_paths', 'flip', 'rotate'],
      dtype='object')]

  encoding=encoding,
