-
Notifications
You must be signed in to change notification settings - Fork 2
/
hot_feed.go
1380 lines (1231 loc) · 52.8 KB
/
hot_feed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package routes
import (
"bytes"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"reflect"
"sort"
"time"
"github.com/deso-smart/deso-core/v2/lib"
"github.com/golang/glog"
)
// This file defines a simple go routine that tracks "hot" posts from the specified look-back period as well
// as API functionality for retrieving scored posts. The algorithm for assessing a post's
// "hotness" is experimental and will likely be iterated upon depending on its results.
// HotnessFeed scoring algorithm knobs.
const (
// Number of blocks per halving for the scoring time decay for the global hot feed.
DefaultHotFeedTimeDecayBlocks uint64 = 72
// Number of blocks per halving for the scoring time decay for a tag hot feed.
DefaultHotFeedTagTimeDecayBlocks uint64 = 96
// Maximum score amount that any individual PKID can contribute before time decay.
DefaultHotFeedInteractionCap uint64 = 4e12
// Maximum score amount that any individual PKID can contribute before time decay for a particular tag grouping.
DefaultHotFeedTagInteractionCap uint64 = 4e12
// How many iterations of the hot feed calculation until the built-up caches should be reset. (Once per day)
ResetCachesIterationLimit int = 288
)
// A single element in the server's HotFeedOrderedList.
type HotFeedEntry struct {
PostHash *lib.BlockHash
PostHashHex string
HotnessScore uint64
}
// A single element in the server's HotFeedOrderedList, with the age of the post for sorting purposes.
type HotFeedEntryTimeSortable struct {
PostHash *lib.BlockHash
PostHashHex string
HotnessScore uint64
PostBlockAge int
}
// A key to track whether a specific public key has interacted with a post before.
type HotFeedInteractionKey struct {
InteractionPKID lib.PKID
InteractionPostHash lib.BlockHash
}
// Multipliers to help a node operator boost content from PKID's relevant to their node.
// For example, a sports-focused node could boost athlete PKIDs.
type HotFeedPKIDMultiplier struct {
// A multiplier applied to the score that each user interaction adds to a post.
InteractionMultiplier float64
// A multiplier applied to all posts from this specific PKID.
PostsMultiplier float64
}
// A cached "HotFeedOrderedList" is stored on the server object and updated whenever a new
// block is found. In addition, a "HotFeedApprovedPostMap" is maintained using hot feed
// approval/removal operations stored in global state. Once started, the routine runs every
// second in order to make sure hot feed removals are processed quickly.
func (fes *APIServer) StartHotFeedRoutine() {
glog.Info("Starting hot feed routine.")
// Initialize maps used for serving tag-specific hot feeds.
fes.PostTagToPostHashesMap = make(map[string]map[lib.BlockHash]bool)
fes.PostTagToOrderedHotFeedEntries = make(map[string][]*HotFeedEntry)
fes.PostTagToOrderedNewestEntries = make(map[string][]*HotFeedEntry)
fes.PostHashToPostTagsMap = make(map[lib.BlockHash][]string)
fes.HotFeedBlockCache = make(map[lib.BlockHash]*lib.MsgDeSoBlock)
cacheResetCounter := 0
go func() {
out:
for {
select {
case <-time.After(5 * time.Minute):
resetCache := false
if cacheResetCounter >= ResetCachesIterationLimit {
resetCache = true
cacheResetCounter = 0
}
fes.UpdateHotFeed(resetCache)
cacheResetCounter += 1
case <-fes.quit:
break out
}
}
}()
}
// The business.
func (fes *APIServer) UpdateHotFeed(resetCache bool) {
if resetCache {
glog.Info("Resetting hot feed cache.")
fes.PostTagToPostHashesMap = make(map[string]map[lib.BlockHash]bool)
fes.PostHashToPostTagsMap = make(map[lib.BlockHash][]string)
fes.HotFeedBlockCache = make(map[lib.BlockHash]*lib.MsgDeSoBlock)
}
// We copy the HotFeedApprovedPosts map and HotFeedPKIDMultiplier maps so we can access
// them safely without locking them.
hotFeedApprovedPosts := fes.CopyHotFeedApprovedPostsMap()
hotFeedPKIDMultipliers := fes.CopyHotFeedPKIDMultipliersMap()
// Update the approved posts map and pkid multipliers map based on global state.
fes.UpdateHotFeedApprovedPostsMap(hotFeedApprovedPosts)
fes.UpdateHotFeedPKIDMultipliersMap(hotFeedPKIDMultipliers)
// Update the HotFeedOrderedList based on the specified look-back period's blocks.
hotFeedPosts := fes.UpdateHotFeedOrderedList(hotFeedApprovedPosts, hotFeedPKIDMultipliers)
// The hotFeedPosts map will be nil unless we found new blocks in the call above.
if hotFeedPosts != nil {
fes.PruneHotFeedApprovedPostsMap(hotFeedPosts, hotFeedApprovedPosts)
}
// Replace the HotFeedApprovedPostsMap and HotFeedPKIDMultiplier map with the fresh ones.
fes.HotFeedApprovedPostsToMultipliers = hotFeedApprovedPosts
fes.HotFeedPKIDMultipliers = hotFeedPKIDMultipliers
}
func (fes *APIServer) UpdateHotFeedApprovedPostsMap(hotFeedApprovedPosts map[lib.BlockHash]float64) {
// Grab all of the relevant operations to update the map with.
startTimestampNanos := uint64(time.Now().UTC().AddDate(0, 0, -1).UnixNano()) // 1 day ago.
if fes.LastHotFeedApprovedPostOpProcessedTstampNanos != 0 {
startTimestampNanos = fes.LastHotFeedApprovedPostOpProcessedTstampNanos
}
startPrefix := GlobalStateSeekKeyForHotFeedApprovedPostOps(startTimestampNanos + 1)
opKeys, opVals, err := fes.GlobalState.Seek(
startPrefix,
_GlobalStatePrefixForHotFeedApprovedPostOps, /*validForPrefix*/
0, /*maxKeyLen -- ignored since reverse is false*/
0, /*numToFetch -- 0 is ignored*/
false, /*reverse*/
true, /*fetchValues*/
)
if err != nil {
glog.Infof("UpdateHotFeedApprovedPostsMap: Seek failed: %v", err)
}
// Chop up the keys and process each operation.
for opIdx, opKey := range opKeys {
// Each key consists of: prefix, timestamp, posthash.
timestampStartIdx := 1
postHashStartIdx := timestampStartIdx + 8
postHashBytes := opKey[postHashStartIdx:]
postHash := &lib.BlockHash{}
copy(postHash[:], postHashBytes)
// Deserialize the HotFeedApprovedPostOp.
hotFeedOp := HotFeedApprovedPostOp{}
hotFeedOpBytes := opVals[opIdx]
if len(hotFeedOpBytes) > 0 {
err = gob.NewDecoder(bytes.NewReader(hotFeedOpBytes)).Decode(&hotFeedOp)
if err != nil {
glog.Infof("UpdateHotFeedApprovedPostsMap: ERROR decoding HotFeedApprovedPostOp: %v", err)
continue
}
} else {
// If this row doesn't actually have a HotFeedApprovedPostOp, bail.
continue
}
if hotFeedOp.IsRemoval {
delete(hotFeedApprovedPosts, *postHash)
} else if hotFeedOp.Multiplier >= 0 {
hotFeedApprovedPosts[*postHash] = hotFeedOp.Multiplier
// Now we need to figure out if this was a multiplier update.
prevMultiplier, hasPrevMultiplier := fes.HotFeedApprovedPostsToMultipliers[*postHash]
if hasPrevMultiplier && prevMultiplier != hotFeedOp.Multiplier {
fes.HotFeedPostMultiplierUpdated = true
} else if hotFeedOp.Multiplier != 1 {
fes.HotFeedPostMultiplierUpdated = true
}
}
// If we've made it to the end of the op list, update the last op processed timestamp.
if opIdx == len(opKeys)-1 {
opTstampBytes := opKey[timestampStartIdx:postHashStartIdx]
opTstampNanos := lib.DecodeUint64(opTstampBytes)
fes.LastHotFeedApprovedPostOpProcessedTstampNanos = opTstampNanos
}
}
}
func (fes *APIServer) UpdateHotFeedPKIDMultipliersMap(
hotFeedPKIDMultipliers map[lib.PKID]*HotFeedPKIDMultiplier,
) {
// Grab all of the relevant operations to update the map with.
startTimestampNanos := uint64(time.Now().UTC().AddDate(0, 0, -1).UnixNano()) // 1 day ago.
if fes.LastHotFeedPKIDMultiplierOpProcessedTstampNanos != 0 {
startTimestampNanos = fes.LastHotFeedPKIDMultiplierOpProcessedTstampNanos
}
startPrefix := GlobalStateSeekKeyForHotFeedPKIDMultiplierOps(startTimestampNanos + 1)
opKeys, opVals, err := fes.GlobalState.Seek(
startPrefix,
_GlobalStatePrefixForHotFeedPKIDMultiplierOps, /*validForPrefix*/
0, /*maxKeyLen -- ignored since reverse is false*/
0, /*numToFetch -- 0 is ignored*/
false, /*reverse*/
true, /*fetchValues*/
)
if err != nil {
glog.Infof("UpdateHotFeedPKIDMultipliersMap: Seek failed: %v", err)
}
// Chop up the keys and process each operation.
for opIdx, opKey := range opKeys {
// Each key consists of: prefix, timestamp, PKID.
timestampStartIdx := 1
pkidStartIdx := timestampStartIdx + 8
opPKIDBytes := opKey[pkidStartIdx:]
opPKID := &lib.PKID{}
copy(opPKID[:], opPKIDBytes)
// Deserialize the HotFeedPKIDMultiplierOp.
hotFeedOp := HotFeedPKIDMultiplierOp{}
hotFeedOpBytes := opVals[opIdx]
if len(hotFeedOpBytes) > 0 {
err = gob.NewDecoder(bytes.NewReader(hotFeedOpBytes)).Decode(&hotFeedOp)
if err != nil {
glog.Infof("UpdateHotFeedPKIDMultipliersMap: ERROR decoding HotFeedPKIDMultiplierOp: %v", err)
continue
}
} else {
// If this row doesn't actually have a HotFeedPKIDMultiplierOp, bail.
continue
}
// Get the current multiplier and update it. Note that negatives are ignored.
hotFeedPKIDMultiplier := hotFeedPKIDMultipliers[*opPKID]
if hotFeedPKIDMultiplier == nil {
hotFeedPKIDMultiplier = &HotFeedPKIDMultiplier{
InteractionMultiplier: 1,
PostsMultiplier: 1,
}
}
if hotFeedOp.InteractionMultiplier >= 0 {
hotFeedPKIDMultiplier.InteractionMultiplier = hotFeedOp.InteractionMultiplier
} else if hotFeedOp.PostsMultiplier >= 0 {
hotFeedPKIDMultiplier.PostsMultiplier = hotFeedOp.PostsMultiplier
}
hotFeedPKIDMultipliers[*opPKID] = hotFeedPKIDMultiplier
// If we've made it to the end of the op list, update trackers.
if opIdx == len(opKeys)-1 {
// Update the time stamp of the last op processed.
opTstampBytes := opKey[timestampStartIdx:pkidStartIdx]
opTstampNanos := lib.DecodeUint64(opTstampBytes)
fes.LastHotFeedPKIDMultiplierOpProcessedTstampNanos = opTstampNanos
// Record that the multiplier map has updates.
fes.HotFeedPKIDMultiplierUpdated = true
}
}
}
func (fes *APIServer) CopyHotFeedApprovedPostsMap() map[lib.BlockHash]float64 {
hotFeedApprovedPosts := make(map[lib.BlockHash]float64, len(fes.HotFeedApprovedPostsToMultipliers))
for postKey, postVal := range fes.HotFeedApprovedPostsToMultipliers {
hotFeedApprovedPosts[postKey] = postVal
}
return hotFeedApprovedPosts
}
func (fes *APIServer) CopyHotFeedPKIDMultipliersMap() map[lib.PKID]*HotFeedPKIDMultiplier {
hotFeedPKIDMultipliers := make(map[lib.PKID]*HotFeedPKIDMultiplier, len(fes.HotFeedPKIDMultipliers))
for pkidKey, multiplierVal := range fes.HotFeedPKIDMultipliers {
multiplierValCopy := *multiplierVal
hotFeedPKIDMultipliers[pkidKey] = &multiplierValCopy
}
return hotFeedPKIDMultipliers
}
type HotnessPostInfo struct {
// How long ago the post was created in number of blocks
PostBlockAge int
HotnessScore uint64
}
func (fes *APIServer) UpdateHotFeedOrderedList(
postsToMultipliers map[lib.BlockHash]float64,
pkidsToMultipliers map[lib.PKID]*HotFeedPKIDMultiplier,
) (_hotFeedPostsMap map[lib.BlockHash]*HotnessPostInfo,
) {
// Check to see if any of the algorithm constants have changed.
foundNewConstants := false
globalStateInteractionCap, globalStateTagInteractionCap, globalStateTimeDecayBlocks, globalStateTagTimeDecayBlocks, globalStateTxnTypeMultiplierMap, err := fes.GetHotFeedConstantsFromGlobalState()
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to get constants: %v", err)
return nil
}
if globalStateInteractionCap == 0 || globalStateTimeDecayBlocks == 0 {
// The hot feed go routine has not been run yet since constants have not been set.
foundNewConstants = true
// Set the default constants in GlobalState and then on the server object.
err := fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedInteractionCap,
lib.EncodeUint64(DefaultHotFeedInteractionCap),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put InteractionCap: %v", err)
return nil
}
err = fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedTagInteractionCap,
lib.EncodeUint64(DefaultHotFeedTagInteractionCap),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put InteractionCap for tag feeds: %v", err)
return nil
}
err = fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedTimeDecayBlocks,
lib.EncodeUint64(DefaultHotFeedTimeDecayBlocks),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put TimeDecayBlocks: %v", err)
return nil
}
err = fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedTagTimeDecayBlocks,
lib.EncodeUint64(DefaultHotFeedTagTimeDecayBlocks),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put TimeDecayBlocks for tag feeds: %v", err)
return nil
}
// Now that we've successfully updated global state, set them on the server object.
fes.HotFeedInteractionCap = DefaultHotFeedInteractionCap
fes.HotFeedTagInteractionCap = DefaultHotFeedTagInteractionCap
fes.HotFeedTimeDecayBlocks = DefaultHotFeedTimeDecayBlocks
fes.HotFeedTagTimeDecayBlocks = DefaultHotFeedTagTimeDecayBlocks
fes.HotFeedTxnTypeMultiplierMap = make(map[lib.TxnType]uint64)
// Check to see if only the tag-specific feed configuration variables are unset and set just those.
} else if globalStateTagInteractionCap == 0 || globalStateTagTimeDecayBlocks == 0 {
// The hot feed go routine has not been run yet since constants have not been set.
foundNewConstants = true
err = fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedTagInteractionCap,
lib.EncodeUint64(DefaultHotFeedTagInteractionCap),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put InteractionCap: %v", err)
return nil
}
err = fes.GlobalState.Put(
_GlobalStatePrefixForHotFeedTagTimeDecayBlocks,
lib.EncodeUint64(DefaultHotFeedTagTimeDecayBlocks),
)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put TimeDecayBlocks: %v", err)
return nil
}
// Now that we've successfully updated global state, set them on the server object.
fes.HotFeedTagInteractionCap = DefaultHotFeedTagInteractionCap
fes.HotFeedTagTimeDecayBlocks = DefaultHotFeedTagTimeDecayBlocks
fes.HotFeedTxnTypeMultiplierMap = make(map[lib.TxnType]uint64)
} else if fes.HotFeedInteractionCap != globalStateInteractionCap ||
fes.HotFeedTimeDecayBlocks != globalStateTimeDecayBlocks ||
fes.HotFeedTagInteractionCap != globalStateTagInteractionCap ||
fes.HotFeedTagTimeDecayBlocks != globalStateTagTimeDecayBlocks ||
!reflect.DeepEqual(fes.HotFeedTxnTypeMultiplierMap, globalStateTxnTypeMultiplierMap) {
// New constants were found in global state. Set them and proceed.
fes.HotFeedInteractionCap = globalStateInteractionCap
fes.HotFeedTimeDecayBlocks = globalStateTimeDecayBlocks
fes.HotFeedTagInteractionCap = globalStateTagInteractionCap
fes.HotFeedTagTimeDecayBlocks = globalStateTagTimeDecayBlocks
fes.HotFeedTxnTypeMultiplierMap = globalStateTxnTypeMultiplierMap
foundNewConstants = true
} else if fes.HotFeedPostMultiplierUpdated || fes.HotFeedPKIDMultiplierUpdated {
// If a post's multiplier was updated, we need to recompute scores.
foundNewConstants = true
fes.HotFeedPostMultiplierUpdated = false
fes.HotFeedPKIDMultiplierUpdated = false
}
// If the constants for the algorithm haven't changed and we have already seen the latest
// block or the chain is out of sync, bail.
blockTip := fes.blockchain.BlockTip()
chainState := fes.blockchain.ChainState()
if (!foundNewConstants && blockTip.Height <= fes.HotFeedBlockHeight) ||
chainState != lib.SyncStateFullyCurrent {
return nil
}
// Log how long this routine takes, since it could be heavy.
glog.Info("UpdateHotFeedOrderedList: Starting new update cycle.")
start := time.Now()
// Get a utxoView for lookups.
utxoView, err := fes.backendServer.GetMempool().GetAugmentedUniversalView()
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to get utxo view: %v", err)
return nil
}
// This offset allows us to see what the hot feed would look like in the past,
// which is useful for testing purposes.
blockOffsetForTesting := 0
// Grab the last 90 days worth of blocks (25,920 blocks @ 5min/block).
lookbackWindowBlocks := 90 * 24 * 60 / 5
blockTipIndex := len(fes.blockchain.BestChain()) - 1 - blockOffsetForTesting
relevantNodes := fes.blockchain.BestChain()
if len(fes.blockchain.BestChain()) > (lookbackWindowBlocks + blockOffsetForTesting) {
relevantNodes = fes.blockchain.BestChain()[blockTipIndex-lookbackWindowBlocks-blockOffsetForTesting : blockTipIndex]
}
var hotnessInfoBlocks []*hotnessInfoBlock
for blockIdx, node := range relevantNodes {
var block *lib.MsgDeSoBlock
if cachedBlock, ok := fes.HotFeedBlockCache[*node.Hash]; ok {
block = cachedBlock
} else {
block, _ = lib.GetBlock(node.Hash, utxoView.Handle)
fes.HotFeedBlockCache[*node.Hash] = block
}
hotnessInfoBlocks = append(hotnessInfoBlocks, &hotnessInfoBlock{
Block: block,
// For time decay, we care about how many blocks away from the tip this block is.
BlockAge: len(relevantNodes) - blockIdx,
})
}
// Iterate over the blocks and track global feed hotness scores for each post.
hotnessInfoMapGlobalFeed, err := fes.PopulateHotnessInfoMap(utxoView, postsToMultipliers, pkidsToMultipliers, false, hotnessInfoBlocks)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put PopulateHotnessInfoMap for global feed: %v", err)
return nil
}
// Iterate over the blocks and track tag feed hotness scores for each post.
hotnessInfoMapTagFeed, err := fes.PopulateHotnessInfoMap(utxoView, postsToMultipliers, pkidsToMultipliers, true, hotnessInfoBlocks)
if err != nil {
glog.Infof("UpdateHotFeedOrderedList: ERROR - Failed to put PopulateHotnessInfoMap for tag feed: %v", err)
return nil
}
// Sort the map into an ordered list and set it as the server's new HotFeedOrderedList.
hotFeedOrderedList := []*HotFeedEntry{}
for postHashKey, hotnessInfo := range hotnessInfoMapGlobalFeed {
postHash := postHashKey
hotFeedEntry := &HotFeedEntry{
PostHash: &postHash,
PostHashHex: hex.EncodeToString(postHash[:]),
HotnessScore: hotnessInfo.HotnessScore,
}
hotFeedOrderedList = append(hotFeedOrderedList, hotFeedEntry)
}
sort.Slice(hotFeedOrderedList, func(ii, jj int) bool {
if hotFeedOrderedList[ii].HotnessScore != hotFeedOrderedList[jj].HotnessScore {
return hotFeedOrderedList[ii].HotnessScore > hotFeedOrderedList[jj].HotnessScore
} else {
return hotFeedOrderedList[ii].PostHashHex > hotFeedOrderedList[jj].PostHashHex
}
})
fes.HotFeedOrderedList = hotFeedOrderedList
fes.HotFeedPostHashToTagScoreMap = hotnessInfoMapTagFeed
// Set the ordered lists for hot feed based on tags.
postTagToOrderedHotFeedEntries := make(map[string][]*HotFeedEntry)
postTagToOrderedHotFeedEntries = fes.SaveOrderedFeedForTags(true, postTagToOrderedHotFeedEntries)
fes.PostTagToOrderedHotFeedEntries = postTagToOrderedHotFeedEntries
// Set the ordered lists for newness based on tags.
postTagToOrderedNewestEntries := map[string][]*HotFeedEntry{}
postTagToOrderedNewestEntries = fes.SaveOrderedFeedForTags(false, postTagToOrderedNewestEntries)
fes.PostTagToOrderedNewestEntries = postTagToOrderedNewestEntries
// Update the HotFeedBlockHeight so we don't re-evaluate this set of blocks.
fes.HotFeedBlockHeight = blockTip.Height
elapsed := time.Since(start)
glog.Infof("Successfully updated HotFeedOrderedList in %s", elapsed)
return hotnessInfoMapGlobalFeed
}
type hotnessInfoBlock struct {
Block *lib.MsgDeSoBlock
BlockAge int
}
func (fes *APIServer) PopulateHotnessInfoMap(
utxoView *lib.UtxoView,
postsToMultipliers map[lib.BlockHash]float64,
pkidsToMultipliers map[lib.PKID]*HotFeedPKIDMultiplier,
isTagFeed bool,
hotnessInfoBlocks []*hotnessInfoBlock,
) (map[lib.BlockHash]*HotnessPostInfo, error) {
hotnessInfoMap := make(map[lib.BlockHash]*HotnessPostInfo)
// Map of interaction key to transaction type multiplier applied.
postInteractionMap := make(map[HotFeedInteractionKey]uint64)
for _, hotnessInfoBlock := range hotnessInfoBlocks {
block := hotnessInfoBlock.Block
blockAgee := hotnessInfoBlock.BlockAge
for _, txn := range block.Txns {
// We only care about posts created in the specified look-back period. There should always be a
// transaction that creates a given post before someone interacts with it. By only
// scoring posts that meet this condition, we can restrict the HotFeedOrderedList
// to posts from the specified look-back period without even looking up the post time stamp.
isCreatePost, postHashCreated := CheckTxnForCreatePost(txn)
if isCreatePost {
hotnessInfoMap[*postHashCreated] = &HotnessPostInfo{
PostBlockAge: blockAgee,
HotnessScore: 0,
}
continue
}
// If the post has been edited, remove all tags associated with that post.
// This ensures that the categorization reflects the most recently edited version.
isEditPost, postHashEdited := CheckTxnForEditPost(txn)
if isEditPost {
tags := fes.PostHashToPostTagsMap[*postHashEdited]
delete(fes.PostHashToPostTagsMap, *postHashEdited)
for _, tag := range tags {
if postHashes, ok := fes.PostTagToPostHashesMap[tag]; ok {
delete(postHashes, *postHashEdited)
}
}
continue
}
// The age used in determining the score should be that of the post
// that we are evaluating. The interaction's score will be discounted
// by this age.
postHashToScore, posterPKID := GetPostHashToScoreForTxn(txn, utxoView)
if postHashToScore == nil {
// If we don't have a post hash to score then this txn is not relevant
// and we can continue.
continue
}
prevHotnessInfo, inHotnessInfoMap := hotnessInfoMap[*postHashToScore]
if !inHotnessInfoMap {
// If the post is not in the hotnessInfoMap yet, it wasn't created
// in the specified look-back period so we can continue.
continue
}
postBlockAge := prevHotnessInfo.PostBlockAge
// If we get here, we know we are dealing with a txn that interacts with a
// post that was created within the specified look-back period.
// Evaluate the txn and attempt to update the hotnessInfoMap.
postHashScored, interactionPKID, txnHotnessScore :=
fes.GetHotnessScoreInfoForTxn(txn, postBlockAge, postInteractionMap, utxoView, isTagFeed)
if txnHotnessScore != 0 && postHashScored != nil {
// Check for a post-specific multiplier.
multiplier, hasMultiplier := postsToMultipliers[*postHashScored]
if hasMultiplier && multiplier >= 0 {
txnHotnessScore = uint64(multiplier * float64(txnHotnessScore))
}
// Check for PKID-specifc multipliers for the poster and the interactor.
posterPKIDMultiplier, hasPosterPKIDMultiplier := pkidsToMultipliers[*posterPKID]
if hasPosterPKIDMultiplier {
txnHotnessScore = uint64(
posterPKIDMultiplier.PostsMultiplier * float64(txnHotnessScore))
}
interactionPKIDMultiplier, hasInteractionPKIDMultiplier := pkidsToMultipliers[*interactionPKID]
if hasInteractionPKIDMultiplier {
txnHotnessScore = uint64(
interactionPKIDMultiplier.InteractionMultiplier * float64(txnHotnessScore))
}
// Check for overflow just in case.
if prevHotnessInfo.HotnessScore > math.MaxInt64-txnHotnessScore {
continue
}
// Finally, make sure the post scored isn't a comment or repost.
postEntryScored := utxoView.GetPostEntryForPostHash(postHashScored)
if len(postEntryScored.ParentStakeID) > 0 || lib.IsVanillaRepost(postEntryScored) {
continue
}
var tags []string
var err error
// Before parsing the text body, first check to see if this post has been processed and cached prior.
if postTags, ok := fes.PostHashToPostTagsMap[*postHashScored]; ok {
tags = postTags
} else {
// Parse tags from post entry.
tags, err = ParseTagsFromPost(postEntryScored)
if err != nil {
return nil, err
}
// Cache processed post in map.
fes.PostHashToPostTagsMap[*postHashScored] = tags
// Add each tagged post to the tag:postEntries map
for _, tag := range tags {
// If a post hash set already exists, append to it,
// otherwise create a new set and add it to the map.
var postHashSet map[lib.BlockHash]bool
if postHashSet, ok = fes.PostTagToPostHashesMap[tag]; !ok {
postHashSet = make(map[lib.BlockHash]bool)
}
if _, ok = postHashSet[*postHashScored]; !ok {
postHashSet[*postHashScored] = true
}
fes.PostTagToPostHashesMap[tag] = postHashSet
}
}
// Update the hotness score.
prevHotnessInfo.HotnessScore += txnHotnessScore
}
}
}
return hotnessInfoMap, nil
}
// Rank posts on a tag-by-tag basis and save them to their corresponding index in a map.
// If sortByHotness is true, sort by their hotness score, otherwise sort by newness.
func (fes *APIServer) SaveOrderedFeedForTags(sortByHotness bool, PostTagToOrderedEntries map[string][]*HotFeedEntry) map[string][]*HotFeedEntry {
for tag, tagPostHashes := range fes.PostTagToPostHashesMap {
tagHotFeedOrderedList := []*HotFeedEntry{}
tagHotFeedListWithAge := []*HotFeedEntryTimeSortable{}
// Loop through every tagged post for the tag in question.
for tagPostHashKey, _ := range tagPostHashes {
tagPostHash := tagPostHashKey
if postHotnessInfo, ok := fes.HotFeedPostHashToTagScoreMap[tagPostHash]; ok {
postHotFeedEntry := &HotFeedEntryTimeSortable{
PostHash: &tagPostHash,
PostHashHex: hex.EncodeToString(tagPostHash[:]),
HotnessScore: postHotnessInfo.HotnessScore,
PostBlockAge: postHotnessInfo.PostBlockAge,
}
tagHotFeedListWithAge = append(tagHotFeedListWithAge, postHotFeedEntry)
}
}
// Sort posts based on specified criteria, either age (asc) or hotness (desc).
sort.Slice(tagHotFeedListWithAge, func(ii, jj int) bool {
if sortByHotness {
return tagHotFeedListWithAge[ii].HotnessScore > tagHotFeedListWithAge[jj].HotnessScore
} else {
return tagHotFeedListWithAge[ii].PostBlockAge < tagHotFeedListWithAge[jj].PostBlockAge
}
})
// Remove age from entry to save space.
tagHotFeedOrderedList = removeAgeFromSortedHotFeedEntries(tagHotFeedListWithAge)
PostTagToOrderedEntries[tag] = tagHotFeedOrderedList
}
return PostTagToOrderedEntries
}
// This function removes the age field from a sorted list of hot feed entries. This allows us to reduce the size
// of the entries created.
func removeAgeFromSortedHotFeedEntries(sortedHotFeedEntries []*HotFeedEntryTimeSortable) []*HotFeedEntry {
hotFeedEntriesWithoutAge := []*HotFeedEntry{}
for _, hotFeedEntryWithAge := range sortedHotFeedEntries {
hotFeedEntriesWithoutAge = append(hotFeedEntriesWithoutAge, &HotFeedEntry{
PostHash: hotFeedEntryWithAge.PostHash,
PostHashHex: hotFeedEntryWithAge.PostHashHex,
HotnessScore: hotFeedEntryWithAge.HotnessScore,
})
}
return hotFeedEntriesWithoutAge
}
func (fes *APIServer) GetHotFeedParamFromGlobalState(prefix []byte) (uint64, error) {
valueBytes, err := fes.GlobalState.Get(prefix)
if err != nil {
return 0, err
}
value := uint64(0)
if len(valueBytes) > 0 {
value = lib.DecodeUint64(valueBytes)
}
return value, nil
}
func (fes *APIServer) GetHotFeedConstantsFromGlobalState() (
_interactionCap uint64, _interactionTagCap uint64, _timeDecayBlocks uint64, _timeDecayTagBlocks uint64, _tnxTypeMultiplierMap map[lib.TxnType]uint64, _err error,
) {
interactionCap, err := fes.GetHotFeedParamFromGlobalState(_GlobalStatePrefixForHotFeedInteractionCap)
if err != nil {
return 0, 0, 0, 0, nil, nil
}
interactionCapTag, err := fes.GetHotFeedParamFromGlobalState(_GlobalStatePrefixForHotFeedTagInteractionCap)
if err != nil {
return 0, 0, 0, 0, nil, nil
}
timeDecayBlocks, err := fes.GetHotFeedParamFromGlobalState(_GlobalStatePrefixForHotFeedTimeDecayBlocks)
if err != nil {
return 0, 0, 0, 0, nil, nil
}
timeDecayBlocksTag, err := fes.GetHotFeedParamFromGlobalState(_GlobalStatePrefixForHotFeedTagTimeDecayBlocks)
if err != nil {
return 0, 0, 0, 0, nil, nil
}
txnTypeMultiplierMapBytes, err := fes.GlobalState.Get(_GlobalStatePrefixHotFeedTxnTypeMultiplierBasisPoints)
if err != nil {
return 0, 0, 0, 0, nil, nil
}
txnTypeMultiplierMap := make(map[lib.TxnType]uint64)
if len(txnTypeMultiplierMapBytes) > 0 {
if err = gob.NewDecoder(bytes.NewReader(txnTypeMultiplierMapBytes)).Decode(&txnTypeMultiplierMap); err != nil {
return 0, 0, 0, 0, nil, fmt.Errorf("Error decoding txnTypeMultiplierMapBytes to map: %v", err)
}
}
return interactionCap, interactionCapTag, timeDecayBlocks, timeDecayBlocksTag, txnTypeMultiplierMap, nil
}
func CheckTxnForCreatePost(txn *lib.MsgDeSoTxn) (
_isCreatePostTxn bool, _postHashCreated *lib.BlockHash) {
if txn.TxnMeta.GetTxnType() == lib.TxnTypeSubmitPost {
txMeta := txn.TxnMeta.(*lib.SubmitPostMetadata)
// The post hash of a brand new post is the same as its txn hash.
if len(txMeta.PostHashToModify) == 0 {
return true, txn.Hash()
}
}
return false, nil
}
func CheckTxnForEditPost(txn *lib.MsgDeSoTxn) (
_isEditPostTxn bool, _postHashCreated *lib.BlockHash) {
if txn.TxnMeta.GetTxnType() == lib.TxnTypeSubmitPost {
txMeta := txn.TxnMeta.(*lib.SubmitPostMetadata)
// The post hash of a brand new post is the same as its txn hash.
if len(txMeta.PostHashToModify) != 0 {
blockHashToModify := lib.NewBlockHash(txMeta.PostHashToModify)
return true, blockHashToModify
}
}
return false, nil
}
func GetPostHashToScoreForTxn(txn *lib.MsgDeSoTxn,
utxoView *lib.UtxoView) (_postHashScored *lib.BlockHash, _posterPKID *lib.PKID) {
// Figure out which post this transaction should affect.
interactionPostHash := &lib.BlockHash{}
var interactionPostEntry *lib.PostEntry
txnType := txn.TxnMeta.GetTxnType()
if txnType == lib.TxnTypeLike {
txMeta := txn.TxnMeta.(*lib.LikeMetadata)
interactionPostHash = txMeta.LikedPostHash
} else if txnType == lib.TxnTypeBasicTransfer {
// Check for a post being diamonded.
diamondPostHashBytes, hasDiamondPostHash := txn.ExtraData[lib.DiamondPostHashKey]
if hasDiamondPostHash {
copy(interactionPostHash[:], diamondPostHashBytes[:])
} else {
// If this basic transfer doesn't have a diamond, it is irrelevant.
return nil, nil
}
} else if txnType == lib.TxnTypeSubmitPost {
txMeta := txn.TxnMeta.(*lib.SubmitPostMetadata)
// If this is a transaction creating a brand new post, we can ignore it.
if len(txMeta.PostHashToModify) == 0 {
return nil, nil
}
postHash := &lib.BlockHash{}
copy(postHash[:], txMeta.PostHashToModify[:])
interactionPostEntry = utxoView.GetPostEntryForPostHash(postHash)
// For posts we must process three cases: Reposts, Quoted Reposts, and Comments.
if lib.IsVanillaRepost(interactionPostEntry) || lib.IsQuotedRepost(interactionPostEntry) {
repostedPostHashBytes := txn.ExtraData[lib.RepostedPostHash]
copy(interactionPostHash[:], repostedPostHashBytes)
} else if len(interactionPostEntry.ParentStakeID) > 0 {
copy(interactionPostHash[:], interactionPostEntry.ParentStakeID[:])
} else {
return nil, nil
}
} else {
// This transaction is not relevant, bail.
return nil, nil
}
// If we haven't gotten the post entry yet, make sure we fetch it.
if interactionPostEntry == nil {
interactionPostEntry = utxoView.GetPostEntryForPostHash(interactionPostHash)
}
// At this point, we have a post hash to return so look up the posterPKID as well.
posterPKIDEntry := utxoView.GetPKIDForPublicKey(interactionPostEntry.PosterPublicKey)
return interactionPostHash, posterPKIDEntry.PKID
}
// Returns the post hash that a txn is relevant to and the amount that the txn should contribute
// to that post's hotness score. The postInteractionMap is used to ensure that each PKID only
// gets one interaction per post.
func (fes *APIServer) GetHotnessScoreInfoForTxn(
txn *lib.MsgDeSoTxn,
blockAge int, // Number of blocks this txn is from the blockTip. Not block height.
postInteractionMap map[HotFeedInteractionKey]uint64,
utxoView *lib.UtxoView,
isTagFeed bool,
) (_postHashScored *lib.BlockHash, _interactionPKID *lib.PKID, _hotnessScore uint64,
) {
// Figure out who is responsible for the transaction.
interactionPKIDEntry := utxoView.GetPKIDForPublicKey(txn.PublicKey)
interactionPostHash, _ := GetPostHashToScoreForTxn(txn, utxoView)
// Check to see if we've seen this interaction pair before. Log an interaction if not.
interactionKey := HotFeedInteractionKey{
InteractionPKID: *interactionPKIDEntry.PKID,
InteractionPostHash: *interactionPostHash,
}
// Transaction type multiplier for current transaction.
multiplier := fes.getTxnMultiplier(txn)
// Get previously applied multiplier for post, if post has been counted already for this user.
if prevMultiplier, exists := postInteractionMap[interactionKey]; exists {
// If the previously applied multiplier is greater, skip this transaction.
if prevMultiplier > multiplier {
return nil, nil, 0
}
postInteractionMap[interactionKey] = multiplier
// We want to count the difference of the new multiplier and the previously counted multiplier.
multiplier = multiplier - prevMultiplier
} else {
postInteractionMap[interactionKey] = multiplier
}
// Finally return the post hash and the txn's hotness score.
interactionProfile := utxoView.GetProfileEntryForPKID(interactionPKIDEntry.PKID)
interactionUserBalance, err := utxoView.GetDeSoBalanceNanosForPublicKey(txn.PublicKey)
if err != nil {
return nil, nil, 0
}
hotnessScore := interactionUserBalance
// It is possible for the profile to be nil since you don't need a profile for diamonds.
if interactionProfile != nil && !interactionProfile.IsDeleted() {
hotnessScore += interactionProfile.CreatorCoinEntry.DeSoLockedNanos
}
// Apply transaction type multiplier.
// Multipliers are defined in basis points, so the resulting product is divided by 10,000.
hotnessScore = hotnessScore * multiplier / 10000
if hotnessScore > fes.HotFeedInteractionCap && !isTagFeed {
hotnessScore = fes.HotFeedInteractionCap
} else if hotnessScore > fes.HotFeedTagInteractionCap && isTagFeed {
hotnessScore = fes.HotFeedTagInteractionCap
}
var timeDecayBlocks uint64
if isTagFeed {
timeDecayBlocks = fes.HotFeedTagTimeDecayBlocks
} else {
timeDecayBlocks = fes.HotFeedTimeDecayBlocks
}
hotnessScoreTimeDecayed := uint64(float64(hotnessScore) *
math.Pow(0.5, float64(blockAge)/float64(timeDecayBlocks)))
return interactionPostHash, interactionPKIDEntry.PKID, hotnessScoreTimeDecayed
}
func (fes *APIServer) PruneHotFeedApprovedPostsMap(
hotFeedPosts map[lib.BlockHash]*HotnessPostInfo, hotFeedApprovedPosts map[lib.BlockHash]float64,
) {
for postHash := range fes.HotFeedApprovedPostsToMultipliers {
if _, inHotFeedMap := hotFeedPosts[postHash]; !inHotFeedMap {
delete(hotFeedApprovedPosts, postHash)
}
}
}
// Get the transaction type multiplier associated with a particular transaction
func (fes *APIServer) getTxnMultiplier(txn *lib.MsgDeSoTxn) uint64 {
if multiplier, ok := fes.HotFeedTxnTypeMultiplierMap[txn.TxnMeta.GetTxnType()]; ok {
return multiplier
} else {
// If transaction doesn't have a multiplier defined, multiply by 1x (in basis points)
return 10000
}
}
type HotFeedPageRequest struct {
ReaderPublicKeyBase58Check string
// Since the hot feed is constantly changing, we pass a list of posts that have already
// been seen in order to send a more accurate next page.
SeenPosts []string
// Number of post entry responses to return.
ResponseLimit int
// If defined, only get the hot feed for posts tagged with this tag.
Tag string
// If true, sort by new instead of by hotness. Only applies to queries where "Tag" is defined.
SortByNew bool
}
type HotFeedPageResponse struct {
HotFeedPage []PostEntryResponse
}
func (fes *APIServer) AdminGetUnfilteredHotFeed(ww http.ResponseWriter, req *http.Request) {
fes.HandleHotFeedPageRequest(ww, req, false /*approvedPostsOnly*/, true /*addMultiplierBool*/)
}
func (fes *APIServer) GetHotFeed(ww http.ResponseWriter, req *http.Request) {
fes.HandleHotFeedPageRequest(ww, req, false /*approvedPostsOnly*/, false /*addMultiplierBool*/)
}
func (fes *APIServer) HandleHotFeedPageRequest(
ww http.ResponseWriter,
req *http.Request,
approvedPostsOnly bool,
addMultiplierBool bool,
) {
decoder := json.NewDecoder(io.LimitReader(req.Body, MaxRequestBodySizeBytes))
requestData := HotFeedPageRequest{}
if err := decoder.Decode(&requestData); err != nil {
_AddBadRequestError(ww, fmt.Sprintf("HandleHotFeedPageRequest: Problem parsing request body: %v", err))
return
}
var readerPublicKeyBytes []byte
var err error
if requestData.ReaderPublicKeyBase58Check != "" {
readerPublicKeyBytes, _, err = lib.Base58CheckDecode(requestData.ReaderPublicKeyBase58Check)
if err != nil {
_AddBadRequestError(ww, fmt.Sprintf("HandleHotFeedPageRequest: Problem decoding reader public key: %v", err))
return
}
}
// Get a view.
utxoView, err := fes.backendServer.GetMempool().GetAugmentedUniversalView()
if err != nil {
_AddBadRequestError(ww, fmt.Sprintf("HandleHotFeedPageRequest: Error getting utxoView: %v", err))
return
}
// Make the lists of posts a user has already seen into a map.
seenPostsMap := make(map[string][]byte)
for _, postHashHex := range requestData.SeenPosts {
seenPostsMap[postHashHex] = []byte{}
}
hotFeed := []PostEntryResponse{}
// The list of posts that will be iterated on
var hotFeedOrderedList []*HotFeedEntry
// Only process posts tagged with a particular tag if specified in the request
if requestData.Tag != "" {
// Choose the map with the lists sorted in the manner specified by the user (hotness or newness).
var tagMap map[string][]*HotFeedEntry
if requestData.SortByNew {
tagMap = fes.PostTagToOrderedNewestEntries
} else {
tagMap = fes.PostTagToOrderedHotFeedEntries
}
// Check to make sure key exists in map. If not, return an empty list.
if orderedEntriesForTag, ok := tagMap[requestData.Tag]; ok {
hotFeedOrderedList = orderedEntriesForTag
} else {
hotFeedOrderedList = []*HotFeedEntry{}
}
} else {
hotFeedOrderedList = fes.HotFeedOrderedList
}
for _, hotFeedEntry := range hotFeedOrderedList {
if requestData.ResponseLimit != 0 && len(hotFeed) > requestData.ResponseLimit {
break
}
// Skip posts that have already been seen.
if _, alreadySeen := seenPostsMap[hotFeedEntry.PostHashHex]; alreadySeen {
continue
}
// Skip posts that aren't approved yet, if requested.
if _, isApproved := fes.HotFeedApprovedPostsToMultipliers[*hotFeedEntry.PostHash]; approvedPostsOnly && !isApproved {
continue
}
postEntry := utxoView.GetPostEntryForPostHash(hotFeedEntry.PostHash)
postEntryResponse, err := fes._postEntryToResponse(
postEntry, true, fes.Params, utxoView, readerPublicKeyBytes, 1)