/
main.go
178 lines (154 loc) · 4.69 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// fdsn-holdings-consumer receives notifications for the creation of miniSEED objects
// in AWS S3. Notifications are received from SQS.
// The the miniSEED file referred to by the notification is fetched and indexed. The
// results are saved to the holdings DB.
//
// Multiple instances (workers) of this code can be run against the same queue for
// Large data reindexing tasks. Reindexing files that already exist in the bucket
// would require sending messages in the notification format to the SQS queue.
// See github.com/GeoNet/kit/aws/s3 for the Event type.
package main
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/GeoNet/kit/aws/s3"
"github.com/GeoNet/kit/aws/sqs"
"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/metrics"
)
var (
db *sql.DB
queueURL = os.Getenv("SQS_QUEUE_URL")
sqsClient sqs.SQS
s3Client *s3.S3
saveHoldings *sql.Stmt
)
type event struct {
s3.Event
}
func main() {
p, err := cfg.PostgresEnv()
if err != nil {
log.Fatalf("error reading DB config from the environment vars: %s", err)
}
db, err = sql.Open("postgres", p.Connection())
if err != nil {
log.Fatalf("error with DB config: %s", err)
}
defer db.Close()
// TODO - this is duplicated in the test set up.
// make a struct like in fdsn-holdings-consumer and move the
// db connection and set up to that.
//
// when a miniSEED file has errors the error state and message are saved to
// the holdings db. The key is the file name and the streamPK will be
// based on a nscl with zero strings "".""."".""
// if the error is corrected the stream will change to some valid nscl.
// To handle this the streamPK is updated on conflict.
saveHoldings, err = db.Prepare(`INSERT INTO fdsn.holdings (streamPK, start_time, numsamples, key, error_data, error_msg)
SELECT streamPK, $5, $6, $7, $8, $9
FROM fdsn.stream
WHERE network = $1
AND station = $2
AND channel = $3
AND location = $4
ON CONFLICT (streamPK, key) DO UPDATE SET
streamPK = EXCLUDED.streamPK,
start_time = EXCLUDED.start_time,
numsamples = EXCLUDED.numsamples,
error_data = EXCLUDED.error_data,
error_msg = EXCLUDED.error_msg`)
if err != nil {
log.Fatalf("preparing saveHoldings statement: %s", err.Error())
}
defer saveHoldings.Close()
db.SetMaxIdleConns(p.MaxIdle)
db.SetMaxOpenConns(p.MaxOpen)
ping:
for {
err = db.Ping()
if err != nil {
log.Println("problem pinging DB sleeping and retrying")
time.Sleep(time.Second * 30)
continue ping
}
break ping
}
sqsClient, err = sqs.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("error creating SQS client: %s", err)
}
s3c, err := s3.NewWithMaxRetries(3)
if err != nil {
log.Fatalf("error creating S3 client: %s", err)
}
s3Client = &s3c
log.Println("listening for messages")
var r sqs.Raw
var e event
for {
r, err = sqsClient.Receive(queueURL, 600)
if err != nil {
log.Printf("problem receiving message, backing off: %s", err)
time.Sleep(time.Second * 20)
continue
}
err = metrics.DoProcess(&e, []byte(r.Body))
if err != nil {
log.Printf("problem processing message, skipping deletion for redelivery: %s", err)
continue
}
err = sqsClient.Delete(queueURL, r.ReceiptHandle)
if err != nil {
log.Printf("problem deleting message, continuing: %s", err)
}
}
}
// Process implements msg.Processor for event.
func (e *event) Process(msg []byte) error {
err := json.Unmarshal(msg, e)
if err != nil {
return err
}
// add testing on the message. If these return errors the message should
// go to the DLQ for further inspectio. Will catch errors such
// as SQS->SNS subscriptions being not for raw messages.S
if e.Records == nil {
return errors.New("got nil Records pointer in notification message")
}
if len(e.Records) == 0 {
return errors.New("got zero Records in notification message")
}
for _, v := range e.Records {
switch {
case strings.HasPrefix(v.EventName, "ObjectCreated"):
// TODO (GMC) setting errors like this will include miniSEED errors as well
// errors from reading from S3. Is this ok or should it just be miniSEED errors?
h, err := holdingS3(v.S3.Bucket.Name, v.S3.Object.Key)
if err != nil {
h.key = v.S3.Object.Key
h.errorData = true
h.errorMsg = err.Error()
}
err = h.save()
if err != nil {
return fmt.Errorf("error saving holding for %s %s: %w", v.S3.Bucket.Name, v.S3.Object.Key, err)
}
case strings.HasPrefix(v.EventName, "ObjectRemoved"):
h := holding{key: v.S3.Object.Key}
err = h.delete()
if err != nil {
return fmt.Errorf("error deleting holdings for %s %s: %w", v.S3.Bucket.Name, v.S3.Object.Key, err)
}
default:
return errors.New("unknown EventName: " + v.EventName)
}
}
return nil
}