# Making Datasets for PPSN

## Imports

In [1]:
import os
import random
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.model_selection import StratifiedGroupKFold

current_working_directory = os.getcwd()
print(current_working_directory)

/home/bandheyh/scikit-FIBERS-ryan_dev


## Setting up Local Parameters

In [2]:
local_save = True
folder_path = None
if local_save:
    output_folder = './PPSNDatasets/'
else:
    output_folder = folder_path
if not os.path.exists(output_folder):
        os.makedirs(output_folder)
print(output_folder)

./PPSNDatasets/


In [3]:
random.seed(42)
np.random.seed(42)

In [4]:
class Dataset:
    def __init__(self, dataset_path="./PPSNDatasets/realworld_imp1.csv", outcome_label="graftyrs", censoring_label="grf_fail"):
        """
        Creates dataset with path of tabular file

        Args:
            dataset_path: path of tabular file (as csv, tsv, or txt)
            outcome_label: column label for the outcome to be predicted in the dataset
            censoring_label: column to identify unique groups of instances in the dataset \
            that have been 'matched' as part of preparing the dataset with cases and controls \
            that have been matched for some co-variates \
            Match label is really only used in the cross validation partitioning \
            It keeps any set of instances with the same match label value in the same partition.
            instance_label: Instance label is mostly used by the rule based learner in modeling, \
            we use it to trace back heterogeneous subgroups to the instances in the original dataset

        """
        self.data = None
        self.path = dataset_path
        self.name = self.path.split('/')[-1].split('.')[0]
        self.format = self.path.split('/')[-1].split('.')[-1]
        self.outcome_label = outcome_label
        self.censoring_label = censoring_label

    def load_data(self):
        """
        Function to load data in dataset
        """
        if self.format == 'csv':
            self.data = pd.read_csv(self.path, na_values='NA', sep=',')
        elif self.format == 'tsv':
            self.data = pd.read_csv(self.path, na_values='NA', sep='\t')
        elif self.format == 'txt':
            self.data = pd.read_csv(self.path, na_values='NA', delim_whitespace=True)
        else:
            raise Exception("Unknown file format")

        # Remove any whitespace from ends of individual data cells
        self.data.columns = self.data.columns.str.strip()

        if not (self.outcome_label in self.data.columns):
            raise Exception("Outcome label not found in file")
        if self.censoring_label and not (self.censoring_label in self.data.columns):
            raise Exception("Censoring label not found in file")


In [5]:
class KFoldPartitioner:
    """
    Base class for KFold CrossValidation Operations on dataset, Initialization for KFoldPartitioner base class
    """

    def __init__(self, dataset_path="./PPSNDatasets/realworld_imp1.csv", partition_method="Random", experiment_path="./PPSNDatasets/", n_splits=10, random_state=42):
        """

        Args:
            dataset: a streamline.utils.dataset.Dataset object or a path to dataset text file
            partition_method: KFold CV method used for partitioning, must be one of ["Random", "Stratified", "Group"]
            experiment_path: path to experiment the logging directory folder
            n_splits: number of splits in k-fold cross validation
            random_state: random seed parameter for data reproducibility
        """
        super().__init__()
        self.dataset = Dataset(dataset_path)
        self.dataset_path = self.dataset.path
        self.experiment_path = experiment_path
        self.n_splits = n_splits
        self.random_state = random_state

        self.supported_ptn_methods = ["Random", "Stratified", "Group"]

        if partition_method not in self.supported_ptn_methods:
            raise Exception('Error: Unknown partition method.')
        if partition_method == "Group" and self.dataset.censoring_label is None:
            raise Exception("No Match Label in dataset")

        self.partition_method = partition_method
        self.train_dfs = None
        self.test_dfs = None
        self.cv = None

    def cv_partitioner(self, return_dfs=True, save_dfs=True, partition_method=None):
        """

        Takes data frame (data), number of cv partitions, partition method (R, S, or M), class label,
        and the column name used for matched CV. Returns list of training and testing dataframe partitions.

        Args:
            return_dfs: flag to return splits as list of dataframe, returns empty list if set to False
            save_dfs: save dataframes in experiment path folder
            partition_method: override default partition method

        Returns: train_df, test_df both list of dataframes of train and test splits

        """

        if partition_method:
            self.partition_method = partition_method

        train_dfs, test_dfs = list(), list()

        # Random Partitioning Method
        if self.partition_method == 'Random':
            cv = KFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        # Stratified Partitioning Method
        elif self.partition_method == 'Stratified':
            cv = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        # Group Partitioning Method
        elif self.partition_method == 'Group':
            cv = StratifiedGroupKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        else:
            raise Exception('Error: Requested partition method not found.')

        self.cv = cv

        if return_dfs:
            if self.partition_method == "Group":
                if self.dataset.censoring_label is None:
                    raise Exception("No Match Label in dataset")
                for train_index, test_index in cv.split(self.dataset.data,
                                                        self.dataset.data[self.dataset.outcome_label],
                                                        self.dataset.data[self.dataset.censoring_label]):
                    train_dfs.append(self.dataset.data.iloc[train_index, :])
                    test_dfs.append(self.dataset.data.iloc[test_index, :])
            else:
                for train_index, test_index in cv.split(self.dataset.data,
                                                        self.dataset.data[self.dataset.outcome_label]):
                    train_dfs.append(self.dataset.data.iloc[train_index, :])
                    test_dfs.append(self.dataset.data.iloc[test_index, :])
            self.train_dfs = train_dfs
            self.test_dfs = test_dfs

        if save_dfs:
            self.save_datasets(self.experiment_path, self.train_dfs, self.test_dfs)

        return self.train_dfs, self.test_dfs

    def save_datasets(self, experiment_path=None, train_dfs=None, test_dfs=None):
        """ Saves individual training and testing CV datasets as .csv files"""
        # Generate folder to contain generated CV datasets

        if experiment_path is None:
            experiment_path = self.experiment_path

        train_dfs, test_dfs = train_dfs, test_dfs

        if train_dfs is None and test_dfs is None:
            if self.train_dfs is None and self.test_dfs is None:
                train_dfs, test_dfs = list(), list()
                if self.partition_method == "Group":
                    for train_index, test_index in self.cv.split(self.dataset.feature_only_data(),
                                                                 self.dataset.data[self.dataset.outcome_label],
                                                                 self.dataset.data[self.dataset.censoring_label]):
                        train_dfs.append(self.dataset.data.iloc[train_index, :])
                        test_dfs.append(self.dataset.data.iloc[test_index, :])
                else:
                    for train_index, test_index in self.cv.split(self.dataset.feature_only_data(),
                                                                 self.dataset.data[self.dataset.outcome_label]):
                        train_dfs.append(self.dataset.data.iloc[train_index, :])
                        test_dfs.append(self.dataset.data.iloc[test_index, :])
            else:
                train_dfs, test_dfs = self.train_dfs, self.test_dfs

        if not os.path.exists(experiment_path + '/' + self.dataset.name + '/CVDatasets'):
            os.makedirs(experiment_path + '/' + self.dataset.name + '/CVDatasets')

        # Export training datasets
        counter = 0
        for df in train_dfs:
            file = experiment_path + '/' + self.dataset.name + '/CVDatasets/' + self.dataset.name \
                   + '_CV_' + str(counter) + "_Train.csv"
            df.to_csv(file, index=False)
            counter += 1

        counter = 0
        for df in test_dfs:
            file = experiment_path + '/' + self.dataset.name + '/CVDatasets/' + self.dataset.name \
                   + '_CV_' + str(counter) + "_Test.csv"
            df.to_csv(file, index=False)

## Making Datasets

In [6]:
# import dask
# from dask.distributed import Client
# from dask_jobqueue import SLURMCluster, LSFCluster, SGECluster

In [7]:
# def get_cluster(cluster_type='SLURM', output_path=".", queue='defq', memory=16):
#     client = None
#     try:
#         if cluster_type == 'SLURM':
#             cluster = SLURMCluster(queue=queue,
#                                    cores=1,
#                                    memory=str(memory) + "G",
#                                    walltime="24:00:00",
#                                    log_directory=output_path + "/dask_logs/")
#             cluster.adapt(maximum_jobs=400)
#         elif cluster_type == "LSF":
#             cluster = LSFCluster(queue=queue,
#                                  cores=1,
#                                  mem=memory * 1000000000,
#                                  memory=str(memory) + "G",
#                                  walltime="24:00",
#                                  log_directory=output_path + "/dask_logs/")
#             cluster.adapt(maximum_jobs=400)
#         elif cluster_type == 'UGE':
#             cluster = SGECluster(queue=queue,
#                                  cores=1,
#                                  memory=str(memory) + "G",
#                                  resource_spec="mem_free=" + str(memory) + "G",
#                                  walltime="24:00:00",
#                                  log_directory=output_path + "/dask_logs/")
#             cluster.adapt(maximum_jobs=400)
#         elif cluster_type == 'Local':
#             c = Client()
#             cluster = c.cluster
#         else:
#             raise Exception("Unknown or Unsupported Cluster Type")
#         client = Client(cluster)
#     except Exception as e:
#         print(e)
#         raise Exception("Exception: Unknown Exception")
#     print("Running dask-cluster")
#     print(client.scheduler_info())
#     return client

In [8]:
def runner_fn(obj):
    obj.dataset.load_data()
    obj.cv_partitioner()

In [9]:
kfold = KFoldPartitioner()

In [10]:
# results = dask.compute([dask.delayed(runner_fn)(obj) for obj in [kfold,]])
runner_fn(kfold)

  self.data = pd.read_csv(self.path, na_values='NA', sep=',')
