/
fileupdate.go
196 lines (178 loc) · 5.32 KB
/
fileupdate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright 2019 DxChain, All rights reserved.
// Use of this source code is governed by an Apache
// License 2.0 that can be found in the LICENSE file.
package storage
import (
"fmt"
"os"
"path/filepath"
"github.com/DxChainNetwork/godx/common"
"github.com/DxChainNetwork/godx/common/writeaheadlog"
"github.com/DxChainNetwork/godx/rlp"
)
const (
// OpInsertFile is the operation name for an InsertUpdate
OpInsertFile = "insert_file"
// OpDeleteFile is the operation name for an DeleteUpdate
OpDeleteFile = "delete_file"
)
// FileUpdate defines the update interface for dxfile and dxdir update.
// It is the intermediate layer between dxfile/dxdir persist and wal
// Currently FileUpdate is implemented by InsertUpdate and DeleteUpdate
type FileUpdate interface {
Apply() error // Apply the content of the update
EncodeToWalOp() (writeaheadlog.Operation, error) // convert an dxfileUpdate to writeaheadlog.Operation
}
type (
// InsertUpdate defines an update of insert Data into FileName at Offset
InsertUpdate struct {
FileName string
Offset uint64
Data []byte
}
// DeleteUpdate defines an update of delete the FileName
DeleteUpdate struct {
FileName string
}
)
// Apply execute the InsertUpdate, writing data to the location
func (iu *InsertUpdate) Apply() (err error) {
// Open the file
err = os.MkdirAll(filepath.Dir(iu.FileName), 0700)
if err != nil {
return fmt.Errorf("failed to make directory: %v", err)
}
f, err := os.OpenFile(iu.FileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to apply InsertUpdate: %v", err)
}
defer func() {
err = common.ErrCompose(err, f.Close())
}()
// Write the data
if n, err := f.WriteAt(iu.Data, int64(iu.Offset)); err != nil {
return fmt.Errorf("failed to write insertUpdate Data: %v", err)
} else if n < len(iu.Data) {
return fmt.Errorf("failed to write full Data of an insertUpdate: %v", err)
}
// Flush the data
return f.Sync()
}
// EncodeToWalOp encode the InsertUpdate to wal.Operation, named by OpInsertFile
func (iu *InsertUpdate) EncodeToWalOp() (writeaheadlog.Operation, error) {
data, err := rlp.EncodeToBytes(*iu)
if err != nil {
return writeaheadlog.Operation{}, err
}
return writeaheadlog.Operation{
Name: OpInsertFile,
Data: data,
}, err
}
// Apply of DeleteUpdate delete du.FileName
func (du *DeleteUpdate) Apply() (err error) {
err = os.Remove(du.FileName)
if os.IsNotExist(err) {
return nil
}
return
}
// EncodeToWalOp encode the DeleteUpdate to Operation, named by OpDeleteFile
func (du *DeleteUpdate) EncodeToWalOp() (writeaheadlog.Operation, error) {
data, err := rlp.EncodeToBytes(*du)
if err != nil {
return writeaheadlog.Operation{}, nil
}
return writeaheadlog.Operation{
Name: OpDeleteFile,
Data: data,
}, nil
}
// OpToUpdate decodeFromWalOp will decode the wal.Operation to a specified type of dxfileUpdate based on the op.Name field
func OpToUpdate(op writeaheadlog.Operation) (FileUpdate, error) {
switch op.Name {
case OpInsertFile:
return decodeOpToInsertUpdate(op)
case OpDeleteFile:
return decodeOpToDeleteUpdate(op)
default:
return nil, fmt.Errorf("invalid op.Name: %v", op.Name)
}
}
// decodeOpToDeleteUpdate decode the wal.Operation to a deleteUpdate
func decodeOpToDeleteUpdate(op writeaheadlog.Operation) (*DeleteUpdate, error) {
var du DeleteUpdate
err := rlp.DecodeBytes(op.Data, &du)
if err != nil {
return nil, err
}
return &du, nil
}
// decodeOpToInsertUpdate decode the op to an insertUpdate
func decodeOpToInsertUpdate(op writeaheadlog.Operation) (*InsertUpdate, error) {
var iu InsertUpdate
err := rlp.DecodeBytes(op.Data, &iu)
if err != nil {
return nil, err
}
return &iu, nil
}
// updatesToOps encode the updates to Operations
func updatesToOps(updates []FileUpdate) ([]writeaheadlog.Operation, error) {
var fullErr error
ops := make([]writeaheadlog.Operation, len(updates))
for i, update := range updates {
op, err := update.EncodeToWalOp()
if err != nil {
fullErr = common.ErrCompose(fullErr, err)
continue
}
ops[i] = op
}
return ops, fullErr
}
// ApplyOperations apply the operations
func ApplyOperations(ops []writeaheadlog.Operation) error {
var fullErr error
for i, op := range ops {
up, err := OpToUpdate(op)
if err != nil {
fullErr = common.ErrCompose(fullErr, fmt.Errorf("cannot decode op to update at index %d: %v", i, err))
continue
}
err = up.Apply()
if err != nil {
fullErr = common.ErrCompose(fullErr, fmt.Errorf("cannot apply operation %d: %v", i, err))
continue
}
}
return fullErr
}
// ApplyUpdates apply the updates on the wal
func ApplyUpdates(wal *writeaheadlog.Wal, updates []FileUpdate) error {
// Decode the updates to Operations
ops, err := updatesToOps(updates)
if err != nil {
return fmt.Errorf("failed to encode updates: %v", err)
}
// Create the writeaheadlog transaction
txn, err := wal.NewTransaction(ops)
if err != nil {
return fmt.Errorf("failed to create transaction: %v", err)
}
<-txn.InitComplete
if txn.InitErr != nil {
return fmt.Errorf("failed to create transaction: %v", txn.InitErr)
}
if err = <-txn.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %v", err)
}
err = ApplyOperations(ops)
if err != nil {
return err
}
if err = txn.Release(); err != nil {
return fmt.Errorf("failed to release transaction: %v", err)
}
return nil
}