In [None]:
import luigi

# Introduction to Luigi

In [None]:
class HelloWorld(luigi.Task):
    def requires(self):
        # depends on other stuff
        return []
    
    def run(self):
        # does something
        print('Hello world!')
        self._has_run = True
        
    def complete(self):
        # let know other if it's done
        return hasattr(self, '_has_run') and self._has_run

In [None]:
assert luigi.build([HelloWorld()], local_scheduler=True)

## Task dependencies

In [None]:
class HelloWorld2(luigi.Task):
    def requires(self):
        return HelloWorld()
    
    def run(self):
        # does something
        print('Hello world2!')
        
    def complete(self):
        return False

In [None]:
assert HelloWorld().complete()

In [None]:
assert luigi.build([HelloWorld2()], local_scheduler=True)

In [None]:
from luigi.util import requires

In [None]:
@requires(HelloWorld)
class HelloWorld2(luigi.Task):
    
    def run(self):
        # does something
        print('Hello world2!')
        
    def complete(self):
        return False

In [None]:
assert luigi.build([HelloWorld2()], local_scheduler=True)

In [None]:
class GenerateRecords(luigi.Task):
    def run(self):
        with self.output().open('w') as f:
            for i in range(10):
                f.write(f'{i}\n')
    def output(self):
        return luigi.LocalTarget('generated-records')

In [None]:
assert luigi.build([GenerateRecords()], local_scheduler=True)

In [None]:
class ProcessRecords(luigi.Task):
    def requires(self):
        return GenerateRecords()
    
    def run(self):
        with self.input().open() as f, self.output().open('w') as fout:
            for line in f:
                fout.write(line)
    
    def output(self):
        return luigi.LocalTarget('processed-records')

In [None]:
assert luigi.build([ProcessRecords()], local_scheduler=True)

In [None]:
class DownstreamAnalysis(luigi.Task):
    suffix = luigi.Parameter()
    
    def requires(self):
        return ProcessRecords()
    
    def run(self):
        with self.input().open() as f, self.output().open('w') as fout:
            for line in f:
                fout.write(line.strip() + self.suffix + '\n')
    
    def output(self):
        return luigi.LocalTarget(f'processed-records-with-{self.suffix}')

In [None]:
assert luigi.build([DownstreamAnalysis(suffix='Z')], local_scheduler=True, workers=4)

In [None]:
assert luigi.build([DownstreamAnalysis(suffix='K')], local_scheduler=True)

In [None]:
class CombinedAnalysis(luigi.Task):
    def requires(self):
        return [DownstreamAnalysis(suffix=s) for s in 'ABCDEF']
    
    def run(self):
        with self.output().open('w') as fout:
            for lines in zip(*(req_input.open('r') for req_input in self.input())):
                fout.write('\t'.join([line.strip() for line in lines]) + '\n')
    
    def output(self):
        return luigi.LocalTarget('combined-analysis')

In [None]:
assert luigi.build([CombinedAnalysis()], local_scheduler=True)

# Scaling things up with Bioluigi

In [None]:
from bioluigi.scheduled_external_program import ScheduledExternalProgramTask

In [None]:
class Ls(ScheduledExternalProgramTask):
    scheduler = 'slurm'
    
    number_of_tasks = luigi.Parameter()
    
    @property
    def cpus(self):
        return self.number_of_tasks
    
    def program_args(self):
        return ['ls']
    
    def complete(self):
        return False

In [None]:
assert luigi.build([Ls(2)], local_scheduler=True)

# Let's do some serious work now!

In [None]:
import luigi
from bioluigi.tasks import sratoolkit, rsem

In [None]:
!wget ftp://ftp.ensembl.org/pub/release-101/fasta/caenorhabditis_elegans/dna/Caenorhabditis_elegans.WBcel235.dna.toplevel.fa.gz
!wget ftp://ftp.ensembl.org/pub/release-101/gtf/caenorhabditis_elegans/Caenorhabditis_elegans.WBcel235.101.gtf.gz

In [None]:
!gunzip Caenorhabditis_elegans.WBcel235.dna.toplevel.fa.gz
!gunzip Caenorhabditis_elegans.WBcel235.101.gtf.gz

In [None]:
%%time
prepare_reference = rsem.PrepareReference(annotation_file='Caenorhabditis_elegans.WBcel235.101.gtf',
                                          reference_fasta_files=['Caenorhabditis_elegans.WBcel235.dna.toplevel.fa'],
                                          reference_name='worm_0',
                                          aligner='star',
                                          cpus=16)
assert luigi.build([prepare_reference], local_scheduler=True)

In [None]:
%%time
download_sample = sratoolkit.FastqDump('SRR12478578', output_dir='.', scheduler='slurm')
assert luigi.build([download_sample], local_scheduler=True)

In [None]:
%%time
calculate_expression = rsem.CalculateExpression(annotation_file='Caenorhabditis_elegans.WBcel235.101.gtf',
                                                reference_fasta_files=['Caenorhabditis_elegans.WBcel235.dna.toplevel.fa'],
                                                reference_name='worm_0',
                                                aligner='star',
                                                upstream_read_files=['SRR12478578.fastq.gz'],
                                                sample_name='SRR12478578',
                                                cpus=16)
assert luigi.build([calculate_expression], local_scheduler=True)

## Let's assemble all this!

In [None]:
class RnaSeqPipeline(luigi.Task):
    srr_accession = luigi.Parameter()
    def run(self):
        fastqs = yield sratoolkit.FastqDump(self.srr_accession, 
                                            output_dir='.')
        fastqs = [f.path for f in fastqs]
        quantifications = yield rsem.CalculateExpression(annotation_file='Caenorhabditis_elegans.WBcel235.101.gtf',
                                                reference_fasta_files=['Caenorhabditis_elegans.WBcel235.dna.toplevel.fa'],
                                                reference_name='worm_0',
                                                aligner='star',
                                                upstream_read_files=fastqs[:1],
                                                sample_name='SRR12478578',
                                                cpus=16)
    def output(self):
        return luigi.LocalTarget(f'{self.srr_accession}.isoforms.results')

In [None]:
assert luigi.build([RnaSeqPipeline('SRR12478578')], local_scheduler=True)

In [None]:
assert luigi.build([RnaSeqPipeline(srr_accession) for srr_accession in ['SRR12478578']], 
                   local_scheduler=True,
                   workers=32)