/
log_processor.go
181 lines (156 loc) · 5.73 KB
/
log_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package account
import (
"github.com/LemoFoundationLtd/lemochain-core/chain/types"
"github.com/LemoFoundationLtd/lemochain-core/common"
"github.com/LemoFoundationLtd/lemochain-core/common/log"
"sort"
)
type revision struct {
id int
journalIndex int
}
type RawAccountLoader interface {
getRawAccount(address common.Address) *Account
}
// LogProcessor records change logs and contract events during block's transaction execution. It can access raw account data for undo/redo
type LogProcessor struct {
accountLoader RawAccountLoader
// The change logs generated by transactions from the same block. We put all logs in a queue so we can revert to any revision
changeLogs []*types.ChangeLog
revisions []revision
nextRevisionId int
}
// NewLogProcessor creates a new LogProcessor. It is used to maintain change logs queue
func NewLogProcessor(accountLoader RawAccountLoader) *LogProcessor {
if accountLoader == nil {
panic("account.NewLogProcessor is called without an RawAccountLoader")
}
return &LogProcessor{
accountLoader: accountLoader,
changeLogs: make([]*types.ChangeLog, 0),
}
}
func (h *LogProcessor) GetAccount(addr common.Address) types.AccountAccessor {
return h.accountLoader.getRawAccount(addr)
}
func (h *LogProcessor) PushChangeLog(log *types.ChangeLog) {
h.changeLogs = append(h.changeLogs, log)
}
func (h *LogProcessor) GetChangeLogs() []*types.ChangeLog {
return h.changeLogs
}
func (h *LogProcessor) GetLogsByAddress(address common.Address) []*types.ChangeLog {
result := make([]*types.ChangeLog, 0)
for _, changeLog := range h.changeLogs {
if changeLog.Address == address {
result = append(result, changeLog)
}
}
return result
}
// GetNextVersion generate new version for change log
// func (h *LogProcessor) GetNextVersion(logType types.ChangeLogType, addr common.Address) uint32 {
// // read current version in account
// maxVersion := h.GetAccount(addr).GetBaseVersion(logType)
// // find newest change log in LogProcessor
// for _, changeLog := range h.changeLogs {
// if changeLog.LogType == logType && changeLog.Address == addr {
// maxVersion = changeLog.Version
// }
// }
// return maxVersion + 1
// }
func (h *LogProcessor) Clear() {
h.changeLogs = make([]*types.ChangeLog, 0)
h.nextRevisionId = 0
h.revisions = make([]revision, 0)
}
// Snapshot returns an identifier for the current revision of the change log.
func (h *LogProcessor) Snapshot() int {
id := h.nextRevisionId
h.nextRevisionId++
h.revisions = append(h.revisions, revision{id, len(h.changeLogs)})
return id
}
// checkRevisionAvailable check if the newest revision is accessible
func (h *LogProcessor) checkRevisionAvailable() bool {
if len(h.revisions) == 0 {
return true
}
last := h.revisions[len(h.revisions)-1]
return last.journalIndex <= len(h.changeLogs)
}
// RevertToSnapshot reverts all changes made since the given revision.
func (h *LogProcessor) RevertToSnapshot(revid int) {
// Find the snapshot in the stack of valid snapshots.
idx := sort.Search(len(h.revisions), func(i int) bool {
return h.revisions[i].id >= revid
})
if idx == len(h.revisions) || h.revisions[idx].id != revid {
log.Errorf("revision id %v cannot be reverted", revid)
panic(ErrRevisionNotExist)
}
snapshot := h.revisions[idx].journalIndex
lastVersions := make(map[common.Address]map[types.ChangeLogType]uint32)
// Replay the change log to undo changes.
for i := len(h.changeLogs) - 1; i >= snapshot; i-- {
changeLog := h.changeLogs[i]
account := h.GetAccount(changeLog.Address)
// Make sure the sequence of changelog is correct. And the change log's version is bigger than in account, because the version in account is only updated at the end of block process
accountVersion := account.(*Account).GetVersion(changeLog.LogType)
_, ok := lastVersions[changeLog.Address]
if !ok {
lastVersions[changeLog.Address] = make(map[types.ChangeLogType]uint32)
}
// Check change log's version
last, ok := lastVersions[changeLog.Address][changeLog.LogType]
if (!ok || last-1 == changeLog.Version) && accountVersion < changeLog.Version {
lastVersions[changeLog.Address][changeLog.LogType] = changeLog.Version
} else {
log.Errorf("expected undo version %d, got %d", last-1, changeLog.Version)
panic(types.ErrWrongChangeLogVersion)
}
err := changeLog.Undo(h)
if err != nil {
panic(err)
}
}
h.changeLogs = h.changeLogs[:snapshot]
// Remove invalidated snapshots from the stack.
h.revisions = h.revisions[:idx]
}
// Rebuild loads and redo all change logs to update account to the newest state.
func (h *LogProcessor) Rebuild(address common.Address, logs types.ChangeLogSlice) (*Account, error) {
sort.Sort(logs)
account := h.GetAccount(address).(*Account)
lastVersions := make(map[common.Address]uint32)
// Replay the change log to undo changes.
for _, changeLog := range logs {
accountVersion := account.GetVersion(changeLog.LogType)
last, ok := lastVersions[changeLog.Address]
// Make sure the sequence of changelog is correct. And the change log's version is bigger than in account
if !ok {
lastVersions[changeLog.Address] = accountVersion
last = accountVersion
}
// Check change log's version
if changeLog.Version < last+1 {
log.Errorf("expected redo version %d, got %d", last+1, changeLog.Version)
panic(types.ErrAlreadyRedo)
} else if changeLog.Version > last+1 {
log.Errorf("expected redo version %d, got %d", last+1, changeLog.Version)
panic(types.ErrWrongChangeLogVersion)
}
last = changeLog.Version
err := changeLog.Redo(h)
if err != nil && err != types.ErrAlreadyRedo {
panic(err)
}
}
// save account
return account, nil
}
// MergeChangeLogs merges the change logs for same account in block
func (h *LogProcessor) MergeChangeLogs() {
h.changeLogs = MergeChangeLogs(h.changeLogs)
}