Skip to content

Commit

Permalink
feat: 优化多源搜索和书源搜索功能
Browse files Browse the repository at this point in the history
  • Loading branch information
hectorqin committed Apr 11, 2022
1 parent 9fc3d66 commit f0531af
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 49 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
# ports:
# - 4395:8080
# volumes:
# - /home/reader/log:/log
# - /home/reader/logs:/logs
# - /home/reader/storage:/storage
# environment:
# - SPRING_PROFILES_ACTIVE=prod
Expand All @@ -32,7 +32,7 @@ services:
ports:
- 4396:8080 # 4396端口映射可自行修改
volumes:
- /home/reader/log:/log # log映射目录 /home/reader/log 可自行修改
- /home/reader/logs:/logs # logs映射目录 /home/reader/logs 可自行修改
- /home/reader/storage:/storage # 数据映射目录 /home/reader/storage 可自行修改
environment:
- SPRING_PROFILES_ACTIVE=prod
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/htmake/reader/api/YueduApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class YueduApi : RestVerticle() {
// 搜索其它来源
router.get("/reader3/searchBookSource").coroutineHandler { bookController.searchBookSource(it) }
router.get("/reader3/getBookSource").coroutineHandler { bookController.getBookSource(it) }
router.get("/reader3/searchBookSourceSSE").coroutineHandlerWithoutRes { bookController.searchBookSourceSSE(it) }

// 换源
router.get("/reader3/saveBookSource").coroutineHandler { bookController.saveBookSource(it) }
Expand Down
106 changes: 87 additions & 19 deletions src/main/java/com/htmake/reader/api/controller/BaseController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import kotlinx.coroutines.withContext
import kotlinx.coroutines.async
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import io.legado.app.help.coroutine.Coroutine

private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -305,36 +307,102 @@ open class BaseController(override val coroutineContext: CoroutineContext): Coro

suspend fun limitConcurrent(concurrentCount: Int, startIndex: Int, endIndex: Int, handler: suspend CoroutineScope.(Int) -> Any, needContinue: (ArrayList<Any>, Int) -> Boolean) {
var lastIndex = startIndex
var loopCount = 0;
var loopCount = 0
var resultCount = 0
var loopStart = System.currentTimeMillis()
var costTime = 0L
var deferredList = arrayListOf<Deferred<Any>>()
while(true) {
var deferredList = arrayListOf<Deferred<Any>>()
var croutineCount = 0;
for(i in lastIndex until endIndex) {
croutineCount += 1;
deferredList.add(async {
handler(i)
})
var croutineCount = deferredList.size;
if (croutineCount < concurrentCount) {
for(i in lastIndex until endIndex) {
croutineCount += 1;
deferredList.add(async {
handler(i)
})

lastIndex = i
if (croutineCount >= concurrentCount) {
break;
lastIndex = i
if (croutineCount >= concurrentCount) {
break;
}
}
}
val resultList = arrayListOf<Any>()
val costTime = measureTimeMillis {
var resultList = arrayListOf<Any>()

// 等待任何一个完成
while (resultList.size <= 0) {
delay(10)
var stillDeferredList = arrayListOf<Deferred<Any>>()
for (i in 0 until deferredList.size) {
resultList.add(deferredList.get(i).await())
try {
var deferred = deferredList.get(i)
if (deferred.isCompleted) {
resultCount++
resultList.add(deferred.getCompleted())
} else if (!deferred.isCancelled) {
stillDeferredList.add(deferred)
} else {
resultCount++
}
} catch(e: Exception) {

}
}
deferredList.clear()
deferredList.addAll(stillDeferredList)
}
loopCount += 1;
logger.info("Loop: {} concurrentCount: {} lastIndex: {} costTime: {} ms", loopCount, croutineCount, lastIndex, costTime)
if (lastIndex >= endIndex) {
break;

if (resultCount / concurrentCount > loopCount) {
loopCount = resultCount / concurrentCount
costTime = System.currentTimeMillis() - loopStart
logger.info("Loop: {} concurrentCount: {} lastIndex: {} endIndex: {} costTime: {} ms deferredList size: {}", loopCount, croutineCount, lastIndex, endIndex, costTime, deferredList.size)
}
if (!needContinue(resultList, loopCount)) {

if (lastIndex >= endIndex - 1) {
// 搞完了,等待所有结束
for (i in 0 until deferredList.size) {
try {
resultList.add(deferredList.get(i).await())
} catch(e: Exception) {

}
}
deferredList.clear()
needContinue(resultList, loopCount)
break;
}
if (resultList.size > 0) {
if (!needContinue(resultList, loopCount)) {
break;
}
}
lastIndex = lastIndex + 1
}

// for (i in 0 until concurrentCount) {
// runBlocking(concurrentCount, startIndex + i , endIndex, handler, needContinue)
// }
}

suspend fun runBlocking(concurrentCount: Int, startIndex: Int, endIndex: Int, handler: suspend CoroutineScope.(Int) -> Any, needContinue: (ArrayList<Any>, Int) -> Boolean) {
var lastIndex = startIndex

Coroutine.async(this, coroutineContext) {
handler(lastIndex)
}.timeout(30000L)
.onSuccess(Dispatchers.IO) {
if (lastIndex < endIndex - concurrentCount && needContinue(arrayListOf(it), 0)) {
lastIndex += concurrentCount
runBlocking(concurrentCount, lastIndex, endIndex, handler, needContinue)
}
}
.onError(Dispatchers.IO) {
if (lastIndex < endIndex - concurrentCount) {
lastIndex += concurrentCount
runBlocking(concurrentCount, lastIndex, endIndex, handler, needContinue)
} else {
needContinue(arrayListOf(), 0)
}
}
}
}
128 changes: 116 additions & 12 deletions src/main/java/com/htmake/reader/api/controller/BookController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti

var bookInfoCache = ACache.get("bookInfoCache", 1000 * 1000 * 2L, 10000) // 缓存 2M 的书籍信息
var invalidBookSourceList = arrayListOf<Map<String, Any>>()
val concurrentLoopCount = 8

private var webClient: WebClient

Expand Down Expand Up @@ -593,8 +594,8 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti
}
}
logger.info("Loog: {} resultList.size: {}", loopCount, resultList.size)
if (loopCount >= 10) {
// 超过10轮,终止执行
if (loopCount >= concurrentLoopCount) {
// 超过最大轮次,终止执行
false
} else {
resultList.size < searchSize
Expand Down Expand Up @@ -626,35 +627,35 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti
key = context.bodyAsJson.getString("key", "")
bookSourceGroup = context.bodyAsJson.getString("bookSourceGroup", "")
lastIndex = context.bodyAsJson.getInteger("lastIndex", -1)
searchSize = context.bodyAsJson.getInteger("searchSize", 100)
searchSize = context.bodyAsJson.getInteger("searchSize", 50)
concurrentCount = context.bodyAsJson.getInteger("concurrentCount", 24)
} else {
// get 请求
key = context.queryParam("key").firstOrNull() ?: ""
bookSourceGroup = context.queryParam("bookSourceGroup").firstOrNull() ?: ""
lastIndex = context.queryParam("lastIndex").firstOrNull()?.toInt() ?: -1
searchSize = context.queryParam("searchSize").firstOrNull()?.toInt() ?: 100
searchSize = context.queryParam("searchSize").firstOrNull()?.toInt() ?: 50
concurrentCount = context.queryParam("concurrentCount").firstOrNull()?.toInt() ?: 24
}
var userNameSpace = getUserNameSpace(context)
var userBookSourceList = loadBookSourceStringList(userNameSpace, bookSourceGroup)
if (userBookSourceList.size <= 0) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setData("NEED_LOGIN").setErrorMsg("未配置书源"), false) + "\n\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("未配置书源"), false) + "\n\n")
return
}
if (key.isNullOrEmpty()) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setData("NEED_LOGIN").setErrorMsg("请输入搜索关键字"), false) + "\n\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("请输入搜索关键字"), false) + "\n\n")
return
}
if (lastIndex >= userBookSourceList.size) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setData("NEED_LOGIN").setErrorMsg("没有更多了"), false) + "\n\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("没有更多了"), false) + "\n\n")
return
}

searchSize = if(searchSize > 0) searchSize else 100
searchSize = if(searchSize > 0) searchSize else 50
concurrentCount = if(concurrentCount > 0) concurrentCount else 24
logger.info("searchBookMulti from lastIndex: {} concurrentCount: {} searchSize: {}", lastIndex, concurrentCount, searchSize)
var resultList = arrayListOf<SearchBook>()
Expand Down Expand Up @@ -684,8 +685,8 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti
response.write("data: " + jsonEncode(mapOf("lastIndex" to lastIndex, "data" to loopResult), false) + "\n\n")
logger.info("Loog: {} resultList.size: {}", loopCount, resultList.size)

if (loopCount >= 10) {
// 超过10轮,终止执行
if (loopCount >= concurrentLoopCount) {
// 超过最大轮次,终止执行
false
} else {
resultList.size < searchSize
Expand Down Expand Up @@ -763,8 +764,8 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti
resultList.addAll(it)
}
}
if (loopCount >= 10) {
// 超过10轮,终止执行
if (loopCount >= concurrentLoopCount) {
// 超过最大轮次,终止执行
false
} else {
resultList.size < searchSize
Expand All @@ -775,6 +776,109 @@ class BookController(coroutineContext: CoroutineContext): BaseController(corouti
return returnData.setData(mapOf("lastIndex" to lastIndex, "list" to resultList))
}

suspend fun searchBookSourceSSE(context: RoutingContext) {
val returnData = ReturnData()
// 返回 event-stream
val response = context.response().putHeader("Content-Type", "text/event-stream")
.putHeader("Cache-Control", "no-cache")
.setChunked(true);

if (!checkAuth(context)) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setData("NEED_LOGIN").setErrorMsg("请登录后使用"), false) + "\n\n")
return
}
val bookUrl: String
var lastIndex: Int
var searchSize: Int
var bookSourceGroup: String
if (context.request().method() == HttpMethod.POST) {
// post 请求
bookUrl = context.bodyAsJson.getString("url")
lastIndex = context.bodyAsJson.getInteger("lastIndex", -1)
searchSize = context.bodyAsJson.getInteger("searchSize", 30)
bookSourceGroup = context.bodyAsJson.getString("bookSourceGroup", "")
} else {
// get 请求
bookUrl = context.queryParam("url").firstOrNull() ?: ""
lastIndex = context.queryParam("lastIndex").firstOrNull()?.toInt() ?: -1
searchSize = context.queryParam("searchSize").firstOrNull()?.toInt() ?: 30
bookSourceGroup = context.queryParam("bookSourceGroup").firstOrNull() ?: ""
}
var userNameSpace = getUserNameSpace(context)
var userBookSourceList = loadBookSourceStringList(userNameSpace, bookSourceGroup)
if (userBookSourceList.size <= 0) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("未配置书源"), false) + "\n\n")
return
}
if (bookUrl.isNullOrEmpty()) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("请输入书籍链接"), false) + "\n\n")
return
}
if (lastIndex >= userBookSourceList.size) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("没有更多了"), false) + "\n\n")
return
}

var book = getShelfBookByURL(bookUrl, userNameSpace)
if (book == null) {
book = bookInfoCache.getAsString(bookUrl)?.toMap()?.toDataClass()
}
if (book == null) {
response.write("event: error\n")
response.end("data: " + jsonEncode(returnData.setErrorMsg("书籍信息错误"), false) + "\n\n")
return
}
// 校正 lastIndex
var bookSourceList: JsonArray? = asJsonArray(getUserStorage(userNameSpace, book.name + "_" + book.author, "bookSource"))
if (bookSourceList != null && bookSourceList.size() > 0) {
try {
val lastBookSourceUrl = bookSourceList.getJsonObject(bookSourceList.size() - 1).getString("origin")
lastIndex = Math.max(lastIndex, getBookSourceBySourceURL(lastBookSourceUrl, userNameSpace, userBookSourceList).second)
} catch(e: Exception) {
e.printStackTrace()
}
}

searchSize = if(searchSize > 0) searchSize else 30
var resultList = arrayListOf<SearchBook>()
var concurrentCount = Math.max(searchSize * 2, 24)
logger.info("searchBookMulti from lastIndex: {} concurrentCount: {} searchSize: {}", lastIndex, concurrentCount, searchSize)

limitConcurrent(concurrentCount, lastIndex + 1, userBookSourceList.size, {it->
lastIndex = it
var bookSource = userBookSourceList.get(it)
searchBookWithSource(bookSource, book)
}) {list, loopCount ->
// logger.info("list: {}", list)
val loopResult = arrayListOf<SearchBook>()
list.forEach {
val bookList = it as? Collection<SearchBook>
bookList?.let {
resultList.addAll(it)
loopResult.addAll(it)
}
}
// 返回本轮数据
response.write("data: " + jsonEncode(mapOf("lastIndex" to lastIndex, "data" to loopResult), false) + "\n\n")
logger.info("Loog: {} resultList.size: {}", loopCount, resultList.size)

if (loopCount >= concurrentLoopCount) {
// 超过最大轮次,终止执行
false
} else {
resultList.size < searchSize
}
}
saveInvalidBookSourceList()
saveBookSources(book, resultList, userNameSpace)
response.write("event: end\n")
response.end("data: " + jsonEncode(mapOf("lastIndex" to lastIndex), false) + "\n\n")
}

suspend fun searchBookWithSource(bookSourceString: String, book: Book, accurate: Boolean = true): ArrayList<SearchBook> {
var resultList = arrayListOf<SearchBook>()
var bookSource = asJsonObject(bookSourceString)?.mapTo(BookSource::class.java)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/legado/app/help/coroutine/Coroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class Coroutine<T>(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): Job {
return scope.plus(Dispatchers.Main).launch {
return scope.plus(Dispatchers.IO).launch {
try {
start?.let { dispatchVoidCallback(this, it) }
val value = executeBlock(scope, context, timeMillis ?: 0L, block)
Expand Down

0 comments on commit f0531af

Please sign in to comment.