/
hashtable.go
239 lines (223 loc) · 6.62 KB
/
hashtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Hash table file contains binary content.
//
// This package implements a static hash table made of hash buckets and integer
// entries.
//
// Every bucket has a fixed number of entries. When a bucket becomes full, a new
// bucket is chained to it in order to store more entries. Every entry has an
// integer key and value. An entry key may have multiple values assigned to it,
// however the combination of entry key and value must be unique across the
// entire hash table.
package data
import (
"encoding/binary"
"sync"
"github.com/HouzuoGuo/tiedot/tdlog"
)
// Hash table file is a binary file containing buckets of hash entries.
type HashTable struct {
*Config
*DataFile
numBuckets int
Lock *sync.RWMutex
}
// Open a hash table file.
func (conf *Config) OpenHashTable(path string) (ht *HashTable, err error) {
ht = &HashTable{Config: conf, Lock: new(sync.RWMutex)}
if ht.DataFile, err = OpenDataFile(path, ht.HTFileGrowth); err != nil {
return
}
conf.CalculateConfigConstants()
ht.calculateNumBuckets()
return
}
// Follow the longest bucket chain to calculate total number of buckets, hence the "used size" of hash table file.
func (ht *HashTable) calculateNumBuckets() {
ht.numBuckets = ht.Size / ht.BucketSize
largestBucketNum := ht.InitialBuckets - 1
for i := 0; i < ht.InitialBuckets; i++ {
lastBucket := ht.lastBucket(i)
if lastBucket > largestBucketNum && lastBucket < ht.numBuckets {
largestBucketNum = lastBucket
}
}
ht.numBuckets = largestBucketNum + 1
usedSize := ht.numBuckets * ht.BucketSize
if usedSize > ht.Size {
ht.Used = ht.Size
ht.EnsureSize(usedSize - ht.Used)
}
ht.Used = usedSize
tdlog.Infof("%s: calculated used size is %d", ht.Path, usedSize)
}
// Return number of the next chained bucket.
func (ht *HashTable) nextBucket(bucket int) int {
if bucket >= ht.numBuckets {
return 0
}
bucketAddr := bucket * ht.BucketSize
nextUint, err := binary.Varint(ht.Buf[bucketAddr : bucketAddr+10])
next := int(nextUint)
if next == 0 {
return 0
} else if err < 0 || next <= bucket || next >= ht.numBuckets || next < ht.InitialBuckets {
tdlog.CritNoRepeat("Bad hash table - repair ASAP %s", ht.Path)
return 0
} else {
return next
}
}
// Return number of the last bucket in chain.
func (ht *HashTable) lastBucket(bucket int) int {
for curr := bucket; ; {
next := ht.nextBucket(curr)
if next == 0 {
return curr
}
curr = next
}
}
// Create and chain a new bucket.
func (ht *HashTable) growBucket(bucket int) {
ht.EnsureSize(ht.BucketSize)
lastBucketAddr := ht.lastBucket(bucket) * ht.BucketSize
binary.PutVarint(ht.Buf[lastBucketAddr:lastBucketAddr+10], int64(ht.numBuckets))
ht.Used += ht.BucketSize
ht.numBuckets++
}
// Clear the entire hash table.
func (ht *HashTable) Clear() (err error) {
if err = ht.DataFile.Clear(); err != nil {
return
}
ht.calculateNumBuckets()
return
}
// Store the entry into a vacant (invalidated or empty) place in the appropriate bucket.
func (ht *HashTable) Put(key, val int) {
for bucket, entry := ht.HashKey(key), 0; ; {
entryAddr := bucket*ht.BucketSize + BucketHeader + entry*EntrySize
if ht.Buf[entryAddr] != 1 {
ht.Buf[entryAddr] = 1
binary.PutVarint(ht.Buf[entryAddr+1:entryAddr+11], int64(key))
binary.PutVarint(ht.Buf[entryAddr+11:entryAddr+21], int64(val))
return
}
if entry++; entry == ht.PerBucket {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
ht.growBucket(ht.HashKey(key))
ht.Put(key, val)
return
}
}
}
}
// Look up values by key.
func (ht *HashTable) Get(key, limit int) (vals []int) {
if limit == 0 {
vals = make([]int, 0, 10)
} else {
vals = make([]int, 0, limit)
}
for count, entry, bucket := 0, 0, ht.HashKey(key); ; {
entryAddr := bucket*ht.BucketSize + BucketHeader + entry*EntrySize
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
if int(entryKey) == key {
vals = append(vals, int(entryVal))
if count++; count == limit {
return
}
}
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == ht.PerBucket {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Flag an entry as invalid, so that Get will not return it later on.
func (ht *HashTable) Remove(key, val int) {
for entry, bucket := 0, ht.HashKey(key); ; {
entryAddr := bucket*ht.BucketSize + BucketHeader + entry*EntrySize
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
if int(entryKey) == key && int(entryVal) == val {
ht.Buf[entryAddr] = 0
return
}
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == ht.PerBucket {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Divide the entire hash table into roughly equally sized partitions, and return the start/end key range of the chosen partition.
func (conf *Config) GetPartitionRange(partNum, totalParts int) (start int, end int) {
perPart := conf.InitialBuckets / totalParts
leftOver := conf.InitialBuckets % totalParts
start = partNum * perPart
if leftOver > 0 {
if partNum == 0 {
end++
} else if partNum < leftOver {
start += partNum
end++
} else {
start += leftOver
}
}
end += start + perPart
if partNum == totalParts-1 {
end = conf.InitialBuckets
}
return
}
// Collect entries all the way from "head" bucket to the end of its chained buckets.
func (ht *HashTable) collectEntries(head int) (keys, vals []int) {
keys = make([]int, 0, ht.PerBucket)
vals = make([]int, 0, ht.PerBucket)
var entry, bucket int = 0, head
for {
entryAddr := bucket*ht.BucketSize + BucketHeader + entry*EntrySize
entryKey, _ := binary.Varint(ht.Buf[entryAddr+1 : entryAddr+11])
entryVal, _ := binary.Varint(ht.Buf[entryAddr+11 : entryAddr+21])
if ht.Buf[entryAddr] == 1 {
keys = append(keys, int(entryKey))
vals = append(vals, int(entryVal))
} else if entryKey == 0 && entryVal == 0 {
return
}
if entry++; entry == ht.PerBucket {
entry = 0
if bucket = ht.nextBucket(bucket); bucket == 0 {
return
}
}
}
}
// Return all entries in the chosen partition.
func (ht *HashTable) GetPartition(partNum, partSize int) (keys, vals []int) {
rangeStart, rangeEnd := ht.GetPartitionRange(partNum, partSize)
prealloc := (rangeEnd - rangeStart) * ht.PerBucket
keys = make([]int, 0, prealloc)
vals = make([]int, 0, prealloc)
for head := rangeStart; head < rangeEnd; head++ {
k, v := ht.collectEntries(head)
keys = append(keys, k...)
vals = append(vals, v...)
}
return
}