diff --git a/tests/test_API_noncopy.py b/tests/test_API_noncopy.py index 67fca77..7b668c2 100644 --- a/tests/test_API_noncopy.py +++ b/tests/test_API_noncopy.py @@ -26,126 +26,133 @@ def get_viterbi_path(viterbi_matrix, pointer_matrix): return best_path -# Assumptions: -# Non-NONCOPY states are contiguous. -# No MISSING states in ref. panel. -# Fixed number of ref. haplotypes across sites. - -H = np.array([ - [ 0, 0, 0, 0, 0, NC, NC, NC, NC, NC], # Ancestor 0 - [NC, NC, NC, NC, NC, 0, 0, 0, 0, 0], # Ancestor 1 - [NC, NC, NC, 0, 0, 0, 0, NC, NC, NC], # Ancestor 2 - [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # Ancestor 3 - [ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # Sample 0 - [ 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], # Sample 1 - [ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], # Sample 2 - [ 0, 0, 1, 1, 0, 0, 1, 1, 0, 0], # Sample 3 - [ 1, 1, 0, 0, 1, 1, 0, 0, 1, 1], # Sample 4 -]).T - -m = H.shape[0] # Number of sites -n = H.shape[1] # Number of ref. haplotypes - -r = np.zeros(m, dtype=np.float32) + 0.20 -p_mutation = np.zeros(m, dtype=np.float32) + 0.10 -n_alleles = np.zeros(m, dtype=np.int8) + 2 # Two segregating alleles at all sites -e = get_emission_probabilities(m, p_mutation, n_alleles) - -# No crossover, clone of sample 0 -query_s0_clone = np.array([[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]) -expected_path_s0_clone = np.array([ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]) - -# No crossover, clone of ancestor 3 -query_a3_clone = np.array([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]) -expected_path_a3_clone = np.array([ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]) - -# Crossover from sample 1 to sample 2 -query_s1_x_s2 = np.array([[ 0, 1, 0, 1, 0, 0, 1, 0, 1, 0]]) -expected_path_s1_x_s2 = np.array([ 5, 5, 5, 5, 5, 6, 6, 6, 6, 6]) - -# Crossover from ancestor 0 to sample 0 -query_a0_x_s0 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]]) -expected_path_a0_x_s0 = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4]) - -# Crossover from sample 0 to ancestor 1 (or ancestor 3) -# Two equally likely paths -query_s0_x_a1 = np.array([[ 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]]) -expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1]) -#expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 3, 3, 3, 3, 3]) -#assert np.sum(V[-1, :] == np.max(V[-1, :])) == 2 - -# Crossover from sample 0 to ancestor 2 and back to sample 0 -query_s0_x_a2_x_s0 = np.array([[ 1, 1, 1, 0, 0, 0, 0, 1, 1, 1]]) -expected_path_s0_x_a2_x_s0 = np.array([ 4, 4, 4, 2, 2, 2, 2, 4, 4, 4]) - -# Crossover from ancestor 0 to sample 0, -# but with a missing state at the switch position. -# Switching should be delayed by one position. -query_a0_x_s0_miss_a = np.array([[ 0, 0, 0, 0, 0, -1, 1, 1, 1, 1]]) -expected_path_a0_x_s0_miss_a = np.array([ 3, 3, 3, 3, 3, 3, 4, 4, 4, 4]) - -# Crossover from ancestor 0 to sample 0, -# but with a missing state at the position left of the switch position. -# Switching should occur at the same position as the case without missing states. -query_a0_x_s0_miss_b = np.array([[ 0, 0, 0, 0, -1, 1, 1, 1, 1, 1]]) -expected_path_a0_x_s0_miss_b = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4]) +def get_example_data(use_multiallelic_sites): + """ + Assumptions: + 1. Non-NONCOPY states are contiguous. + 2. No MISSING states in ref. panel. + 3. Fixed number of ref. haplotypes across sites. + """ + num_alleles = 2 + + H = np.array([ + [ 0, 0, 0, 0, 0, NC, NC, NC, NC, NC], # Ancestor 0 + [NC, NC, NC, NC, NC, 0, 0, 0, 0, 0], # Ancestor 1 + [NC, NC, NC, 0, 0, 0, 0, NC, NC, NC], # Ancestor 2 + [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # Ancestor 3 + [ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # Sample 0 + [ 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], # Sample 1 + [ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], # Sample 2 + [ 0, 0, 1, 1, 0, 0, 1, 1, 0, 0], # Sample 3 + [ 1, 1, 0, 0, 1, 1, 0, 0, 1, 1], # Sample 4 + ]).T + + if use_multiallelic_sites: + H[H != -2] += 1 + H[:, 3] = 0 + num_alleles = 3 + + m = H.shape[0] # Number of sites + n = H.shape[1] # Number of ref. haplotypes + r = np.zeros(m, dtype=np.float32) + 0.20 + p_mutation = np.zeros(m, dtype=np.float32) + 0.10 + + n_alleles = np.zeros(m, dtype=np.int8) + num_alleles + e = get_emission_probabilities(m, p_mutation, n_alleles) + + return H, m, n, r, e + + +def get_test_data_biallelic(): + # No crossover, clone of sample 0. + query_s0_clone = np.array([[ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]) + expected_path_s0_clone = np.array([ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]) + # No crossover, clone of ancestor 3. + query_a3_clone = np.array([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]) + expected_path_a3_clone = np.array([ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]) + # Crossover from sample 1 to sample 2. + query_s1_x_s2 = np.array([[ 0, 1, 0, 1, 0, 0, 1, 0, 1, 0]]) + expected_path_s1_x_s2 = np.array([ 5, 5, 5, 5, 5, 6, 6, 6, 6, 6]) + # Crossover from ancestor 0 to sample 0. + query_a0_x_s0 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]]) + expected_path_a0_x_s0 = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4]) + # Crossover from sample 0 to ancestor 1 (or ancestor 3). + # Two equally likely paths + query_s0_x_a1 = np.array([[ 1, 1, 1, 1, 1, 0, 0, 0, 0, 0]]) + expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1]) + #expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 3, 3, 3, 3, 3]) + # Crossover from sample 0 to ancestor 2 and back to sample 0 + query_s0_x_a2_x_s0 = np.array([[ 1, 1, 1, 0, 0, 0, 0, 1, 1, 1]]) + expected_path_s0_x_a2_x_s0 = np.array([ 4, 4, 4, 2, 2, 2, 2, 4, 4, 4]) + # Crossover from ancestor 0 to sample 0, + # but with a missing state at the switch position. + # Switching should be delayed by one position. + query_a0_x_s0_miss_a = np.array([[ 0, 0, 0, 0, 0, -1, 1, 1, 1, 1]]) + expected_path_a0_x_s0_miss_a = np.array([ 3, 3, 3, 3, 3, 3, 4, 4, 4, 4]) + # Crossover from ancestor 0 to sample 0, + # but with a missing state at the position left of the switch position. + # Switching should occur at the same position as the case without missing states. + query_a0_x_s0_miss_b = np.array([[ 0, 0, 0, 0, -1, 1, 1, 1, 1, 1]]) + expected_path_a0_x_s0_miss_b = np.array([ 0, 0, 0, 0, 0, 4, 4, 4, 4, 4]) + + return [ + (query_s0_clone, expected_path_s0_clone, 1), + (query_a3_clone, expected_path_a3_clone, 1), + (query_s1_x_s2, expected_path_s1_x_s2, 1), + (query_a0_x_s0, expected_path_a0_x_s0, 1), + (query_s0_x_a1, expected_path_s0_x_a1, 2), + (query_s0_x_a2_x_s0, expected_path_s0_x_a2_x_s0, 1), + (query_a0_x_s0_miss_a, expected_path_a0_x_s0_miss_a, 1), + (query_a0_x_s0_miss_b, expected_path_a0_x_s0_miss_b, 1), + ] @pytest.mark.parametrize( "query, expected_path, expected_num_paths", - [ - (query_s0_clone, expected_path_s0_clone, 1), - (query_a3_clone, expected_path_a3_clone, 1), - (query_s1_x_s2, expected_path_s1_x_s2, 1), - (query_a0_x_s0, expected_path_a0_x_s0, 1), - (query_s0_x_a1, expected_path_s0_x_a1, 2), - (query_s0_x_a2_x_s0, expected_path_s0_x_a2_x_s0, 1), - (query_a0_x_s0_miss_a, expected_path_a0_x_s0_miss_a, 1), - (query_a0_x_s0_miss_b, expected_path_a0_x_s0_miss_b, 1), - ] + get_test_data_biallelic() ) -def test_haploid_viterbi(query, expected_path, expected_num_paths): +def test_haploid_viterbi_biallelic(query, expected_path, expected_num_paths): + H, m, n, r, e = get_example_data(use_multiallelic_sites=False) + V, P, _ = vh.forwards_viterbi_hap_naive(n, m, H, query, e, r) best_path = get_viterbi_path(V, P) num_best_paths = np.sum(V[-1, :] == np.max(V[-1, :])) + assert np.array_equal(expected_path, best_path) assert expected_num_paths == num_best_paths -# Test for multiallelic sites. -Hm = np.copy(H) -Hm[Hm != -2] += 1 -Hm[:, 3] = 0 - -n_alleles = np.zeros(m, dtype=np.int8) + 3 # Three segregating alleles at all sites -e = get_emission_probabilities(m, p_mutation, n_alleles) - - -# Crossover from ancestor 3 to ancestor 1 -query_m_a3_x_a1 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]]) -expected_path_m_a3_x_a1 = np.array([ 3, 3, 3, 3, 3, 1, 1, 1, 1, 1]) - -# Crossover from ancestor 3 to ancestor 2 to sample 0 -query_m_a3_x_a2_x_s0 = np.array([[ 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]]) -expected_path_m_a3_x_a2_x_s0 = np.array([ 3, 3, 3, 2, 2, 2, 2, 4, 4, 4]) - -# Crossover from sample 0 to ancestor 1 -# One most likely path, c.f. case with only biallelic sites. -query_s0_x_a1 = np.array([[ 2, 2, 2, 2, 2, 1, 1, 1, 1, 1]]) -expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1]) - +def get_test_data_multiallelic(): + # Crossover from ancestor 3 to ancestor 1. + query_m_a3_x_a1 = np.array([[ 0, 0, 0, 0, 0, 1, 1, 1, 1, 1]]) + expected_path_m_a3_x_a1 = np.array([ 3, 3, 3, 3, 3, 1, 1, 1, 1, 1]) + # Crossover from ancestor 3 to ancestor 2 to sample 0. + query_m_a3_x_a2_x_s0 = np.array([[ 0, 0, 0, 1, 1, 1, 1, 2, 2, 2]]) + expected_path_m_a3_x_a2_x_s0 = np.array([ 3, 3, 3, 2, 2, 2, 2, 4, 4, 4]) + # Crossover from sample 0 to ancestor 1. + # One most likely path, c.f. case with only biallelic sites. + query_s0_x_a1 = np.array([[ 2, 2, 2, 2, 2, 1, 1, 1, 1, 1]]) + expected_path_s0_x_a1 = np.array([ 4, 4, 4, 4, 4, 1, 1, 1, 1, 1]) -@pytest.mark.parametrize( - "query, expected_path, expected_num_paths", - [ + return [ (query_m_a3_x_a1, expected_path_m_a3_x_a1, 1), (query_m_a3_x_a2_x_s0, expected_path_m_a3_x_a2_x_s0, 1), (query_s0_x_a1, expected_path_s0_x_a1, 1), ] + + +@pytest.mark.parametrize( + "query, expected_path, expected_num_paths", + get_test_data_multiallelic() ) -def test_haploid_viterbi_multi(query, expected_path, expected_num_paths): - V, P, _ = vh.forwards_viterbi_hap_naive(n, m, Hm, query, e, r) +def test_haploid_viterbi_multiallelic( + query, expected_path, expected_num_paths +): + H, m, n, r, e = get_example_data(use_multiallelic_sites=True) + + V, P, _ = vh.forwards_viterbi_hap_naive(n, m, H, query, e, r) best_path = get_viterbi_path(V, P) num_best_paths = np.sum(V[-1, :] == np.max(V[-1, :])) + assert np.array_equal(expected_path, best_path) assert expected_num_paths == num_best_paths