/
s3.go
99 lines (87 loc) · 2.56 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package connectors
import (
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/dimpogissou/isengard-server/config"
"github.com/dimpogissou/isengard-server/logger"
"github.com/hpcloud/tail"
uuid "github.com/nu7hatch/gouuid"
)
type S3Connector struct {
session *session.Session
client *s3.S3
cfg config.S3ConnectorConfig
}
func (c S3Connector) GetName() string {
return c.cfg.Name
}
// Sets up S3 client
func SetupS3Client(cfg config.S3ConnectorConfig) (*session.Session, *s3.S3) {
sessionPtr := session.Must(session.NewSession(&aws.Config{
S3ForcePathStyle: aws.Bool(true),
CredentialsChainVerboseErrors: aws.Bool(true),
Region: aws.String(cfg.Region),
Endpoint: aws.String(cfg.Endpoint),
}))
client := s3.New(sessionPtr, &aws.Config{})
logger.Info(fmt.Sprintf("Created S3 client --> %v", &client))
return sessionPtr, client
}
func (c S3Connector) Close() error {
logger.Info("Closed S3 connector (no-op) ...")
return nil
}
// Parses a log line into a string map using the regex built from config
func parseLine(l *tail.Line, re *regexp.Regexp) (map[string]string, error) {
match := re.FindStringSubmatch(l.Text)
if match == nil {
return make(map[string]string), errors.New("No match found in line, returning empty map")
} else {
paramsMap := make(map[string]string)
for i, name := range re.SubexpNames() {
if i > 0 && i <= len(match) {
paramsMap[name] = match[i]
}
}
return paramsMap, nil
}
}
// Puts a tailed line into the specified bucket
func (c S3Connector) s3PutObject(bucket string, fileKey string, line *tail.Line) (*s3.PutObjectOutput, error) {
p := s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(fileKey),
ACL: aws.String("public-read"),
Body: strings.NewReader(line.Text),
}
r, err := c.client.PutObject(&p)
if err != nil {
return nil, err
}
return r, nil
}
func (c S3Connector) Send(line *tail.Line) error {
t := time.Now()
uuid, err := uuid.NewV4()
if err != nil {
logger.Error("CreateUuidError", err.Error())
return err
}
fileName := fmt.Sprintf("%s/%d-%02d-%02dT%02d-%02d-%02d-%v",
c.cfg.KeyPrefix,
t.Year(), t.Month(), t.Day(),
t.Hour(), t.Minute(), t.Second(), uuid)
_, err = c.s3PutObject(c.cfg.Bucket, fileName, line)
logger.Info(fmt.Sprintf("Sending file '%s' to S3 bucket '%s'", fileName, c.cfg.Bucket))
if err != nil {
logger.Error("S3PutObjectError", err.Error())
return err
}
return nil
}