In [None]:
ls "../sheets"

### load the config

In [None]:
from yaml import CLoader as Loader, load, dump
config_file = "../configs/active_config.yaml"

with open(config_file, "r") as stream:
    config = load(stream, Loader=Loader)

    
# switch the path to local mount
home = os.environ['HOME']
bam_path = os.path.join(home, "mount/scratch/projects/AllenCell/bamfinal")
config['inputdirs'] = [bam_path]
config

In [None]:
def get_files(folder_list, sample_sheet):
    '''
    retrieves the path to all the files in the sample_sheet
    if rerun == False, it looks for fastq files
    if rerun == True, it looks for bam files
    '''

    # check whether single folder or folder list
    if len(folder_list[0]) == 1:
        folder_list = [folder_list]
    # check full path or append path to scriptdir
    if not sample_sheet.startswith('/'):
        sample_sheet = os.path.join(snakedir, sample_sheet)
    # import the sample sheet
    samples = pd.read_csv(sample_sheet, sep='\t', index_col=0).set_index('name')


    bam_list = []
    short_list = []
    # cycle through the input folders and look for matching bam files
    for input_folder in folder_list:
        # print(f"Looking for bam files in {input_folder}")
        for folder, _, files in os.walk(input_folder):
            for file in files:
                if '.bam' in file and '.md5' not in file and '.bai' not in file:
                    bam_list.append(os.path.join(folder, file))
                    short_list.append(file)

    # print(bam_list)
    # include check for empty list
    def get_bam_paths(row, bam_list=None):
        '''
        go through the sample list and find the respective read and index bams in the respective bam_list
        '''

        for file in bam_list:
            # get the basename
            base_file = os.path.basename(file)
            if row['sample'] in base_file and not "chr" in base_file:
                row['bam_path'] = file
        return row

    samples_df = samples.apply(get_bam_paths, axis=1, bam_list=bam_list)
    short_df = samples.apply(get_bam_paths, axis=1, bam_list=short_list)

    # # remove leading zeros
    # samples_df.index = samples_df.index.str.lstrip('0')
    # short_df.index = short_df.index.str.lstrip('0')
    # ########## DEBUG #################
    # print(short_df)
    # print(samples_df)
    # ##################################

    return samples_df, short_df

### load the samples

In [None]:
snakedir = ".."
sample_df, short_sample_df = get_files(config['inputdirs'], config['samples']['samplesheet'])
sample_df

## get tumor-normal-pairs

In [None]:
def get_normal(verbose=False):
    '''
    boolean check for a fixed normal
    also exits if there are more then one normal in combination with a fixed one
    '''
    normal = config['samples']['normal']
    
    if (is_fixed:= "_" in normal[0]):
        if len(normal) > 1:
            print("Cannot have more than one fixed normal!")
            exit
        if verbose:
            print(f"Using fixed normal {normal[0]}")
    
    return normal[0], is_fixed
    
def get_tumor_normal_pairs(samples, config):
    '''
    turns valid_file_info into tuples of valid tumor normal pairs
    for a single normal file it returns 
    '''

    samples = samples.reset_index()
    # extract the tumor-normal suffix (Name_A --> sample: "Name", TN: "A")
    samples[['sample', 'TN']] = samples['name'].str.extract('(?P<sample>^[^_]+)_(?P<TN>[^_]+)$')
    
    tumor = config['samples']['tumor']
    normal, is_fixed = get_normal(verbose=True)
    
    if is_fixed:
        # select only the A samples and concat with B
        TN_list = samples.query('TN == @tumor')['name'] + "-B"
        return list(TN_list)
    
    def TN_pair(group, l=[]):

        for n in normal:
            if n in list(group['TN']):
                for t in tumor:
                    if t in list(group['TN']):
                        TN_list.append(f"{group['sample'].iloc[0]}_{t}-{n}")
    TN_list = []
    # append in an apply 
    samples.groupby('sample').apply(TN_pair, l=TN_list).reset_index()
    ########## DEBUG #################
    # print(TN_list)
    ##################################
    return TN_list

In [None]:
get_normal(True)

In [None]:
df = get_tumor_normal_pairs(sample_df, config)
df

### get bam paths

#### creating wildcards object for testing


In [None]:
class W:
    def __init__(self, sample, tumor, normal, type="B"):
        self.sample = sample
        self.tumor = tumor
        self.normal = normal
        self.type = type
        self._names = dict(
            sample=self.sample,
            tumor=self.tumor,
            normal=self.normal,
            type=self.type
        )
wildcards = {
    "sample":"AICS16",
    "tumor": "A",
    "normal": "B",
    "type": "B"
}
w = W(**wildcards)
vars(w)['_names'].keys()

In [None]:
def get_bam_path(w):
    '''
    returns the bam path from the wildcards object depending on the context
    '''

    ## get the wildcard atributes into wcs
    wcs = vars(w)['_names'].keys()
    if 'type' in wcs:
        sample_name = f"{w.sample}_{w.type}"
    # for filterbam wildcards contain tumor and type but type is needed
    # elif takes care of that
    elif 'tumor' in wcs:
        sample_name = f"{w.sample}_{w.tumor}"
    elif 'tumor' not in wcs:
        sample_name = w.sample
        
    # checking for fixed normals 
    fixed_normal, normal_is_fixed = get_normal()
    if normal_is_fixed and sample_name.endswith("_B"):
        
        return sample_df.loc[fixed_normal]['bam_path']
            
    return sample_df.loc[sample_name]['bam_path']

In [None]:
get_bam_path(w)

### get bam pairs

In [None]:
def get_bam_pair(w):
    '''
    returns the tumor_bam-normal_bam pair from the wildcards object depending on the context
    '''
    tumor = f"{w.sample}_{w.tumor}"
    normal = f"{w.sample}_{w.normal}"
    fixed_normal, normal_is_fixed = get_normal()
    if normal_is_fixed:
        return dict(
            tumor_bam=sample_df.loc[tumor]['bam_path'],
            normal_bam=sample_df.loc[fixed_normal]['bam_path']
        )
    return dict(
        tumor_bam=sample_df.loc[tumor]['bam_path'],
        normal_bam=sample_df.loc[normal]['bam_path']
    )

In [None]:
get_bam_pair(w)