-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogbase.go
147 lines (130 loc) · 3.35 KB
/
logbase.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package logbase
import (
"compress/bzip2"
"compress/flate"
"compress/gzip"
"context"
"io"
//"github.com/google/uuid"
"github.com/pkg/errors"
"os"
"path/filepath"
"sync"
"github.com/jackc/pgx/v4/pgxpool"
"gopkg.in/birkirb/loggers.v1"
"github.com/apisite/logbase/nginx"
)
type Config struct {
FileBeginProc string `long:"file_begin" default:"logs.file_begin" description:"Before file load func"`
}
// FileConfig holds config loaded from DB via key
type FileConfig struct {
ID int
Type LogType
Data []byte
}
// Service holds service data
type Service struct {
Config *Config
Log loggers.Contextual
DB *pgxpool.Pool
commitLock sync.RWMutex
// MessageChan chan interface{}
}
// New creates an LogBase object
func New(cfg Config, log loggers.Contextual, db *pgxpool.Pool) *Service {
srv := &Service{
Config: &cfg,
Log: log,
DB: db,
// MessageChan: make(chan interface{}),
}
return srv
}
func (srv *Service) Auth(key string) (*FileConfig, error) {
ctx := context.Background()
db, err := srv.DB.Acquire(ctx)
if err != nil {
return nil, err
}
defer db.Release()
fc := FileConfig{}
sql := "select id, type_id, data from logs.config where key=$1"
if err := db.QueryRow(ctx, sql, key).Scan(&fc.ID, &fc.Type, &fc.Data); err != nil {
return nil, err
}
srv.Log.Printf("Got key for: %s/%d", fc.Type, fc.ID)
return &fc, nil
}
func (srv *Service) LoadFile(cfg *FileConfig, path, file, ctype string) (int, error) {
ctx := context.Background()
var fileID int
db, err := srv.DB.Acquire(ctx)
if err != nil {
return 0, errors.Wrap(err, "Acquire")
}
defer db.Release()
sql := "select logs.file_before(a_type_id => $1, a_config_id => $2, a_filename => $3)"
if err := db.QueryRow(ctx, sql, cfg.Type, cfg.ID, file).Scan(&fileID); err != nil {
return 0, errors.Wrap(err, "FileBegin")
}
filePath := filepath.Join(path, file)
go srv.load(cfg, filePath, ctype, fileID)
return fileID, nil
}
func (srv *Service) load(cfg *FileConfig, file, ctype string, fileID int) {
// file_begin
ctx := context.Background()
fh, err := os.Open(file)
if err != nil {
srv.Log.Errorf("Open %s error: %v", file, err)
return
}
defer fh.Close()
srv.Log.Warnf("File %s encoding: %s", file, ctype)
var reader io.Reader
switch ctype {
case "bz2":
reader = bzip2.NewReader(fh)
case "gzip":
gz, err := gzip.NewReader(fh)
if err != nil {
srv.Log.Errorf("Open gzip %s error: %v", file, err)
return
}
reader = gz
case "deflate":
def := flate.NewReader(fh)
reader = def
default:
// just use the default reader
reader = fh
}
//defer reader.Close()
var stat []interface{}
switch cfg.Type {
case Nginx:
srv.Log.Print("Load nginx: " + file)
nstat := nginx.Stat{}
err = nginx.Run(srv.DB, cfg.Data, fileID, reader, &nstat)
stat = []interface{}{fileID, err, nstat.Total, nstat.Loaded, nstat.Skipped, nstat.First, nstat.Last}
default:
err = errors.New("Unknown config type")
}
if err != nil {
srv.Log.Errorf("Run: %v", err)
return
}
// file_end
db, err := srv.DB.Acquire(ctx)
if err != nil {
srv.Log.Errorf("Acquire: %v", err)
return
}
defer db.Release()
sql := "select logs.file_after(a_id => $1, a_error => $2, a_total=>$3, a_loaded=>$4, a_skipped=>$5, a_first => $6, a_last => $7)"
if _, err := db.Exec(ctx, sql, stat...); err != nil {
srv.Log.Errorf("FileEnd: %v", err)
}
srv.Log.Printf("File stat: %+v", stat)
}