-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/statistical metric distance #21
base: master
Are you sure you want to change the base?
Changes from 5 commits
43e9aee
24ad7b5
c9f6fcb
2807968
55a986f
7d3e685
3ac1b15
b95b7e7
6c006ff
88c6ad6
4af9279
409c7f3
b47373c
fcb03f2
37f1f0f
2d8639c
86ecf8e
8ce8560
9f4d948
05c3d40
af25df8
632eb4f
4f850d2
2c2355e
9b4c410
231d627
735d61a
537290b
ffcc05f
42d4de1
1667f92
768b1a4
66d1682
4b7855d
9ca8f1b
9dc56b0
5323a4d
20822fb
3599a2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,3 +2,4 @@ pomegranate | |
Cython | ||
numpy | ||
mysql-connector==2.1.6 | ||
scipy |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
from .negloglikelihood import NegativeLogLikelihood | ||
from .naive_parameter_sampling import NaiveParameterSampling | ||
from .statisticalmetric import StatisticalMetric |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
import scipy.stats as stats | ||
import numpy as np | ||
|
||
cdef class StatisticalMetric(object): | ||
cdef double significance_level | ||
cdef int sequence_length | ||
def __init__(self, sequence_length, significance_level): | ||
self.sequence_length = sequence_length | ||
self.significance_level = significance_level | ||
""" | ||
1. Generate a sequence A of length N from G₁ | ||
2. For every starting state in vₖ in G₂, estimate the transition | ||
probabilities if it were to generate the sequence | ||
3. For every starting state vₖ that accepted the sequence, generate | ||
G₂' from the estimated transition probabilites in 2) | ||
4. Determine Chi-squared statistic with significance level of α and | ||
nₜ - nₛ degrees of freedom. Where nₜ = number of transitions and | ||
nₛ is the number of states in G₂. This can be done with standard | ||
tables or calculating the value. | ||
5. Compute the Χ² statistic for comparing model G₂ with models G₂'ₖ as | ||
[see paper]. | ||
6. If Χ²ₖ ≤ Χ²ₐ for any G₂'ₖ the Chi-squared test test accepts with | ||
significance a the hypothesis that the PDFs in G₂ are consistent | ||
with the PDFs in that G₂'ₖ. Since G₂'ₖ is calculated using a data | ||
trace from G₁ , this means that their statistics are | ||
consistent. Exit with equivalence. | ||
7. Else exit with no equivalence. | ||
""" | ||
cpdef double distance(self, left_vlmc, right_vlmc): | ||
for p_value in np.arange(0, 0.05, 0.001): | ||
left_vlmc.tree = self.remove_larger_probabilities(left_vlmc.tree, p_value) | ||
right_vlmc.tree = self.remove_larger_probabilities(right_vlmc.tree, p_value) | ||
if (not self.is_null_model(left_vlmc) and not self.is_null_model(right_vlmc)): | ||
if self.equivalence_test(left_vlmc, right_vlmc): | ||
print("Found equality at p_value " + str(p_value)) | ||
return p_value | ||
return 1 | ||
|
||
cdef bint is_null_model(self, vlmc): | ||
# TODO, this is clearly wrong | ||
return False | ||
for lookup_prob in vlmc.tree.values(): | ||
if any(prob > 0 for prob in lookup_prob.values()): | ||
return False | ||
return True | ||
|
||
cdef dict remove_larger_probabilities(self, tree, p_value): | ||
contexts_to_remove = [] | ||
for context in tree.keys(): | ||
nbr_greater_probabilities = 0 | ||
sum_of_other_probabilites = 0 | ||
for letter in tree[context]: | ||
if tree[context][letter] <= p_value: | ||
tree[context][letter] = 0 | ||
else: | ||
sum_of_other_probabilites += tree[context][letter] | ||
nbr_greater_probabilities += 1 | ||
if nbr_greater_probabilities == 0: | ||
contexts_to_remove.append(context) | ||
for letter in tree[context]: | ||
if tree[context][letter] > p_value: | ||
tree[context][letter] = tree[context][letter] / sum_of_other_probabilites | ||
for context in contexts_to_remove: | ||
tree.pop(context) | ||
return tree | ||
|
||
cdef bint equivalence_test(self, left_vlmc, right_vlmc): | ||
pre_sample_length = 500 | ||
cdef str sequence = left_vlmc.generate_sequence(self.sequence_length, | ||
pre_sample_length) | ||
# For every starting state, | ||
possible_contexts = right_vlmc.tree.keys() | ||
for start_context in possible_contexts: | ||
if self.check_from_context(start_context, sequence, right_vlmc): | ||
return True | ||
return False | ||
|
||
cdef bint check_from_context(self, start_context, sequence, right_vlmc): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
current_context = start_context | ||
# Map (Context -> Int) | ||
# Map (Context -> Map (Letter -> Int)) | ||
context_counters, context_probabilities = self.initialize_counters(right_vlmc, | ||
current_context) | ||
for char_ in sequence: | ||
context_counters[current_context] += 1 | ||
context_probabilities[current_context][char_] += 1 | ||
current_context = self.get_next_context(current_context, char_, right_vlmc) | ||
if current_context == "": | ||
return False | ||
|
||
new_vlmc_tree = self.create_pst_by_estimating_probabilities(context_counters, context_probabilities) | ||
expected_values, observed_values = self.get_expected_observed_values(new_vlmc_tree, right_vlmc, context_counters) | ||
chisq, p_value = stats.chisquare(f_obs=observed_values, f_exp=expected_values) | ||
if 1 - p_value < self.significance_level: | ||
return True | ||
|
||
cdef int nbr_of_non_zero_probabilities(self, vlmc): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't seem to be used |
||
counter = 0 | ||
for context, probabilites in vlmc.tree: | ||
for char_, prob in probabilites: | ||
if prob > 0: | ||
counter += 1 | ||
return counter | ||
|
||
cdef tuple get_expected_observed_values(self, estimated_vlmc_tree, original_vlmc, context_count): | ||
expected_values = [] | ||
observed_values = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can pre-allocate here, for potential speedups? |
||
for context in original_vlmc.tree.keys(): | ||
times_visited_node = context_count[context] | ||
if times_visited_node > 0: | ||
alphabet = ["A", "C", "G", "T"] | ||
probabilites_original = list(zip(alphabet, list(map(lambda x: original_vlmc.tree[context][x], alphabet )))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extract the |
||
# find probabilites that are greater than zero | ||
probs_without_zeros = [item for item in probabilites_original if item[1] > 0] | ||
# loop through all of these exept one (last) | ||
for char_prob in probs_without_zeros[:-1]: | ||
letter = char_prob[0] | ||
probability_original_vlmc = char_prob[1] | ||
probability_estimation = estimated_vlmc_tree[context][letter] | ||
|
||
expected_frequency = times_visited_node*probability_original_vlmc | ||
observed_frequency = times_visited_node*probability_estimation | ||
expected_values.append(expected_frequency) | ||
observed_values.append(observed_frequency) | ||
return expected_values, observed_values | ||
|
||
cdef dict create_pst_by_estimating_probabilities(self, context_counters, context_probabilites): | ||
tree = {} | ||
for context, count in context_counters.items(): | ||
tree[context] = {} | ||
for char_ in ["A", "C", "G", "T"]: | ||
if count > 0: | ||
tree[context][char_] = context_probabilites[context][char_] / count | ||
else: | ||
tree[context][char_] = 0 | ||
|
||
return tree | ||
|
||
cdef str get_next_context(self, context_before, next_char, vlmc_to_approximate): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parts of this function should probably be moved into the vlmc class, we seem to be want to do this all over the place. |
||
order = vlmc_to_approximate.order | ||
possible_contexts = vlmc_to_approximate.tree.keys() | ||
tmp_context = next_char + context_before | ||
# truncate to _order_ nbr of characters | ||
new_context = tmp_context[:order] | ||
# Loop through to find the longest possible context applicable | ||
for i in range(order): | ||
maybe_context = new_context[:order-i] | ||
if maybe_context in possible_contexts: | ||
return maybe_context | ||
return "" | ||
|
||
cdef tuple initialize_counters(self, right_vlmc, start_context): | ||
context_counters = {} # Map (Context -> Int) | ||
context_probabilities = {} # Map (Context -> Map (Letter -> Int)) | ||
current_context = start_context | ||
# Initialize counters | ||
for context in right_vlmc.tree.keys(): | ||
context_counters[context] = 0 | ||
context_probabilities[context] = {} | ||
for char_ in ["A", "C", "G", "T"]: | ||
context_probabilities[context][char_] = 0 | ||
return context_counters, context_probabilities |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,12 +3,14 @@ import json | |
from queue import Queue | ||
import os | ||
from random import choices | ||
import numpy as np | ||
import scipy | ||
|
||
|
||
cdef class VLMC(object): | ||
cdef public dict tree | ||
cdef public str name | ||
cdef int order | ||
cdef public int order | ||
|
||
def __init__(self, tree, name): | ||
self.tree = tree | ||
|
@@ -106,6 +108,57 @@ cdef class VLMC(object): | |
def _calculate_order(self, tree): | ||
return max(map(lambda k: len(k), tree.keys())) | ||
|
||
def _get_transition_matrix(self): | ||
nbr_of_states = len(self.tree.keys()) | ||
alphabet = ["A", "C", "G", "T"] | ||
rows = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we can preallocate a matrix here, for speedups. |
||
for left in self.tree.keys(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can probably rename |
||
row = [] | ||
contexts_we_can_get_to = [] | ||
for char_ in alphabet: | ||
probability_of_char = self.tree[left][char_] | ||
if probability_of_char > 0: | ||
new_context = left + char_ | ||
for i in range(self.order+1): | ||
truncated_context = new_context[-(self.order - i):] | ||
if truncated_context in self.tree: | ||
contexts_we_can_get_to.append((truncated_context, probability_of_char)) | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can use _get_context() here? |
||
print(contexts_we_can_get_to) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
for right in self.tree.keys(): | ||
prob = 0 | ||
for x in contexts_we_can_get_to: | ||
if x[0] == right: | ||
prob = x[1] | ||
row.append(prob) | ||
rows.append(row) | ||
return np.array(rows) | ||
|
||
def get_context_distribution(self): | ||
# TODO, cleanup | ||
# if matrix A is the transition probabilites from row to column | ||
# and x is the current state then transpose(A)*x is the next state | ||
# this function returns the vector v such that transpose(A)*v = v, | ||
# i.e., it returns the stationary distribution of this vlmc | ||
transition_matrix = self._get_transition_matrix() | ||
matrix = np.transpose(transition_matrix) | ||
np.set_printoptions(threshold='nan') | ||
# Get one eigen vector, with eigen value 1 | ||
# TODO, what happens if no eigen value 1 vector exists? | ||
values, vectors = scipy.sparse.linalg.eigs(matrix, k=1, sigma=1) | ||
np.set_printoptions(threshold=np.NaN) | ||
print(values[0]) | ||
print(self.tree.keys()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These |
||
|
||
one = vectors[:, 0] | ||
sum_ = np.sum(one) | ||
scaled_vector = np.real(np.around(one.real / sum_, decimals=4)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably don't need to round this off? |
||
stationary_distibution = {} | ||
for i, context in enumerate(self.tree.keys()): | ||
# print("Probability of " + context + "\t" + str(scaled_vector[i])) | ||
stationary_distibution[context] = scaled_vector[i] | ||
# print(self.tree["GTA"]) | ||
return stationary_distibution | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can create a function for this last normalisation step |
||
|
||
if __name__ == "__main__": | ||
s = '{"":{"A":0.5,"B":0.5},"A":{"B":0.5,"A":0.5},"B":{"A":0.5,"B":0.5},"BA":{"A":0.5,"B":0.5},"AA":{"A":0.5,"B":0.5}}' | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably use some comments