# Generate Nextflow Pipeline JobScript for Biomarker GWAS - 2

This notebook creates the jobscript for running the longitudinal GWAS pipeline. It depends on the following files:

- `nextflow/jobs.ini` -> configuration file that defines the paths of generic pipeline inputs
- `nextflow/params/analysis-params.json` -> json file that defines the parameters and paths of inputs for GWAS analysis

### `nextflow/jobs.ini`

Modify the parameters in the `[Pipeline]` section for your respective project

- `script` is the nextflow script for the pipeline (can use the default)
- `config` is the config file for the pipeline
- `logPath` defines the logging path for each analysis
- `chunkSize` defines the size of each chunk for preprocessing step
- `workDir` location of nextflow work directory

Keep the paths for the datasets the same in the `[ADNI]` and `[AMP-PD]` sections. These section names should correspond to the keys in the `nextflow/params/analysis-params.json` file.

### `nextflow/nextflow.config`

Modify the `OUTPUT_DIR` variable in this file to set the output path for the pipeline results

### `nextflow/params/analysis-params.json`

Structure of the file should be

```json
{
  "covariates": ["sex", "age", "PC1", "PC2", "PC3"], // pipeline covariates to include in the model (aside from PC1-10 should be present in covarfiles
  "AMP-PD": {   // key corresponds dataset in jobs.ini
    "lt": {     // ["lt", "cs"] defines longitudinal or cross-sectional analysis
      "basePath": "/data/CARD/projects/longGWASnextflow/ExampleGWAS/input", // this defines the base input paths for analysis defined below
                                                                            // the basePath is appended to the covarfile and phenofile
      "out_sfx": "-PPMI-EX-LT",                           // suffix for this combination of covariates and dataset - if not unique uses existing cache
      "analysis": [                                       // list of analysis to be run
        {
          "outcome": "phenotype_1",                       // name of the outcome variable in the phenofile
          "covarfile": "path/to/input_covariates.tsv",    // relative path from the basePath variable above to the covariates
          "phenofile": "path/to/input_phenotypes_1.tsv"   // relative path from the basePath variable above to the outcome
        },
        {
          "outcome": "phenotype_2",
          "covarfile": "path/to/input_covariates.tsv",
          "phenofile": "path/to/input_phenotypes_2.tsv"
        }
      ]
    }
  }
}
```


### Directories

the following directories should exist relative to project folder, in this case `ExampleGWAS`

- `./jobs`
- `./logs`
- `./nextflow/params`


In [1]:
import configparser
import os
import json
import subprocess
import time
from pathlib import Path

In [2]:
def construct_job(sbatch_params, job_params):
  header = "#! /bin/bash\n"
  for key, value in sbatch_params.items():
    header += f'#SBATCH {key}={value}\n'

  modules = """\
module load nextflow/20.01.0
module load singularity
"""

  env_vars = f"""\
export TMPDIR=/lscratch/$SLURM_JOB_ID
outcome="{job_params['outcome']}"
mode="{job_params['mode']}"
dataset="{job_params['dataset']}"
"""

  env_setup = """\
for i in $(seq 1 22); do
"""

  if meta_params['p1_cache']:
    env_setup += "  touch /lscratch/$SLURM_JOB_ID/chr${i}.vcf.gz"
  else:
    env_setup += f" cp {job_params['data-path']}" + " /lscratch/$SLURM_JOB_ID/chr${i}.vcf.gz"

  env_setup += f"""
done

echo "Files in /lscratch/$SLURM_JOB_ID:"
ls /lscratch/$SLURM_JOB_ID

ulimit -Sv {sbatch_params['--mem'][:-1]}000000
"""

  job = f"""\
srun nextflow -log {job_params['log-path']} run {job_params['script-path']} \\
  -w "{job_params['work-dir']}" \\
  --input "{job_params['input']}" \\
  --dataset "{job_params['dataset']}" \\
  --chunk {job_params['chunk_size']} \\
  --minor_allele_ct {job_params['MAC']} \\
  --out {job_params['out_fmt']} \\
  --covarfile {job_params['covar-path']} \\
  --phenofile {job_params['pheno-path']} \\
  --ancestry "{job_params['ancestry']}" \\
  --pheno_name "{job_params['outcome']}" \\
  --chunk_flag \\
  --covariates "{job_params['covariates']}" """

  if job_params['mode'] == 'lt':
    job += f"""\\
  --time_col "{job_params['time_col']}" \\
  --longitudinal_flag 
"""

  if job_params['mh_plot']:
    job += """\\
  --mh_plot 
"""
    
  return '\n'.join([header, modules, env_vars, env_setup, job])

In [3]:
params_dir = Path('./nextflow/params')
analysis = {
  'AMP-PD/woAAO': params_dir / 'analysis-params.woAAO.json',
  'AMP-PD/wAAO': params_dir / 'analysis-params.json',
  'AMP-PD/cs': params_dir / 'analysis-params.cs.json'
}

In [4]:
tmp_fp = analysis['AMP-PD/cs']
tmp_fh = open(tmp_fp, 'r')
aparams = json.loads(tmp_fh.read())
tmp_fh.close()

Replace paths for 
`/data/CARD/projects/longGWASnextflow/ExampleGWAS` to your project path

In [9]:
MAC = 5
mh_plot = False
ancestry = 'EUR'
reqs = {
  'ram': 125,
  'scratch': 250,
  'cpus': 20,
  'time': {'lt': '6:00:00',
           'cs': '4:00:00'}
}

meta_params = {
  'p1_cache': True,
  'dataset': None,
  'config_path': 'nextflow/jobs.ini'
}

run = True  # set to False to generate scripts but not submit jobs, set to True to execute jobs
             # check that cache is available prior to submitting multiple jobs

pipeline_config = configparser.ConfigParser()
pipeline_config.read(meta_params['config_path'])

# set lambda x: x == 'ADNI'
for ds in filter(lambda x: x != 'covariates', aparams.keys()):
  covariates = aparams['covariates']
  for mode in filter( lambda x: True, aparams[ds].keys()):
    out_sfx = aparams[ds][mode]['out_sfx']
    time_col = aparams[ds][mode]['time_col'] if mode == 'lt' else None
      
    for jp in aparams[ds][mode]['analysis']:
      if jp['outcome'] == 'log_CSF_Ab':
        continue
      covarfile = os.path.join(aparams[ds][mode]['basePath'],
                               jp['covarfile'])
      phenofile = os.path.join(aparams[ds][mode]['basePath'],
                               jp['phenofile'])
      outcome = jp['outcome']
    
      
      meta_params['dataset'] = ds

      assert(os.path.exists(covarfile))
      assert(os.path.exists(phenofile))
      assert(covarfile != phenofile)


      ## job params
      job_params = {
        'data-path': pipeline_config[meta_params['dataset']]['dataPath'],
        'log-path': pipeline_config['Pipeline']['logPath'],
        'script-path': pipeline_config['Pipeline']['script'],
        'dataset': meta_params['dataset'],
        'input': "/lsratch/${SLURM_JOB_ID}/chr*.vcf.gz",
        'work-dir': pipeline_config['Pipeline']['workDir'],
        'chunk_size': pipeline_config['Pipeline']['chunkSize'],
        'covariates': ' '.join(covariates),
        'MAC': MAC,
        'ancestry': ancestry,
        'mode': mode,
        'outcome': outcome,
        'out_fmt': f"{meta_params['dataset']}{out_sfx}",
        'covar-path': covarfile,
        'pheno-path': phenofile,
        'mh_plot': mh_plot,
        'time_col': time_col
      }

      assert(os.path.exists(job_params['script-path']))
      
      ## SBATCH params
      sbatch_params = {
        '--cpus-per-task': reqs['cpus'],
        '--mem': f"{reqs['ram']}g",
        '--gres': f"lscratch:{reqs['scratch']}",
        '--time': f"{reqs['time'][mode]}",
        '--partition': 'norm,quick'
      }
      
      cmd = construct_job(sbatch_params, job_params)
      
      job_path = "jobs"
      job_path = os.path.join( job_path,
                               f"jobscript.{meta_params['dataset']}-{outcome}.{mode}.bat")
      with open(job_path, 'w') as f:
        f.write( cmd )
        
        
      call_cmd = ['sbatch']
      call_cmd += ['--out', f"logs/{job_params['out_fmt']}.{outcome}.out"]
      call_cmd += ['--error', f"logs/{job_params['out_fmt']}.{outcome}.err"]
      call_cmd += [job_path]

      if run:
        job_id =subprocess.run(call_cmd,
                                   stdout=subprocess.PIPE).stdout.decode('utf-8').strip().split('\n')
        print(job_id, job_params['out_fmt'], mode, outcome)
        time.sleep(5)
      else:
        print(job_params['out_fmt'], mode, outcome)


['50837973'] AMP-PD-BF-CS cs log_CSF_tTau
['50837987'] AMP-PD-BF-CS cs log_CSF_pTau
