In [16]:
def read_fasta_file(filename):
    """
    Reads the given FASTA file f and returns a dictionary of sequences.

    Lines starting with ';' in the FASTA file are ignored.
    """
    sequences_lines = {}
    current_sequence_lines = None
    with open(filename) as fp:
        for line in fp:
            line = line.strip()
            if line.startswith(';') or not line:
                continue
            if line.startswith('>'):
                sequence_name = line.lstrip('>')
                current_sequence_lines = []
                sequences_lines[sequence_name] = current_sequence_lines
            else:
                if current_sequence_lines is not None:
                    current_sequence_lines.append(line)
    sequences = {}
    for name, lines in sequences_lines.items():
        sequences[name] = ''.join(lines)
    return sequences

def create_fasta_dict(n):
    d = {'genome' : [], 'annotation' : [], 'genome_name' : []}
    for i in n:
        d['genome_name'].append('genome' + str(i) + '.fa')
        d['genome'].append(read_fasta_file('genome' + str(i) + '.fa')['genome' + str(i)])
        if i < 6:
            d['annotation'].append(read_fasta_file('annotation'+ str(i) + '.fa')['annotation' + str(i)])
        else:
            d['annotation'].append(None)
    return d

In [18]:
train_set = create_fasta_dict([1,2,3,4,5])
test_set = create_fasta_dict([6,7,8,9,10])


In [32]:
import numpy as np
def viterbi(transition, emission, pi, hidden, sequence, observables):
    #init viterbi table
    viterbi_table = np.zeros((len(hidden), len(sequence)))
    viterbi_table = viterbi_table.astype(np.float)
    #init result list Z
    Z = [0] * len(sequence)
    
    for i in range(len(hidden)):
        viterbi_table[i,0] = np.log(pi[i]) + np.log(emission[i, observables[sequence[0]]])
    for n in range(1,len(sequence)):
        for k in range(len(hidden)):
            viterbi_table[k,n] = float("-inf")
            if emission[k, observables[sequence[n]]] != float(0):
                for j in range(len(hidden)):
                    if transition[j,k] != float(0):
                        viterbi_table[k,n] = max(viterbi_table[k,n], np.log(emission[k, observables[sequence[n]]]) + viterbi_table[j, n - 1] + np.log(transition[j,k]))
    Z[-1] = np.argmax(viterbi_table[:,-1])
    for n in reversed(range(len(sequence)-1)):
        state = float("-inf")
        for k in range(len(hidden)):
            z = np.log(emission[Z[n+1], observables[sequence[n+1]]]) + viterbi_table[k,n] + np.log(transition[k, Z[n+1]])
            if z > state:
                state = z
                index = k
        Z[n] = index
    return Z

def train_by_counting_2_state(dictionary):
	observables = ['A', 'G', 'C', 'T']
	emissions  = dict(zip(observables,range(len(observables))))
	seqnames = dictionary['genome_name']
	seqs = dictionary['genome']
	hiddenseqs = dictionary['annotation']
	hiddenstates = {'N':0, 'C':1, 'R':1}

	emissionMatrix = np.zeros((2, len(observables)))
	for i in range(len(seqs)):
		for j, char in enumerate(seqs[i]):
			emissionMatrix[hiddenstates[hiddenseqs[i][j]], emissions[char]] += 1
	for i in range(2):
		rowsum = emissionMatrix[i,:].sum()
		for j in range(len(observables)):
			emissionMatrix[i,j] = emissionMatrix[i,j] / rowsum

	transitionMatrix = np.zeros((2,2))
	pi = [0.0] * 2

	for i in range(len(hiddenseqs)):
		for j, state in enumerate(hiddenseqs[i]):
			if j < 1:
				pi[hiddenstates[state]] += 1
			else:
				transitionMatrix[hiddenstates[hiddenseqs[i][j-1]], hiddenstates[state]] += 1
	pisum = sum(pi)
	for i in range(2):
		rowsum = transitionMatrix[i,:].sum()

		pi[i] = pi[i] / pisum

		for j in range(2):
			transitionMatrix[i,j] = transitionMatrix[i,j] / rowsum
	return pi, transitionMatrix, emissionMatrix, hiddenstates


In [40]:
trained_model = train_by_counting_2_state(dictionary = train_set)
pi = trained_model[0]
transition = trained_model[1]
emission = trained_model[2]
hiddenstates = {'N' : 0, 'C' : 1}
observables = {'A':0, 'G':1, 'C':2, 'T':3}

In [36]:
def tenfold_cross_validation(datafolder, states = 3, decoding = "viterbi"):
	observables = ['A', 'C', 'E', 'D', 'G', 'F', 'I', 'H', 'K', 'M', 'L', 'N', 'Q', 'P', 'S', 'R', 'T', 'W', 'V', 'Y']
	emissions  = dict(zip(observables,range(len(observables))))
	results = []
	names = []
	sequences = []
	files = []
	transitionMatrix = []
	pi = []
	filelist = os.listdir(datafolder)
	for i in range(len(filelist)):
		trainingdata = filelist[:]
		validationdata = trainingdata.pop(i)

		trainingset = [[], [], []]
		for filename in trainingdata:
			data = read_sequence(datafolder + "/" + filename)
			trainingset[0] = trainingset[0] + data[0]
			trainingset[1] = trainingset[1] + data[1]
			trainingset[2] = trainingset[2] + data[2]
		if states == 3:
			model = train_by_counting_3_state(trainingset)
		if states == 4:
			model = train_by_counting_4_state(trainingset)
		if states == 24:
			model = train_by_counting_24_states(trainingset)
		hidden = model[3]
		transitionMatrix.append(model[1])
		pi.append(model[0])
		newhidden = {v: k for k, v in hidden.iteritems()}
		validationset = read_sequence(datafolder + "/" + validationdata)
		predictions = []
		names.append(validationset[0])
		for valseq in validationset[1]:
			if decoding == 'viterbi':
				predictions.append(viterbi.viterbi(transition = model[1], emission = model[2], pi = model[0], hidden = hidden, sequence = valseq, observables = emissions))
			if decoding == 'posterior':
				predictions.append(posterior.posterior(transition = model[1], emission = model[2], pi = model[0], hidden = hidden, sequence = valseq, observables = emissions))
		for r in range(len(predictions)):
			predictions[r] = [newhidden[x] for x in predictions[r]]
			if states == 4:
				for x in range(len(predictions[r])):
					if predictions[r][x] == 'N':
						predictions[r][x] = 'M'
			if states == 24:
				for x in range(len(predictions[r])):
					if predictions[r][x] != 'i':
						if predictions[r][x] != 'o':
							predictions[r][x] = 'M'
			predictions[r] = "".join(predictions[r])
		results.append(predictions)
		sequences.append(validationset[1])
		files.append(validationdata)
	return files, names, results, sequences, transitionMatrix, pi


[[ 0.33434315  0.16612728  0.16479453  0.33473505]
 [ 0.32083579  0.17959771  0.17934359  0.32022291]]


In [43]:
test_model = viterbi(transition, emission, pi, hiddenstates, test_set['genome'][0], observables = observables)
for i in range(len(test_model)):
    if test_model[i] == 0:
        test_model[i] = 'N'
    else:
        test_model[i] = 'C'



