Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#266 - length in qstream + boyscouting #285

Merged
merged 2 commits into from Nov 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -91,7 +91,11 @@ object InfotonSerializer extends LazyLogging {

def setLastModified(lastModified: String): Unit = {
if(this.lastModified eq null) this.lastModified = fmt.parseDateTime(lastModified)
else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]")
else {
val newLastModified = fmt.parseDateTime(lastModified)
if (this.lastModified.compareTo(newLastModified) == 0) logger.warn(s"Been called twice on lastModified with same (parsed) value [${this.lastModified.toString(fmt)},$lastModified]")
else throw new IllegalStateException(s"lastModified was already set for uuid [$uuidHint] [${this.lastModified.toString(fmt)},$lastModified]")
}
}

def setDc(dc: String): Unit = {
Expand Down
6 changes: 4 additions & 2 deletions server/cmwell-ws/app/cmwell/ws/Streams.scala
Expand Up @@ -106,9 +106,11 @@ object Streams extends LazyLogging {
.mapAsyncUnordered(parallelism)(str => crudServiceFS.getInfotonByUuidAsync(str.uuid, nbg))
.collect{ case FullBox(i) => i}

def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] = Flow[SearchThinResults]
val searchThinResultsFlattened: Flow[SearchThinResults,SearchThinResult,NotUsed] = Flow[SearchThinResults]
.mapConcat { case SearchThinResults(_, _, _, str, _) => str.toList }
.via(searchThinResultToFatInfoton(nbg,crudServiceFS))

def searchThinResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[SearchThinResults,Infoton,NotUsed] =
searchThinResultsFlattened.via(searchThinResultToFatInfoton(nbg,crudServiceFS))

def iterationResultsToFatInfotons(nbg: Boolean, crudServiceFS: CRUDServiceFS): Flow[IterationResults,Infoton,NotUsed] = Flow[IterationResults]
.collect { case IterationResults(_, _, Some(iSeq), _, _) => iSeq}
Expand Down
32 changes: 17 additions & 15 deletions server/cmwell-ws/app/controllers/Application.scala
Expand Up @@ -24,7 +24,7 @@ import javax.inject._
import actions._
import akka.NotUsed
import akka.pattern.AskTimeoutException
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.util.{ByteString, Timeout}
import cmwell.common.file.MimeTypeIdentifier
import cmwell.domain.{BagOfInfotons, CompoundInfoton, DeletedInfoton, FString, PaginationInfo, SearchResponse, SearchResults, _}
Expand Down Expand Up @@ -912,8 +912,7 @@ callback=< [URL] >

val indexTime: Long = request.getQueryString("index-time").flatMap(asLong).getOrElse(0L)
val withMeta = request.queryString.keySet("with-meta")
//TODO: length should determine the overall length (cutoff iteratee)
//val length = request.getQueryString("length").flatMap(asLong)
val length = request.getQueryString("length").flatMap(asLong)
val lengthHint = request.getQueryString("length-hint").flatMap(asInt).getOrElse(100)
val normalizedPath = normalizePath(request.path)
val qpOpt = request.getQueryString("qp")
Expand Down Expand Up @@ -976,17 +975,20 @@ callback=< [URL] >

val src = streams.qStream(firstTimeStamp,path,history,deleted,descendants,lengthHint,fieldFilters,nbg)

// if (!withData) source.flatMapConcat(identity)
// //we already retry internally in IRW, but apparently, sometimes it's not enough: http://gitlab:8082/cm-well/cm-well/issues/136
// else source.flatMapConcat(_.mapAsync(getAvailableProcessors)(thinfoton => retry(8,50.millis,2)(CRUDServiceFS.getInfotonByUuidAsync(thinfoton.uuid).map {
// case FullBox(i) => i.indexTime.fold(cmwell.domain.addIndexTime(i, Some(thinfoton.indexTime)))(_ => i)
// case EmptyBox => throw new NoSuchElementException(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.")
// case BoxedFailure(e) => throw new Exception(s"could not retrieve uuid [${thinfoton.uuid}] from cassandra.",e)
// })))

val ss: Source[ByteString,NotUsed] = {
if (withData.isEmpty) src.via(Flows.searchThinResultToByteString(formatter))
else src.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS)).via(Flows.infotonToByteString(formatter))
val ss: Source[ByteString,NotUsed] = length.fold{
if (withData.isEmpty)
src.via(Flows.searchThinResultToByteString(formatter))
else src
.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS))
.via(Flows.infotonToByteString(formatter))
}{ l =>
if (withData.isEmpty) src
.take(l)
.via(Flows.searchThinResultToByteString(formatter))
else src
.via(Flows.searchThinResultToFatInfoton(nbg,crudServiceFS))
.take(l)
.via(Flows.infotonToByteString(formatter))
}

val contentType = {
Expand All @@ -996,7 +998,7 @@ callback=< [URL] >
overrideMimetype(formatType.mimetype, request)._2
}

Ok.chunked(ss).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)`
Ok.chunked(ss.batch(128,identity)(_ ++ _)).as(contentType) //TODO: `.withHeaders("X-CM-WELL-N" -> total.toString)`
}
}.recover(errorHandler)
}
Expand Down