-
Notifications
You must be signed in to change notification settings - Fork 0
/
sstable.go
159 lines (142 loc) · 4.29 KB
/
sstable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package reconEngine
import (
"io/ioutil"
"log"
"sort"
"strconv"
"strings"
"time"
)
//Base SsTable interface, you can implement own realisation
type SsTableStorage interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Del(key string) error
CreatePartition() SsTablePartitionStorage
ClosePartition(partition SsTablePartitionStorage) error
OpenPartition(createdAt int64) SsTablePartitionStorage
Range(cb func(createdAt int64, partitionStorage SsTablePartitionStorage) bool)
Len() int
CloseAll() error
MergeSort() error
}
type ssTablePartitions []SsTablePartitionStorage
func (ssp ssTablePartitions) Len() int { return len(ssp) }
func (ssp ssTablePartitions) Swap(i, j int) { ssp[i], ssp[j] = ssp[j], ssp[i] }
func (ssp ssTablePartitions) Less(i, j int) bool { return ssp[i].Key() > ssp[j].Key() }
type ssTablePartitionKeys []int64
func (sspK ssTablePartitionKeys) Len() int { return len(sspK) }
func (sspK ssTablePartitionKeys) Swap(i, j int) { sspK[i], sspK[j] = sspK[j], sspK[i] }
func (sspK ssTablePartitionKeys) Less(i, j int) bool { return sspK[i] > sspK[j] }
// Ss table realisation
type ssTable struct {
openedPartitions ssTablePartitions
availablePartitions ssTablePartitionKeys
dir *string
}
func (ssTable *ssTable) Len() int {
return len(ssTable.availablePartitions) + len(ssTable.openedPartitions)
}
func (ssTable ssTable) Range(cb func(createdAt int64, partitionStorage SsTablePartitionStorage) bool) {
for _, createdAt := range ssTable.availablePartitions {
ssTable.OpenPartition(createdAt)
}
for _, p := range ssTable.openedPartitions {
if !cb(p.Key(), p) {
break
}
}
}
func (ssTable *ssTable) CreatePartition() SsTablePartitionStorage {
ssp := ssTable.OpenPartition(time.Now().UnixNano())
return ssp
}
func (ssTable *ssTable) ClosePartition(partition SsTablePartitionStorage) error {
for i, p := range ssTable.openedPartitions {
if p.Key() == partition.Key() {
err := p.Close()
if err != nil {
return err
}
if len(ssTable.openedPartitions) > 1 {
ssTable.openedPartitions = append(ssTable.openedPartitions[:i], ssTable.openedPartitions[i+1:]...)
} else {
ssTable.openedPartitions = make(ssTablePartitions, 0)
}
ssTable.availablePartitions = append(ssTable.availablePartitions, p.Key())
sort.Sort(ssTable.availablePartitions)
break
}
}
return nil
}
func (ssTable *ssTable) OpenPartition(createdAt int64) SsTablePartitionStorage {
partition := NewSStablePartition(createdAt, ssTable.dir)
ssTable.openedPartitions = append(ssTable.openedPartitions, partition)
sort.Sort(ssTable.openedPartitions)
return partition
}
func (ssTable *ssTable) Get(key string) (val []byte, err error) {
if ssTable.Len() == 0 {
ssTable.CreatePartition()
}
ssTable.Range(func(createdAt int64, partitionStorage SsTablePartitionStorage) bool {
val, err = partitionStorage.Get(key)
if err == KeyNotFoundErr {
return true
}
return false
})
return
}
func (ssTable *ssTable) Set(key string, val []byte) (err error) {
if ssTable.Len() == 0 {
ssTable.CreatePartition()
}
ssTable.Range(func(createdAt int64, partitionStorage SsTablePartitionStorage) bool {
err = partitionStorage.Set(key, val)
return false
})
return
}
func (ssTable *ssTable) Del(key string) (err error) {
if ssTable.Len() == 0 {
ssTable.CreatePartition()
}
ssTable.Range(func(createdAt int64, partitionStorage SsTablePartitionStorage) bool {
err = partitionStorage.Del(key)
return false
})
return
}
func (ssTable *ssTable) CloseAll() error {
for _, o := range ssTable.openedPartitions {
err := ssTable.ClosePartition(o)
if err != nil {
return err
}
}
return nil
}
// SsTable constructor, create structure realised SsTableStorage interface
func NewSsTable(dir *string) SsTableStorage {
var SsTable = &ssTable{
dir: dir,
}
fileInfos, err := ioutil.ReadDir(*dir)
if err != nil {
log.Panic(err)
}
for _, fileInfo := range fileInfos {
if strings.Contains(fileInfo.Name(), "-partition") {
timestampEnc := strings.ReplaceAll(fileInfo.Name(), "-partition.bin", "")
timestamp, err := strconv.ParseInt(timestampEnc, 10, 64)
if err != nil {
log.Panic(err)
}
SsTable.availablePartitions = append(SsTable.availablePartitions, timestamp)
sort.Sort(SsTable.availablePartitions)
}
}
return SsTable
}