From ef2beb820f9191631826eff48dcbeee0b4984e03 Mon Sep 17 00:00:00 2001 From: Gilad Hoch Date: Sun, 12 Nov 2017 10:13:18 +0200 Subject: [PATCH 1/2] #266 - length in qstream + boyscouting --- .../cmwell/domain/InfotonSerializer.scala | 6 +++- server/cmwell-ws/app/cmwell/ws/Streams.scala | 6 ++-- .../app/controllers/Application.scala | 32 ++++++++++--------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/server/cmwell-domain/src/main/scala/cmwell/domain/InfotonSerializer.scala b/server/cmwell-domain/src/main/scala/cmwell/domain/InfotonSerializer.scala index bdb016e5c..25cc410e7 100644 --- a/server/cmwell-domain/src/main/scala/cmwell/domain/InfotonSerializer.scala +++ b/server/cmwell-domain/src/main/scala/cmwell/domain/InfotonSerializer.scala @@ -91,7 +91,11 @@ object InfotonSerializer extends LazyLogging { def setLastModified(lastModified: String): Unit = { if(this.lastModified eq null) this.lastModified = fmt.parseDateTime(lastModified) - else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]") + else { + val newLastModified = fmt.parseDateTime(lastModified) + if (this.lastModified.compareTo(newLastModified) == 0) logger.warn(s"Been called twice on lastModified with same (parsed) value [${this.lastModified.toString(fmt)},$lastModified]") + else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]") + } } def setDc(dc: String): Unit = { diff --git a/server/cmwell-ws/app/cmwell/ws/Streams.scala b/server/cmwell-ws/app/cmwell/ws/Streams.scala index f12d39dd8..b863fdd57 100644 --- a/server/cmwell-ws/app/cmwell/ws/Streams.scala +++ b/server/cmwell-ws/app/cmwell/ws/Streams.scala @@ -106,9 +106,11 @@ object Streams extends LazyLogging { .mapAsyncUnordered(parallelism)(str => crudServiceFS.getInfotonByUuidAsync(str.uuid, nbg)) .collect{ case FullBox(i) => i} - def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] = Flow[SearchThinResults] + def searchThinResultsFlattened: Flow[SearchThinResults,SearchThinResult,NotUsed] = Flow[SearchThinResults] .mapConcat { case SearchThinResults(_, _, _, str, _) => str.toList } - .via(searchThinResultToFatInfoton(nbg,crudServiceFS)) + + def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] = + searchThinResultsFlattened.via(searchThinResultToFatInfoton(nbg,crudServiceFS)) def iterationResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[IterationResults,Infoton,NotUsed] = Flow[IterationResults] .collect { case IterationResults(_, _, Some(iSeq), _, _) => iSeq} diff --git a/server/cmwell-ws/app/controllers/Application.scala b/server/cmwell-ws/app/controllers/Application.scala index 6ad02b012..718c96b47 100644 --- a/server/cmwell-ws/app/controllers/Application.scala +++ b/server/cmwell-ws/app/controllers/Application.scala @@ -24,7 +24,7 @@ import javax.inject._ import actions._ import akka.NotUsed import akka.pattern.AskTimeoutException -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{GraphDSL, Source} import akka.util.{ByteString, Timeout} import cmwell.common.file.MimeTypeIdentifier import cmwell.domain.{BagOfInfotons, CompoundInfoton, DeletedInfoton, FString, PaginationInfo, SearchResponse, SearchResults, _} @@ -912,8 +912,7 @@ callback=< [URL] > val indexTime: Long = request.getQueryString("index-time").flatMap(asLong).getOrElse(0L) val withMeta = request.queryString.keySet("with-meta") - //TODO: length should determine the overall length (cutoff iteratee) - //val length = request.getQueryString("length").flatMap(asLong) + val length = request.getQueryString("length").flatMap(asLong) val lengthHint = request.getQueryString("length-hint").flatMap(asInt).getOrElse(100) val normalizedPath = normalizePath(request.path) val qpOpt = request.getQueryString("qp") @@ -976,17 +975,20 @@ callback=< [URL] > val src = streams.qStream(firstTimeStamp,path,history,deleted,descendants,lengthHint,fieldFilters,nbg) -// if (!withData) source.flatMapConcat(identity) -// //we already retry internally in IRW, but apparently, sometimes it's not enough: http://gitlab:8082/cm-well/cm-well/issues/136 -// else source.flatMapConcat(_.mapAsync(getAvailableProcessors)(thinfoton => retry(8,50.millis,2)(CRUDServiceFS.getInfotonByUuidAsync(thinfoton.uuid).map { -// case FullBox(i) => i.indexTime.fold(cmwell.domain.addIndexTime(i, Some(thinfoton.indexTime)))(_ => i) -// case EmptyBox => throw new NoSuchElementException(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.") -// case BoxedFailure(e) => throw new Exception(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.",e) -// }))) - - val ss: Source[ByteString,NotUsed] = { - if (withData.isEmpty) src.via(Flows.searchThinResultToByteString(formatter)) - else src.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS)).via(Flows.infotonToByteString(formatter)) + val ss: Source[ByteString,NotUsed] = length.fold{ + if (withData.isEmpty) + src.via(Flows.searchThinResultToByteString(formatter)) + else src + .via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS)) + .via(Flows.infotonToByteString(formatter)) + }{ l => + if (withData.isEmpty) src + .take(l) + .via(Flows.searchThinResultToByteString(formatter)) + else src + .via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS)) + .take(l) + .via(Flows.infotonToByteString(formatter)) } val contentType = { @@ -996,7 +998,7 @@ callback=< [URL] > overrideMimetype(formatType.mimetype, request)._2 } - Ok.chunked(ss).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)` + Ok.chunked(ss.batch(128,identity)(_ ++ _)).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)` } }.recover(errorHandler) } From 4de4f35c8406e1ff39e97de6cac8583fa4e3b379 Mon Sep 17 00:00:00 2001 From: Gilad Hoch Date: Sun, 12 Nov 2017 14:13:40 +0200 Subject: [PATCH 2/2] changes per review --- server/cmwell-ws/app/cmwell/ws/Streams.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/cmwell-ws/app/cmwell/ws/Streams.scala b/server/cmwell-ws/app/cmwell/ws/Streams.scala index b863fdd57..13902b658 100644 --- a/server/cmwell-ws/app/cmwell/ws/Streams.scala +++ b/server/cmwell-ws/app/cmwell/ws/Streams.scala @@ -106,7 +106,7 @@ object Streams extends LazyLogging { .mapAsyncUnordered(parallelism)(str => crudServiceFS.getInfotonByUuidAsync(str.uuid, nbg)) .collect{ case FullBox(i) => i} - def searchThinResultsFlattened: Flow[SearchThinResults,SearchThinResult,NotUsed] = Flow[SearchThinResults] + val searchThinResultsFlattened: Flow[SearchThinResults,SearchThinResult,NotUsed] = Flow[SearchThinResults] .mapConcat { case SearchThinResults(_, _, _, str, _) => str.toList } def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] =