In [2]:
import logging
import threading
import datetime
import SimpleITK as sitk
import six
import radiomics
from radiomics import featureextractor 
import os
from os import cpu_count
from multiprocessing import Pool
import csv
from collections import OrderedDict
import pickle
import shutil
import numpy as np

# Parallel processing variables
TEMP_DIR = '_TEMP'
REMOVE_TEMP_DIR = True  # Remove temporary directory when results have been successfully stored into 1 file
NUM_OF_WORKERS = cpu_count()-1  # Number of processors to use, keep one processor free for other work
if NUM_OF_WORKERS < 1:  # in case only one processor is available, ensure that it is used
  NUM_OF_WORKERS = 1
HEADERS = None  # headers of all extracted features
ROOT = '/media/mts_dbs/mclaro/pyradiomics_parallel'
OUTPUTCSV = 'out_csvs'
INPUTCSV = 'in_csvs'

with open('./pickles/segs', "rb") as fp:  
    segs = pickle.load(fp)

with open('./pickles/qsms', "rb") as fp:  
    qsms = pickle.load(fp)




def run(case):
  global ROOT, TEMP_DIR
  ptLogger = logging.getLogger('radiomics.batch')

  feature_vector = OrderedDict(case)

  try:
    # set thread name to patient name
    threading.current_thread().name = case['Patient']

    filename = r'features_' + str(case['Reader']) + '_' + str(case['Patient']) + '.csv'
    output_filename = os.path.join(ROOT, TEMP_DIR, filename)

    if os.path.isfile(output_filename):
      # Output already generated, load result (prevents re-extraction in case of interrupted process)
      with open(output_filename, 'w') as outputFile:
        reader = csv.reader(outputFile)
        headers = reader.rows[0]
        values = reader.rows[1]
        feature_vector = OrderedDict(zip(headers, values))

      ptLogger.info('Patient %s read by %s already processed...', case['Patient'], case['Reader'])

    else:
      t = datetime.now()

      imageFilepath = case['Image']  # Required
      maskFilepath = case['Mask']  # Required
      label = case.get('Label', None)  # Optional

      # Instantiate Radiomics Feature extractor
      # Generate feature structure Phi from all ROIs and all cases
      extractor = featureextractor.RadiomicsFeatureExtractor()
      extractor.enableAllFeatures()
      extractor.enableAllImageTypes()
      extractor.enableFeatureClassByName('shape2D',enabled = False)


      # Extract features
      voxel_size = ((0.5,0.5,0.5))
      seg_sitk = sitk.GetImageFromArray(segs[case])
      seg_sitk.SetSpacing(voxel_size)
      qsm_sitk_gt = sitk.GetImageFromArray(qsms[case])
      qsm_sitk_gt.SetSpacing(voxel_size)
      feature_vector.update(extractor.execute(qsm_sitk_gt, seg_sitk, label=label))

      # Store results in temporary separate files to prevent write conflicts
      # This allows for the extraction to be interrupted. Upon restarting, already processed cases are found in the
      # TEMP_DIR directory and loaded instead of re-extracted
      with open(output_filename, 'w') as outputFile:
        writer = csv.DictWriter(outputFile, fieldnames=list(feature_vector.keys()), lineterminator='\n')
        writer.writeheader()
        writer.writerow(feature_vector)

      # Display message

      delta_t = datetime.now() - t

      ptLogger.info('Patient %s read by %s processed in %s', case['Patient'], case['Reader'], delta_t)

  except Exception:
    ptLogger.error('Feature extraction failed!', exc_info=True)

  return feature_vector


def _writeResults(featureVector):
  global HEADERS, OUTPUTCSV

  # Use the lock to prevent write access conflicts
  try:
    with open(OUTPUTCSV, 'a') as outputFile:
      writer = csv.writer(outputFile, lineterminator='\n')
      if HEADERS is None:
        HEADERS = list(featureVector.keys())
        writer.writerow(HEADERS)

      row = []
      for h in HEADERS:
        row.append(featureVector.get(h, "N/A"))
      writer.writerow(row)
  except Exception:
    logging.getLogger('radiomics.batch').error('Error writing the results!', exc_info=True)


if __name__ == '__main__':
  logger = logging.getLogger('radiomics.batch')

  # Ensure the entire extraction is handled on 1 thread
  #####################################################

  sitk.ProcessObject_SetGlobalDefaultNumberOfThreads(1)

  # Set up the pool processing
  ############################

  logger.info('pyradiomics version: %s', radiomics.__version__)
  logger.info('Loading CSV...')

  # Extract List of cases
  cases = np.arange(segs.__len__())


  logger.info('Loaded %d jobs', len(qsms))

  # Make output directory if necessary
  if not os.path.isdir(os.path.join(ROOT, TEMP_DIR)):
    logger.info('Creating temporary output directory %s', os.path.join(ROOT, TEMP_DIR))
    os.mkdir(os.path.join(ROOT, TEMP_DIR))

  # Start parallel processing
  ###########################

  logger.info('Starting parralel pool with %d workers out of %d CPUs', NUM_OF_WORKERS, cpu_count())
  # Running the Pool
  pool = Pool(NUM_OF_WORKERS)
  results = pool.map(run, qsms)

  try:
    # Store all results into 1 file
    with open(OUTPUTCSV, mode='w') as outputFile:
      writer = csv.DictWriter(outputFile,
                              fieldnames=list(results[0].keys()),
                              restval='',
                              extrasaction='raise',  # raise error when a case contains more headers than first case
                              lineterminator='\n')
      writer.writeheader()
      writer.writerows(results)

    if REMOVE_TEMP_DIR:
      logger.info('Removing temporary directory %s (contains individual case results files)',
                  os.path.join(ROOT, TEMP_DIR))
      shutil.rmtree(os.path.join(ROOT, TEMP_DIR))
  except Exception:
    logger.error('Error storing results into single file!', exc_info=True)

KeyboardInterrupt: 