-
Notifications
You must be signed in to change notification settings - Fork 39
/
snps_imputer.py
173 lines (139 loc) · 5.2 KB
/
snps_imputer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
""" Impute SNPs based on population allele frequencies"""
from __future__ import print_function, division
from copy import deepcopy
import numpy as np
_MISSING_SKLEARN = """
This ipyrad tool requires the library scikit-learn.
You can install it with the following command in a terminal.
conda install scikit-learn -c conda-forge
"""
class SNPsImputer(object):
"""
Imputation of missing data based on population allele frequencies. This
tool is generally meant to be used internally by ipyrad within other
analysis tool methods.
Parameters:
-----------
data: (ndarray)
A uint8 ndarray of genotype calls that have already been filtered
to remove any sites that are not wanted. Only 0,1,2,9 should be
in matrix.
names: list
Ordered names as extracted from the HDF5 database.
imap: (dict; default=None)
Dictionary to assign samples to groups to be used with minmap.
impute_method: (str; default='sample')
None, "sample", "simple", "kmeans"
Functions:
----------
...
"""
def __init__(
self,
data,
names,
imap=None,
impute_method="sample",
quiet=False,
):
# init attributes
self.quiet = quiet
self.snps = deepcopy(data)
self._mvals = np.sum(self.snps == 9)
# data attributes
self.impute_method = impute_method
self.imap = imap
# get names from imap else we'll fill this later with all
self.names = names
def _print(self, msg):
if not self.quiet:
print(msg)
def run(self):
"""
Impute data in-place updating self.snps by filling missing (9) values.
"""
if self.impute_method == "sample":
self.snps = self._impute_sample()
else:
self.snps[self.snps == 9] = 0
self._print(
"Imputation: 'None'; (0, 1, 2) = {:.1f}%, {:.1f}%, {:.1f}%"
.format(100, 0, 0)
)
return self.snps
def _impute_sample(self, imap=None):
"""
Sample derived alleles by their frequency for each population and
assign to fill 9 in each column for each pop.
"""
# override imap
if not imap:
imap = self.imap
# impute data by mean value in each population
newdata = self.snps.copy()
for pop, samps in imap.items():
# sample pop data
sidxs = sorted(self.names.index(i) for i in samps)
data = newdata[sidxs, :].copy()
# number of alleles at each site that are not 9
nallels = np.sum(data != 9, axis=0) * 2
# get prob derived at each site using tmp array w/ missing to zero
tmp = data.copy()
tmp[tmp == 9] = 0
fderived = tmp.sum(axis=0) / nallels
# sampler
sampled = np.random.binomial(n=2, p=fderived, size=data.shape)
data[data == 9] = sampled[data == 9]
newdata[sidxs, :] = data
# get all imputed values
imputed = newdata[np.where(self.snps == 9)]
self._print(
"Imputation: 'sampled'; (0, 1, 2) = {:.1f}%, {:.1f}%, {:.1f}%"
.format(
100 * np.sum(imputed == 0) / imputed.size,
100 * np.sum(imputed == 1) / imputed.size,
100 * np.sum(imputed == 2) / imputed.size,
)
)
return newdata
def _impute_sample_hier(self, imap=None):
"""
Sample derived alleles by their frequency for each population and
assign to fill 9 in each column for each pop. IF a population has
no samples meeting the minmap requirement in the first round of
imputation, then a second round is applied in which they sample
a genotype based on the overall (non-IMAP) genotype frequencies.
"""
if 1:
raise NotImplementedError()
# override imap
if not imap:
imap = self.imap
# impute data by mean value in each population
newdata = self.snps.copy()
for pop, samps in imap.items():
# sample pop data
sidxs = sorted(self.names.index(i) for i in samps)
data = newdata[sidxs, :].copy()
# number of alleles at each site that are not 9
nallels = np.sum(data != 9, axis=0) * 2
# get prob derived at each site using tmp array w/ missing to zero
tmp = data.copy()
tmp[tmp == 9] = 0
fderived = tmp.sum(axis=0) / nallels
# sampler
sampled = np.random.binomial(n=2, p=fderived, size=data.shape)
data[data == 9] = sampled[data == 9]
newdata[sidxs, :] = data
# get all imputed values
imputed = newdata[np.where(self.snps == 9)]
self._print(
"Imputation: 'sampled'; (0, 1, 2) = {:.1f}%, {:.1f}%, {:.1f}%"
.format(
100 * np.sum(imputed == 0) / imputed.size,
100 * np.sum(imputed == 1) / imputed.size,
100 * np.sum(imputed == 2) / imputed.size,
)
)
return newdata