# RDM in an advanced data pipeline on an HPC cluster
## Introduction
In this tutorial are working through parts of the computational pipeline in https://doi.org/10.3389/fgene.2013.00289.
The pipeline deals with the problem to find out the best configuration for predicting the outcome of breast cancer patients based on their gene expression profiles. One major aspect in the performance of the classification is the selection of features. Several methods to select genes or groups of genes are compared in a double-loop cross validation procedure. For simplicity in this tutorial we only focus on two benchmark methods to select genes (Single genes and Random genes) and one method, Lee, which uses pathways to group genes and classfify then with the aggregated statistics over these genes.
We will also only use one classifier model, the neares mean classifier with the standard metric. And to keep running times down we will also only execute a 5-fold cross validation.

The experssion dataset is published on figshare and we will download it during the pipeline from this external source. The curated pathway data, which is necessary for the Lee method to define features, lies in the iRODS instance and is annotated by some metadata. 

## Step-by-step guide
### 1. Install dependencies
Before we start the computationa pipeline, we need to make sure that all necessary python modules are installed

In [None]:
pip install --upgrade sklearn python-irodsclient wget

### 2. Imports
Here we import some standard python modules, the irods python modules, some own functions to ease the interaction with iRODS and of course our own software ACES, which implements the datatypes and analysis.

In [None]:
#Standard python modules
import json
import os
import datetime
import numpy
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import csv
import wget
import pprint

#iRODS python modules
from irods.session import iRODSSession
from irods.models import Collection, DataObject, CollectionMeta, DataObjectMeta
from irods.access import iRODSAccess
#Own python functions to wrap lengthy iRODS code
from helperFunctions import *

#Imports from the ACES software
from SetUpGrid import CombineData
from CreateTokens import generate_tokens
from SetUpGrid import SetUpRun
from SetUpGrid import RunInstance

### 3. Connection to iRODS
Here we define the parameters which are needed to connect to iRODS and setup the connection:

**Please adjust the user, password and share!**

In [None]:
print('Connect to iRODS')
host = "<IP or FQDN>"
port = 1247
user = "<irods user>"
password = "<irods password>"
zone = "aliceZone"
share = "<another irods user or group>" # share the output with this other iRODS user or group
session = iRODSSession(host=host, port=port, user=user, password=password, zone=zone)

### 4. Parameters for our computational pipeline
- Setting up the folder structure on fast storage of the compute cluster. The data stored here is not backed up, nor safely stored, this storage is just used to allow very quick calculations on the data.
- Keywords and their values to search for the correct data in iRODS
- Prepare a collection on the iRODS instance to gather our results

**Please adjust the paths!**

In [None]:
dataDir = "/lustre/scratch/GUESTS/<user>/acesdata"
resultsDir = "/lustre/scratch/GUESTS/<user>/acesresults"
ensure_dir(dataDir)
ensure_dir(resultsDir)

Now we can search for our curated pathway data.

In [None]:
ATTR_NAME = 'DATATYPE'
ATTR_VALUE = 'PATHWAYS'
query = session.query(Collection.name, DataObject.name)
filteredQuery = query.filter(DataObjectMeta.name == ATTR_NAME).\
                          filter(DataObjectMeta.value == ATTR_VALUE)


In [None]:
print(filteredQuery.all())

And download the data to our directory on the scratch file system.

In [None]:
iPaths = iParseQuery(filteredQuery)
print('Downloading: ')
print('\n'.join(iPaths))
iGetList(session, iPaths, dataDir)

We still lack the gene expression dataset. Let us download it:

In [None]:
url = "https://ndownloader.figshare.com/files/4851460"
fileName = "4851460"
wget.download(url, out=dataDir)

During the computational pipeline we will create output data which we would like to directly store in iRODS. So we need to create a dedicated collection:

In [None]:
coll = session.collections.get('/' + zone + '/home/' +user)
collNames = [c.name for c in coll.subcollections]
resultsName = 'aces_results'
tmp = resultsName
count = 0
while resultsName in collNames:
        resultsName = tmp + '_' +str(count)
        count = count + 1
print('Upload results to: '+ coll.path + '/' + resultsName)
coll = session.collections.create(coll.path + '/' + resultsName)

### 5. The ACES pipeline
Now that we have setup our environment, downloaded all necessary data, we can progress and setup our data analysis pipeline.
ACES does a huge parameter sweep by combining classifiers, feature selections methods, their parameters and the 5 splits of the dataset. Each parameter combination is defined in a 'token'. From these tokens the run is initialised and results are created.
But first let's create all combinations between the dataset and the feature selection algorithms and their parameters:

In [None]:
DataAndFeatureExtractors = CombineData()
DataAndFeatureExtractors

For each of these combinations we need to create tokens that define how our gene expression dataset is split, on which splits the classifier is trained and which split serves as testing dataset. Let's do that for **the first** of the items in our CombineData list.
- 'dataset': name of the dataset
- 'fold': the fold of the data that serves as testing data
- 'method': Feature selector algorithm
- 'network': Gene interaction network or pathways data name
- 'repeat': 0 (we do not repeat the 5 fold cross validation)
- 'ShuffleNr': Used to randomised/shuffle the 'network' data (not used in this tutorial)
- 'specific': Set to extra values for the 'method' if necessary


In [None]:
item = DataAndFeatureExtractors[0]
tokens = generate_tokens([item], 1, 5, "PerfTest")
pprint.pprint(tokens)

In [None]:
#Loading all necessary data, pathways etc into memory:
(data, net, featureSelector, classifiers, Dataset2Time) = \
        SetUpRun(item[0], item[1][1], item[1][0][0], datafile = "4851460", datapath=dataDir)

Now we run the classification on each of the tokens.Four of the folds serve as training datasets. On that training dataset we rank the features according to their discriminative power and then subsequently adding them to the Nearest Mean Classifier. The classification performance is evaluated by Area-uner-the-Receiver-Operator-Curve (AUC).

In [None]:
for token in tokens:
        dataset = token['input']['dataset']
        network = token['input']['network']
        method = token['input']['method']
        repeat = token['input']['repeat']
        fold = token['input']['fold']
        print('dataset:', dataset)
        print('network', network)
        print('method', method)
        print('repeat', repeat)
        print('fold', fold)
        (dataName, featureExtractorproductName, netName, shuffle, featureExtractor, AucAndCi) = RunInstance(
                    data, net, featureSelector, None, classifiers, repeat, 5, fold, None, Dataset2Time, None)
        token['output'] = (dataName, featureExtractorproductName, netName, None, shuffle, 
                           featureExtractor, AucAndCi)

In [None]:
pprint.pprint(token['input'])

Let us have a look at the performances. We see the best-ranked feature gives us aleardy an AUC of 0.71:

In [None]:
pprint.pprint(list(token['output'][6]['BinaryNearestMeanClassifier_V1'].items())[:10])

**(optional)** But how does this best feature look like, i.e. which genes were used in that feature? 

In [None]:
fe = json.loads(token['output'][5])
type(fe)
fe[0]
print("name:", fe[0])
print("space (list of all known genes):")
pprint.pprint(fe[1][:10])
print("best ranked featues (index of gene):")
pprint.pprint(fe[2][:10])

To find the Entrez number of the genes in the first feature, one needs to look the index up in the space:

In [None]:
feature = fe[2][1]
[fe[1][idx] for idx in feature]

### 6. Send the results to iRODS
Let us save the list of tokens to a file and put it into iRODS. we will directly stream the data ino an iRODS object rather than first creating a file on the scratch file system and subsequently uploading the file to iRODS:

In [None]:
filebase = item[0]+'_'+item[1][0][0]+'_'+item[1][1]+'_raw.json' # name of the file
obj = session.data_objects.create(coll.path + "/" + filebase) # Create a new data object in iRODS
print("Data will be written to iRODS:", obj.path)

In [None]:
with obj.open('w') as obj_desc:
    obj_desc.write(json.dumps(tokens).encode())

**Verify**: Open a browser and go to https://<IP or FQDN>/aliceZone/home/<user\>/aces_results<_num\>

### 7. Create some metadata and summarising plot for your co-worker (share)
As you have seen, the raw data is very hard to parse for human beings. However, we would like to give the person or group we defined in 'share' some impression ofd the data. Hence, we will create some metadata that captures the provenance, i.e. how the data came into being, and some plots. Let's start with the metadata: 

In [None]:
obj.metadata.add('ISEARCH', ATTR_NAME + '==' + ATTR_VALUE)
obj.metadata.add('ISEARCHDATE', str(datetime.date.today()))
obj.metadata.add('prov:wasDerivedFrom', 'http://dx.doi.org/10.6084/m9.figshare.3119248.v1')
obj.metadata.add('DATATYPE', 'ACES results')
obj.metadata.add('prov:SoftwareAgent', 'ACES')
obj.metadata.add('ALGORITHM', filebase.split('_raw.json')[0])

Now we create some summarising data and upload it to iRODS. We will create a performance figure and extract the 50 best scoring features per split of the data:

In [None]:
performance = []
for token in tokens:
    performance.append([token['output'][6]['BinaryNearestMeanClassifier_V1'][perf][0]
        for perf in list(token['output'][6]['BinaryNearestMeanClassifier_V1'].keys())[:50]])
    
plt.plot(numpy.transpose(performance))
plt.xlabel('Features')
plt.ylabel('AUC (performance)')
plt.title(token['output'][1]+' '+str(token['output'][2]))
figName = 'performance_'+filebase+'.png'
plt.savefig(resultsDir+'/'+figName)
plt.clf()

In [None]:
print('Write plot to iRODS: '+coll.path+'/'+figName)
session.data_objects.put(resultsDir+'/'+figName, coll.path+'/'+figName)
obj = session.data_objects.get(coll.path+'/'+figName)
obj.metadata.add('REFDATA', 'http://dx.doi.org/10.6084/m9.figshare.3119248.v1')
obj.metadata.add('DATATYPE', 'ACES results')
obj.metadata.add('ALGORITHM', filebase)

In [None]:
# Extract 50 most differentially expressed features
bestFeatures = []
for token in tokens:
    _, genes, features = json.loads((token['output'][5]))
    if item[1][1] != None:
        genelist = [genes[feat] for sublist in features[:10] for feat in sublist]
    else:
        genelist = features[:10]
    bestFeatures.append(genelist)
csvName = 'features_'+filebase+'.csv'
with open(resultsDir+'/'+csvName, 'w') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    writer.writerows(bestFeatures)

In [None]:
# Upload to iRODS
print('Write feature csv to iRODS: '+coll.path+'/'+csvName)
session.data_objects.put(resultsDir+'/'+csvName, coll.path+'/'+csvName)
obj = session.data_objects.get(coll.path+'/'+csvName)
obj.metadata.add('REFDATA', 'http://dx.doi.org/10.6084/m9.figshare.3119248.v1')
obj.metadata.add('DATATYPE', 'ACES results')
obj.metadata.add('ALGORITHM', filebase)

**Verify**: Go back to your browser and check Verify: Open a browser and go to https://<IP or FQDN>/aliceZone/home/<your user\>/aces_results\<_tag\>


### 8. Share the data
We indicated a person or a group with which we want to share the data. Now we will adjust the accession to our iROPDS results folder:

In [None]:
print(coll.path)

In [None]:
for srcColl, colls, objs in coll.walk():
    for obj in objs:
        try:
            acl = iRODSAccess('read', obj.path, share, session.zone)
            session.permissions.set(acl)
        except:
            print("User or group unknown: "+share)


### 9. Last step: Clean up
To free storage for other users let us clean up the scratch file system. All of our result data is in iRODS, so we will not need them any longer.

In [None]:
print("Cleaning up: ", dataDir)
print("Cleaning up: ", resultsDir)

In [None]:
from shutil import rmtree
print("Removing local data in", dataDir)
rmtree(dataDir)
print("Removing local data in", resultsDir)
rmtree(resultsDir)

## Remarks
We have seen that parameter sweeps can become quite elaborate. You might want to think beforehand which output do I really need for my analysis and interpretation.
Instead of running everything in many nested for-loops we defined tokens that decode the parameter setting of a run.

We also saw that the calculation of only one combination of data and algorithm can take some time. So we want to setup the whole pipeline and run it remotely (not interactively) on an HPC cluster.
The tokenisation of the single parameter combinations also gives us the chance to start several jobs each considering a different set of tokens. The results however will be gathered in iRODS no matter where the tokens are calculated.

Next step: Run the pipeline on the HPC ... see you there.