In [4]:
import sqlite3 as sql
import pandas  as pd
import numpy as np
import sys
import xgboost as xgb
from sklearn.ensemble import AdaBoostRegressor
from sklearn import pipeline
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import learning_curve, GridSearchCV
from numpy import linalg as LA
import cPickle

In [5]:
def inverse_mutation(line):
    sequence,current,position,mutation,ddG,PH,Temp = line
    current,mutation = mutation,current
    sequence = sequence[:int(position)]+current+sequence[int(position)+1:]
    ddG = str(-float(ddG))
    return [sequence,current,position,mutation,ddG,PH,Temp]
def read_data(file_name):
    conn = sql.connect(file_name)
    cursor = conn.cursor()
    types = [str,str,int,str,float,float,float]
    names = ["sequence","current","position","mutation","ddG","PH","Temp"]
    results = {names[i]:[] for i in range(len(names))}
    for row in cursor.execute("SELECT * FROM dataset"):
        row = list(row)
        row[2] = str(int(row[2])-1)
        inversed = inverse_mutation(row)
        for i in range(len(names)):
            results[names[i]].append(types[i](row[i]))
            #results[names[i]].append(types[i](inversed[i]))
    result = pd.DataFrame(results)
    conn.close()
    return result

In [6]:
def over_sampling(data_set):
    number_plus = len(data_set[data_set.ddG > 0])
    number_minus = len(data_set[data_set.ddG < 0])
    multiply = 0
    plus = 0
    new_number_plus = number_plus
    new_number_minus = number_minus
    while new_number_plus+number_plus < new_number_minus:
        new_number_plus += number_plus
        multiply += 1
    plus = new_number_minus - new_number_plus
    #need to do
    

In [7]:
def neighbour(sequence,position,shift):
    if (position + shift > len(sequence)-1 or position + shift < 0):
        return "0"
    else:
        return sequence[position+shift]

In [8]:
def test_true_false(label, true_label):
    count_true = 0
    count_all = 0
    for i in range(len(label)):
        count_all += 1
        if (np.sign(true_label[i]) == np.sign(label[i])):
            count_true += 1
    return float(count_true)/count_all

In [9]:
def test_symmetry(estimator,test_data,test_labels):
    columns = map(str,test_data.columns)
    inversed_test_data = test_data.copy()
    inversed_test_data.current,inversed_test_data.mutation = inversed_test_data.mutation.apply(lambda x: x),inversed_test_data.current.apply(lambda x: x)
    inversed_labels = estimator.predict(inversed_test_data)
    test_labels = np.array(test_labels)
    inversed_labels = np.array(inversed_labels)
    vector = test_labels+inversed_labels
    return LA.norm(vector)/len(vector)

In [42]:
def calculate():
    #read dataset
    raw_data = read_data("iwdb.sqlite")
    #edit new features
    number_neighbours = 3
    for i in range(1,number_neighbours+1):
        raw_data['Left'+str(i)] = raw_data.apply(lambda row: neighbour(row['sequence'],row['position'],-(i)),axis=1)
        raw_data['Right'+str(i)] = raw_data.apply(lambda row: neighbour(row['sequence'],row['position'],(i)),axis=1)
    #shuffle data
    raw_data = raw_data.sample(frac=1).reset_index(drop=True)
    #change symbols in features to int values
    inputting = 'A B C D E F G H I J K L M N O P Q R S T U V W X Y Z 0'
    symbols = inputting.split(' ')
    numerics = [i for i in range(len(symbols))]
    categorical = ['current','mutation']+['Left'+str(i) for i in range(1,number_neighbours+1)]+['Right'+str(i) for i in range(1,number_neighbours+1)]
    for item in categorical:
        raw_data[item] = raw_data[item].apply(lambda x: numerics[symbols.index(x)])
    #make train and test data
    train_size = 0.8
    size = int(len(raw_data)*train_size)
    train_data = raw_data.iloc[:size]
    test_data = raw_data.iloc[size:]
    train_labels = train_data['ddG'].values
    train_data = train_data.drop(['ddG','sequence'],axis=1)
    test_labels = test_data['ddG'].values
    test_data = test_data.drop(['ddG','sequence'],axis=1)
    #make foundation for pipeline
    binary_data_columns = ['holiday', 'workingday']
    binary_data_indices = np.array([(column in binary_data_columns) for column in train_data.columns], dtype = bool)
    categorical_data_indices = np.array([(column in categorical) for column in train_data.columns], dtype = bool)
    numeric_data_columns = ['Temp', 'PH', 'position']
    numeric_data_indices = np.array([(column in numeric_data_columns) for column in train_data.columns], dtype = bool)
    #creat regressor
    dtrain = xgb.DMatrix(train_data.values,train_labels)
    dtest = xgb.DMatrix(test_data.values)
    param = {'max_depth':2, 'eta':1, 'objective':'binary:logistic' }
    num_round = 2
    regr = RandomForestRegressor(random_state = 0, max_depth = 20, n_estimators = 100)
    #regr = AdaBoostRegressor(random_state=0, n_estimators=100)
    estimator = pipeline.Pipeline(steps = [       
        ('feature_processing', pipeline.FeatureUnion(transformer_list = [        
                #binary
                ('binary_variables_processing', preprocessing.FunctionTransformer(lambda data: data[:, binary_data_indices])), 
                    
                #numeric
                ('numeric_variables_processing', pipeline.Pipeline(steps = [
                     ('selecting', preprocessing.FunctionTransformer(lambda data: data[:, numeric_data_indices]))
                            ])),
        
                #categorical
                ('categorical_variables_processing', pipeline.Pipeline(steps = [
                    ('selecting', preprocessing.FunctionTransformer(lambda data: data[:, categorical_data_indices]))            
                            ])),
            ])),
        ('model_fitting', regr)
        ]
    )
    bst = xgb.train(param, dtrain, num_round)
    # make prediction
    predictions = bst.predict(dtest)
    #estimator.fit(train_data,train_labels)
    #metrics.mean_absolute_error(test_labels, estimator.predict(test_data))
    #test_true_false(estimator.predict(test_data),test_labels)
    #test_symmetry(test_data,test_labels)
    #predictions = estimator.predict(test_data)
    #with open('my_dumped_classifier.pkl', 'wb') as fid:
        #cPickle.dump(regr, fid)    
    #print(metrics.mean_squared_error(test_labels,predictions),test_true_false(predictions,test_labels),test_symmetry(estimator,test_data,test_labels))
    print(metrics.mean_squared_error(test_labels,predictions),test_true_false(predictions,test_labels),test_symmetry(bst,test_data,test_labels))
    #test_load(test_data,test_labels,binary_data_indices,numeric_data_indices,categorical_data_indices)

In [40]:
calculate()

(1.4095932727289064, 0.8148984198645598, 0.1208164103383151)
(1.4095932727289064, 0.8148984198645598, 0.1208164103383151)


In [34]:
# load it again
def test_load(test_data,test_labels,binary_data_indices,numeric_data_indices,categorical_data_indices):
    with open('my_dumped_classifier.pkl', 'rb') as fid:
        gnb_loaded = cPickle.load(fid)
    estimator = pipeline.Pipeline(steps = [       
            ('feature_processing', pipeline.FeatureUnion(transformer_list = [        
                    #binary
                    ('binary_variables_processing', preprocessing.FunctionTransformer(lambda data: data[:, binary_data_indices])), 
                    
                    #numeric
                    ('numeric_variables_processing', pipeline.Pipeline(steps = [
                         ('selecting', preprocessing.FunctionTransformer(lambda data: data[:, numeric_data_indices]))
                                ])),
        
                    #categorical
                    ('categorical_variables_processing', pipeline.Pipeline(steps = [
                        ('selecting', preprocessing.FunctionTransformer(lambda data: data[:, categorical_data_indices]))            
                                ])),
                ])),
            ('model_fitting', gnb_loaded)
            ]
        )
    predictions = estimator.predict(test_data)
    print(metrics.mean_squared_error(test_labels,predictions),test_true_false(predictions,test_labels),test_symmetry(estimator,test_data,test_labels))