/
prs.go
219 lines (185 loc) · 5.6 KB
/
prs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"sort"
"strconv"
"github.com/carbocation/genomisc"
"github.com/carbocation/genomisc/prsparser"
"github.com/carbocation/pfx"
)
var prsSorted = make([]prsparser.PRS, 0)
var currentVariantScoreLookup map[ChrPos]prsparser.PRS
type ChrPos struct {
Chromosome string
Position uint32
SNP string
}
type PRSSitesOnChrom struct {
Chrom string
PRSSites []prsparser.PRS
}
type PRSFact struct {
prsparser.PRS
SiteEA string
SiteNEA string
Scorable int
Scored int
}
// LoadPRS is ***not*** safe for concurrent access from multiple goroutines
func LoadPRS(prsPath, layout string, alwaysIncrement bool) error {
parser, err := prsparser.New(layout)
if err != nil {
return fmt.Errorf("CreatePRSParserErr: %s", err.Error())
}
// Open PRS file
f, err := os.Open(prsPath)
if err != nil {
return pfx.Err(err)
}
defer f.Close()
fd, err := genomisc.MaybeDecompressReadCloserFromFile(f)
if err != nil {
return pfx.Err(err)
}
defer fd.Close()
reader := csv.NewReader(fd)
reader.Comma = parser.CSVReaderSettings.Comma
reader.Comment = parser.CSVReaderSettings.Comment
reader.TrimLeadingSpace = parser.CSVReaderSettings.TrimLeadingSpace
currentVariantScoreLookup = nil
currentVariantScoreLookup = make(map[ChrPos]prsparser.PRS)
for i := 0; ; i++ {
row, err := reader.Read()
if err != nil {
if err == io.EOF {
break
} else if err, ok := err.(*csv.ParseError); ok && err.Err == csv.ErrFieldCount {
// We actually permit this
log.Printf("Recovering from parsing error which may be caused by jagged files with missing entries and proceeding with this variant. Error: %s", err.Error())
} else {
return pfx.Err(err)
}
}
val, err := parser.ParseRow(row)
if err != nil && i == 0 {
// Permit a header and skip it
continue
} else if err != nil {
return pfx.Err(err)
}
p := prsparser.PRS{
Chromosome: val.Chromosome,
Position: val.Position,
EffectAllele: val.EffectAllele,
Allele1: val.Allele1,
Allele2: val.Allele2,
Score: val.Score,
SNP: val.SNP,
}
if p.EffectAllele != p.Allele1 && p.EffectAllele != p.Allele2 {
return fmt.Errorf("Effect Allele (%v) is neither equal to Allele 1 (%v) nor Allele 2 (%v)", p.EffectAllele, p.Allele1, p.Allele2)
}
// Ensure that all scores will be positive. If the effect size is
// negative, swap the effect and alt alleles and the effect sign.
if alwaysIncrement && p.Score < 0 {
p.Score *= -1
if p.EffectAllele == p.Allele1 {
p.EffectAllele = p.Allele2
} else {
p.EffectAllele = p.Allele1
}
}
currentVariantScoreLookup[ChrPos{p.Chromosome, uint32(p.Position), p.SNP}] = p
prsSorted = append(prsSorted, p)
}
sort.Slice(prsSorted, func(i, j int) bool {
if prsSorted[i].Chromosome == prsSorted[j].Chromosome {
return prsSorted[i].Position < prsSorted[j].Position
} else {
// If both chromosomes are integers, sort by chromosome integer
if _, err := strconv.Atoi(prsSorted[i].Chromosome); err == nil {
if _, err := strconv.Atoi(prsSorted[j].Chromosome); err == nil {
return prsSorted[i].Chromosome < prsSorted[j].Chromosome
}
}
// If one or both are not integers, sort by string
return prsSorted[i].Chromosome < prsSorted[j].Chromosome
}
})
return nil
}
func LookupPRS(chromosome string, position uint32, snp string) *prsparser.PRS {
if prs, exists := currentVariantScoreLookup[ChrPos{chromosome, position, snp}]; exists {
return &prs
} else {
// See if we have missed a leading zero
chrInt, err := strconv.Atoi(chromosome)
if err != nil {
// If it cannot be parsed as an integer, then it was a sex
// chromosome and it truly didn't match.
return nil
}
// We parsed as an integer. Now recheck without the leading zero to see
// if we can match.
if prs, exists := currentVariantScoreLookup[ChrPos{strconv.Itoa(chrInt), position, snp}]; exists {
return &prs
}
}
return nil
}
// ChromosomalPRS creates a map containing each chromosome and the PRS variants
// on that chromosome.
func ChromosomalPRS() map[string][]prsparser.PRS {
output := make(map[string][]prsparser.PRS)
for _, v := range prsSorted {
if _, exists := output[v.Chromosome]; !exists {
output[v.Chromosome] = make([]prsparser.PRS, 0)
}
output[v.Chromosome] = append(output[v.Chromosome], v)
}
return output
}
// splitter parallelizes the computation. Each separate goroutine will process a
// set of sites. Each goroutine will have its own BGEN and BGI filehandles.
func splitter(chromosomalPRS map[string][]prsparser.PRS, chunkSize int) []PRSSitesOnChrom {
out := make([]PRSSitesOnChrom, 0)
var subChunk PRSSitesOnChrom
// For each chromosome
for chrom, prsSites := range chromosomalPRS {
if subChunk.Chrom == "" {
// We are initializing from scratch
subChunk.Chrom = chrom
subChunk.PRSSites = make([]prsparser.PRS, 0, chunkSize)
}
if subChunk.Chrom != chrom {
// We have moved to a new chromosome
out = append(out, subChunk)
subChunk = PRSSitesOnChrom{
Chrom: chrom,
PRSSites: make([]prsparser.PRS, 0, chunkSize),
}
}
// Within this chromosome, add sites into different chunks
for k, prsSite := range prsSites {
subChunk.PRSSites = append(subChunk.PRSSites, prsSite)
// But if we have iterated to our chunk size, then create a new
// chunk
if k > 0 && k%chunkSize == 0 {
out = append(out, subChunk)
subChunk = PRSSitesOnChrom{
Chrom: chrom,
PRSSites: make([]prsparser.PRS, 0, chunkSize),
}
}
}
}
// Clean up at the end
if subChunk.Chrom != "" {
out = append(out, subChunk)
}
return out
}