-
Notifications
You must be signed in to change notification settings - Fork 1
/
SubBatcher.kt
105 lines (93 loc) · 3.7 KB
/
SubBatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.dluvian.voyage.data.nostr
import android.util.Log
import com.dluvian.nostr_kt.RelayUrl
import com.dluvian.voyage.core.DEBOUNCE
import com.dluvian.voyage.core.EventIdHex
import com.dluvian.voyage.core.PubkeyHex
import com.dluvian.voyage.core.createReplyAndVoteFilters
import com.dluvian.voyage.core.launchIO
import com.dluvian.voyage.core.syncedPutOrAdd
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import rust.nostr.protocol.EventId
import rust.nostr.protocol.Filter
import rust.nostr.protocol.PublicKey
import rust.nostr.protocol.Timestamp
import java.util.concurrent.atomic.AtomicBoolean
private const val TAG = "SubBatcher"
private const val BATCH_DELAY = 2 * DEBOUNCE
class SubBatcher(private val subCreator: SubscriptionCreator) {
private val idQueue = mutableMapOf<RelayUrl, MutableSet<EventIdHex>>()
private val votePubkeyQueue = mutableMapOf<RelayUrl, MutableSet<PubkeyHex>>()
private val isProcessingSubs = AtomicBoolean(false)
private val scope = CoroutineScope(Dispatchers.IO)
init {
startProcessingJob()
}
fun submitVotesAndReplies(
relayUrl: RelayUrl,
eventIds: List<EventIdHex>,
votePubkeys: List<PubkeyHex>
) {
if (eventIds.isEmpty()) return
idQueue.syncedPutOrAdd(relayUrl, eventIds)
votePubkeyQueue.syncedPutOrAdd(relayUrl, votePubkeys)
startProcessingJob()
}
private fun startProcessingJob() {
if (!isProcessingSubs.compareAndSet(false, true)) return
Log.i(TAG, "Start job")
scope.launchIO {
while (true) {
delay(BATCH_DELAY)
val idsByRelay = mutableMapOf<RelayUrl, Set<EventIdHex>>()
val votePubkeys = mutableMapOf<RelayUrl, Set<PubkeyHex>>()
synchronized(idQueue) {
idsByRelay.putAll(idQueue)
idQueue.clear()
}
synchronized(votePubkeyQueue) {
votePubkeys.putAll(votePubkeyQueue)
votePubkeyQueue.clear()
}
val timestamp = Timestamp.now()
val replyAndVoteFilters = getReplyAndVoteFilters(
idsByRelay = idsByRelay,
votePubkeys = votePubkeys,
timestamp = timestamp
)
replyAndVoteFilters.forEach { (relay, filters) ->
Log.d(TAG, "Sub ${filters.size} filters in $relay")
subCreator.subscribe(relayUrl = relay, filters = filters)
}
}
}.invokeOnCompletion {
Log.w(TAG, "Processing job completed", it)
isProcessingSubs.set(false)
}
}
private fun getReplyAndVoteFilters(
idsByRelay: Map<RelayUrl, Set<EventIdHex>>,
votePubkeys: Map<RelayUrl, Set<PubkeyHex>>,
timestamp: Timestamp,
): Map<RelayUrl, List<Filter>> {
val convertedIds = mutableMapOf<EventIdHex, EventId>()
val convertedPubkeys = mutableMapOf<EventIdHex, PublicKey>()
return idsByRelay.mapValues { (relay, ids) ->
val eventIds = ids.map {
val id = convertedIds[it] ?: EventId.fromHex(it)
convertedIds.putIfAbsent(it, id) ?: id
}
val publicKeys = votePubkeys.getOrDefault(relay, emptySet()).map {
val pubkey = convertedPubkeys[it] ?: PublicKey.fromHex(it)
convertedPubkeys.putIfAbsent(it, pubkey) ?: pubkey
}
createReplyAndVoteFilters(
ids = eventIds,
votePubkeys = publicKeys,
timestamp = timestamp
)
}
}
}