In order to speed up processing time when running classification algorithms, it is often useful to choose only the most "best" genes to use.  There are various algorithms available to choose genes, however here we use Chi2 Select K best.  K is how many genes you wish to use for testing stability.  More genese is usually better, however again in order to speed up processing time we limit the number of genes used.  This program allows you to set a min and max number of genes and an interval.  This will in turn setup numpy arrays with class and the select number of genes for further processing by FASTR and FASTrand.

### Libraries
Must be pre-installed.  Recommended to use virtual environment.

In [325]:
import numpy as np
from enum import Enum
from random import shuffle
from os import path, getcwd, makedirs
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import StratifiedKFold
from collections import Counter
from sklearn.metrics import mean_squared_error
from scipy.sparse.linalg import lsqr
from math import sqrt
from random import uniform, choice
from sklearn import svm
from sklearn import datasets

## Methods and Classes

In [163]:
class AlterStrategy(Enum):
    GREEDY = 0
    CHI2 = 1
    RAND = 2
    PERCENT = 3

### NBC.py

In [522]:
class Model:
    """Describes the model class."""

    def __init__(self, samples, eps, class_label):
        """Initialize the model class.

        Args:
            samples: training samples of size [samples, genes].
            eps: epsilon value for correlation cutoff.
            class_label: classification label.
        """

        self.class_label = class_label
        self.samples = np.array(samples)
        self.eps = eps

        # columns are variables, rows are samples
        self.correlation = np.corrcoef(self.samples, y=None, rowvar=False)

        # note that the mask is actually the graph
        self.mask = (np.absolute(self.correlation) > self.eps)

        # the coefficients associated with the system of equation: Ax=b,
        # where A is an equation list created from the neighbors of gene
        # n and b is the value of gene n.
        self.geneFuncMasks = []  # these are the coefficients in Ax=b
        for gene in range(len(self.correlation)):
            currMask = self.mask[gene]
            setOfNeighbors = []
            solutions = []
            for sample in self.samples:
                neighbors = [sample[neighbor] if (currMask[neighbor] and (gene != neighbor))
                             else 0 for neighbor in range(len(currMask))]
                neighbors.append(1)
                setOfNeighbors.append(neighbors)
                solutions.append(sample[gene])
            coeff = self.solver(setOfNeighbors, solutions, 2)
            self.geneFuncMasks.append(coeff.tolist())

        self.coefficients = np.array(self.geneFuncMasks)

    def solver(self, neighbors, sols, choice):
        # Use lsqr to solve Ax=b
        A = np.array(neighbors)
        b = np.array(sols)
        x = lsqr(A, b)[0]
        return x

    def expression(self, sample):
        """Given a sample, return the hypothetical expression.

        Args:
            sample: the sample whose hypothetical expression we wish to
            calculate
        Returns:
            expr: A list with the expression values of size number of genes.
        """
        expression = []
        for gene in range(len(self.coefficients)):
            geneVal = 0
            for neighbor in range(len(self.mask)-1):
                geneVal += self.coefficients[gene][neighbor] * sample[neighbor]
            geneVal += self.coefficients[gene][len(self.mask)]
            expression.append(geneVal)
        return np.array(expression)

    def label (self):
        """Return the classification label of this model."""
        return self.class_label


class NetworkBasedClassifier:
    """Describes the NBClassifier class."""

    def __init__(self, epsilon):
        """Initialize a NBF classifier.

        Args:
            eps: epsilon value
        """
        self.models = []
        self.epsilon = epsilon

    def fit(self, X, y):
        """Fit the data with classes to create class models.

        Fits the data [num_samples, num_genes] with classifications
        [num_samples] to the model.  Creates as many models as classes.

        Args:
            X: the data we wish to train the classifier on
            y: the classifications associated with the samples
        """
        y = np.array(y)
        X = np.array(X)
        for key in Counter(y):
            a_class = np.where(y == key)
            self.models.append(Model([X[i] for i in a_class[0]], self.epsilon, key))

            
    def score(self, X, y):
        """Scores the classifications of a given set of samples (X) according to their
        actual clsssifications (y).

        Must fit the classifier before this method is called.

        Args:
            samples: the samples we wish to predict classification for.

        Returns:
            accuracy: the classification accuracy
        """
        y = np.array(y)
        X = np.array(X)
        predicted = self.predict(X)        
        correct = np.asarray(predicted == y)
        return np.sum(correct)/correct.shape[0]        
        
        
    def predict(self, X):
        """Predict the classification of a sample.

        Must fit the classifier before this method is called.

        Args:
            samples: the samples we wish to predict classification for.

        Returns:
            classifications: the classifications of the samples.
        """
        classifications = []
        for sample in X:
            RMSEs = []
            for model in self.models:
                rmse = sqrt( mean_squared_error(sample, model.expression(sample)))
                RMSEs.append(rmse)
            min_index = RMSEs.index(min(RMSEs))
            label = self.models[min_index].label()
            classifications.append(label)
        return np.array(classifications)

### Common.py

In [511]:
def getAccuracy ( index, chosen, notChosen, series, size, type, change, flag = False, folds = 10, repeats = 10 ):

    if ( change == 0 ):
        test = [ i for i in chosen ]
        test.append( notChosen[ index ] )

    if ( change == 1 or change == 2 or change == 3 ):
        test = index

    classes = np.load ( "FAST/%s_classes.npy" % series )
    exprs = np.load ( "FAST/%s_%03d_fs_genes.npy" % ( series, size ) )

    training, testing = kFoldInd ( classes, folds )

    if (flag):
        classifiers = multi.getClassifiers( type, training, classes, exprs )
        avg = multi.avgAccuracy( classifiers, testing, classes, exprs, change, test, series )
    else :
        classifiers = serial.getClassifiers( type, training, classes, exprs )
        avg = serial.avgAccuracy( classifiers, testing, classes, exprs, change, test, series  )

    return index, avg

### Alter.py

In [512]:
def alter (exprs, percent):
    result = []
    for expr in exprs:
        alt = []
        for gene in expr:
            _offset = gene * percent
            _low = gene - _offset
            _high = gene + _offset
            alt.append(choice([_low, _high]))
        result.append(alt)
    return np.array(result)

## START MAIN PROGRAM

###### Enter the series and feature_size to use
Must be all upper case. e.g. `"GSE27562"`

In [480]:
series = "GSE19804"
feature_size = 10
fs_strategy = fsStrategy.KBEST

### Get/Create Directories
Assumes this notebook is in `GenClass-Stability/main/notebooks/`

In [481]:
notebook_dir = getcwd();
main_dir = path.dirname(path.dirname(notebook_dir))
load_path = path.join(main_dir, "GSE", series)
gsa_path = path.join(main_dir,"GSA", series, str(feature_size))
if not path.exists(gsa_path):
    makedirs(gsa_path)

### Import Classes and Expressions
Load original data. Assumes SIT and custome GSE script have been run to import data.

In [482]:
classes =np.loadtxt(path.join(load_path, "classes.txt"), dtype=np.str, delimiter="\t")
exprs = np.loadtxt(path.join(load_path, "exprs.txt"), delimiter="\t")

Select K best genes for analysis.

In [483]:
b = SelectKBest(chi2, feature_size).fit(exprs, classes)
a = b.get_support(indices = True)
exprs = exprs[:, a]

Save the selected expression data for potential later use.

In [484]:
np.save(path.join(gsa_path,"exprs.npy"), exprs)
np.save(path.join(gsa_path,"classses.npy"), classes)

## Stability Test I

In [485]:
exprs.shape

(120, 10)

In [486]:
#exprs_alt = alter(exprs,1)
#exprs_alt

In [487]:
#import multiprocessing
#multiprocessing.cpu_count()

In [516]:
def crossValidate(estimator, X, y, cv=10, alt=0):
    scores = []
    skf = StratifiedKFold(cv)
    for train_index, test_index in skf.split(X, y):
        estimator.fit(X[train_index], y[train_index]) 
        accuracy = estimator.score(alter(X[test_index], alt), y[test_index])
        scores.append(accuracy)
    return np.array(scores)

In [533]:
svc = NetworkBasedClassifier( 0.8 )
scores = crossValidate(svc, exprs, classes, 10, 1)
print(scores)
print("Accuracy: %0.2f (+/- %0.2f)" % (scores.mean(), scores.std() * 2))

[0.66666667 0.58333333 0.83333333 0.58333333 0.41666667 0.75
 0.58333333 0.66666667 0.5        0.33333333]
Accuracy: 0.59 (+/- 0.28)
