In [62]:
import pandas as pd
import numpy as np
import re
import string
import warnings
from sklearn.preprocessing import MultiLabelBinarizer

warnings.filterwarnings('ignore')


In [63]:
def read_csv(file):
    df_origin = pd.read_csv(file)

    df = df_origin.replace(['Benign/Likely_benign', 'Pathogenic/Likely_pathogenic'], ['Likely_benign', 'Likely_pathogenic']) 

    df.drop(df[(df['CLNSIG'] != 'Benign') & (df['CLNSIG'] != 'Pathogenic') & (df['CLNSIG'] != 'Likely_benign') & (df['CLNSIG'] != 'Likely_pathogenic')].index, inplace=True)

    df.reset_index(inplace=True)
    df.drop(['index', 'Unnamed: 0', 'Unnamed: 0.1'], axis=1, inplace=True)

    return df

df = read_csv('../../Data/train/data_z.csv')

In [64]:
def multi_hot_consequence(df):
    con_split = df['Consequence'].apply(lambda x: x if pd.isna(x) else x.split('&'))
    mlb = MultiLabelBinarizer()
    con_df = pd.DataFrame(mlb.fit_transform(con_split), columns=mlb.classes_)

    return con_df

con_df = multi_hot_consequence(df)

In [65]:
integenic = ['intergenic_variant', 'upstream_gene_variant', 'downstream_gene_variant']
# 1 if(set(integenic) & set(con_df.columns)) else 0



In [66]:
def region_div(con_df):
    # 基因间区：（这个对整体致病性影响较小）
    integenic = ['intergenic_variant', 'upstream_gene_variant', 'downstream_gene_variant']
    con_df['integenic'] = con_df[list(set(integenic) & set(con_df.columns))].sum(axis=1).apply(lambda x: 1 if x else 0)

    # UTR区域：
    utr = ['5_prime_UTR_variant', '5_prime_UTR_variant']
    con_df['utr'] = con_df[list(set(utr) & set(con_df.columns))].sum(axis=1).apply(lambda x: 1 if x else 0)

    # 内含子区域：
    intron = ['intron_variant']

    # 外显子区域：
    exon = ['synonymous_variant', 'missense_variant', 'inframe_insertion', 'inframe_deletion', 'stop_gained', 'frameshift_variant', 'coding_sequence_variant', 'stop_lost', 'stop_retained_variant', 'start_lost', 'stop_retained_variant', 'incomplete_terminal_codon_variant']
    con_df['exon'] = con_df[list(set(exon) & set(con_df.columns))].sum(axis=1).apply(lambda x: 1 if x else 0)

    # 在UTR,内含子和外显子的交界处，存在splice区域
    splice = ['splice_acceptor_variant', 'splice_donor_variant', 'splice_region_variant']
    con_df['splice'] = con_df[list(set(splice) & set(con_df.columns))].sum(axis=1).apply(lambda x: 1 if x else 0)

region_div(con_df)

In [67]:
def merge_df(df, con_df):
    df_copy = df.merge(con_df, left_on=df.index, right_on=con_df.index)
    df_copy.drop('key_0', axis=1, inplace=True)

    return df_copy

df_copy = merge_df(df, con_df)

In [70]:
def get_EXON(df):
    df_EXON = df[df['EXON'].notna()]
    return df_EXON

df_EXON = get_EXON(df_copy)

In [104]:
# df_EXON['FATHMM_pred'].value_counts()
a = 'T&T&T&T&.&T&T&T&D'
a.split('&').count('T')

7

In [69]:
def exon_format(df_EXON):
    # df_EXON['INTRON'] = df_EXON['INTRON'].apply(lambda x: 1 if pd.notna(x) else 0)

    def del_brackets(x):
        res = re.sub(u'\\(.*?\\)', '', x)
        return res

    df_EXON['SIFT'] = df_EXON['SIFT'].apply(lambda x: x if pd.isna(x) else del_brackets(x))

    df_EXON['SIFT_pred'] = df_EXON['SIFT_pred'].apply(lambda x: x if pd.isna(x) else max(string.ascii_uppercase, key=x.count))

    df_EXON['PolyPhen'] = df_EXON['PolyPhen'].apply(lambda x: x if pd.isna(x) else del_brackets(x))

    # def percent(x, D, T):
        
    #     sum = 1 if x.count(D) + x.count(T) == 0 else x.count(D) + x.count(T)
    #     return count1 / sum
        
    df_EXON['FATHMM_pred'] = df_EXON['FATHMM_pred'].apply(lambda x: x if pd.isna(x) else percent(x, 'D', 'T'))

    df_EXON['MutationTaster_pred'] = df_EXON['MutationTaster_pred'].apply(lambda x: x if pd.isna(x) else max(string.ascii_uppercase, key=x.count))
                                                                                
    df_EXON['PROVEAN_pred'] = df['PROVEAN_pred'].apply(lambda x: x if pd.isna(x) else percent(x, 'D', 'N'))

    df_EXON['Polyphen2_HDIV_pred'] = df_EXON['Polyphen2_HDIV_pred'].apply(lambda x: x if pd.isna(x) else max(string.ascii_uppercase, key=x.count))

    df_EXON['Polyphen2_HVAR_pred'] = df_EXON['Polyphen2_HVAR_pred'].apply(lambda x: x if pd.isna(x) else max(string.ascii_uppercase, key=x.count))

    def vest3_avr(x):
        ss = x.split('&')
        sum = 0
        index = 0
        for i in ss:
            sum += float(i)
            index += 1
        return sum / index

    df_EXON['VEST3_score'] = df_EXON['VEST3_score'].apply(lambda x: x if pd.isna(x) else vest3_avr(x))

    # df_INTRON['']
    # ada_score
    # df_INTRON['ada_score'] = df_INTRON['ada_score'].fillna(df_INTRON.groupby('Consequence')['ada_score'].transform('mean'))

    # rf_score
    # df_INTRON['rf_score'] = df_INTRON['rf_score'].fillna(df_INTRON.groupby('Consequence')['rf_score'].transform('mean'))

    df_EXON['gnomadWES_AF_POPMAX'] = df_EXON['gnomadWES_AF_POPMAX'].apply(lambda x: 0 if (pd.isna(x)) or (x == '.') else float(x))
    df_EXON['AF'] = df_EXON.apply(lambda x: np.mean([x['MAX_AF'], float(x['gnomadWES_AF_POPMAX'])]), axis=1)

# exon_format(df_EXON)
# print(type(format(df_EXON)))