Skip to content

Commit

Permalink
Enable POST /package/add to accept requests for Universe packages (#545)
Browse files Browse the repository at this point in the history
- RequestValidators.selectedBody allows any endpoint to accept multiple
  Content-Types
- Replace AddResult with OperationInProgress exception
- Implement DefaultUniverseInstaller for the operation queue processor
- LocalObjectStorage now writes objects atomically
- Update ScalaCheck to latest version supported by ScalaTest (1.12.6)
  • Loading branch information
cruhland committed Dec 1, 2016
1 parent 0fee690 commit 4fd9038
Show file tree
Hide file tree
Showing 51 changed files with 1,183 additions and 456 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.twitter.bijection.Bijection
import com.twitter.bijection.Conversion
import com.twitter.bijection.Conversion.asMethod
import com.twitter.bijection.Injection
import java.time.Instant
import scala.util.Failure
import scala.util.Success
import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.mesosphere.cosmos.finch

import com.mesosphere.cosmos.http.MediaType
import com.twitter.util.Throw
import com.twitter.util.Try
import io.finch.Error
import io.finch.items
import java.nio.charset.StandardCharsets

/** Allows a [[com.mesosphere.cosmos.http.MediaType]] to select a request body parsing function. */
final class DispatchingMediaTypedBodyParser[A] private(
private[this] val parsers: Map[MediaType, Array[Byte] => Try[A]]
) extends (MediaType => Option[Array[Byte] => Try[A]]) {

def apply(mediaType: MediaType): Option[Array[Byte] => Try[A]] = parsers.get(mediaType)

def mediaTypes: Set[MediaType] = parsers.keySet

}

object DispatchingMediaTypedBodyParser {

def apply[A](
parsers: (MediaType, Array[Byte] => Try[A])*
): DispatchingMediaTypedBodyParser[A] = {
new DispatchingMediaTypedBodyParser(parsers.toMap)
}

def parserFromDecoder[A](implicit
accepts: MediaTypedRequestDecoder[A]
): Array[Byte] => Try[A] = { bodyBytes =>
val bodyString = new String(bodyBytes, StandardCharsets.UTF_8)
accepts.decoder(bodyString).rescue { case t =>
Throw(Error.NotParsed(items.BodyItem, accepts.classTag, t))
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.mesosphere.cosmos.finch

import com.mesosphere.cosmos.http.MediaType
import com.twitter.finagle.http.Fields
import com.twitter.finagle.http.Status
import io.circe.JsonObject
import io.circe.syntax._
Expand All @@ -19,4 +20,11 @@ case class IncompatibleAcceptHeader(available: Set[MediaType], specified: Set[Me
)))

val getHeaders: Map[String, String] = Map.empty

override def getMessage: String = {
val specifiedStr = specified.map(_.show).mkString(", ")
val availableStr = available.map(_.show).mkString(", ")
s"${Fields.Accept} header was [$specifiedStr] but should be one of [$availableStr]"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.mesosphere.cosmos.finch

import com.mesosphere.cosmos.http.MediaType
import com.twitter.finagle.http.Fields
import com.twitter.finagle.http.Status
import io.circe.JsonObject
import io.circe.syntax._

case class IncompatibleContentTypeHeader(available: Set[MediaType], specified: MediaType)
extends RequestError {
val errType: String = "not_valid"
val status: Status = Status.BadRequest

val getData: Option[JsonObject] = Some(JsonObject.fromMap(Map(
"invalidItem" -> JsonObject.fromMap(Map(
"type" -> "header".asJson,
"name" -> Fields.ContentType.asJson
)).asJson,
"specified" -> specified.show.asJson,
"available" -> available.map(_.show).asJson
)))

val getHeaders: Map[String, String] = Map.empty

override def getMessage: String = {
val validChoices = available.map(_.show).mkString(", ")
s"${Fields.ContentType} header was ${specified.show}, but should be one of: $validChoices"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import com.mesosphere.cosmos.http.Authorization
import com.mesosphere.cosmos.http.CompoundMediaType
import com.mesosphere.cosmos.http.MediaType
import com.mesosphere.cosmos.http.RequestSession
import com.twitter.concurrent.AsyncStream
import com.twitter.io.Buf
import com.twitter.util.Future
import com.twitter.finagle.http.Fields
import io.finch._
import shapeless.::
import shapeless.HNil
Expand All @@ -26,44 +24,54 @@ object RequestValidators {
accepts: MediaTypedRequestDecoder[Req],
produces: DispatchingMediaTypedEncoder[Res]
): Endpoint[EndpointContext[Req, Res]] = {
val a = baseValidator(produces)
val h = header("Content-Type").as[MediaType].should(beTheExpectedType(accepts.mediaTypedDecoder.mediaType))
val b = body.as[Req](accepts.decoder, accepts.classTag)
val c = a :: h :: b
c.map { case authorization :: responseEncoder :: contentType :: req :: HNil =>
EndpointContext(req, RequestSession(authorization, Some(contentType)), responseEncoder)
val contentTypeRule = beTheExpectedType(accepts.mediaTypedDecoder.mediaType)
val contentTypeValidator = header(Fields.ContentType).as[MediaType].should(contentTypeRule)

val bodyValidator = body.as[Req](accepts.decoder, accepts.classTag)

val allValidators = baseValidator(produces) :: contentTypeValidator :: bodyValidator
allValidators.map {
case authorization :: responseEncoder :: contentType :: requestBody :: HNil =>
val session = RequestSession(authorization, Some(contentType))
EndpointContext(requestBody, session, responseEncoder)
}
}

// TODO package-add: Include standard request validation, e.g. Content-Type/Accept, etc.
def streamed[Req, Res](toReq: (AsyncStream[Buf], Long) => Req)(implicit
resEncoder: MediaTypedEncoder[Res]
// TODO package-add: Unit tests in RequestValidatorsSpec
def selectedBody[Req, Res](implicit
accepts: DispatchingMediaTypedBodyParser[Req],
produces: DispatchingMediaTypedEncoder[Res]
): Endpoint[EndpointContext[Req, Res]] = {
val sessionValidator = header("Content-Type").as[MediaType].map { contentType =>
RequestSession(authorization = None, contentType = Some(contentType))
}
val contentTypeValidator = header(Fields.ContentType)
.as[MediaType]
.map { contentType =>
accepts(contentType) match {
case Some(bodyParser) => contentType :: bodyParser :: HNil
case _ => throw IncompatibleContentTypeHeader(accepts.mediaTypes, contentType)
}
}

val validators = asyncBody :: header("X-Dcos-Content-Length") :: sessionValidator
validators.map { case bufStream :: bodySize :: session :: HNil =>
// TODO package-add: Better error handling for request data extraction (e.g. toLong)
EndpointContext(toReq(bufStream, bodySize.toLong), session, resEncoder)
val allValidators = baseValidator(produces) :: contentTypeValidator :: binaryBody
allValidators.map {
case authorization :: responseEncoder :: contentType :: bodyParser :: bodyBytes :: HNil =>
val requestBody = bodyParser(bodyBytes).get // Exceptions will be caught by Endpoint.map()
val session = RequestSession(authorization, Some(contentType))
EndpointContext(requestBody, session, responseEncoder)
}
}

private[this] def baseValidator[Res](
produces: DispatchingMediaTypedEncoder[Res]
): Endpoint[Option[Authorization] :: MediaTypedEncoder[Res] :: HNil] = {
val accept = header("Accept")
.as[CompoundMediaType]
.mapAsync { accept =>
produces(accept) match {
case Some(x) =>
Future.value(x)
case None =>
Future.exception(IncompatibleAcceptHeader(produces.mediaTypes, accept.mediaTypes))
}
}
val auth = headerOption("Authorization").map(_.map(Authorization))
val accept = header(Fields.Accept)
.as[CompoundMediaType]
.map { accept =>
produces(accept) match {
case Some(x) => x
case None => throw IncompatibleAcceptHeader(produces.mediaTypes, accept.mediaTypes)
}
}
val auth = headerOption(Fields.Authorization).map(_.map(Authorization))
auth :: accept
}
}
Loading

0 comments on commit 4fd9038

Please sign in to comment.