In [None]:
import os
from google.colab import drive
drive.mount("/content/drive")
os.chdir("/content/drive/MyDrive/DP")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import glob
import time
import re
import argparse
import nibabel as nib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import shutil
import errno

In [None]:
NORMAL = 'NOR'
MINF = 'MINF'
DCM = 'DCM'
HCM = 'HCM'
RV = 'RV'

In [None]:
def copy(src, dest):
  """
  Copy function
  """
  try:
      shutil.copytree(src, dest, ignore=shutil.ignore_patterns())
  except OSError as e:
      # If the error was caused because the source wasn't a directory
      if e.errno == errno.ENOTDIR:
          shutil.copy(src, dest)
      else:
          print('Directory not copied. Error: %s' % e)

def read_patient_cfg(path):
    """
    Reads patient data in the cfg file and returns a dictionary
    """
    patient_info = {}
    with open(os.path.join(path, 'Info.cfg')) as f_in:
      for line in f_in:
        l = line.rstrip().split(": ")
        patient_info[l[0]] = l[1]
    return patient_info

def group_patient_cases(src_path, out_path, force=False):
  """ Group the patient data according to cardiac pathology"""

  cases = sorted(next(os.walk(src_path))[1])
  dest_path = os.path.join(out_path, 'Patient_Groups_test')
  if force:
    shutil.rmtree(dest_path)
  if os.path.exists(dest_path):
    return dest_path

  os.makedirs(dest_path)
  os.mkdir(os.path.join(dest_path, NORMAL))
  os.mkdir(os.path.join(dest_path, MINF))
  os.mkdir(os.path.join(dest_path, DCM))
  os.mkdir(os.path.join(dest_path, HCM))
  os.mkdir(os.path.join(dest_path, RV))

  for case in cases:
    full_path = os.path.join(src_path, case)
    copy(full_path, os.path.join(dest_path,\
        read_patient_cfg(full_path)['Group'], case))

In [None]:
src_path = "/content/drive/MyDrive/DP/training/"
out_path = "/content/drive/MyDrive/DP/"

In [None]:
group_patient_cases(src_path, out_path , force = True)

In [None]:
def generate_train_validate_test_set(src_path, dest_path):
  """
  Split the data into 70:15:15 for train-validate-test set
  arg: path: input data path
  """
  SPLIT_TRAIN = 0.70
  SPLIT_VALID = 0.15

  dest_path = os.path.join(dest_path,'dataset2')
  if os.path.exists(dest_path):
    shutil.rmtree(dest_path)
  os.makedirs(os.path.join(dest_path, 'train_set'))
  os.makedirs(os.path.join(dest_path, 'validation_set'))
  os.makedirs(os.path.join(dest_path, 'test_set'))
  # print (src_path)
  groups = next(os.walk(src_path))[1]
  for group in groups:
    group_path = next(os.walk(os.path.join(src_path, group)))[0]
    patient_folders = next(os.walk(group_path))[1]
    np.random.shuffle(patient_folders)
    train_ = patient_folders[0:int(SPLIT_TRAIN*len(patient_folders))]
    valid_ = patient_folders[int(SPLIT_TRAIN*len(patient_folders)):
                 int((SPLIT_TRAIN+SPLIT_VALID)*len(patient_folders))]
    test_ = patient_folders[int((SPLIT_TRAIN+SPLIT_VALID)*len(patient_folders)):]
    for patient in train_:
      folder_path = os.path.join(group_path, patient)
      copy(folder_path, os.path.join(dest_path, 'train_set', patient))

    for patient in valid_:
      folder_path = os.path.join(group_path, patient)
      copy(folder_path, os.path.join(dest_path, 'validation_set', patient))

    for patient in test_:
      folder_path = os.path.join(group_path, patient)
      copy(folder_path, os.path.join(dest_path, 'test_set', patient))

In [None]:
src = "/content/drive/MyDrive/DP/Patient_Groups_test/"
dest = "/content/drive/MyDrive/DP/"

In [None]:
generate_train_validate_test_set(src, dest)

In [None]:
path = "/content/drive/MyDrive/DP/testing/patient101/"

In [None]:
read_patient_cfg(path)

{'ED': '1', 'ES': '14', 'Height': '169.0', 'NbFrame': '30', 'Weight': '79.0'}