# Test get_genotype_all_vars

In [1]:
import pandas as pd
import numpy as np
import cptac
import cptac.utils as ut

In [2]:
def get_genotype_all_vars(self, mutations_genes, mutations_filter=None, show_location=True, mutation_hotspot=None):
        """Return a dataframe that has the mutation type and wheather or not it is a multiple mutation
        Parameters:
        mutation_genes (str, or list or array-like of str): The gene(s) to get mutation data for.
        mutations_filter (list, optional):  List of mutations to prioritize when filtering out multiple mutations, in order of priority.
        show_location (bool, optional): Whether to include the Location column from the mutation dataframe. Defaults to True.
        mutation_hotspot (optional): a list of hotspots
        """

        #If they don't give us a filter, this is the default.
        if mutations_filter == None:
            if self.get_cancer_type() == "colon":
                mutations_filter = ["Deletion", #deletion
                                        'frameshift deletion', 'frameshift insertion', 'frameshift substitution', 'stopgain', 'stoploss', #truncation
                                        'Missense_Mutation_hotspot',
    	                                'nonframeshift deletion', 'nonframeshift insertion', 'nonframeshift substitution', 'nonsynonymous SNV', #missense
                                        'Amplification',
                                         'Wildtype']


            elif self.get_cancer_type() == "hnscc":
                mutations_filter = ["Deletion", #deletion
                                        'Frame_Shift_Del', 'Frame_Shift_Ins', 'Nonsense_Mutation', 'Nonstop_Mutation', #truncation
                                        'Missense_Mutation_hotspot',
    	                                'Missense_Mutation',
                                        'Amplification',
                                        'In_Frame_Del', 'In_Frame_Ins', 'Splice_Site' #inframe changes
                                        'Silent','Wildtype']

            elif self.get_cancer_type() == "gbm":
                mutations_filter = ["Deletion", #deletion
                                        'Frame_Shift_Del', 'Frame_Shift_Ins', 'Nonsense_Mutation', 'Nonstop_Mutation', #truncation
                                        'Missense_Mutation_hotspot',
                                        'Missense_Mutation',
                                        'Amplification',
                                        'In_Frame_Del', 'In_Frame_Ins', 'Splice_Site' #inframe changes
                                        'Silent','Wildtype']

            else:
                mutations_filter = ["Deletion",
                                        'Frame_Shift_Del', 'Frame_Shift_Ins', 'Nonsense_Mutation', 'Nonstop_Mutation', #tuncation
                                        'Missense_Mutation_hotspot',
    	                                'Missense_Mutation',
                                        'Amplification',
                                        'In_Frame_Del', 'In_Frame_Ins', 'Splice_Site'
                                        'Silent',
                                        'Wildtype']

        if self.get_cancer_type() == 'colon':
            truncations = ['frameshift deletion', 'frameshift insertion', 'frameshift substitution', 'stopgain', 'stoploss']
            missenses = ['nonframeshift deletion', 'nonframeshift insertion', 'nonframeshift substitution', 'nonsynonymous SNV']
        elif self.get_cancer_type() == 'hnscc' and self.version() == "0.1":
            truncations =["stopgain", "stoploss"]
            missenses = ["nonframeshift insertion", "nonframeshift deletion"]
        else:
            truncations = ['Frame_Shift_Del', 'Frame_Shift_Ins', 'Nonsense_Mutation', 'Nonstop_Mutation', 'Splice_Site']
            missenses = ['In_Frame_Del', 'In_Frame_Ins', 'Missense_Mutation']

        if self.get_cancer_type() == "gbm":
            noncodings = ["Intron", "RNA", "3'Flank", "Splice_Region", "5'UTR", "5'Flank", "3'UTR"]



        #check that gene is in the somatic_mutation DataFrame
        somatic_mutation = self.get_somatic_mutation()
        if mutations_genes not in somatic_mutation["Gene"].unique(): #if the gene isn't in the somacic mutations df it will still have CNV data that we want
            def add_del_and_amp_no_somatic(row):
                if row[mutations_genes] <= -.2:
                    mutations = 'Deletion'
                elif row[mutations_genes] >= .2:
                    mutations = 'Amplification'
                else:
                    mutations = "No_Mutation" 
                    
                return mutations


            cnv = self.get_CNV()
            
            #drop the database index from ccrcc and brca
            if isinstance(cnv.keys(), pd.core.indexes.multi.MultiIndex): ##
                drop = ['Database_ID']
                cnv = ut.reduce_multiindex(df=cnv, levels_to_drop=drop)
                
            gene_cnv = cnv[[mutations_genes]] ##
            mutation_col = gene_cnv.apply(add_del_and_amp_no_somatic, axis=1)
            df = gene_cnv.assign(Mutation = mutation_col)
            return df


        #combine the cnv and mutations dataframe
        combined = self.join_omics_to_mutations(omics_df_name="CNV", mutations_genes=mutations_genes, omics_genes=mutations_genes)


        #drop the database index from ccrcc
        if self.get_cancer_type() == "ccrcc" or self.get_cancer_type() == "brca":
            cc = self.get_CNV()
            drop = ['Database_ID']
            combined = ut.reduce_multiindex(df=combined, levels_to_drop=drop)


        #If there are hotspot mutations, append 'hotspot' to the mutation type so that it's prioritized correctly
        def mark_hotspot_locations(row):
            #iterate through each location in the current row
            mutations = []
            for location in row[mutations_genes+'_Location']:
                if location in mutation_hotspot: #if it's a hotspot mutation
                    #get the position of the location
                    position = row[mutations_genes+'_Location'].index(location)
                    #use that to change the correct mutation
                    mutations.append(row[mutations_genes+"_Mutation"][position] + "_hotspot")
                else:
                    # get the position of the location
                    position = row[mutations_genes+'_Location'].index(location)
                    mutations.append(row[mutations_genes+"_Mutation"][position])
            return mutations

        if mutation_hotspot is not None:
            combined['hotspot'] = combined.apply(mark_hotspot_locations, axis=1)
            combined[mutations_genes+"_Mutation"] = combined['hotspot']
            combined = combined.drop(columns='hotspot')

        # Based on cnv make a new column with mutation type that includes deletions and amplifications
        def add_del_and_amp(row):
            if row[mutations_genes+"_CNV"] <= -.2:
                mutations = row[mutations_genes+"_Mutation"] + ['Deletion']
                locations = row[mutations_genes+'_Location']+['Deletion']

            elif row[mutations_genes+"_CNV"] >= .2:
                mutations = row[mutations_genes+"_Mutation"] + ['Amplification']
                locations = row[mutations_genes+'_Location']+['Amplification']
            else:
                mutations = row[mutations_genes+"_Mutation"]
                locations = row[mutations_genes+"_Location"]

            return mutations, locations


        combined['mutations'], combined['locations'] = zip(*combined.apply(add_del_and_amp, axis=1))


        #now that we have the deletion and amplifications, we need to prioritize the correct mutations.
        def sort(row):
            sortedcol = []
            location = []
            chosen_indices = []
            sample_mutations_list = row['mutations']
            sample_locations_list = row['locations']
            if len(sample_mutations_list) == 1: #if there's only one mutation in the list
                sortedcol.append(sample_mutations_list[0])
                location.append(sample_locations_list[0])

            else:
                for filter_val in mutations_filter: # This will start at the beginning of the filter list, thus filters earlier in the list are prioritized, like we want
                    if filter_val in sample_mutations_list:
                        chosen_indices = [index for index, value in enumerate(sample_mutations_list) if value == filter_val]
                    if len(chosen_indices) > 0: # We found at least one mutation from the filter to prioritize, so we don't need to worry about later values in the filter priority list
                        break

                if len(chosen_indices) == 0: # None of the mutations for the sample were in the filter, so we're going to have to use our default hierarchy
                    for mutation in sample_mutations_list:
                        if mutation in truncations:
                            chosen_indices += [index for index, value in enumerate(sample_mutations_list) if value == mutation]

                if len(chosen_indices) == 0: # None of them were in the filter, nor were truncations, so we'll grab all the missenses
                    for mutation in sample_mutations_list:
                        if mutation in missenses:
                            chosen_indices += [index for index, value in enumerate(sample_mutations_list) if value == mutation]

                if self.get_cancer_type() == "gbm" and len(chosen_indices) == 0: # None of them were in the filter, nor were truncations, nor missenses, so we'll grab all the noncodings
                    for mutation in sample_mutations_list:
                        if mutation in noncodings:
                            chosen_indices += [index for index, value in enumerate(sample_mutations_list) if value == mutation]

                soonest_mutation = sample_mutations_list[chosen_indices[0]]
                soonest_location = sample_locations_list[chosen_indices[0]]
                chosen_indices.clear()
                sortedcol.append(soonest_mutation)
                location.append(soonest_location)

            return pd.Series([sortedcol, location],index=['mutations', 'locations'])

        df = combined.apply(sort, axis=1)
        combined['Mutation'] = df['mutations']
        combined['Location'] = df['locations']

        #get a sample_status column that says if the gene has multiple mutations (including dletion and amplification)
        def sample_status(row):
            if len(row['mutations']) > 1: #if there's more than one mutation
                if len(row['mutations']) == 2 and "Wildtype_Tumor" in row['mutations']: #one of the mutations might be a "wildtype tumor"
                    status ="Single_mutation"

                elif len(row['mutations']) == 2 and "Wildtype_Normal" in row['mutations']:
                    status ="Single_mutation"

                else:
                    status = "Multiple_mutation"
            else:
                if row["mutations"] == ["Wildtype_Normal"]:
                    status = "Wildtype_Normal"
                elif row['mutations'] == ['Wildtype_Tumor']:
                    status = "Wildtype_Tumor"
                else:
                    status = "Single_mutation"

            return status
        combined['Mutation_Status'] = combined.apply(sample_status, axis=1)

        #drop all the unnecessary Columns
        df = combined.drop(columns=[mutations_genes+"_CNV", mutations_genes+"_Mutation", mutations_genes+"_Location", mutations_genes+"_Mutation_Status", 'Sample_Status', 'mutations','locations'])
        df['Mutation'] = [','.join(map(str, l)) for l in df['Mutation']]
        df['Location'] = [','.join(map(str, l)) for l in df['Location']]
        if show_location == False: df = df.drop(columns="Location") #if they don't want us to show the location, drop it
        return df


In [2]:
def print_test_result(PASS):
    """Prints the result of a test, based on a bool.
    Parameters:
    PASS (bool): Whether or not the test passed.
    """
    if PASS:
        print('\tPASS')
    else:
        print('\tFAIL\n')

def check_returned_is_df(returned):
    """Checks that an object is a dataframe. Prints a specific message if it's actually None, or a general message if it's something else.
    Parameters:
    returned: The object to test
    Returns:
    bool: Indicates whether the object was a dataframe.
    """
    if returned is None:
        print("Function under test returned None.")
        return False
    
    if not isinstance(returned, pd.core.frame.DataFrame):
        print("Returned object was not a dataframe. Type of object: {}".format(type(returned)))
        return False
    return True

def check_df_shape(df, exp_shape):
    """Checks that a dataframe has the proper shape.
    Parameters:
    df (pandas.core.frame.DataFrame): The dataframe to test.
    exp_shape (tuple): A tuple with two elements. First element is expected number of rows, second is expected number of columns.
    Returns:
    bool: Indicates whether the dataframe had the proper shape.
    """
    act_shape = df.shape
    if exp_shape != act_shape:
        print("Dataframe dimensions did not match expected values.\n\tExpected: {}\n\tActual: {}\n".format(exp_shape, act_shape))
        return False
    return True

def check_getter(df, exp_dim, exp_headers, coordinates, values): 
    """Test a dataframe's dimensions and headers, and three test values, then print whether it passed the test.
    Parameters
    df: the dataframe gotten by the getter we are testing
    exp_dim: a tuple containing the expected dimensions of the dataframe, in the format (rows, columns)
    exp_headers: if the dataframe has up to 20 columns, all of the headers for the dataframe, in order. If it has more than 20 columns, then a list containing the first ten and last ten headers, in order.
    coordinates: a tuple with three elements, each element being a tuple with two elements, the first element being the int index of the row of a test value, and the second element being the int index of the column of a test value
    values: a tuple with three elements, each element being the expected value of the test value corresponding to the coordinates at the same index in the coordinates parameter 
    Returns
    bool indicating if the dataframe had the correct data.
    """
    PASS = True

    # Check that df is a dataframe, not None or something else.
    if not check_returned_is_df(df):
        return False # End test, because other tests will be useless.

    # Check dimensions
    if not check_df_shape(df, exp_dim):
        PASS = False

    # Check headers
    act_headers_all = list(df.columns.values)
    if len(df.columns.values) <= 20:
        act_headers = act_headers_all
    else:
        act_headers = act_headers_all[:10] + act_headers_all[-10:]

    if len(exp_headers) != len(act_headers):
        print("Unexpected number of test headers in dataframe. Expected number of headers: {}. You passed {} headers.\n".format(len(act_headers), len(exp_headers)))
        PASS = False
    else:
        for i, header in enumerate(exp_headers):
            if header != act_headers[i]:
                print("Dataframe header did not match expected value.\n\tExpected: {}\n\tActual: {}\n".format(header, act_headers[i]))
                PASS = False

    # Check test values
    act_values = [
        df.iloc[coordinates[0][0], coordinates[0][1]],
        df.iloc[coordinates[1][0], coordinates[1][1]],
        df.iloc[coordinates[2][0], coordinates[2][1]]]

    for i, value in enumerate(values):
        if act_values[i] != value:
            print("Dataframe value did not match expected value.\n\tColumn: {}\n\tIndex: {}\n\tExpected: {}\n\tActual: {}\n".format(df.columns.values[coordinates[i][1]], df.index.values[coordinates[i][0]], value, act_values[i]))
            PASS = False

    # Return whether the dataframe passed the test
    return PASS

# Check Vals

In [50]:
ca = k
mut = get_genotype_all_vars(ca, 'KRAS')
mut.Mutation.value_counts()



No_Mutation      97
Amplification    13
Name: Mutation, dtype: int64

In [51]:
m = ca.get_somatic_mutation()
mg = m.loc[m['Gene'] == 'KRAS']
mg

Name,Gene,Mutation,Location
Patient_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [55]:
cnv = ca.get_CNV()

#drop the database index from ccrcc and brca
if isinstance(cnv.keys(), pd.core.indexes.multi.MultiIndex): ##
    drop = ['Database_ID']
    cnv = ut.reduce_multiindex(df=cnv, levels_to_drop=drop)
    
gene = cnv[['KRAS']]
print('total:', len(gene.index))

dell = gene.loc[gene['KRAS'] < -0.2]
amp = gene.loc[gene['KRAS'] > 0.2]
print('DEL:', len(dell))
print('AMP:', len(amp))

np.intersect1d(list(dell.index), list(amp.index))

total: 110
DEL: 0
AMP: 13




array([], dtype='<U32')

In [56]:
clin = ca.get_clinical(tissue_type = 'tumor')
len(clin.index)

110

# CCRCC

In [3]:
def test_genotype_ccrcc_KRAS():
    
    # test when there is no data in the somatic mutations df
    print('Running get_genotype_all_vars...')
    df = k.get_genotype_all_vars('KRAS')
    
    dimensions = (110, 2)
    headers = ['KRAS', 'Mutation']
    
    # get index (int) of patient_ID
    index_1 = df.index.get_loc('C3L-00010') # Test No_Mutation
    index_2 = df.index.get_loc('C3L-01560')
    index_3 = df.index.get_loc('C3N-00646')
    index_4 = df.index.get_loc('C3L-00800') # No del vals (test more No_Mutation) 
    index_5 = df.index.get_loc('C3L-01281')
    index_6 = df.index.get_loc('C3N-00154') 
    index_7 = df.index.get_loc('C3N-00492') # Test Amp
    index_8 = df.index.get_loc('C3L-01287')
    index_9 = df.index.get_loc('C3N-00852')

    # Test No_Mutation
    test_coord_1 = ((index_1, 1), (index_2, 1), (index_3, 1)) # C3N-01515
    test_vals_1 = ('No_Mutation', 'No_Mutation', 'No_Mutation') 
    test_coord_2 = ((index_4, 1),(index_5, 1),(index_6, 1))
    test_vals_2 = ('No_Mutation', 'No_Mutation', 'No_Mutation')
    # Test Amp 
    test_coord_3 = ((index_7, 1), (index_8, 1), (index_9, 1))
    test_vals_3 = ('Amplification', 'Amplification', 'Amplification') 

    test_coord_vals = [(test_coord_1, test_vals_1), (test_coord_2, test_vals_2), 
                       (test_coord_3, test_vals_3)]

    for coord, vals in test_coord_vals:
        PASS = check_getter(df, dimensions, headers, coord, vals)
    
    print_test_result(PASS)

# GBM

In [7]:
def test_genotype_gbm_KRAS():
    
    # test when there is no data in the somatic mutations df
    print('Running get_genotype_all_vars...')
    df = g.get_genotype_all_vars('KRAS')
    
    dimensions = (98, 2)
    headers = ['KRAS', 'Mutation']
    
    # get index (int) of patient_ID
    index_1 = df.index.get_loc('C3N-03473') # Test No_Mutation
    index_2 = df.index.get_loc('C3N-03183')
    index_3 = df.index.get_loc('C3N-01515')
    index_4 = df.index.get_loc('C3L-01049') # Test Del (only 2)
    index_5 = df.index.get_loc('C3L-02708')
    index_6 = df.index.get_loc('C3N-02256') 
    index_7 = df.index.get_loc('C3N-01816') # Test Amp
    index_8 = df.index.get_loc('C3N-02769')
    index_9 = df.index.get_loc('C3N-02784')

    # Test No_Mutation
    test_coord_1 = ((index_1, 1), (index_2, 1), (index_3, 1)) # C3N-01515
    test_vals_1 = ('No_Mutation', 'No_Mutation', 'No_Mutation') 
    
    # Test Del (only 2 del)
    test_coord_2 = ((index_4, 1),(index_5, 1),(index_6, 1))
    test_vals_2 = ('Deletion', 'Deletion', 'No_Mutation')
    # Test Amp 
    test_coord_3 = ((index_7, 1), (index_8, 1), (index_9, 1))
    test_vals_3 = ('Amplification', 'Amplification', 'Amplification') 

    test_coord_vals = [(test_coord_1, test_vals_1), (test_coord_2, test_vals_2), 
                       (test_coord_3, test_vals_3)]

    for coord, vals in test_coord_vals:
        PASS = check_getter(df, dimensions, headers, coord, vals)
    
    print_test_result(PASS)

# HNSCC

In [8]:
def test_genotype_hnscc_KRAS():
    
    # test when there is no data in the somatic mutations df
    print('Running get_genotype_all_vars...')
    df = h.get_genotype_all_vars('KRAS')
    
    dimensions = (109, 2)
    headers = ['KRAS', 'Mutation']
    
    # get index (int) of patient_ID
    index_1 = df.index.get_loc('C3L-00999') # Test No_Mutation
    index_2 = df.index.get_loc('C3N-01946')
    index_3 = df.index.get_loc('C3N-03487')
    index_4 = df.index.get_loc('C3N-01337') # Test Del 
    index_5 = df.index.get_loc('C3N-03012')
    index_6 = df.index.get_loc('C3N-03785') 
    index_7 = df.index.get_loc('C3L-04844') # Test Amp
    index_8 = df.index.get_loc('C3L-00987')
    index_9 = df.index.get_loc('C3N-03488')

    # Test No_Mutation
    test_coord_1 = ((index_1, 1), (index_2, 1), (index_3, 1)) # C3N-01515
    test_vals_1 = ('No_Mutation', 'No_Mutation', 'No_Mutation') 
    
    # Test Del 
    test_coord_2 = ((index_4, 1),(index_5, 1),(index_6, 1))
    test_vals_2 = ('Deletion', 'Deletion', 'Deletion')
    # Test Amp 
    test_coord_3 = ((index_7, 1), (index_8, 1), (index_9, 1))
    test_vals_3 = ('Amplification', 'Amplification', 'Amplification') 

    test_coord_vals = [(test_coord_1, test_vals_1), (test_coord_2, test_vals_2), 
                       (test_coord_3, test_vals_3)]

    for coord, vals in test_coord_vals:
        PASS = check_getter(df, dimensions, headers, coord, vals)
    
    print_test_result(PASS)

# All tests

In [5]:
g = cptac.Gbm()
h = cptac.Hnscc()
k = cptac.Ccrcc()

Checking that hnscc index is up-to-date...



                                          

In [9]:
print("\nTesting get_genotype_all_vars...")
test_genotype_ccrcc_KRAS()
test_genotype_gbm_KRAS()
test_genotype_hnscc_KRAS()


Testing get_genotype_all_vars...
Running get_genotype_all_vars...
	PASS
Running get_genotype_all_vars...
	PASS
Running get_genotype_all_vars...
	PASS


