Skip to content

Commit

Permalink
Major refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
szhan committed Mar 21, 2024
1 parent c179b55 commit b44049c
Showing 1 changed file with 108 additions and 101 deletions.
209 changes: 108 additions & 101 deletions tests/test_API_noncopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,126 +26,133 @@ def get_viterbi_path(viterbi_matrix, pointer_matrix):
return best_path


# Assumptions:
# Non-NONCOPY states are contiguous.
# No MISSING states in ref. panel.
# Fixed number of ref. haplotypes across sites.

H = np.array([
[ 0, 0, 0, 0, 0, NC, NC, NC, NC, NC], # Ancestor 0
[NC, NC, NC, NC, NC, 0, 0, 0, 0, 0], # Ancestor 1
[NC, NC, NC, 0, 0, 0, 0, NC, NC, NC], # Ancestor 2
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # Ancestor 3
[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # Sample 0
[ 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], # Sample 1
[ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], # Sample 2
[ 0, 0, 1, 1, 0, 0, 1, 1, 0, 0], # Sample 3
[ 1, 1, 0, 0, 1, 1, 0, 0, 1, 1], # Sample 4
]).T

m = H.shape[0] # Number of sites
n = H.shape[1] # Number of ref. haplotypes

r = np.zeros(m, dtype=np.float32) + 0.20
p_mutation = np.zeros(m, dtype=np.float32) + 0.10
n_alleles = np.zeros(m, dtype=np.int8) + 2 # Two segregating alleles at all sites
e = get_emission_probabilities(m, p_mutation, n_alleles)

# No crossover, clone of sample 0
query_s0_clone = np.array([[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])
expected_path_s0_clone = np.array([ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4])

# No crossover, clone of ancestor 3
query_a3_clone = np.array([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
expected_path_a3_clone = np.array([ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3])

# Crossover from sample 1 to sample 2
query_s1_x_s2 = np.array([[ 0, 1, 0, 1, 0, 0, 1, 0, 1, 0]])
expected_path_s1_x_s2 = np.array([ 5, 5, 5, 5, 5, 6, 6, 6, 6, 6])

# Crossover from ancestor 0 to sample 0
query_a0_x_s0 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]])
expected_path_a0_x_s0 = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4])

# Crossover from sample 0 to ancestor 1 (or ancestor 3)
# Two equally likely paths
query_s0_x_a1 = np.array([[ 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]])
expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1])
#expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 3, 3, 3, 3, 3])
#assert np.sum(V[-1, :] == np.max(V[-1, :])) == 2

# Crossover from sample 0 to ancestor 2 and back to sample 0
query_s0_x_a2_x_s0 = np.array([[ 1, 1, 1, 0, 0, 0, 0, 1, 1, 1]])
expected_path_s0_x_a2_x_s0 = np.array([ 4, 4, 4, 2, 2, 2, 2, 4, 4, 4])

# Crossover from ancestor 0 to sample 0,
# but with a missing state at the switch position.
# Switching should be delayed by one position.
query_a0_x_s0_miss_a = np.array([[ 0, 0, 0, 0, 0, -1, 1, 1, 1, 1]])
expected_path_a0_x_s0_miss_a = np.array([ 3, 3, 3, 3, 3, 3, 4, 4, 4, 4])

# Crossover from ancestor 0 to sample 0,
# but with a missing state at the position left of the switch position.
# Switching should occur at the same position as the case without missing states.
query_a0_x_s0_miss_b = np.array([[ 0, 0, 0, 0, -1, 1, 1, 1, 1, 1]])
expected_path_a0_x_s0_miss_b = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4])
def get_example_data(use_multiallelic_sites):
"""
Assumptions:
1. Non-NONCOPY states are contiguous.
2. No MISSING states in ref. panel.
3. Fixed number of ref. haplotypes across sites.
"""
num_alleles = 2

H = np.array([
[ 0, 0, 0, 0, 0, NC, NC, NC, NC, NC], # Ancestor 0
[NC, NC, NC, NC, NC, 0, 0, 0, 0, 0], # Ancestor 1
[NC, NC, NC, 0, 0, 0, 0, NC, NC, NC], # Ancestor 2
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # Ancestor 3
[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # Sample 0
[ 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], # Sample 1
[ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], # Sample 2
[ 0, 0, 1, 1, 0, 0, 1, 1, 0, 0], # Sample 3
[ 1, 1, 0, 0, 1, 1, 0, 0, 1, 1], # Sample 4
]).T

if use_multiallelic_sites:
H[H != -2] += 1
H[:, 3] = 0
num_alleles = 3

m = H.shape[0] # Number of sites
n = H.shape[1] # Number of ref. haplotypes
r = np.zeros(m, dtype=np.float32) + 0.20
p_mutation = np.zeros(m, dtype=np.float32) + 0.10

n_alleles = np.zeros(m, dtype=np.int8) + num_alleles
e = get_emission_probabilities(m, p_mutation, n_alleles)

return H, m, n, r, e


def get_test_data_biallelic():
# No crossover, clone of sample 0.
query_s0_clone = np.array([[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])
expected_path_s0_clone = np.array([ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4])
# No crossover, clone of ancestor 3.
query_a3_clone = np.array([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
expected_path_a3_clone = np.array([ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3])
# Crossover from sample 1 to sample 2.
query_s1_x_s2 = np.array([[ 0, 1, 0, 1, 0, 0, 1, 0, 1, 0]])
expected_path_s1_x_s2 = np.array([ 5, 5, 5, 5, 5, 6, 6, 6, 6, 6])
# Crossover from ancestor 0 to sample 0.
query_a0_x_s0 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]])
expected_path_a0_x_s0 = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4])
# Crossover from sample 0 to ancestor 1 (or ancestor 3).
# Two equally likely paths
query_s0_x_a1 = np.array([[ 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]])
expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1])
#expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 3, 3, 3, 3, 3])
# Crossover from sample 0 to ancestor 2 and back to sample 0
query_s0_x_a2_x_s0 = np.array([[ 1, 1, 1, 0, 0, 0, 0, 1, 1, 1]])
expected_path_s0_x_a2_x_s0 = np.array([ 4, 4, 4, 2, 2, 2, 2, 4, 4, 4])
# Crossover from ancestor 0 to sample 0,
# but with a missing state at the switch position.
# Switching should be delayed by one position.
query_a0_x_s0_miss_a = np.array([[ 0, 0, 0, 0, 0, -1, 1, 1, 1, 1]])
expected_path_a0_x_s0_miss_a = np.array([ 3, 3, 3, 3, 3, 3, 4, 4, 4, 4])
# Crossover from ancestor 0 to sample 0,
# but with a missing state at the position left of the switch position.
# Switching should occur at the same position as the case without missing states.
query_a0_x_s0_miss_b = np.array([[ 0, 0, 0, 0, -1, 1, 1, 1, 1, 1]])
expected_path_a0_x_s0_miss_b = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4])

return [
(query_s0_clone, expected_path_s0_clone, 1),
(query_a3_clone, expected_path_a3_clone, 1),
(query_s1_x_s2, expected_path_s1_x_s2, 1),
(query_a0_x_s0, expected_path_a0_x_s0, 1),
(query_s0_x_a1, expected_path_s0_x_a1, 2),
(query_s0_x_a2_x_s0, expected_path_s0_x_a2_x_s0, 1),
(query_a0_x_s0_miss_a, expected_path_a0_x_s0_miss_a, 1),
(query_a0_x_s0_miss_b, expected_path_a0_x_s0_miss_b, 1),
]


@pytest.mark.parametrize(
"query, expected_path, expected_num_paths",
[
(query_s0_clone, expected_path_s0_clone, 1),
(query_a3_clone, expected_path_a3_clone, 1),
(query_s1_x_s2, expected_path_s1_x_s2, 1),
(query_a0_x_s0, expected_path_a0_x_s0, 1),
(query_s0_x_a1, expected_path_s0_x_a1, 2),
(query_s0_x_a2_x_s0, expected_path_s0_x_a2_x_s0, 1),
(query_a0_x_s0_miss_a, expected_path_a0_x_s0_miss_a, 1),
(query_a0_x_s0_miss_b, expected_path_a0_x_s0_miss_b, 1),
]
get_test_data_biallelic()
)
def test_haploid_viterbi(query, expected_path, expected_num_paths):
def test_haploid_viterbi_biallelic(query, expected_path, expected_num_paths):
H, m, n, r, e = get_example_data(use_multiallelic_sites=False)

V, P, _ = vh.forwards_viterbi_hap_naive(n, m, H, query, e, r)
best_path = get_viterbi_path(V, P)
num_best_paths = np.sum(V[-1, :] == np.max(V[-1, :]))

assert np.array_equal(expected_path, best_path)
assert expected_num_paths == num_best_paths


# Test for multiallelic sites.
Hm = np.copy(H)
Hm[Hm != -2] += 1
Hm[:, 3] = 0

n_alleles = np.zeros(m, dtype=np.int8) + 3 # Three segregating alleles at all sites
e = get_emission_probabilities(m, p_mutation, n_alleles)


# Crossover from ancestor 3 to ancestor 1
query_m_a3_x_a1 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]])
expected_path_m_a3_x_a1 = np.array([ 3, 3, 3, 3, 3, 1, 1, 1, 1, 1])

# Crossover from ancestor 3 to ancestor 2 to sample 0
query_m_a3_x_a2_x_s0 = np.array([[ 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]])
expected_path_m_a3_x_a2_x_s0 = np.array([ 3, 3, 3, 2, 2, 2, 2, 4, 4, 4])

# Crossover from sample 0 to ancestor 1
# One most likely path, c.f. case with only biallelic sites.
query_s0_x_a1 = np.array([[ 2, 2, 2, 2, 2, 1, 1, 1, 1, 1]])
expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1])

def get_test_data_multiallelic():
# Crossover from ancestor 3 to ancestor 1.
query_m_a3_x_a1 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]])
expected_path_m_a3_x_a1 = np.array([ 3, 3, 3, 3, 3, 1, 1, 1, 1, 1])
# Crossover from ancestor 3 to ancestor 2 to sample 0.
query_m_a3_x_a2_x_s0 = np.array([[ 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]])
expected_path_m_a3_x_a2_x_s0 = np.array([ 3, 3, 3, 2, 2, 2, 2, 4, 4, 4])
# Crossover from sample 0 to ancestor 1.
# One most likely path, c.f. case with only biallelic sites.
query_s0_x_a1 = np.array([[ 2, 2, 2, 2, 2, 1, 1, 1, 1, 1]])
expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1])

@pytest.mark.parametrize(
"query, expected_path, expected_num_paths",
[
return [
(query_m_a3_x_a1, expected_path_m_a3_x_a1, 1),
(query_m_a3_x_a2_x_s0, expected_path_m_a3_x_a2_x_s0, 1),
(query_s0_x_a1, expected_path_s0_x_a1, 1),
]


@pytest.mark.parametrize(
"query, expected_path, expected_num_paths",
get_test_data_multiallelic()
)
def test_haploid_viterbi_multi(query, expected_path, expected_num_paths):
V, P, _ = vh.forwards_viterbi_hap_naive(n, m, Hm, query, e, r)
def test_haploid_viterbi_multiallelic(
query, expected_path, expected_num_paths
):
H, m, n, r, e = get_example_data(use_multiallelic_sites=True)

V, P, _ = vh.forwards_viterbi_hap_naive(n, m, H, query, e, r)
best_path = get_viterbi_path(V, P)
num_best_paths = np.sum(V[-1, :] == np.max(V[-1, :]))

assert np.array_equal(expected_path, best_path)
assert expected_num_paths == num_best_paths

0 comments on commit b44049c

Please sign in to comment.