Skip to content

Commit

Permalink
Merge pull request CM-Well#285 from hochgi/cmwell-266-qstream-length
Browse files Browse the repository at this point in the history
CM-Well#266 - length in qstream + boyscouting
  • Loading branch information
Gilad Hoch committed Nov 12, 2017
2 parents 4a7ab3c + 4de4f35 commit 6d08a94
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
Expand Up @@ -91,7 +91,11 @@ object InfotonSerializer extends LazyLogging {

def setLastModified(lastModified: String): Unit = {
if(this.lastModified eq null) this.lastModified = fmt.parseDateTime(lastModified)
else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]")
else {
val newLastModified = fmt.parseDateTime(lastModified)
if (this.lastModified.compareTo(newLastModified) == 0) logger.warn(s"Been called twice on lastModified with same (parsed) value [${this.lastModified.toString(fmt)},$lastModified]")
else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]")
}
}

def setDc(dc: String): Unit = {
Expand Down
6 changes: 4 additions & 2 deletions server/cmwell-ws/app/cmwell/ws/Streams.scala
Expand Up @@ -106,9 +106,11 @@ object Streams extends LazyLogging {
.mapAsyncUnordered(parallelism)(str => crudServiceFS.getInfotonByUuidAsync(str.uuid, nbg))
.collect{ case FullBox(i) => i}

def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] = Flow[SearchThinResults]
val searchThinResultsFlattened: Flow[SearchThinResults,SearchThinResult,NotUsed] = Flow[SearchThinResults]
.mapConcat { case SearchThinResults(_, _, _, str, _) => str.toList }
.via(searchThinResultToFatInfoton(nbg,crudServiceFS))

def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] =
searchThinResultsFlattened.via(searchThinResultToFatInfoton(nbg,crudServiceFS))

def iterationResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[IterationResults,Infoton,NotUsed] = Flow[IterationResults]
.collect { case IterationResults(_, _, Some(iSeq), _, _) => iSeq}
Expand Down
32 changes: 17 additions & 15 deletions server/cmwell-ws/app/controllers/Application.scala
Expand Up @@ -24,7 +24,7 @@ import javax.inject._
import actions._
import akka.NotUsed
import akka.pattern.AskTimeoutException
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.util.{ByteString, Timeout}
import cmwell.common.file.MimeTypeIdentifier
import cmwell.domain.{BagOfInfotons, CompoundInfoton, DeletedInfoton, FString, PaginationInfo, SearchResponse, SearchResults, _}
Expand Down Expand Up @@ -912,8 +912,7 @@ callback=< [URL] >

val indexTime: Long = request.getQueryString("index-time").flatMap(asLong).getOrElse(0L)
val withMeta = request.queryString.keySet("with-meta")
//TODO: length should determine the overall length (cutoff iteratee)
//val length = request.getQueryString("length").flatMap(asLong)
val length = request.getQueryString("length").flatMap(asLong)
val lengthHint = request.getQueryString("length-hint").flatMap(asInt).getOrElse(100)
val normalizedPath = normalizePath(request.path)
val qpOpt = request.getQueryString("qp")
Expand Down Expand Up @@ -976,17 +975,20 @@ callback=< [URL] >

val src = streams.qStream(firstTimeStamp,path,history,deleted,descendants,lengthHint,fieldFilters,nbg)

// if (!withData) source.flatMapConcat(identity)
// //we already retry internally in IRW, but apparently, sometimes it's not enough: http://gitlab:8082/cm-well/cm-well/issues/136
// else source.flatMapConcat(_.mapAsync(getAvailableProcessors)(thinfoton => retry(8,50.millis,2)(CRUDServiceFS.getInfotonByUuidAsync(thinfoton.uuid).map {
// case FullBox(i) => i.indexTime.fold(cmwell.domain.addIndexTime(i, Some(thinfoton.indexTime)))(_ => i)
// case EmptyBox => throw new NoSuchElementException(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.")
// case BoxedFailure(e) => throw new Exception(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.",e)
// })))

val ss: Source[ByteString,NotUsed] = {
if (withData.isEmpty) src.via(Flows.searchThinResultToByteString(formatter))
else src.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS)).via(Flows.infotonToByteString(formatter))
val ss: Source[ByteString,NotUsed] = length.fold{
if (withData.isEmpty)
src.via(Flows.searchThinResultToByteString(formatter))
else src
.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS))
.via(Flows.infotonToByteString(formatter))
}{ l =>
if (withData.isEmpty) src
.take(l)
.via(Flows.searchThinResultToByteString(formatter))
else src
.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS))
.take(l)
.via(Flows.infotonToByteString(formatter))
}

val contentType = {
Expand All @@ -996,7 +998,7 @@ callback=< [URL] >
overrideMimetype(formatType.mimetype, request)._2
}

Ok.chunked(ss).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)`
Ok.chunked(ss.batch(128,identity)(_ ++ _)).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)`
}
}.recover(errorHandler)
}
Expand Down

0 comments on commit 6d08a94

Please sign in to comment.