-
Notifications
You must be signed in to change notification settings - Fork 0
/
ConcurrentPersistentURLMap_common.go
361 lines (326 loc) · 13.4 KB
/
ConcurrentPersistentURLMap_common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
// API:
// 1. PutEntry(long_url, expiry_date) -> (short_url, err)
// 2. GetEntry(short_url) -> (long_url, err)
// 3. CreateConcurrentExpiringPersistentURLMapFromDisk(expiration_check)
package util
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"log"
"os"
"path/filepath"
"strings"
)
type MapItemType struct {
IsTemporary bool // if it's not temporary then it's permanent
ValueType MapItemValueType
}
type MapItem interface {
MapItemToString() string
GetValue() string
GetExpiryTime() int64
GetType() MapItemType
}
type ConcurrentMap interface {
Get_Entry(string) (MapItem, error)
BeginConstruction(int64, ExpiryCallback) ConcurrentMap
ContinueConstruction(string, string, int64, MapItemValueType)
FinishConstruction()
NumItems() int
NumPastes() int
}
type URLMap interface {
Put_New_Entry(string, string, int64, MapItemValueType) error
NumItems() int
}
type LogStorage interface {
AppendNewEntry(string, string, MapItemValueType, int64) error
}
func GetEntryCommon(cm ConcurrentMap, short_url string) (MapItem, error) {
// Literally just pass it directly to the map. Reads should never hit disk.
val, err := cm.Get_Entry(short_url)
if err != nil {
return nil, err
}
return val, err
}
type PasteStorage interface {
InsertFile([]byte, int64) string
}
// Shorten long URL into short URL and return the short URL and store the entry both in map and on disk
func PutEntry_Common(requested_length int, long_url string, value_type MapItemValueType, timestamp int64, generate_strings_up_to int,
slice_storage map[int]*RandomBag64, urlmap URLMap, b53m *Base53IDManager, log_storage LogStorage, paste_storage PasteStorage, map_size_persister *MapSizeFileManager) (string, error) {
if requested_length < 2 { //nolint:gomnd // 2 is not magic here. BASE53 can only go down to 2 characters because it uses one character for the checksum
return "", errors.New("Requested length is too small.")
}
// if length is <= 5, grab it from one of the slices
var result_str string
if requested_length <= generate_strings_up_to { //nolint:nestif // yeah it's complicated
randombag, ok := slice_storage[requested_length]
if !ok {
log.Fatal("Failed to index slice_storage. This should never happen.")
panic("Failed to index slice_storage. This should never happen.")
}
item, err := randombag.PopRandom()
if err != nil {
// This should be a common scenario.
// We haven't modified anything at this point, so it's fine to return error here.
return "", errors.New("No short URLs left")
}
// At this point, the item has been removed from the slice, so add it to the map.
// Add item to the map
result_str = Convert_uint64_to_str(item, requested_length)
// Add paste to storage first
// If it's a paste, first add it to bucket storage before adding it into map
// This is a little bit hacky because we're using long_url as paste_data and then using the directory path as the long URL...
if value_type == TYPE_MAP_ITEM_PASTE {
long_url = paste_storage.InsertFile([]byte(long_url), timestamp)
}
err = urlmap.Put_New_Entry(result_str, long_url, timestamp, value_type)
if err != nil { // Only possible error is if entry already exists, which it should never do since we got it from the slice.
log.Fatal("Put_New_Entry failed. This should never happen. Error:", err)
panic("Put_New_Entry failed. This should never happen. Error:" + err.Error())
}
// Successfully put it into the map. Now write it to disk too
goto added_item_to_map
} else { // Otherwise randomly generate it and see if it already exists
// try 100 times, trying again when it fails due to already existing in the map
// probability of failing 100 times in a row should be astronomically small
// If it's a paste, first add it to bucket storage before adding it into map
// This is a little bit hacky because we're using long_url as paste_data and then using the directory path as the long URL...
if value_type == TYPE_MAP_ITEM_PASTE {
long_url = paste_storage.InsertFile([]byte(long_url), timestamp)
}
for i := 0; i < 100; i++ {
id, err := b53m.B53_generate_random_Base53ID(requested_length)
if err != nil {
log.Fatal("Failed to generate new random ID. This should never happen. Error:", err)
panic(err)
}
result_str = id.GetCombinedString()
err = urlmap.Put_New_Entry(result_str, long_url, timestamp, value_type)
if err == nil {
// Successfully put it into the map. Now write it to disk too
goto added_item_to_map
}
if i > 3 { //nolint:gomnd // 3 is a good number.
log.Println("Unexpected event: got duplicate ID", i, "times in a row. ID is:", result_str)
}
}
log.Fatal("Failed to generate new random string 100 times, this should never happen")
panic("Failed to generate new random string 100 times, this should never happen")
}
// Successfully put it into the map. Now write it to disk too
added_item_to_map:
// Update the size file if necessary
// log.Print("urlmap.NumItems():", urlmap.NumItems())
map_size_persister.UpdateMapSizeRounded(int64(urlmap.NumItems()))
// It's okay if this is slow since it's just a write. Most operations are going to be reads.
err := log_storage.AppendNewEntry(result_str, long_url, value_type, timestamp)
// log.Println("calling log_storage.AppendNewEntry(result_str, long_url, timestamp)")
if err != nil {
// It should never fail.
log.Fatal("AppendNewEntry failed:", err)
panic(err)
}
return result_str, nil
}
type NonExistentKeyError interface {
NonExistentKeyError() string
}
type LSRFD_Params struct {
B53m *Base53IDManager
Log_directory_path_absolute string
Size_file_path_absolute string
Entry_should_be_deleted_fn func(int64) bool
Lss LogStructuredStorage
Expiry_callback ExpiryCallback
Slice_storage map[int]*RandomBag64
Nil_ptr ConcurrentMap
Size_file_rounded_multiple int64
Generate_strings_up_to int
}
// This is the one you want to use in production
func LoadStoredRecordsFromDisk(params *LSRFD_Params) (ConcurrentMap, *MapSizeFileManager) { //nolint:gocognit,ireturn // yeah, it is complicated...
// First, list all the files in the directory
entries, err := os.ReadDir(params.Log_directory_path_absolute)
if err != nil {
log.Fatal("Failed to open bucket directory:", params.Log_directory_path_absolute, "error:", err)
panic(err)
}
// Now for each file, try to parse the file's filename and load it into the map, deleting associated files if entry is expired
files_to_be_loaded_from := make([]string, 0, len(entries))
for _, entry := range entries {
if entry.IsDir() { // ignore directories
continue
}
// validate file name
err = params.Lss.ValidateLogFilename(entry.Name())
if err != nil {
log.Fatal("Failed to parse name of file in log directory:", entry.Name(), "got error:", err)
panic(err)
}
// add it to the list of files to be loaded from
absolute_file_path := filepath.Join(params.Log_directory_path_absolute, entry.Name())
files_to_be_loaded_from = append(files_to_be_loaded_from, absolute_file_path)
}
map_size_persister := NewMapSizeFileManager(params.Size_file_path_absolute, params.Size_file_rounded_multiple)
// Load size of map from file
stored_map_length := map_size_persister.current_rounded_size
// Create the map and slice efficiently using the loaded rounded size. It's okay if it's too small, since these will grow automatically.
concurrent_map := params.Nil_ptr.BeginConstruction(stored_map_length, params.Expiry_callback)
for _, absolute_filepath := range files_to_be_loaded_from {
f, err := os.Open(absolute_filepath) //nolint:govet // ignore err shadow
if err != nil {
log.Fatal("Failed to open bucket log file:", absolute_filepath, "err:", err)
panic(err)
}
// Now scan the input from the file
br := bufio.NewReader(f)
for {
str_without_hash, err := br.ReadBytes('\x1e')
// log.Println("str_without_hash, err:", str_without_hash, err)
if len(str_without_hash) > 0 {
str_without_hash = str_without_hash[:len(str_without_hash)-1] // Remove trailing \r
}
// check if error is EOF
if errors.Is(err, io.EOF) {
// make sure we're not waiting for more input
// If ReadBytes encounters an error before finding a delimiter,
// it returns the data read before the error and the error itself (often io.EOF).
if len(str_without_hash) != 0 {
log.Fatal("File ", absolute_filepath, " does not end with newline, indicating some kind of corruption")
panic("File doesn't end with newline.")
}
break
}
if err != nil {
log.Fatal("Unexpected non-EOF error")
panic(err)
}
md5_base64, err := br.ReadBytes('\n')
if err != nil {
log.Println("Failed on file:", absolute_filepath)
}
Check_err(err)
md5_base64 = md5_base64[:len(md5_base64)-1] // remove trailing newline
parts := strings.Split(string(str_without_hash), "\t")
if len(parts) != 4 { //nolint:gomnd // 3 is okay here...
log.Fatal("Expected 4 parts (key, value, type, imestamp), got", len(parts))
panic("Got unexpected number of parts")
}
key_str := parts[0]
value_str := parts[1]
type_str := parts[2]
timestamp_str := parts[3]
// Check URL ID
_, err = params.B53m.NewBase53ID(key_str[:len(key_str)-1], key_str[len(key_str)-1], false)
if err != nil {
log.Fatal("Invalid URL ID:", key_str, "Error:", err)
panic(err)
}
// Check type_str
var map_item_type MapItemValueType
switch type_str {
case "url":
map_item_type = TYPE_MAP_ITEM_URL
case "paste":
map_item_type = TYPE_MAP_ITEM_PASTE
default:
log.Fatal("Unrecognized value type")
panic("Unrecognized value type")
}
// Check md5_base64
md5_bytes, err := base64.StdEncoding.DecodeString(string(md5_base64))
if err != nil {
log.Fatal("Could not decode base64-encoded md5", err)
panic(err)
}
// Now recompute the md5 and check it against the stored value
recomputed_md5 := md5.Sum(str_without_hash) //nolint:gosec // md5 is fine here.
if !bytes.Equal(recomputed_md5[:], md5_bytes) {
log.Fatalf("md5 does not match. Stored: %s Recomputed: %s", hex.EncodeToString(md5_bytes), hex.EncodeToString(recomputed_md5[:]))
panic("md5 does not match.")
}
// convert timestamp_str to timestamp_unix
timestamp_unix, err := String_to_int64(timestamp_str)
if err != nil {
log.Fatal("Could not convert timestamp_str to int64", err)
panic(err)
}
err = Validate_Timestamp_Common(timestamp_unix)
if err != nil {
log.Fatal(err)
panic(err)
}
if params.Entry_should_be_deleted_fn != nil {
// If entry is expired AND entry is temporary then delete the paste.
// This function being non-nil means we're in the temporary version.
// Therefore delete the paste if it's expired.
ignore_entry := params.Entry_should_be_deleted_fn(timestamp_unix)
if ignore_entry {
if map_item_type == TYPE_MAP_ITEM_PASTE {
// Try to delete it
// Ignore errors since it might already be deleted
_ = os.Remove(value_str)
}
continue
}
}
// So now we know the entry in the file is not expired.
// But what if there is already an entry in the map???
val, err := concurrent_map.Get_Entry(key_str) // if map already contains item, err will be nil
if err == nil { // This implies that we've already seen a non-expired entry for that URL ID, which should never happen
log.Fatal("Multiple non-expired entries found in log files for same key string: ", val.MapItemToString(), " key_str: ", key_str)
panic("Multiple non-expired entries found in log files for same URL ID")
}
// Insert it into map (and push it into heap for ConcurrentExpiringMap)
concurrent_map.ContinueConstruction(key_str, value_str, timestamp_unix, map_item_type)
}
}
// Call heap.Init() for ConcurrentExpiringMap
concurrent_map.FinishConstruction()
should_be_added_fn := func(keystr string) bool { // Only add to slice if it's not in the map
_, err := concurrent_map.Get_Entry(keystr) //nolint:govet // shadow is okay here.
if err != nil {
switch err.(type) { //nolint:errorlint // just let it fail
case CPMNonExistentKeyError, CEMNonExistentKeyError:
// okay, good
default:
log.Fatal("Unexpected error from Get_Entry", err)
panic(err)
}
}
return err != nil
}
for n := 2; n <= params.Generate_strings_up_to; n++ {
log.Println("Generating all Base 53 IDs of length", n)
slice, err := params.B53m.B53_generate_all_Base53IDs_int64_optimized(n, should_be_added_fn) //nolint:govet // ignore err shadow
if err != nil {
log.Fatal("B53_generate_all_Base53IDs_int64_optimized failed", err)
panic("B53_generate_all_Base53IDs_int64_optimized failed: " + err.Error())
}
params.Slice_storage[n] = CreateRandomBagFromSlice(slice)
}
if !IsSameType(concurrent_map, params.Nil_ptr) {
log.Fatalf("concurrent_map is of type %T while nil_ptr is of type %T", concurrent_map, params.Nil_ptr)
panic("Not same type.")
}
map_size_persister.UpdateMapSizeRounded(int64(concurrent_map.NumItems()))
return concurrent_map, map_size_persister
}
type GenericConcurrentPersistentMap interface {
GetEntry(short_url string) (MapItem, error)
PutEntry(requested_length int, long_url string, expiry_time int64, value_type MapItemValueType) (string, error)
NumItems() int
NumPastes() int
}
func type_asserts() {
var _ GenericConcurrentPersistentMap = &ConcurrentExpiringPersistentURLMap{}
var _ GenericConcurrentPersistentMap = &ConcurrentPersistentPermanentURLMap{}
}