-
Notifications
You must be signed in to change notification settings - Fork 4
/
rotate.go
123 lines (101 loc) · 2.96 KB
/
rotate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.
package diskcache
import (
"encoding/binary"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
// Rotate force diskcache switch(rotate) from current write file(cwf) to next
// new file, leave cwf become readble on successive Get().
//
// NOTE: You do not need to call Rotate() during daily usage, we export
// that function for testing cases.
func (c *DiskCache) Rotate() error {
return c.rotate()
}
// rotate to next new file, append to reading list.
func (c *DiskCache) rotate() error {
c.rwlock.Lock()
defer c.rwlock.Unlock()
defer func() {
rotateVec.WithLabelValues(c.path).Inc()
sizeVec.WithLabelValues(c.path).Set(float64(c.size))
datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles)))
}()
eof := make([]byte, dataHeaderLen)
binary.LittleEndian.PutUint32(eof, EOFHint)
if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end
return err
}
// NOTE: EOF bytes do not count to size
// rotate file
var newfile string
if len(c.dataFiles) == 0 {
newfile = filepath.Join(c.path, fmt.Sprintf("data.%032d", 0)) // first rotate file
} else {
// parse last file's name, such as `data.000003', the new rotate file is `data.000004`
last := c.dataFiles[len(c.dataFiles)-1]
arr := strings.Split(filepath.Base(last), ".")
if len(arr) != 2 {
return ErrInvalidDataFileName
}
x, err := strconv.ParseInt(arr[1], 10, 64)
if err != nil {
return ErrInvalidDataFileNameSuffix
}
// data.0003 -> data.0004
newfile = filepath.Join(c.path, fmt.Sprintf("data.%032d", x+1))
}
// close current writing file
if err := c.wfd.Close(); err != nil {
return err
}
c.wfd = nil
// rename data -> data.0004
if err := os.Rename(c.curWriteFile, newfile); err != nil {
return err
}
c.dataFiles = append(c.dataFiles, newfile)
sort.Strings(c.dataFiles)
// reopen new write file
if err := c.openWriteFile(); err != nil {
return err
}
return nil
}
// after file read on EOF, remove the file.
func (c *DiskCache) removeCurrentReadingFile() error {
c.rwlock.Lock()
defer c.rwlock.Unlock()
defer func() {
sizeVec.WithLabelValues(c.path).Set(float64(c.size))
removeVec.WithLabelValues(c.path).Inc()
datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles)))
}()
if c.rfd != nil {
if err := c.rfd.Close(); err != nil {
return err
}
c.rfd = nil
}
if fi, err := os.Stat(c.curReadfile); err == nil { // file exist
if fi.Size() > dataHeaderLen {
c.size -= (fi.Size() - dataHeaderLen) // EOF bytes do not counted in size
}
if err := os.Remove(c.curReadfile); err != nil {
return fmt.Errorf("removeCurrentReadingFile: %q: %w", c.curReadfile, err)
}
}
c.curReadfile = ""
if len(c.dataFiles) > 0 {
c.dataFiles = c.dataFiles[1:] // first file removed
}
return nil
}