-
Notifications
You must be signed in to change notification settings - Fork 1
/
file.go
executable file
·158 lines (138 loc) · 3.82 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// write MultiDataPoint into a file.
// no rotation currently
package backends
import (
"bytes"
"fmt"
"github.com/oliveagle/hickwall/backends/config"
"github.com/oliveagle/hickwall/newcore"
"log"
"os"
"path/filepath"
"time"
)
var (
_ = time.Now()
_ = fmt.Sprintf("")
)
type fileBackend struct {
name string
closing chan chan error // for Close
updates chan newcore.MultiDataPoint // for receive updates
// file backend specific attributes
output *os.File
conf *config.Transport_file
}
func NewFileBackend(name string, conf *config.Transport_file) (newcore.Publication, error) {
s := &fileBackend{
name: name,
closing: make(chan chan error),
updates: make(chan newcore.MultiDataPoint),
conf: conf,
}
if conf.Path == "" {
return nil, fmt.Errorf("backend file should not be empty")
}
go s.loop()
return s, nil
}
func (b *fileBackend) loop() {
var (
startConsuming <-chan newcore.MultiDataPoint
try_open_file_once chan bool
try_open_file_tick <-chan time.Time
buf = bytes.NewBuffer(make([]byte, 0, 1024))
)
startConsuming = b.updates
log.Println("filebackend.loop started")
for {
if b.output == nil && try_open_file_once == nil && try_open_file_tick == nil {
startConsuming = nil // disable consuming
try_open_file_once = make(chan bool)
// log.Println("try to open file the first time.")
// try to open file the first time async.
go func() {
err := b.openFile()
if b.output != nil && err == nil {
// log.Println("openFile first time OK", b.output)
try_open_file_once <- true
} else {
log.Printf("CRITICAL: filebackend trying to open file but failed: %s", err)
try_open_file_once <- false
}
}()
}
select {
case md := <-startConsuming:
// fmt.Println("start consuming ")
for _, p := range md {
if b.output != nil {
// fmt.Printf("fileBackend.loop name:%s, consuming md: 0x%X \n", b.name, &md)
// fmt.Println(p.Metric)
res, _ := p.MarshalJSON()
buf.Write(res)
buf.Write([]byte("\n"))
b.output.Write(buf.Bytes())
buf.Reset()
}
}
case opened := <-try_open_file_once:
try_open_file_once = nil // disable this branch
if !opened {
// failed open it the first time,
// then we try to open file with time interval, until opened successfully.
log.Println("open the first time failed, try to open with interval of 1s")
try_open_file_tick = time.Tick(time.Second * 1)
} else {
// log.Println("file opened the first time.", b.output, try_open_file_once, try_open_file_tick)
startConsuming = b.updates
}
case <-try_open_file_tick:
// try to open with interval
err := b.openFile()
if b.output != nil && err == nil {
// finally opened.
try_open_file_tick = nil
startConsuming = b.updates
} else {
log.Printf("CRITICAL: filebackend trying to open file but failed: %s", err)
}
case errc := <-b.closing:
// fmt.Println("errc <- b.closing")
log.Println("filebackend.loop closing")
startConsuming = nil // stop comsuming
errc <- nil
close(b.updates)
log.Println("filebackend.loop stopped")
return
}
}
}
func (b *fileBackend) Updates() chan<- newcore.MultiDataPoint {
return b.updates
}
func (b *fileBackend) Close() error {
// fmt.Println("bk.Close() start")
errc := make(chan error)
b.closing <- errc
if b.output != nil {
b.output.Close()
}
// fmt.Println("bk.Closed() finished")
return <-errc
}
func (b *fileBackend) Name() string {
return b.name
}
func (b *fileBackend) openFile() error {
abspath, err := filepath.Abs(b.conf.Path)
if err != nil {
return fmt.Errorf("failed to get abs path: %s, err: %v", b.conf.Path, err)
}
f, err := os.OpenFile(abspath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660)
if err != nil {
return fmt.Errorf("failed openFile: %v", err)
}
b.output = f
return nil
}