In [6]:
import pandas as pd
import numpy as np

# Creating distributed jobs

This notebook facilitates the splitting of `samples.txt` and `units.txt`, creating individual bash scripts that handle copying of necessary input files from a remote server, executing snakemake, rsyncing results back to remote server, and exiting server.

In [2]:
samples_fp = '../config/samples.txt'
units_fp = '../config/units.txt'

In [4]:
samples_df = pd.read_csv(samples_fp, sep='\t')
samples_df.head()

Unnamed: 0,Sample
0,amy
1,bob
2,carl
3,diane


In [5]:
units_df = pd.read_csv(units_fp, sep='\t')
units_df.head()

Unnamed: 0,Sample,Unit,R1,R2
0,amy,Run_1,resources/test/test_reads/amy_R1.fastq.gz,resources/test/test_reads/amy_R2.fastq.gz
1,bob,Run_1,resources/test/test_reads/bob_R1.fastq.gz,resources/test/test_reads/bob_R2.fastq.gz
2,carl,Run_1,resources/test/test_reads/carl_R1.fastq.gz,resources/test/test_reads/carl_R2.fastq.gz
3,diane,Run_1,resources/test/test_reads/diane_R1.fastq.gz,resources/test/test_reads/diane_R2.fastq.gz


In [31]:

def make_splits(samples_df, splits):
    sample_num = samples_df.shape[0]
    split_size = int(np.floor(sample_num/splits))

    split_list = []
    for i in range(splits-1):
        split_list.append(list(range(i*split_size, i*split_size+split_size)))
    split_list.append(list(range((splits-1)*split_size, sample_num)))
    
    return(split_list)

splits = make_splits(samples_df, 3)

In [37]:
server='moeller'
target_dir='/workdir/jgs286/data'
indices = splits[2]

def write_scp_lines(samples_df, units_df, indices, server, target_dir):

    scp_command = 'scp {0}:{1} {2}/.'
    fps = []
    for i in indices:
        sample = samples_df.loc[i, 'Sample']
        fps.extend(list(units_df.loc[units_df['Sample'] == sample,
                              'R1']))
        fps.extend(list(units_df.loc[units_df['Sample'] == sample,
                              'R2']))

    commands = []
    for fp in fps:
        commands.append(scp_command.format(server, fp, target_dir))
    
    return(commands)

write_scp_lines(samples_df,
                units_df,
                [0],
                'moeller',
                '/workdir/data')

['scp moeller:resources/test/test_reads/amy_R1.fastq.gz /workdir/data/.',
 'scp moeller:resources/test/test_reads/amy_R2.fastq.gz /workdir/data/.']

In [62]:
splits = 3
data_dir = '/workdir/data'
remote_server = 'moeller'
remote_outdir = '/workdir/jgs/sn-mg-pipeline/output'
sn_command = 'snakemake --use-conda -j 64'
setup_cmds = []
teardown_cmds = ['/programs/bin/labutils/endres.pl']
# def make_split_scripts(samples_df,
#                        units_df,
#                        splits,
#                        sn_command,
#                        remote_outdir,
#                        remote_server='moeller',
#                        setup_cmds=[],
#                        teardown_cmds=[]
# ):
    
# split indices from samples_df into N parts
split_list = make_splits(samples_df, splits)

# write scp code for units from split samples
split_commands = {}
for i, split in enumerate(split_list):
    out_lines = '#! /bin/bash\n\n'
    
    out_lines += '# Setup:\n%s' % '\n'.join(setup_cmds)
    
    scp_commands = write_scp_lines(samples_df,
                    units_df,
                    split,
                    server,
                    data_dir)
    out_lines += '\n\n# Copy input sequences:\n'
    out_lines += '\n'.join(scp_commands)
                     
    # write snakemake code
    out_lines += '\n\n# Run Snakemake\n%s' % sn_command
                     
    
    # write rsync code
    rsync_command = 'rsync -a output/ {0}:{1}'.format(remote_server,
                                                      remote_outdir)
    
    out_lines += '\n\n# Rsync data back\n%s' % rsync_command
    
    out_lines += '\n\n# Teardown:\n%s' % '\n'.join(teardown_cmds)
    
    split_commands[i] = out_lines
    
    # write samples split
    split_df = samples_df.loc[split]
    split_df.to_csv('samples.{0}.txt'.format(i),
                    sep='\t',
                    index=False)
    
    print('########## Split {0} ##########\n\n'.format(i))
    print('----------\nsamples.{0}.txt:\n----------\n\n'.format(i))
    print(split_df)
    print('\n\n\n----------\nsplit.{0}.sh:\n----------\n\n'.format(i))
    print(out_lines)
    print('\n\n\n')
    with open('split.{0}.sh'.format(i), 'w') as f:
        f.write(out_lines)

########## Split 0 ##########


----------
samples.0.txt:
----------


  Sample
0    amy



----------
split.0.sh:
----------


#! /bin/bash

# Setup:


# Copy input sequences:
scp moeller:resources/test/test_reads/amy_R1.fastq.gz /workdir/data/.
scp moeller:resources/test/test_reads/amy_R2.fastq.gz /workdir/data/.

# Run Snakemake
snakemake --use-conda -j 64

# Rsync data back
rsync -a output/ moeller:/workdir/jgs/sn-mg-pipeline/output

# Teardown:
/programs/bin/labutils/endres.pl




########## Split 1 ##########


----------
samples.1.txt:
----------


  Sample
1    bob



----------
split.1.sh:
----------


#! /bin/bash

# Setup:


# Copy input sequences:
scp moeller:resources/test/test_reads/bob_R1.fastq.gz /workdir/data/.
scp moeller:resources/test/test_reads/bob_R2.fastq.gz /workdir/data/.

# Run Snakemake
snakemake --use-conda -j 64

# Rsync data back
rsync -a output/ moeller:/workdir/jgs/sn-mg-pipeline/output

# Teardown:
/programs/bin/labutils/endres.pl




########## Split 2