In [None]:
import os
import pandas as pd
import pybedtools as bedtools 
import shutil
from multiprocessing import Pool
import traceback,sys
import numpy as np 
import gc

In [None]:
tmp_root= os.path.join("/","nobackup","lab_bsf","users","dbarreca")
tmp_dir = os.path.join(tmp_root,"tmp_quantify")

data_folder= os.path.join("..","data")
analysis_folder= os.path.join(data_folder,"quantification")

annotations_file= os.path.join(data_folder,"complete_metadata.csv")
atac_folder = os.path.join(data_folder,"pipeline_out","results")

resources_folder=os.path.join("..","references")
chrom_file = os.path.join(resources_folder, 'hg38.chrom.sizes')
blacklist_file=os.path.join(resources_folder, 'hg38.blacklist.bed')

suffix="ALL"
peaks_file = os.path.join(analysis_folder,"consensus_set_{}.bed".format(suffix))
peaks_file_hg19 = os.path.join(analysis_folder,"consensus_set_{}_hg19.bed".format(suffix))
peaks_file_unmapped = os.path.join(analysis_folder,"consensus_set_{}_unmapped.bed".format(suffix))

binary_file = os.path.join(analysis_folder,"quantification_binary_{}-set.csv".format(suffix))
count_file = os.path.join(analysis_folder,"quantification_{}-set.csv".format(suffix))

sloop_extension=250

In [None]:
annotations = pd.read_csv(annotations_file,index_col=0)

annotations=annotations[(annotations['QC:PASS']==True)]

## 1. Define consensus peak set

Peak set is defined only on the "PASS FILTER" samples to avoid spourious region

In [None]:
if not os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)
    
bedtools.helpers.set_tempdir(tmp_dir)

In [None]:
def merge_peaks(peakfiles, output, isPeak = True, size=sloop_extension,blacklist_file=blacklist_file):
    output_bed= None
  
    for peakfile in peakfiles:
        peak_bed = bedtools.BedTool(peakfile)
        if (isPeak and blacklist_file is not None):
            peak_bed=peak_bed.intersect(blacklist_file,v=True, wa=True)
        if (isPeak):
            peak_bed = peak_bed.slop(g=chrom_file, b=size)
            
        if (output_bed is None):
            output_bed = peak_bed
        else:
            output_bed = output_bed.cat(peak_bed,force_truncate=True)
            
    output_bed.saveas(output)
    return output

In [None]:
if (suffix in ['PBMC','ALL']):
    selected_samples = list(annotations[annotations['SAMPLE:TISSUE']=='PBMC'].index)

if (suffix=='ALL'):
     selected_samples += list(
         annotations[
             (annotations['SAMPLE:TISSUE'].isin(['nkcell','monocyte','cd8t'])) & 
            (annotations['SAMPLE:VISIT'] == 'V1')
         ].index
     )
                  
                  
peakfiles = [os.path.join(atac_folder,sample,'peaks','{}_summits.bed'.format(sample)) for sample in selected_samples]          

In [None]:
futures = list()

cpus = 16
pool = Pool(cpus)

for i,peakfiles_subset in enumerate(np.array_split(peakfiles, cpus)):
    output = os.path.join(tmp_dir,'tmp_{}.bed'.format(i))
    futures.append(pool.apply_async(merge_peaks,args=(peakfiles_subset, output)))

In [None]:
outputs = [result.get() for result in futures]

In [None]:
pool.close()

In [None]:
output = os.path.join(tmp_dir,'final.bed'.format(i))
merge_peaks(outputs,output, isPeak=False)

In [None]:
peaks = bedtools.BedTool(output).sort(faidx=chrom_file).to_dataframe(names=['CHR','START','END'],dtype={'START':int,'END':int})


In [None]:
peaks['ID'] = peaks.index.format(formatter=(lambda x: "CONS{:011d}".format(x)))

In [None]:
bedtools.BedTool().from_dataframe(peaks).saveas(peaks_file)

In [None]:
del(peaks)
del(futures)
del(outputs)
del(peakfiles)

In [None]:
shutil.rmtree(tmp_dir)

## 2. Quantify

Quantification is run on all the samples

In [None]:
gc.collect()

### 2.1 Binary

In [None]:
if not os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)
    
bedtools.helpers.set_tempdir(tmp_dir)

In [None]:
consensus_peaks = bedtools.BedTool(peaks_file)
consensus_peaks_df = bedtools.BedTool(peaks_file).to_dataframe().set_index('name')

In [None]:
peaks_subdir = 'peaks'
peaks_format = '{}_summits.bed'

def get_peaks(sample):
    return os.path.join(atac_folder,sample,peaks_subdir,peaks_format.format(sample))


def get_coverage_bin(sample):
    peakfile=get_peaks(sample)
    result = pd.DataFrame(0,index=consensus_peaks_df.index,columns=[sample])
    try:
        if (peakfile is not None):
            sample_peaks = bedtools.BedTool(peakfile)
            result = consensus_peaks.intersect(
                sample_peaks,
                g=chrom_file, 
                wa=True,
                c=True
            ).to_dataframe(index_col='name',
                usecols=[3,4],
                names=['name',sample]
            )
    except Exception as e:
        print("Error occured while processing sample "+sample)
        traceback.print_exc(file=sys.stdout)
    finally:
        return result.T

In [None]:
results=Pool(16).map(get_coverage_bin,[sample for sample in annotations.index])

In [None]:
results = [item for item in results if item is not None]
results = pd.concat(results).T
results.to_csv(binary_file,index_label='ID')

In [None]:
del(consensus_peaks_df)
del(consensus_peaks)
del(results)

In [None]:
shutil.rmtree(tmp_dir)

### 2.2 Counts

In [None]:
def get_bam(sample):
    return os.path.join(atac_folder,sample,bam_subdir,bam_format.format(sample))

def get_coverage(sample):
    print("Processing "+sample)
    try:
        result= elements_to_quantify.coverage(b=get_bam(sample),sorted=True,g=chrom_file).to_dataframe(
                    names=["CHR", "START", "END", "ID", sample, "NA1", "NA2", "NA3"],
                    dtype={sample: int},
                    usecols=['ID', sample],
                    index_col='ID').T
        return result
    except Exception as e:
        print("Error occured while processing sample "+sample)
        traceback.print_exc(file=sys.stdout)
        return pd.DataFrame(0,index=elements_to_quantify.index,columns=[sample]).T

In [None]:
gc.collect()

In [None]:
bam_subdir = 'mapped'
bam_format = '{}.trimmed.bowtie2.filtered.shifted.events.bed'
elements_to_quantify = bedtools.BedTool(peaks_file)

In [None]:
tmp_dir

In [None]:
if not os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)
    
bedtools.helpers.set_tempdir(tmp_dir)

In [None]:
result=Pool(16).map(get_coverage, [sample for sample in annotations.index])

In [None]:
result = [item for item in result if item is not None]
result = pd.concat(result)
result.T.to_csv(count_file,index_label='ID')
shutil.rmtree(tmp_dir)
del(result)

# 3. Calculate average tracks (wiggle)

In [None]:
tracks_dir=os.path.join(analysis_folder,"summary_tracks")
if not os.path.exists(tracks_dir):
    os.makedirs(tracks_dir)  

In [None]:
tracks_annotations=annotations[(annotations['SAMPLE:TISSUE']=='PBMC') | (annotations['SAMPLE:VISIT']=='V1')]

In [None]:
def run_samples(data,atac_dir=atac_folder,out_dir=tracks_dir, chrom_file=chrom_file):
    job_name=data[0]
    samples=data[1]
    
    out_file=os.path.realpath(os.path.abspath(
        os.path.join(out_dir,"{}.wiggle".format(job_name))
    ))
    cmd="wiggletools median "
    cmd+=" ".join(samples.map(lambda sample: os.path.realpath(os.path.abspath(
        os.path.join(atac_dir,sample,"coverage","{}.bigWig".format(sample))
        ))))
    cmd+=" > {}".format(out_file)
    print("RUNNING {}".format(job_name))
    returnvalue=os.system(cmd)
    print("{}-wiggletools RETURNED {}".format(job_name,returnvalue))
    if (returnvalue=="0"):
        final_file=os.path.realpath(os.path.abspath(
            os.path.join(out_dir,"{}.bigWig".format(job_name))
        ))
        cmd="wigToBigWig {} {} {}".format(out_file,chrom_file,final_file)        
        returnvalue=os.system(cmd)
        print("{}-wigToBigWig RETURNED {}".format(job_name,returnvalue))
        return int(returnvalue)
    else:
        return int(returnvalue)

In [None]:
Pool(16).map(run_samples,
             [("{}_{}".format(index[0],index[1]), df.index) for index, df in tracks_annotations.groupby(['SAMPLE:TISSUE','SAMPLE:VISIT'])]
            )