In [3]:
from pandas_plink import read_plink
import numpy as np
import pandas as pd
from collections import OrderedDict as odict
from math import isnan
from scipy.stats import chi2_contingency
import timeit
import multiprocessing as mp
import os
import psutil
import pickle
from aco import pre_processing, post_processing
import matplotlib.pyplot as plt
%matplotlib inline


In [4]:
class aco_tabu:
    def __init__(self, bim, fam, bed, cases_i, controls_i, nbant, nbt, evaporation_rate, init_val, total_fitness_evals, numgen):
        
        #Coppying the bim, bed and fam files
        self.bim = bim
        self.bed = bed
        self.fam = fam
        self.cases_i = cases_i
        self.controls_i = controls_i
        
        
        #Calculating the number of individuals and snps in the dataset
        self.n_individuals = self.fam.shape[0]
        self.n_snps = self.bim.shape[0]
        
        #Calculating the number of cases and controls
        self.n_cases = len(self.cases_i)
        self.n_controls = len(self.controls_i)
        
        #ACO setup
        self.nbant = nbant
        self.nbt = nbt
        self.init_val = init_val
        self.numgen = numgen
        self.pheromone_mat = np.zeros(self.n_snps)
        self.evaporation_rate = evaporation_rate
        self.total_fitness_evals = total_fitness_evals
        self.n_gen = total_fitness_evals/nbant
        self.best_p = 1
        self.best_stat = 0
        self.best_snp = []
        self.best_fitness = []
        self.tabu_list = []
        self.best_combination = []
        
    
    def init_pheromone(self, init_val):
        self.pheromone_mat.fill(init_val)
    
    
    def tabu_random(self):
        i_snps = range(self.n_snps)
        i_snps = np.delete(i_snps, tabu_list)
        return np.random.choice(i_snps, size = nbt) 
    
    def tournament_choice(self):
        tournament = self.tabu_random()
        biggest = 0
        biggest_i = 0
        for i in tournament:
            if(self.pheromone_mat[i] > biggest):
                biggest = self.pheromone_mat[i]
                biggest_i = i
        return biggest_i
                
            
    def chi_sq_omnibus(self, snp1, snp2):
        snp1_mat = self.bed[snp1].compute()
        snp2_mat = self.bed[snp2].compute()
        
        t_cases = np.zeros((4,4))
        t_controls = np.zeros((4,4))
        table_i = []
        table_j = []
        
        for i in self.cases_i:
            if(isnan(snp1_mat[i])):
                snp1_mat[i] = 3
                
            if(isnan(snp2_mat[i])):
                snp2_mat[i] = 3  
                
            t_cases[int(snp1_mat[i])][int(snp2_mat[i])] += 1
            
            
        for i in self.controls_i:
            
            if(isnan(snp1_mat[i])):
                snp1_mat[i] = 3
                
            if(isnan(snp2_mat[i])):
                snp2_mat[i] = 3
                
            t_controls[int(snp1_mat[i])][int(snp2_mat[i])] += 1
            
        
        table_i = t_cases[:3,:3]
        table_i = table_i.flatten()
        non_zeros = np.nonzero(table_i)
        table_i = table_i[non_zeros]
        
        table_j = t_controls[:3,:3]
        table_j = table_j.flatten()
        table_j = table_j[non_zeros]
        
        table = [table_i, table_j]
        stat, p, dof, expected = chi2_contingency(table)
        
        return stat, p 
    
    def update_pheromone(self, snp1, snp2):
        chi_sq_val, p_val = self.chi_sq_omnibus(snp1,snp2)
        if(p_val < self.best_p):
            self.best_stat = chi_sq_val
            self.best_p = p_val
            self.best_snp = [snp1,snp2]
        self.pheromone_mat[int(snp1)] += chi_sq_val
        self.pheromone_mat[int(snp2)] += chi_sq_val
        
        
    def evaporate_pheromone(self):
        self.pheromone_mat *= self.evaporation_rate
        
        
    def ant(self):
        snp1 = self.tournament_choice()
        snp2 = self.tournament_choice()
        self.update_pheromone(snp1,snp2)
        
    def tabu(self, snp):
        fitness = []
        for i in range(self.n_snps):
            stat,p = self.chi_sq_omnibus(snp, i)
            fitness.append(stat)
        return np.argmin(fitness)
        
    def run(self):
        self.init_pheromone(self.init_val)
        gen_stop = self.numgen
        for i in range(self.n_gen):
            if(self.n_gen == gen_stop):
                best_snp = np.argmax(self.pheromone_mat)
                tabu_list.append(best_snp)
                best_combination.append((best_snp, self.tabu(best_snp)))
                gen_stop += self.numgen 
            for j in range(nbant):
                self.ant()
             
            self.best_fitness.append(np.amax(self.pheromone_mat))
            self.evaporate_pheromone()  
        
        return self.pheromone_mat,self.best_combination, self.tabu_list 

In [5]:
plink_fn = '/Users/raouldias/Desktop/Extend/extend_csp_data_annon'
pheno_fn = '/Users/raouldias/Desktop/Extend/extend_phenotype.txt'
fn = '/Users/raouldias/Desktop/Extend/GSA-rs34872471Tabu'
bim, fam, bed, cases_i, controls_i= pre_processing(plink_fn, pheno_fn).run()
post = post_processing(bim,fam)
val = post.load_file(fn)

TypeError: __init__() missing 5 required positional arguments: 'nbant', 'nbt', 'evaporation_rate', 'init_val', and 'total_fitness_evals'

In [None]:
plink_fn = '/Users/raouldias/Desktop/Extend/extend_csp_data_annon'
pheno_fn = '/Users/raouldias/Desktop/Extend/extend_phenotype.txt'
nbant = 200
nbt = 1000
evaporation_rate = 0.99
init_val = 1
total_fitness_evals = 1000000
numgen = 1000
bim, fam, bed, cases_i, controls_i= pre_processing(plink_fn, pheno_fn).run()
aco = aco_tabu(bim, fam, bed, cases_i, controls_i, nbant, nbt, evaporation_rate, init_val, total_fitness_evals, numgen)