/
state_machine_event.go
152 lines (115 loc) · 3.46 KB
/
state_machine_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package assets
import (
"golang.org/x/xerrors"
)
type mutator interface {
apply(state *AssetPullingInfo)
}
// globalMutator is an event which can apply in every state
type globalMutator interface {
// applyGlobal applies the event to the state. If if returns true,
// event processing should be interrupted
applyGlobal(state *AssetPullingInfo) bool
}
// Ignorable Ignorable
type Ignorable interface {
Ignore()
}
// Global events
// PullAssetRestart restarts asset pulling
type PullAssetRestart struct{}
func (evt PullAssetRestart) applyGlobal(state *AssetPullingInfo) bool {
state.RetryCount = 0
return false
}
// PullAssetFatalError represents a fatal error in asset pulling
type PullAssetFatalError struct{ error }
// FormatError Format error
func (evt PullAssetFatalError) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt PullAssetFatalError) applyGlobal(state *AssetPullingInfo) bool {
log.Errorf("Fatal error on asset %s: %+v", state.CID, evt.error)
return true
}
// AssetForceState forces an asset state
type AssetForceState struct {
State AssetState
// Requester string
Details string
SeedNodeID string
}
func (evt AssetForceState) applyGlobal(state *AssetPullingInfo) bool {
state.State = evt.State
// state.Requester = evt.Requester
state.Details = evt.Details
state.SeedNodeID = evt.SeedNodeID
state.RetryCount = 0
return true
}
// InfoUpdate update asset info
type InfoUpdate struct {
Size int64
Blocks int64
}
func (evt InfoUpdate) applyGlobal(state *AssetPullingInfo) bool {
if state.State == SeedPulling || state.State == SeedUploading {
state.Size = evt.Size
state.Blocks = evt.Blocks
}
return true
}
func (evt InfoUpdate) Ignore() {
}
// PulledResult represents the result of node pulling
type PulledResult struct {
BlocksCount int64
Size int64
}
func (evt PulledResult) apply(state *AssetPullingInfo) {
if state.State == SeedPulling || state.State == SeedUploading {
state.Size = evt.Size
state.Blocks = evt.BlocksCount
}
}
func (evt PulledResult) Ignore() {
}
// PullRequestSent indicates that a pull request has been sent
type PullRequestSent struct{}
func (evt PullRequestSent) apply(state *AssetPullingInfo) {
}
// AssetRePull re-pull the asset
type AssetRePull struct{}
func (evt AssetRePull) apply(state *AssetPullingInfo) {
state.RetryCount++
}
func (evt AssetRePull) Ignore() {
}
// PullSucceed indicates that a node has successfully pulled an asset
type PullSucceed struct{}
func (evt PullSucceed) apply(state *AssetPullingInfo) {
state.RetryCount = 0
// Check to node offline while replenishing the temporary replicas
// After these temporary replicas are pulled, the count should be deleted
if state.State == EdgesPulling {
state.ReplenishReplicas = 0
}
}
func (evt PullSucceed) Ignore() {
}
// SkipStep skips the current step
type SkipStep struct{}
func (evt SkipStep) apply(state *AssetPullingInfo) {}
// PullFailed indicates that a node has failed to pull an asset
type PullFailed struct{ error }
// FormatError Format error
func (evt PullFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt PullFailed) apply(state *AssetPullingInfo) {
state.RetryCount = 1
}
func (evt PullFailed) Ignore() {
}
// SelectFailed indicates that node selection has failed
type SelectFailed struct{ error }
// FormatError Format error
func (evt SelectFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SelectFailed) apply(state *AssetPullingInfo) {
}