/
gbif.go
382 lines (332 loc) · 10.3 KB
/
gbif.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// Purpose: Fetch the latest observations for each taxon from the GBIF API
// save them to the database and return the latest observation for each taxon
package gbif
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"sort"
"strconv"
"strings"
"time"
)
var (
api = "https://api.gbif.org/v1"
userAgent = "gbif-extinct"
endpoint = "/occurrence/search"
limit = 300
basisOfRecord = "basis_of_record=MACHINE_OBSERVATION&basis_of_record=OBSERVATION&basis_of_record=HUMAN_OBSERVATION&basis_of_record=PRESERVED_SPECIMEN"
occurrenceStatus = "occurrenceStatus=PRESENT"
)
const SampleRows = "25"
// Response is the response from the GBIF API for the occurrence search
type Response struct {
Offset int
Limit int
EndOfRecords bool
Count int
Results []Result
Facets []Facet
}
type Result struct {
Key int
DatasetKey string
EventDate string
}
type Facet struct {
Field string
Counts []Count
}
type Count struct {
Name string
Count int
}
type LatestObservation struct {
ObservationID string
ObservationOriginalDate string
ObservationDate string
CountryCode string
TaxonID string
}
type Config struct {
UserAgentPrefix string
}
// Updates the configuration for the GBIF package
func UpdateConfig(config Config) {
if config.UserAgentPrefix != "" {
userAgent = config.UserAgentPrefix + "_" + userAgent
}
}
// FetchLatest fetches the latest observations of a taxon from the GBIF API
func FetchLatest(taxonID string) []LatestObservation {
slog.Info("Fetching latest observations from gbif", "taxonID", taxonID)
years := getYears(taxonID)
if len(years) == 0 {
slog.Info("No year data found for taxon")
return nil
}
countries := getCountries(taxonID, years)
baseUrl := endpoint + "?limit=" + fmt.Sprint(limit) + "&" + basisOfRecord + "&" + occurrenceStatus + "&" + "taxonKey=" + taxonID
var result []LatestObservation
for key, year := range countries {
i := 0
var observations []LatestObservation
for {
offset := i * limit
i++
var response Response
fetchUrl := baseUrl + "&year=" + year + "&country=" + key + "&offset=" + fmt.Sprint(offset)
body := internalFetch(fetchUrl)
if body == nil {
log.Default().Println("Failed to fetch data with Url: " + fetchUrl)
break
}
json.Unmarshal(body, &response)
breakEarly := false
if response.Count < 0 {
slog.Info("No more rows found for given taxa", "taxonID", taxonID)
break
} else {
for _, result := range response.Results {
if len(result.EventDate) < 4 {
continue
}
cleanDate := CleanDate(result.EventDate)
if len(observations) > 1 && cleanDate <= observations[len(observations)-1].ObservationDate {
continue
}
observations = append(observations, LatestObservation{
ObservationID: fmt.Sprint(result.Key),
ObservationOriginalDate: result.EventDate,
ObservationDate: cleanDate,
CountryCode: key,
TaxonID: taxonID,
})
// Escape hatch if we already on the last day of the year
if len(result.EventDate) >= 10 {
breakYear := result.EventDate[:4]
breakDay := breakYear + "-12-31"
if result.EventDate[:10] >= breakDay {
breakEarly = true
break
}
}
}
}
if response.EndOfRecords || breakEarly {
break
}
time.Sleep(1 * time.Second) // Prevent overload of the GBIF API
}
sort.Slice(observations, func(a, b int) bool {
return observations[b].ObservationDate < observations[a].ObservationDate
})
if len(observations) > 0 {
result = append(result, observations[0])
}
}
return result
}
// SaveObservation saves the latest observation for each taxon
// It first clears the old observations for each taxon before inserting the new ones
// to improve performance each insert contains alls new observations for this taxa at once
func SaveObservation(observation [][]LatestObservation, db *sql.DB) {
slog.Info("Updating observations", "taxa", len(observation))
const stmt = "INSERT INTO observations (ObservationID, TaxonID, CountryCode, ObservationDate, ObservationDateOriginal) VALUES"
for _, res := range observation {
var insertString []string
clearOldObservations(db, res[0].TaxonID)
slog.Info("Inserting new for taxaId", "observations", len(res), "taxaId", res[0].TaxonID)
for _, obs := range res {
insertString = append(insertString, fmt.Sprintf("('%s', '%s', '%s', '%s', '%s')", obs.ObservationID, obs.TaxonID, obs.CountryCode, obs.ObservationDate, obs.ObservationOriginalDate))
}
query := stmt + strings.Join(insertString, ",") + " ON CONFLICT DO NOTHING;"
_, err := db.Exec(query)
if err != nil {
slog.Error("Database error on inserting new observations", err)
}
}
}
// Get the synonym id for a taxon id, this is used if fetch is called on a synonym
func GetSynonymID(db *sql.DB, taxonID string) (string, error) {
var synonymID sql.NullString
err := db.QueryRow("SELECT SynonymID FROM taxa WHERE TaxonID = ?", taxonID).Scan(&synonymID)
if err != nil {
slog.Error("Failed to get SynonymID", "error", err)
return "", errors.New("failed to fetch")
}
if !synonymID.Valid {
return "", errors.New("no id found")
}
return synonymID.String, nil
}
// Helper function to get outdated observations at random
// We only want to fetch a few at a time to not overload the GBIF API
// This function is used by the cron job
func GetOutdatedObservations(db *sql.DB) []string {
rows, err := db.Query(`
SELECT TaxonID
FROM taxa
WHERE (6 > date_diff('month', today(), LastFetch) OR LastFetch IS NULL) AND isSynonym = FALSE
USING SAMPLE ` + SampleRows + ` ROWS`)
var taxonIDs []string
if err != nil {
slog.Error("Failed to get outdated observations", "error", err)
return taxonIDs
}
for rows.Next() {
var taxonID string
err = rows.Scan(&taxonID)
if err != nil {
log.Fatal(err)
}
taxonIDs = append(taxonIDs, taxonID)
}
return taxonIDs
}
// UpdateLastFetchStatus updates the last fetch status for a taxon
// this function should be called before updating the observations for a taxon
// The LastFetch column is used to determine if a taxon should be fetched at random by the GetOutdatedObservations function
func UpdateLastFetchStatus(db *sql.DB, taxonID string) bool {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.Exec("UPDATE taxa SET LastFetch = ? WHERE SynonymID = ? OR TaxonID = ?", now, taxonID, taxonID)
if err != nil {
slog.Error("Failed to update last fetch status", "error", err)
return false
}
return true
}
func internalFetch(url string) []byte {
client := http.Client{
Timeout: time.Second * 2,
}
req, err := http.NewRequest(http.MethodGet, api+url, nil)
if err != nil {
log.Default().Printf("Failed to fetch data: %s", err)
return nil
}
req.Header.Set("User-Agent", userAgent)
res, getErr := client.Do(req)
if getErr != nil {
log.Default().Printf("Failed to fetch data: %s", getErr)
return nil
}
if res.Body != nil {
defer res.Body.Close()
}
body, readErr := io.ReadAll(res.Body)
if readErr != nil {
log.Default().Printf("Failed to read body: %s", readErr)
return nil
}
if res.StatusCode != 200 {
log.Default().Printf("Failed to fetch data, StatusCode: %s", res.Status)
return nil
}
return body
}
// Helper function to get the years of observations via facet from the API
func getYears(taxonID string) []int {
var years []int
year := time.Now().Year() + 1
url := endpoint + "?facetMultiselect=true&facet=year&facetLimit=5000&taxonKey=" + taxonID + "&year=" + fmt.Sprint(year)
body := internalFetch(url)
if body == nil {
log.Default().Print("Failed to fetch years data")
return years
}
var response Response
json.Unmarshal(body, &response)
if len(response.Facets) > 0 {
for _, facet := range response.Facets {
if facet.Field == "YEAR" {
for _, count := range facet.Counts {
year, err := strconv.Atoi(count.Name)
if err != nil {
log.Default().Printf("Failed to convert year to int: %s", err)
continue
}
years = append(years, year)
}
}
}
}
sort.Slice(years, func(a, b int) bool {
return years[b] < years[a]
})
return years
}
// Helper function to get the countries of observations via facet from the API
func getCountries(taxonID string, years []int) map[string]string {
countriesMap := make(map[string]string)
countries := make(map[string]string)
i := 0
for _, year := range years {
i++
// We only go 10 deep if we miss any countries so be it
if i > 10 {
break
}
url := endpoint + "?facet=country&facetLimit=5000&taxonKey=" + taxonID + "&year=" + fmt.Sprint(year)
body := internalFetch(url)
var response Response
json.Unmarshal(body, &response)
if len(response.Facets) > 0 {
for _, facet := range response.Facets {
if facet.Field == "COUNTRY" {
for _, count := range facet.Counts {
if count.Name == "" {
continue
}
if _, value := countries[count.Name]; !value {
countries[count.Name] = count.Name
countriesMap[count.Name] = fmt.Sprint(year)
}
}
}
}
}
}
return countriesMap
}
// Clean observation date to be in the format of YYYY-MM-DD
func CleanDate(date string) string {
if date == "" {
return ""
}
var dateParts []string
if strings.Contains(date, "/") {
dateParts = strings.Split(date, "/")
} else {
dateParts = []string{date}
}
// Remove time if it exists
dateParts = strings.Split(dateParts[0], " ")
dateParts = strings.Split(dateParts[0], "T")
dateParts = strings.Split(dateParts[0], "-")
if len(dateParts) == 1 {
return dateParts[0] + "-01-01"
} else if len(dateParts) == 2 {
return dateParts[0] + "-" + dateParts[1] + "-01"
} else {
return dateParts[0] + "-" + dateParts[1] + "-" + dateParts[2]
}
}
// We are only interested in the latest observation for each taxon, so we clear the old ones before inserting new ones
// runs in the same transaction as SaveObservation
func clearOldObservations(db *sql.DB, taxonID string) {
res, err := db.Exec("DELETE FROM observations WHERE TaxonID = ?", taxonID)
if err != nil {
slog.Error("Database error on clearing old observations", err)
}
affected, err := res.RowsAffected()
if err != nil {
slog.Error("Failed to get affected rows", err)
}
slog.Info("Deleted old observations", "taxonID", taxonID, "affected", affected)
}