# run AA on SJ samples using the dxpy API

In [None]:
import dxpy
import pandas as pd
import os
import sys
sys.path.append('../src')
import data_imports


## New SJ samples, acquired 9/6/2022, run 9/2024
Params: Timeout at 48h, cn 4.5

In [None]:
# GLOBALS
PROJECT_ID = 'project-Fz9yBjQ9fj2QPbFY16B8JG8X' # PedPanCancer ecDNA
#APPLET_ID = 'applet-GqPyyxQ9fj2ky254fGgG5Vv7' # AmpliconSuite-DNANexus-applet
APPLET_ID = 'applet-GqgzpFQ9fj2X5qZvKB62x69p' #ampliconsuite-dnanexus-applet
DATA_REPO = dxpy.dxlink('file-GqQbpY09fj2f50kk87843164') # /data_repo/GRCh38.tar.gz
MOSEK = dxpy.dxlink('file-GqQZYGQ9fj2qbJkqjY834g75') # /mosek.lic
OUT_DIR = '/AmpliconArchitect/results_batch_20240912'
REF = 'hg38'

In [None]:
def get_sj_cohort():
    asdf = data_imports.get_pedpancan_biosamples_from_AC()
    asdf = asdf[asdf.str.startswith('SJ')]
    return set(asdf)
def get_sample_info(file):
    df = pd.read_csv(file,sep='\t')
    # drop hematologic malignancies
    #df = df[~(df.attr_diagnosis_group == 'Hematologic Malignancy')]
    return df
def text_venn2(s1, s2):
    print(f'size of set 1: {len(s1)}')
    print(f'size of set 2: {len(s2)}')
    print(f'Samples in s1 not in s2: {len(s1 - s2)}')
    print(f'Samples in s2 not in s1: {len(s2 - s1)}')
    print(f'Overlap: {len(s1 & s2)}')

def write_progress(samples):
    # write the strings in samples to an intermediate file so we can keep track of jobs we have already submitted.
    with open('out/progress.txt','w') as f:
        [f.write(g+'\n') for g in samples]

def append_progress(sample):
    with open('out/progress.txt','a') as f:
        f.write(sample+'\n')

def read_progress():
    # read the lines in progress.txt to a set
    with open('out/progress.txt','r') as f:
        return set(map(str.strip,f.readlines()))

def get_dx_bai(filename):
    # get a handle to the corresponding .bai file given the file's name SJ000_D1.bam.bai
    file_search = dxpy.find_data_objects(name=filename, project=PROJECT_ID, classname='file', return_handler=True)
    for result in file_search:
        return result.get_id()

def create_subdir(dirname,parent):
    project = dxpy.DXProject(PROJECT_ID)
    folders = project.list_folder(folder=parent)['folders']
    subdirectory_path = parent.rstrip('/') + '/' + dirname  # Ensure correct path format
    if subdirectory_path not in folders:
        project.new_folder(subdirectory_path, parents=True)
    return subdirectory_path
    #return subdirectory_path in folders

def run_AA(sample_name, cngain = False):
    # requires globals all_bam and current_cohort
    # Inputs: sample_name (string) of the format eg. SJEPD031215_D1
    #         cngain (float) copy number threshold. default 4.5 if not set.
    if sample_name in current_cohort:
        print(f'AA result for {sample_name} already exists!')
        return
    print(f'Seting up AA run for {sample_name}...')
    bam = all_bam[all_bam.sample_name==sample_name].file_id.iloc[0]
    index = get_dx_bai(sample_name+'.WholeGenome.bam.bai')
    input_data = {
        'bam':dxpy.dxlink(bam),
        'index':dxpy.dxlink(index),
        'data_repo':DATA_REPO,
        'mosek_lic':MOSEK,
        'ref':REF
    }
    if cngain:
        input_data['cngain'] = cngain
    out_folder = create_subdir(sample_name,OUT_DIR)
    job = dxpy.DXApplet(applet_id).run(
        applet_input=input_data,
        project=PROJECT_ID,
        folder=out_folder,
        name='AmpliconSuite '+sample_name
    )
    current_cohort.add(sample_name)
    append_progress(sample_name)
    print(f'Submitted job for {sample_name}.')
    return

In [None]:
# Setup: run once
# Get the DX_AUTH_TOKEN variable from the OS
DX_AUTH_TOKEN = os.getenv('DX_AUTH_TOKEN')
dxpy.set_security_context({"auth_token_type": "Bearer", "auth_token": DX_AUTH_TOKEN})
create_subdir('results_batch_20240912','/AmpliconArchitect')

In [None]:
current_cohort = get_sj_cohort() | read_progress()
all_bam = get_sample_info('out/SAMPLE_INFO_SJ00.txt')
st_bam = all_bam[all_bam.attr_diagnosis_group != 'Hematologic Malignancy']
all_bam_set = set(all_bam.sample_name)
st_bam_set = set(st_bam.sample_name)
text_venn2(all_bam_set,current_cohort)
text_venn2(st_bam_set, current_cohort)
# we have 1776 more samples we can run from the .bam files currently on SJC. About half are hematologic malignancies.

In [None]:
# Now we need to run one AA job for each sample in our st_bam_set.
todo = st_bam_set - current_cohort
i=0
for sample in todo:
    run_AA(sample)
    i+=1
    if i >= 100:
        break

In [None]:
# Rerun failures
# Walltimes for these are 3 days, cngain = 10
df = get_walltimes()
todo = df[(df.state == 'failed') & (df.walltime_hours > 48)]
current_cohort = current_cohort - set(todo.name)
for sample in todo.name:
    run_AA(sample,cngain=16)

In [None]:
df[(df.state == 'failed') & (df.walltime_hours < 48)]

## Test dataset (RCMB56-x1)

In [None]:
project_id = 'project-Fz9yBjQ9fj2QPbFY16B8JG8X' # PedPanCancer ecDNA
applet_id = 'applet-GqgzpFQ9fj2X5qZvKB62x69p' #ampliconsuite-dnanexus-applet
out_folder = '/test-out/RCMB56-x1'
input_data = {
    'bam':dxpy.dxlink('file-GqQQ3109fj2vqJKX61y5b9Bq'),
    'index':dxpy.dxlink('file-GqQQK389fj2yVPq4gfxKvFF0'),
    'data_repo':dxpy.dxlink('file-GqQbpY09fj2f50kk87843164'),
    'mosek_lic':dxpy.dxlink('file-GqQZYGQ9fj2qbJkqjY834g75'),
    'ref':'hg38'
}

# Create and run the job
job = dxpy.DXApplet(applet_id).run(
    applet_input=input_data,
    project=project_id,
    folder=out_folder
)

In [None]:
dxpy.dxlink

In [None]:
job_output = job.wait_on_done()
print(job_output)

In [None]:
all_bam.head()

# Job metaanalysis

In [None]:
import dxpy
import datetime
import seaborn as sns
import matplotlib.pyplot as plt


def get_walltimes():
    ## Get the walltimes of all AA jobs run in september.

    results = []
    
    # Define the time range
    start = str(datetime.date(2024,9,1))
    end = str(datetime.date(2024,10,1))
    
    # Search for jobs
    jobs = dxpy.find_jobs(created_after=start, created_before=end, 
                          launched_by=dxpy.whoami(), describe=True)
    
    # Function to convert timestamps
    def format_time(timestamp):
        return datetime.datetime.utcfromtimestamp(timestamp / 1000) if timestamp else "N/A"
    
    # Print job times
    for job in jobs:
        details = job['describe']
        if details.get('executableName')=='AmpliconSuite-DNANexus-applet':
            name = details['name'].split()[-1]
            if name.startswith('SJ'):
                result = {}
                result['name'] = name
                result['state'] = details.get('state')
                start = format_time(details['stateTransitions'][1].get('setAt'))
                end = format_time(details['stateTransitions'][-1].get('setAt'))
                elapsed = (end-start).total_seconds()/3600
                result['walltime_hours']=elapsed
                results.append(result)
    return pd.DataFrame(results)
                #for state in details['stateTransitions']:
                #    print(f"  {state['newState']}: {format_time(state.get('setAt'))}")
                #print("-" * 40)

In [None]:
adf = get_walltimes()

In [None]:
# Haven't downloaded results for these.
adf[adf.state == 'failed']

In [None]:
len(adf)

In [None]:
df = adf[adf.state != 'running']
a=len(df[(df.state == 'failed') & (df.walltime_hours > 48)])
b=len(df)
print(f"Timeouts: {a} / {b} ({round(a/b*100,1)}%)")
fig=plt.figure()
ax = sns.ecdfplot(df,legend=False,stat='count',complementary=True)
ax.set(xlabel='Time (hours)', ylabel='Cumulative jobs running')
sns.despine()

In [None]:
fig.savefig("out/job_walltimes.png",dpi=300)


# Download job outputs
Should maybe find a better way to do this because it took almost 5 hours...

In [None]:
import dxpy
import os
import pathlib

# Function to recursively list all files in a DNAnexus directory
def list_files_in_directory(project_id, folder_path):
    files = []
    
    # Recursively list the contents of the folder
    for result in dxpy.bindings.search.find_data_objects(project=project_id, folder=folder_path, recurse=True, describe=True):
        if result["describe"]["class"] == "file":
            files.append(result)
    
    return files

# Function to download a file, preserving the directory structure
def download_file_with_structure(file_info, project_id, local_base_dir="downloads"):
    file_path = file_info['describe']['folder'] + "/" + file_info['describe']['name']
    dxfile = dxpy.DXFile(file_info['id'], project=project_id)
    
    # Create corresponding local directory structure
    local_path = os.path.join(local_base_dir, file_path.strip("/"))
    local_dir = os.path.dirname(local_path)
    
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)
    
    # Download the file
    print(f"Downloading {file_path} to {local_path}...")
    dxpy.download_dxfile(dxfile.get_id(), local_path)
    print(f"Download completed: {local_path}")

# Main function to download all files in a directory
def download_directory(project_id, folder_path, local_base_dir="downloads"):
    # List all files in the DNAnexus directory
    files = list_files_in_directory(project_id, folder_path)
    
    # Download each file while preserving directory structure
    for file_info in files:
        download_file_with_structure(file_info, project_id, local_base_dir)



In [None]:
local_out = str(pathlib.Path(os.getcwd(),'out'))

In [None]:
download_directory(PROJECT_ID, OUT_DIR, local_out)

In [None]:
# Get results that weren't available during the big download
stragglers = ['SJMB031439_D1','SJST031395_D1','SJOS031478_D2','SJOS030861_D3']
stragglers = ['SJWLM031677_D1']
stragglers = ['SJOS032521_D1','SJOS031922_D2','SJHGG032492_D1','SJNBL032876_D1','SJST032243_D1','SJMEL031086_D3',
              'SJOS031711_D2','SJBT031810_D1']
stragglers = dnanexus_dirs - local_dirs

for s in stragglers:
    remote_out = str(pathlib.Path(OUT_DIR,s))
    download_directory(PROJECT_ID, remote_out, local_out)

# Tracking down possible missing runs?
progress report says we should have 633 runs.  
However, we only have 616 subdirectories on DNANexus.

In [None]:
# Function to recursively list all files in a DNAnexus directory
def list_subdirectories(project_id, folder_path):
    # List the contents of the specified folder
    contents = dxpy.DXProject(project_id).list_folder(folder=folder_path)
    
    # Extract the subdirectories
    subdirectories = contents['folders']
    
    return subdirectories

def list_local_subdirectories(local_path):
    # List only the immediate subdirectories
    subdirectories = [d for d in os.listdir(local_path) if os.path.isdir(os.path.join(local_path, d))]
    return subdirectories

dnanexus_dirs = set(map(lambda x: x.split('/')[-1],list_subdirectories(PROJECT_ID,OUT_DIR)))
local_dirs = set(list_local_subdirectories(local_out+OUT_DIR))

In [None]:
print(len(dnanexus_dirs))
print(len(local_dirs))

In [None]:
stragglers = dnanexus_dirs - local_dirs
stragglers

In [None]:
'SJMEL001002_D1' in local_dirs

In [None]:
local_dirs