-
Notifications
You must be signed in to change notification settings - Fork 4
/
transaction_log.go
219 lines (182 loc) · 7.45 KB
/
transaction_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package cassandra
import (
"context"
"fmt"
"time"
"github.com/gocql/gocql"
"github.com/SharedCode/sop"
"github.com/SharedCode/sop/in_red_ck/redis"
)
// DateHourLayout format mask string.
const DateHourLayout = "2006-01-02T15"
// NilUUID with gocql.UUID type.
var NilUUID = gocql.UUID(sop.NilUUID)
// This is a good plan, it will work optimally because we are reading entire transaction logs set
// then deleting the entire partition when done. Use consistency of LOCAL_ONE when writing logs.
type TransactionLog interface {
// Add a transaction log.
Add(ctx context.Context, tid gocql.UUID, commitFunction int, payload []byte) error
// Remove all logs of a given transaciton.
Remove(ctx context.Context, tid gocql.UUID) error
// GetOne will fetch the oldest transaction logs from the backend, older than 1 hour ago, mark it so succeeding call
// will return the next hour and so on, until no more, upon reaching the current hour.
//
// GetOne behaves like a job distributor by the hour. SOP uses it to sprinkle/distribute task to cleanup
// left over resources by unfinished transactions in time. Be it due to crash or host reboot, any transaction
// temp resource will then age and reach expiration limit, then get cleaned up. This method is used to do distribution.
//
// It is capped to an hour ago older because anything newer may still be an in-flight or ongoing transaction.
GetOne(ctx context.Context) (gocql.UUID, string, []sop.KeyValuePair[int, []byte], error)
// Given a date hour, returns an available for cleanup set of transaction logs with their Transaction ID.
// Or nils if there is no more needing cleanup for this date hour.
GetLogsDetails(ctx context.Context, hour string) (gocql.UUID, []sop.KeyValuePair[int, []byte], error)
}
type transactionLog struct {
// Should coordinate via Redis cache. Each date hour should get locked and for "work" by GetOne
// to increase chances of distribution of cleanup load across machines.
redisCache redis.Cache
hourLockKey *redis.LockKeys
}
// Returns true if id is nil or empty UUID, otherwise false.
func IsNil(id gocql.UUID) bool {
return sop.UUID(id).IsNil()
}
// Now lambda to allow unit test to inject replayable time.Now.
var Now = time.Now
// NewBlobStore instantiates a new BlobStore instance.
func NewTransactionLog() TransactionLog {
return &transactionLog{
redisCache: redis.NewClient(),
hourLockKey: redis.CreateLockKeys("HBP")[0],
}
}
// GetOne fetches an expired Transaction ID(TID), the hour it was created in and transaction logs for this TID.
func (tl *transactionLog) GetOne(ctx context.Context) (gocql.UUID, string, []sop.KeyValuePair[int, []byte], error) {
duration := time.Duration(7 * time.Hour)
if err := redis.Lock(ctx, duration, tl.hourLockKey); err != nil {
return NilUUID, "", nil, nil
}
hour, tid, err := tl.getOne(ctx)
if err != nil {
redis.Unlock(ctx, tl.hourLockKey)
return NilUUID, hour, nil, err
}
if IsNil(tid) {
// Unlock the hour.
redis.Unlock(ctx, tl.hourLockKey)
return NilUUID, "", nil, nil
}
r, err := tl.getLogsDetails(ctx, tid)
if err != nil {
redis.Unlock(ctx, tl.hourLockKey)
return NilUUID, "", nil, err
}
// Check one more time to remove race condition issue.
if err := redis.IsLocked(ctx, tl.hourLockKey); err != nil {
// Just return nils as we can't attain a lock.
return NilUUID, "", nil, nil
}
return tid, hour, r, nil
}
func (tl *transactionLog) GetLogsDetails(ctx context.Context, hour string) (gocql.UUID, []sop.KeyValuePair[int, []byte], error) {
if hour == "" {
return NilUUID, nil, nil
}
if connection == nil {
return NilUUID, nil, fmt.Errorf("Cassandra connection is closed, 'call OpenConnection(config) to open it")
}
t, err := time.Parse(DateHourLayout, hour)
if err != nil {
return NilUUID, nil, err
}
// Put a max time of three hours for a given cleanup processor.
mh, _ := time.Parse(DateHourLayout, Now().Format(DateHourLayout))
if mh.Sub(t).Hours() > 4 {
// Unlock the hour to allow open opportunity to claim the next cleanup processing.
// Capping to 4th hour(Redis cache is set to 7hrs) maintains only one cleaner process at a time.
redis.Unlock(ctx, tl.hourLockKey)
return NilUUID, nil, nil
}
hrid := gocql.UUIDFromTime(t)
selectStatement := fmt.Sprintf("SELECT id FROM %s.t_log WHERE id < ? LIMIT 1 ALLOW FILTERING;", connection.Config.Keyspace)
qry := connection.Session.Query(selectStatement, hrid).WithContext(ctx).Consistency(gocql.LocalOne)
iter := qry.Iter()
var tid gocql.UUID
for iter.Scan(&tid) {
}
if err := iter.Close(); err != nil {
return NilUUID, nil, err
}
if IsNil(tid) {
// Unlock the hour.
redis.Unlock(ctx, tl.hourLockKey)
return NilUUID, nil, nil
}
r, err := tl.getLogsDetails(ctx, tid)
return tid, r, err
}
func (tl *transactionLog) getOne(ctx context.Context) (string, gocql.UUID, error) {
mh, _ := time.Parse(DateHourLayout, Now().Format(DateHourLayout))
// 70 minute capped hour as transaction has a max of 60min "commit time". 10 min
// gap ensures no issue due to overlapping.
cappedHour := mh.Add(-time.Duration(70 * time.Minute))
cappedHourTID := gocql.UUIDFromTime(cappedHour)
if connection == nil {
return "", NilUUID, fmt.Errorf("Cassandra connection is closed, 'call OpenConnection(config) to open it")
}
selectStatement := fmt.Sprintf("SELECT id FROM %s.t_log WHERE id < ? LIMIT 1 ALLOW FILTERING;", connection.Config.Keyspace)
qry := connection.Session.Query(selectStatement, cappedHourTID).WithContext(ctx).Consistency(gocql.LocalOne)
iter := qry.Iter()
var tid gocql.UUID
for iter.Scan(&tid) {
}
if err := iter.Close(); err != nil {
return "", NilUUID, err
}
return cappedHour.Format(DateHourLayout), tid, nil
}
func (tl *transactionLog) getLogsDetails(ctx context.Context, tid gocql.UUID) ([]sop.KeyValuePair[int, []byte], error) {
if connection == nil {
return nil, fmt.Errorf("Cassandra connection is closed, 'call OpenConnection(config) to open it")
}
selectStatement := fmt.Sprintf("SELECT c_f, c_f_p FROM %s.t_log WHERE id = ?;", connection.Config.Keyspace)
qry := connection.Session.Query(selectStatement, tid).WithContext(ctx).Consistency(gocql.LocalOne)
iter := qry.Iter()
r := make([]sop.KeyValuePair[int, []byte], 0, iter.NumRows())
var c_f int
var c_f_p []byte
for iter.Scan(&c_f, &c_f_p) {
r = append(r, sop.KeyValuePair[int, []byte]{
Key: c_f,
Value: c_f_p,
})
}
if err := iter.Close(); err != nil {
return r, err
}
return r, nil
}
// Add blob(s) to the Blob store.
func (tl *transactionLog) Add(ctx context.Context, tid gocql.UUID, commitFunction int, payload []byte) error {
if connection == nil {
return fmt.Errorf("Cassandra connection is closed, 'call OpenConnection(config) to open it")
}
insertStatement := fmt.Sprintf("INSERT INTO %s.t_log (id, c_f, c_f_p) VALUES(?,?,?);", connection.Config.Keyspace)
qry := connection.Session.Query(insertStatement, tid, commitFunction, payload).WithContext(ctx).Consistency(gocql.LocalOne)
if err := qry.Exec(); err != nil {
return err
}
return nil
}
// Remove will delete transaction log(t_log) records given a transaction ID(tid).
func (tl *transactionLog) Remove(ctx context.Context, tid gocql.UUID) error {
if connection == nil {
return fmt.Errorf("Cassandra connection is closed, 'call OpenConnection(config) to open it")
}
deleteStatement := fmt.Sprintf("DELETE FROM %s.t_log WHERE id = ?;", connection.Config.Keyspace)
qry := connection.Session.Query(deleteStatement, tid).WithContext(ctx).Consistency(gocql.LocalOne)
if err := qry.Exec(); err != nil {
return err
}
return nil
}