-
Notifications
You must be signed in to change notification settings - Fork 4
/
switch.go
108 lines (87 loc) · 2.25 KB
/
switch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.
package diskcache
import (
"fmt"
"io"
"os"
"time"
)
// switch to next file remembered in .pos file.
func (c *DiskCache) loadUnfinishedFile() error {
if _, err := os.Stat(c.pos.fname); err != nil {
return nil // .pos file not exist
}
pos, err := posFromFile(c.pos.fname)
if err != nil {
return fmt.Errorf("posFromFile: %w", err)
}
// check file's healty
if _, err := os.Stat(string(pos.Name)); err != nil { // not exist
if err := c.pos.reset(); err != nil {
return err
}
return nil
}
// invalid .pos, ignored
if pos.Seek <= 0 {
return nil
}
fd, err := os.OpenFile(string(pos.Name), os.O_RDONLY, c.filePerms)
if err != nil {
return fmt.Errorf("OpenFile: %w", err)
}
if _, err := fd.Seek(pos.Seek, io.SeekStart); err != nil {
return fmt.Errorf("Seek(%q: %d, 0): %w", pos.Name, pos.Seek, err)
}
c.rfd = fd
c.curReadfile = string(pos.Name)
c.pos.Name = pos.Name
c.pos.Seek = pos.Seek
return nil
}
// open next read file.
func (c *DiskCache) switchNextFile() error {
c.rwlock.Lock()
defer c.rwlock.Unlock()
if len(c.dataFiles) == 0 {
return nil
} else {
c.curReadfile = c.dataFiles[0]
}
fd, err := os.OpenFile(c.curReadfile, os.O_RDONLY, c.filePerms)
if err != nil {
return fmt.Errorf("under switchNextFile, OpenFile: %w, datafile: %+#v, ", err, c.dataFiles)
}
c.rfd = fd
if !c.noPos {
c.pos.Name = []byte(c.curReadfile)
c.pos.Seek = 0
if err := c.pos.dumpFile(); err != nil {
return err
}
}
return nil
}
// open write file.
func (c *DiskCache) openWriteFile() error {
if fi, err := os.Stat(c.curWriteFile); err == nil { // file exists
if fi.IsDir() {
return fmt.Errorf("data file should not be dir")
}
c.curBatchSize = fi.Size()
} else {
// file not exists
c.curBatchSize = 0
}
// write append fd, always write to the same-name file
wfd, err := os.OpenFile(c.curWriteFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, c.filePerms)
if err != nil {
return fmt.Errorf("under openWriteFile, OpenFile: %w", err)
}
c.wfdLastWrite = time.Now()
c.wfd = wfd
return nil
}