/
handlers.go
105 lines (84 loc) · 2.83 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package server
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/cachecashproject/go-cachecash/log"
"golang.org/x/crypto/ed25519"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// ReceiveLogs receives the logs for processing. Spins out a goroutine to send to ES once received.
func (lp *LogPipe) ReceiveLogs(lf log.LogPipe_ReceiveLogsServer) (retErr error) {
peer, ok := peer.FromContext(lf.Context())
if !ok {
return status.Errorf(codes.FailedPrecondition, "failed to get grpc peer from ctx")
}
incomingIP := peer.Addr.String()[:strings.LastIndex(peer.Addr.String(), ":")]
if err := os.MkdirAll(filepath.Join(lp.config.SpoolDir, incomingIP), 0700); err != nil {
return status.Errorf(codes.ResourceExhausted, err.Error())
}
pubkey, err := lf.Recv()
if err != nil {
return err
}
if len(pubkey.PubKey) != 32 {
return status.Errorf(codes.InvalidArgument, "pubkey is the wrong size")
}
dataRateKey := strings.Join([]string{"rate-limit", "data", incomingIP}, "/")
tf, err := os.Create(filepath.Join(lp.config.SpoolDir, incomingIP, fmt.Sprintf("%d", time.Now().Unix())))
if err != nil {
return status.Errorf(codes.FailedPrecondition, err.Error())
}
defer func() {
tf.Close() // may already be closed, don't check the error here
if retErr != nil {
os.Remove(tf.Name()) // we're depending on the client to re-send the log bundle
}
}()
var size uint64
for {
select {
case <-lp.processContext.Done():
return lp.processContext.Err()
case <-lf.Context().Done():
return lf.Context().Err() // return an error so the file gets cleaned up
default:
}
data, err := lf.Recv()
if err != nil {
if err != io.EOF {
return status.Errorf(codes.FailedPrecondition, err.Error())
}
if err := tf.Close(); err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
// since there was no error, this file will not be cleaned up in the
// defer above. Instead, process will do that once its done working.
lp.processListMutex.Lock()
defer lp.processListMutex.Unlock()
lp.processList = append(lp.processList, &FileMeta{PubKey: pubkey.PubKey, Name: tf.Name(), IPAddr: incomingIP})
return nil
}
dataSize := int64(len(data.Data))
size += uint64(dataSize)
if lp.ratelimiter != nil {
if err := lp.ratelimiter.RateLimit(lf.Context(), dataRateKey, dataSize); err != nil {
return status.Errorf(codes.Unavailable, err.Error())
}
}
if size > lp.config.MaxLogSize {
return status.Errorf(codes.InvalidArgument, log.ErrBundleTooLarge.Error())
}
if !ed25519.Verify(pubkey.PubKey, data.Data, data.Signature) {
return status.Errorf(codes.InvalidArgument, "log bundle was not signed properly")
}
if _, err := tf.Write(data.Data); err != nil {
return status.Errorf(codes.ResourceExhausted, err.Error())
}
}
}