/
provider.go
126 lines (107 loc) · 3.91 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) 2021 Terminus, Inc.
//
// This program is free software: you can use, redistribute, and/or modify
// it under the terms of the GNU Affero General Public License, version 3
// or later ("AGPL"), as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"context"
"fmt"
"time"
"github.com/bluele/gcache"
"github.com/erda-project/erda-infra/base/logs"
"github.com/erda-project/erda-infra/base/servicehub"
writer "github.com/erda-project/erda-infra/pkg/parallel-writer"
"github.com/erda-project/erda-infra/providers/cassandra"
mutex "github.com/erda-project/erda-infra/providers/etcd-mutex"
"github.com/erda-project/erda-infra/providers/kafka"
"github.com/erda-project/erda-infra/providers/mysql"
"github.com/erda-project/erda/modules/core/monitor/log/schema"
)
const selector = "log-store"
type define struct{}
func (d *define) Services() []string { return []string{selector} }
func (d *define) Dependencies() []string {
return []string{"kafka", "cassandra", "mysql", "etcd-mutex"}
}
func (d *define) Summary() string { return "logs store" }
func (d *define) Description() string { return d.Summary() }
func (d *define) Config() interface{} { return &config{} }
func (d *define) Creator() servicehub.Creator {
return func() servicehub.Provider {
return &provider{}
}
}
type config struct {
Input kafka.ConsumerConfig `file:"input"`
Output struct {
LogSchema struct {
OrgRefreshInterval time.Duration `file:"org_refresh_interval" default:"2m" env:"LOG_SCHEMA_ORG_REFRESH_INTERVAL"`
} `file:"log_schema"`
Cassandra struct {
cassandra.WriterConfig `file:"writer_config"`
cassandra.SessionConfig `file:"session_config"`
GCGraceSeconds int `file:"gc_grace_seconds" default:"86400"`
DefaultTTL time.Duration `file:"default_ttl" default:"168h"`
TTLReloadInterval time.Duration `file:"ttl_reload_interval" default:"3m"`
CacheStoreInterval time.Duration `file:"cache_store_interval" default:"3m"`
} `file:"cassandra"`
IDKeys []string `file:"id_keys"`
} `file:"output"`
}
type provider struct {
Cfg *config
Log logs.Logger
Mysql mysql.Interface
Kafka kafka.Interface
EtcdMutexInf mutex.Interface
output writer.Writer
ttl ttlStore
schema schema.LogSchema
cache gcache.Cache
}
func (p *provider) Init(ctx servicehub.Context) error {
cass := ctx.Service("cassandra").(cassandra.Interface)
session, err := cass.Session(&p.Cfg.Output.Cassandra.SessionConfig)
if err != nil {
return fmt.Errorf("fail to create cassandra session, err=%s", err)
}
p.output = cass.NewBatchWriter(session, &p.Cfg.Output.Cassandra.WriterConfig, p.createLogStatementBuilder)
p.ttl = &mysqlStore{
ttlValue: make(map[string]int),
defaultTTLSec: int(p.Cfg.Output.Cassandra.DefaultTTL.Seconds()),
mysql: p.Mysql.DB(),
Log: p.Log.Sub("ttlStore"),
}
p.schema, err = schema.NewCassandraSchema(cass, p.Log.Sub("logSchema"))
if err != nil {
return err
}
p.cache = gcache.New(128).LRU().Build()
return nil
}
func (p *provider) Run(ctx context.Context) error {
if err := p.schema.CreateDefault(); err != nil {
return fmt.Errorf("create default error: %w", err)
}
go p.schema.RunDaemon(ctx, p.Cfg.Output.LogSchema.OrgRefreshInterval, p.EtcdMutexInf)
go p.ttl.Run(ctx, p.Cfg.Output.Cassandra.TTLReloadInterval)
go p.startStoreMetaCache(ctx)
if err := p.Kafka.NewConsumer(&p.Cfg.Input, p.invoke); err != nil {
return err
}
select {
case <-ctx.Done():
}
return nil
}
func init() {
servicehub.RegisterProvider(selector, &define{})
}