In [3]:
# manually curated plates list
all_plates = ['B000771',
 'B000873',
 'B001429',
 'B001438',
 'B001439',
 'B001440',
 'B001476',
 'B001561',
 'B001564',
 'B001566',
 'B002081',
 'B002083',
 'B002343',
 'B002522',
 'B002524',
 'B002530',
 'B002549',
 'B002550',
 'B003063',
 'B003096',
 'B003104',
 'B003198',
 'B003207',
 'B003346',
 'B003469',
 'B003520',
 'B003526',
 'B003580',
 'B003590',
 'B003591',
 'B003593',
 'B003595',
 'B003605',
 'B003817',
 'B003819',
 'B003823',
 'B003828',
 'B003829',
 'B003830',
 'B003832',
 'B003916']

cores = 3

In [9]:
import pandas as pd
from tqdm import tqdm
import subprocess
import multiprocessing
import boto3
import botocore.exceptions
import re, os, shutil

def get_s3path_list(bucket, prefix, suffix):
    #     bucket = 'darmanis-group'
    #     prefix = 'singlecell_lungadeno/rawdata/fastqs'
    #     suffix = 'fastq.gz'

    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    operation_parameters = {'Bucket': bucket,
                            'Prefix': prefix}
    page_iterator = paginator.paginate(**operation_parameters)
    paths = ['s3://{}/{}'.format(bucket, key['Key']) for page in page_iterator for key in page['Contents'] if key['Key'].endswith(suffix)]
    return paths

def s3_restore(args):
    b,k = args
    # b = bucket name
    # k = file key
    obj = s3r.Object(b, k)
    storage_class = obj.storage_class
    restore = obj.restore
    if not obj.restore:
        resp = bucket.meta.client.restore_object(
            Bucket=b,
            Key=k,
            RestoreRequest={'Days': 3}
        )
        restore = f'{b}/{k}'
    return restore
    
def s3_restore_parallel(b,k_list,n_proc=2):

    global s3r
    s3r = boto3.resource('s3')
    global bucket
    bucket = s3r.Bucket(b)

    print('creating pool')

    p = multiprocessing.Pool(processes=n_proc)

    try:
        print('restoring files...')
        output = p.map(s3_restore, zip([b] * len(k_list), k_list), chunksize=100)
    finally:
        p.close()
        p.join()
        
    return output

def s3copy(args):
    """return failed paths"""
    src, dest = args
    s3cmd = f'aws s3 cp {src} {dest}'.split(' ')
    process = subprocess.run(s3cmd)
    if process.returncode == 0:
        path = None
    else:
        path = src
    return src

def s3copy_parallel(src_list, dest_list, n_proc = 2):
    try:
        p = multiprocessing.Pool(processes=n_proc)
        process = p.map(s3copy, zip(src_list, dest_list), chunksize=100)
    finally:
        p.close()
        p.join()
    return process

def s3move(args):
    """return failed paths"""
    src, dest = args
    s3cmd = f'aws s3 mv {src} {dest}'.split(' ')
    process = subprocess.run(s3cmd)
    if process.returncode == 0:
        path = None
    else:
        path = src
    return src

def s3move_parallel(src_list, dest_list, n_proc = 2):
    try:
        p = multiprocessing.Pool(processes=n_proc)
        process = p.map(s3move, zip(src_list, dest_list), chunksize=100)
    finally:
        p.close()
        p.join()
    return process

def s3rm(src):
    syscmd = f'aws s3 rm {src}'
    process = subprocess.run(syscmd.split(' '))
    if process.returncode == 0:
        path = None
    else:
        path = src
    return src

def s3rm_parallel(paths, n_proc = 2):
    try:
        p = multiprocessing.Pool(processes=n_proc)
        process = p.map(s3rm, paths, chunksize=100)
    finally:
        p.close()
        p.join()
    return process

def merge_counts(top_dir):
    # Create big counts table from local tables
    file_list = [filename for filename in glob.iglob(top_dir + '**/*.txt', recursive=True)]
    first_df = pd.read_csv(file_list[0], header=None, delimiter='\t')
    num_row = len(first_df)
    rownames = first_df.iloc[:,0].tolist()
    num_col = len(file_list)
    colnames = []
    empty_array = np.zeros((num_row, num_col))
    
    for idx, file in tqdm.tqdm(enumerate(file_list)):
        pulled_col = pd.read_csv(file, header=None, delimiter='\t', usecols=[1])
        colname = '_'.join(file.split('/')[-1].split('_')[:2] + ['0'])
        if colname in colnames:
            name_split = colname.split('_')
            new_idx =  int(name_split[-1]) + 1
            colname = '_'.join(name_split[:2] + [new_idx])
        colnames.append(colname)
        empty_array[:,idx] = pulled_col.values.reshape((len(pulled_col),))
    
    # convert numpy to pandas
    master_df = pd.DataFrame(empty_array)
    master_df.columns = colnames
    master_df['gene'] = rownames
    
    # remove metadata 
    master_df = master_df[["__" not in x for x in master_df.gene]]
    
    # reset gene col
    master_df = master_df.set_index('gene').reset_index()
    
    return master_df
    

In [4]:
# # parse s3 file system
# czb_paths = get_s3path_list('czb-seqbot', 'fastqs', 'homo.htseq-count.txt')
# czbiohub_paths = get_s3path_list('czbiohub-seqbot', 'fastqs', 'homo.htseq-count.txt')
# darmanis_paths = get_s3path_list('darmanis-group', 'melanocyte_primary', 'homo.htseq-count.txt')
# all_paths = czb_paths + czbiohub_paths + darmanis_paths


In [14]:
# parse s3 file system
suffix_oi = '.homo.Aligned.out.sorted.bam'
czb_paths = get_s3path_list('czb-seqbot', 'fastqs', suffix_oi)
czbiohub_paths = get_s3path_list('czbiohub-seqbot', 'fastqs', suffix_oi)
darmanis_paths = get_s3path_list('darmanis-group', 'melanocyte_primary', suffix_oi)
all_paths = czb_paths + czbiohub_paths + darmanis_paths


In [19]:
# extract well-plate information from cell-gene counts tables
dest_prefix = 's3://darmanis-group/melanocyte_primary/melanocyte_primary/bams'

paths_df = pd.DataFrame({'path':[x for x in all_paths if any([y in x for y in all_plates])]})
paths_df['well'] = [x.split('/')[-1].split('_')[0] for x in paths_df['path']] 
paths_df['plate'] = [x.split('/')[-1].split('_')[1] for x in paths_df['path']] 
paths_df['bucket'] = [x.split('/')[2] for x in paths_df['path']]
paths_df['key'] = ['/'.join(x.split('/')[3:]) for x in paths_df['path']]
paths_df['dest'] = [f'{dest_prefix}/{x}_{y}.bam' for x,y in zip(paths_df.well.values.tolist(),
                            paths_df.plate.values.tolist())]

paths_df.to_csv('~/data/DL20181011_melanocyte_test_data/bams_table.csv')


In [22]:
# # restore files that have been glacier-ized
# for b in set(paths_df.bucket):
#     print('restoring', b)
#     df_slice = paths_df[paths_df.bucket == b]
#     k_list = df_slice.key.values.tolist()
#     output = s3_restore_parallel(b,k_list,n_proc=cores)

In [None]:
# Copy files
## Check if file already exists in destination
dest_paths = get_s3path_list('darmanis-group', 
                             'melanocyte_primary/melanocyte_primary/bams', 
                             '.bam')

## update transfer lists
src_list = paths_df[[x not in dest_paths for x in paths_df.dest]].path.values.tolist()
dest_list = paths_df[[x not in dest_paths for x in paths_df.dest]].dest.values.tolist()

## perform copy
if len(src_list) > 0:
    output = s3copy_parallel(src_list, dest_list, n_proc = cores)
    

In [None]:
# # copy files from s3 bucket to compute destination
# ## pull file list to copy
# dest_paths = get_s3path_list('darmanis-group', 
#                              'melanocyte_primary/melanocyte_primary/counts_tables', 
#                              '.txt')

# ## copy files
# local_wkdir = '/where/to/store/tmp'
# local_dest_list = ['{}/{}'.format(local_wkdir, x.split('/')[-1]) for x in dest_paths]
# output = s3copy_parallel(dest_paths, local_dest_list, n_proc = cores)

# ## run merge
# merged_df = merge_counts(local_wkdir)
