-
Notifications
You must be signed in to change notification settings - Fork 0
/
Align.py
29 lines (21 loc) · 1.02 KB
/
Align.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from celery import group
from tasks import align_paired_reads, sort_bams, postprocess, variant_call
input_fastqs = [("U0a_CGATGT_L001_R1_001.fastq", "U0a_CGATGT_L001_R2_001.fastq"), ("U0a_CGATGT_L001_R1_002.fastq", "U0a_CGATGT_L001_R2_002.fastq"), ("U0a_CGATGT_L001_R1_003.fastq", "U0a_CGATGT_L001_R2_003.fastq"),("U0a_CGATGT_L001_R1_004.fastq", "U0a_CGATGT_L001_R2_004.fastq")]
aligned_bam = "U0a_CGATGT.bam"
sorted_bam = "U0a_CGATGT.sorted.bam"
postprocessed_bam = "U0a_CGATGT.sorted.pp.bam"
vcf_file = "U0a_CGATGT.vcf"
# do alignment
aligned_bams = []
alignment_jobs = []
for input_fastq_1, input_fastq_2 in input_fastqs:
out_bam = input_fastq_1 + input_fastq_2
alignment_jobs.append(align_paired_reads.s(input_fastq_1, input_fastq_2, out_bam))
aligned_bams.append(out_bam)
align_job_batch = group(alignment_jobs)
result = align_job_batch.apply_async()
result.join()
# do merge and sort
#sort_bams.delay(aligned_bams, sorted_bam)
#postprocess.delay(sorted_bam, postprocessed_bam)
#variant_call.delay(postprocessed_bam, vcf_file)