/
CacheDownloader.kt
231 lines (189 loc) · 10.4 KB
/
CacheDownloader.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package com.opennxt.tools.impl.cachedownloader
import com.github.ajalt.clikt.parameters.options.default
import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.types.int
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.opennxt.Constants
import com.opennxt.ext.getCrc32
import com.opennxt.filesystem.ChecksumTable
import com.opennxt.filesystem.Container
import com.opennxt.filesystem.Filesystem
import com.opennxt.filesystem.ReferenceTable
import com.opennxt.filesystem.sqlite.SqliteFilesystem
import com.opennxt.tools.Tool
import java.lang.Thread.sleep
import java.nio.ByteBuffer
import java.util.concurrent.*
import kotlin.system.exitProcess
class CacheDownloader : Tool("cache-downloader", "Updates / downloads the cache from Jagex' JS5 servers") {
private val ip by option(help = "Live js5 server ip").default("content.runescape.com")
private val port by option(help = "Live js5 server port").int().default(43594)
private val numJs5Clients by option(help = "The amount of concurrent js5 connections").int().default(1)
private val numHttpClients by option(help = "The max amount of concurrent HTTP connections").int().default(3)
private val ioThreads by option(help = "The number of I/O threads for cache-related operations").int().default(8)
private val checkThreads by option(help = "The number of I/O threads for checking which files require updating").int()
.default(4)
private lateinit var cache: Filesystem
private lateinit var checkerExecutor: ExecutorService
private lateinit var clientPool: Js5ClientPool
private lateinit var checksumTable: ChecksumTable
private lateinit var requestHandler: Js5RequestHandler
fun request(priority: Boolean, index: Int, archive: Int): Js5RequestHandler.ArchiveRequest {
if (index == 255) {
val request = Js5RequestHandler.ArchiveRequest(index, archive, priority)
return request
} else {
TODO("Non-255 requests")
}
}
private fun downloadChecksumTable() {
val request = clientPool.addRequest(true, 255, 255)
?: throw IllegalStateException("failed to request [255,255]")
if (!request.awaitCompletion(30, TimeUnit.SECONDS)) {
logger.error { "Took more than 30 seconds to download the checksum table. Exiting." }
exitProcess(1)
}
logger.info { "Finished downloading checksum table!" }
checksumTable = ChecksumTable.decode(ByteBuffer.wrap(Container.decode(request.buffer!!).data))
checksumTable.entries.forEachIndexed { index, entry ->
logger.info { "checksum for index $index = $entry" }
}
}
private fun createNewIndices() {
if (checksumTable.entries.size > cache.numIndices()) {
logger.info { "Need to expand cache! Got ${cache.numIndices()} indices, need ${checksumTable.entries.size}" }
for (i in cache.numIndices() until checksumTable.entries.size) {
cache.createIndex(i)
logger.info { "Creating index $i in the cache" }
}
} else {
logger.info { "Cache is the correct size (${checksumTable.entries.size} indices)" }
}
}
private fun updateReferenceTables() {
val pending = HashSet<Js5RequestHandler.ArchiveRequest>()
checksumTable.entries.forEachIndexed { index, entry ->
if (entry.crc == 0 && entry.version == 0) return@forEachIndexed
val existingRaw = cache.readReferenceTable(index)
if (existingRaw == null) {
logger.info { "Reference table for index $index missing, adding to downloads..." }
pending += clientPool.addRequest(true, 255, index)
?: throw IllegalStateException("Failed to add reference table request $index")
return@forEachIndexed
}
val crc = existingRaw.getCrc32()
if (entry.crc != crc) {
logger.info { "CRC mismatch in reference table for index $index, adding to downloads..." }
pending += clientPool.addRequest(true, 255, index)
?: throw IllegalStateException("Failed to add reference table request $index")
return@forEachIndexed
}
val existing = ReferenceTable(cache, index)
existing.decode(ByteBuffer.wrap(Container.decode(existingRaw).data))
if (existing.version != entry.version) {
logger.info { "Version mismatch in reference table for index $index, adding to downloads..." }
pending += clientPool.addRequest(true, 255, index)
?: throw IllegalStateException("Failed to add reference table request $index")
return@forEachIndexed
}
logger.info { "Reference table for index $index is up-to-date." }
}
pending.forEach { request ->
if (!request.awaitCompletion(30, TimeUnit.SECONDS)) {
logger.error { "Took more than 30 seconds to download reference table for index ${request.archive}. Exiting." }
exitProcess(1)
}
val buffer = request.buffer ?: throw NullPointerException("Buffer for completed archive request is null")
val referenceTable = ReferenceTable(cache, request.archive)
val crc = buffer.getCrc32()
val entry = checksumTable.entries[request.archive]
if (crc != entry.crc) {
logger.error { "CRC mismatch in downloaded reference table for index ${request.archive}. Exiting." }
exitProcess(1)
}
val container = Container.decode(buffer)
referenceTable.decode(ByteBuffer.wrap(container.data))
if (referenceTable.version != entry.version) {
logger.error { "Version mismatch in downloaded reference table for index ${request.archive}. Exiting." }
exitProcess(1)
}
cache.writeReferenceTable(request.archive, buffer.array(), container.version, crc)
logger.info { "Finished downloading & saving reference table for index ${request.archive}." }
}
}
override fun runTool() {
check(numJs5Clients > 0) { "num-js5-clients must be greater than 0" }
check(numHttpClients > 0) { "num-http-clients must be greater than 0" }
logger.info { "Starting download from $ip:$port" }
logger.info { "Opening filesystem from ${Constants.CACHE_PATH}" }
cache = SqliteFilesystem(Constants.CACHE_PATH)
logger.info { "Setting up client pool with $numJs5Clients js5 clients and $numHttpClients http clients" }
clientPool = Js5ClientPool(numJs5Clients, numHttpClients, ip, port)
logger.info { "Grabbing checksum and reference tables..." }
clientPool.openConnections(amount = 1)
val client = clientPool.getClient()
try {
if (!client.awaitConnected(30, TimeUnit.SECONDS)) {
logger.error { "Took more than 30 seconds to successfully connect to js5 servers. Exiting." }
exitProcess(1)
}
} catch (e: InterruptedException) {
logger.error { "Lock on connection got interrupted. Exiting." }
exitProcess(1)
}
logger.info { "Connected! Requesting checksum table now..." }
downloadChecksumTable()
createNewIndices()
logger.info { "Setting up request handler" }
requestHandler = Js5RequestHandler(clientPool, cache, ioThreads)
logger.info { "Checking tables" }
updateReferenceTables()
logger.info { "Starting table checks" }
checkerExecutor = Executors.newFixedThreadPool(checkThreads, ThreadFactoryBuilder()
.setNameFormat("table-checker-%d")
.setUncaughtExceptionHandler { t, e ->
logger.error { "Uncaught exception in thread ${t.name}: $e" }
e.printStackTrace()
}
.build())
// start music first, big archive over http we can download first for faster overall downloads
val musicChecker = IndexCompletionChecker(cache, 40, requestHandler)
checkerExecutor.submit(musicChecker)
val completionCheckers = HashSet<IndexCompletionChecker>()
checksumTable.entries.forEachIndexed { index, entry ->
if (index == 40 || (entry.crc == 0 && entry.version == 0)) return@forEachIndexed
val checker = IndexCompletionChecker(cache, index, requestHandler)
completionCheckers.add(checker)
checkerExecutor.submit(checker)
}
// Other clients in the pool will automatically be opened in the request handler
Thread(requestHandler, "js5-request-handler").start()
var doneJs5 = false
var doneHttp = false
while (true) {
val snapshot = requestHandler.createSnapshot()
if (!clientPool.closed && completionCheckers.all { it.completed } && snapshot.pendingCount == 0 && snapshot.processingCount == 0) {
logger.info { "All js5 operations are done, closing js5 client pool" }
clientPool.close()
doneJs5 = true
}
if (clientPool.closed && musicChecker.completed && snapshot.pendingHttpCount == 0) {
logger.info { "All http operations are done, preparing to shutdown" }
doneHttp = true
}
if (doneJs5 && doneHttp && snapshot.pendingIOOPerations == 0) {
logger.info { "No more pending IO operations, shutting down remaining things" }
logger.error { "We can't close the cache yet. Should probably support that." }
exitProcess(0)
}
logger.info {
"Unassigned: ${snapshot.pendingCount} (~${snapshot.pendingSize / 1024L / 1024L}MB). Assigned: ${snapshot.processingCount} (~${snapshot.processingSize / 1024L / 1024L}MB). Http pending: ${snapshot.pendingHttpCount} (~${snapshot.pendingHttpSize / 1024L / 1024L}MB). Js5 Bandwidth: ${
"%.2f".format(
Js5ClientPipeline.getReadThroughput().toDouble() / 1024.0 / 1024.0
)
}MB/s (excludes http). Pending IO ops: ${snapshot.pendingIOOPerations}. Last worker tick: ${System.currentTimeMillis()-snapshot.lastTick}ms ago"
}
sleep(1000)
}
}
}