diff --git a/.travis.yml b/.travis.yml index 47de7bd..2ddba20 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ language: scala scala: - 2.11.8 - - 2.12.0 + - 2.12.1 script: - sbt ++$TRAVIS_SCALA_VERSION test diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpClient.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpClient.scala new file mode 100644 index 0000000..c5446df --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpClient.scala @@ -0,0 +1,158 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import scala.collection.JavaConverters._ +import java.io.{ File, UnsupportedEncodingException } +import java.nio.charset.{ Charset, StandardCharsets } +import scala.concurrent.{ Future, Promise, ExecutionContext } +import akka.actor.{ Actor, ActorSystem, Props } +import akka.stream.{ OverflowStrategy, Materializer } +import akka.stream.scaladsl.{ FileIO, SourceQueueWithComplete } +import akka.http.scaladsl.{ Http => AkkaHttp } +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, ResponseEntity, HttpEntity, Uri, + StatusCodes, HttpMethod, HttpMethods, HttpHeader } +import akka.http.scaladsl.model.ws.WebSocketRequest + +class AkkaHttpClient(config: Config, system: ActorSystem)(implicit fm: Materializer) extends HttpClient { + private val akkaHttp = AkkaHttp(system) + + def underlying[A]: A = akkaHttp.asInstanceOf[A] + + /** Closes this client, and releases underlying resources. */ + def close(): Unit = () + + /** Runs the request and return a Future of FullResponse. Errors on non-OK response. */ + def run(request: Request): Future[FullResponse] = run(request, identity) + + /** Runs the request and return a Future of A. Errors on non-OK response. */ + def run[A](request: Request, f: FullResponse => A): Future[A] = process(request, OkHandler(f)) + + /** Runs the request and return a Future of Either a FullResponse or a Throwable. Errors on non-OK response. */ + def run[A](request: Request, lifter: FutureLifter[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = + lifter.run(run(request)) + + /** Downloads the request to the file. Errors on non-OK response. */ + def download(request: Request, file: File): Future[File] = + process(request, AkkaHttpDownloadHandler(file)) + + /** Executes the request and return a Future of FullResponse. Does not error on non-OK response. */ + def process(request: Request): Future[FullResponse] = process(request, identity[FullResponse] _) + + /** Executes the request and return a Future of A. Does not error on non-OK response. */ + def process[A](request: Request, f: FullResponse => A): Future[A] = + process(request, FunctionHandler(f)) + + /** Executes the request and return a Future of Either a FullResponse or a Throwable. Does not error on non-OK response. */ + def process[A](request: Request, lifter: FutureLifter[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = + lifter.run(process(request)) + + /** Executes the request. Does not error on non-OK response. */ + def process[A](request: Request, handler: AkkaHttpCompletionHandler[A]): Future[A] = + { + implicit val ec = system.dispatcher + def processInitialResponse(response: HttpResponse): Future[Unit] = + { + val p = Promise[Unit]() + val s1 = handler.onStatusReceived(response.status) + if (s1 == State.Abort) { + response.entity.discardBytes(fm) + p.failure { StatusError(response.status.intValue) } + } + else p.success() + p.future + } + for { + response <- akkaHttp.singleRequest(buildRequest(request)) + _ <- processInitialResponse(response) + result <- handler.onPartialResponse(response, config) + } yield result + } + + def buildRequest(request: Request): HttpRequest = + HttpRequest(method = buildMethod(request), + uri = buildUri(request), + headers = buildHeaders(request)) + + def buildWsRequest(request: Request): WebSocketRequest = + WebSocketRequest(uri = buildUri(request), + extraHeaders = buildHeaders(request)) + + private def buildMethod(request: Request): HttpMethod = + request.method match { + case HttpVerbs.GET => HttpMethods.GET + case HttpVerbs.POST => HttpMethods.POST + case HttpVerbs.PUT => HttpMethods.PUT + case HttpVerbs.PATCH => HttpMethods.PATCH + case HttpVerbs.DELETE => HttpMethods.DELETE + case HttpVerbs.HEAD => HttpMethods.HEAD + case HttpVerbs.OPTIONS => HttpMethods.OPTIONS + } + + private def buildHeaders(request: Request): List[HttpHeader] = + for { + (k, vs) <- request.headers.toList + v <- vs.toList + x <- HttpHeader.parse(k, v) match { + case HttpHeader.ParsingResult.Ok(header, _) => List(header) + case _ => Nil + } + } yield x + + private def buildUri(request: Request): Uri = + { + import request._ + // queries + val qs = for { + (key, values) <- queryString + value <- values + } yield (key, value) + Uri(url).withQuery(Uri.Query(qs)) + } + + /** Open a websocket connection. */ + def websocket(request: Request)(handler: PartialFunction[WebSocketEvent, Unit]): Future[WebSocket] = + { + implicit val ec = system.dispatcher + // http://doc.akka.io/docs/akka-http/current/scala/http/client-side/websocket-support.html + val xrequest = buildWsRequest(request) + import akka.stream.scaladsl._ + import akka.Done + import akka.http.scaladsl.model.ws.Message + val listener = new WebSocketListener(handler, system) + val wsSink: Sink[Message, Future[Done]] = listener.sink + val wsSource = listener.source + val flow: Flow[Message, Message, Future[Done]] = + Flow.fromSinkAndSourceMat(wsSink, wsSource)(Keep.left) + // upgradeResponse is a Future[WebSocketUpgradeResponse] that + // completes or fails when the connection succeeds or fails + // and closed is a Future[Done] representing the stream completion from above + val (upgradeResponse, closed) = akkaHttp.singleWebSocketRequest(xrequest, flow) + val connected = upgradeResponse.map { upgrade => + // just like a regular http request we can access response status which is available via upgrade.response.status + // status code 101 (Switching Protocols) indicates that server support WebSockets + if (upgrade.response.status == StatusCodes.SwitchingProtocols) { + Done + } else { + throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") + } + } + val result = listener.result + result.future + } +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpCompletionHandler.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpCompletionHandler.scala new file mode 100644 index 0000000..a98d292 --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpCompletionHandler.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import scala.concurrent.{ Future, ExecutionContext } +import akka.http.scaladsl.model.{ HttpResponse, StatusCode, HttpHeader } +import akka.stream.Materializer + +abstract class AkkaHttpCompletionHandler[A] extends CompletionHandler[A] { + def onStatusReceived(status: StatusCode): State = State.Continue + def onHeadersReceived(headers: Seq[HttpHeader]): State = State.Continue + def onCompleted(response: FullResponse): A + def onPartialResponse(httpResponse: HttpResponse, config: Config)(implicit fm: Materializer, ec: ExecutionContext): Future[A] = + for { + entity <- httpResponse.entity.toStrict(config.requestTimeout) + } yield onCompleted(new AkkaHttpFullResponse(httpResponse, entity)) +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpDownloadHandler.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpDownloadHandler.scala new file mode 100644 index 0000000..7b3829f --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpDownloadHandler.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import java.io.File +import scala.concurrent.{ Future, ExecutionContext } +import akka.stream.Materializer +import akka.http.scaladsl.model.HttpResponse +import akka.stream.scaladsl.FileIO + +abstract class AkkaHttpDownloadHandler(file: File) extends OkHandler[File](_ => file) { + // This will not be called + override def onCompleted(response: FullResponse): File = sys.error("Unexpected call to onCompleted") + override def onPartialResponse(httpResponse: HttpResponse, config: Config)(implicit fm: Materializer, ec: ExecutionContext): Future[File] = + { + val source = httpResponse.entity.dataBytes + for { + _ <- source.runWith(FileIO.toPath(file.toPath)) + } yield file + } +} + +object AkkaHttpDownloadHandler { + def apply(file: File): AkkaHttpDownloadHandler = + new AkkaHttpDownloadHandler(file) {} +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpFullResponse.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpFullResponse.scala new file mode 100644 index 0000000..a2b9f2c --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/AkkaHttpFullResponse.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import scala.collection.JavaConverters._ +import java.nio.charset.Charset +import java.nio.ByteBuffer +import scala.collection.immutable.TreeMap +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling.{ Unmarshal, FromResponseUnmarshaller } +import scala.concurrent.{ Future, ExecutionContext, Await } +import scala.concurrent.duration._ +import akka.stream.Materializer + +class AkkaHttpFullResponse(akkaHttpResponse: HttpResponse, entity: HttpEntity.Strict)(implicit val fm: Materializer) extends FullResponse { + /** + * @return The underlying entity object. + */ + def underlying[A] = entity.asInstanceOf[A] + + /** + * @return The underlying response object. + */ + def underlyingResponse[A] = akkaHttpResponse.asInstanceOf[A] + + /** + * The response body as a `ByteBuffer`. + */ + def bodyAsByteBuffer: ByteBuffer = entity.data.asByteBuffer + + /** + * The response body as String. + */ + lazy val bodyAsString: String = { + // RFC-2616#3.7.1 states that any text/* mime type should default to ISO-8859-1 charset if not + // explicitly set, while Plays default encoding is UTF-8. So, use UTF-8 if charset is not explicitly + // set and content type is not text/*, otherwise default to ISO-8859-1 + val contentType = entity.contentType + val charset = contentType.charsetOption getOrElse HttpCharsets.`UTF-8` + entity.data.decodeString(charset.value) + } + + /** + * Return the headers of the response as a case-insensitive map + */ + lazy val allHeaders: Map[String, List[String]] = + TreeMap[String, List[String]]() ++ + akkaHttpResponse.headers.groupBy(_.name).mapValues(vs => vs.toList map { _.value }) + + /** + * The response status code. + */ + def status: Int = akkaHttpResponse.status.intValue + + /** + * The response status message. + */ + def statusText: String = akkaHttpResponse.status.reason + + /** + * Get a response header. + */ + def header(key: String): Option[String] = + akkaHttpResponse.headers.find(_.name == key) map { _.value } +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/FunctionHandler.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/FunctionHandler.scala new file mode 100644 index 0000000..066ccc3 --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/FunctionHandler.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import scala.concurrent.Future +import akka.http.scaladsl.model.HttpResponse + +abstract class FunctionHandler[A](f: FullResponse => A) extends AkkaHttpCompletionHandler[A] { + override def onCompleted(response: FullResponse): A = f(response) +} + +object FunctionHandler { + def apply[A](f: FullResponse => A): FunctionHandler[A] = + new FunctionHandler[A](f) {} +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/Gigahorse.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/Gigahorse.scala new file mode 100644 index 0000000..4433aae --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/Gigahorse.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import java.nio.charset.Charset +import akka.actor.{ Actor, ActorSystem } +import akka.stream.{ Materializer, ActorMaterializer } +import scala.concurrent.Future + +abstract class Gigahorse extends GigahorseSupport { + /** Returns HttpClient. You must call `close` when you're done. */ + def http(config: Config, system: ActorSystem)(implicit fm: Materializer): HttpClient = new AkkaHttpClient(config, system) + + def withHttp[A](config: Config)(f: HttpClient => A): A = + { + implicit val system = ActorSystem("gigahorse-akka-http") + implicit val materializer = ActorMaterializer() + val client: HttpClient = http(config, system) + try { + f(client) + } + finally { + system.shutdown() + } + } + def withHttp[A](f: HttpClient => A): A = + withHttp(config)(f) +} + +object Gigahorse extends Gigahorse diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/MessageForwarder.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/MessageForwarder.scala new file mode 100644 index 0000000..c16381c --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/MessageForwarder.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import akka.actor._ +import akka.stream.actor._ +import akka.stream.scaladsl._ +import akka.http.scaladsl.model.ws.Message + +object MessageForwarder { + def props : Props = Props[MessageForwarder] +} + +class MessageForwarder extends Actor with ActorPublisher[Message] { + var items:List[Message] = List.empty + import ActorPublisherMessage._ + def receive = { + case m: Message => + if (totalDemand == 0) items = items :+ m + else onNext(m) + case Request(demand) => + if (demand > items.size){ + items foreach (onNext) + items = List.empty + } + else { + val (send, keep) = items.splitAt(demand.toInt) + items = keep + send foreach (onNext) + } + } +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/OkHandler.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/OkHandler.scala new file mode 100644 index 0000000..c8ec0ab --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/OkHandler.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import akka.http.scaladsl.model.StatusCode +import akka.http.scaladsl.model.HttpResponse + +abstract class OkHandler[A](f: FullResponse => A) extends FunctionHandler[A](f) { + override def onStatusReceived(status: StatusCode): State = + { + if (status.isFailure) State.Abort + else super.onStatusReceived(status) + } +} + +object OkHandler { + def apply[A](f: FullResponse => A): OkHandler[A] = + new OkHandler[A](f) {} +} diff --git a/akka-http/src/main/scala/gigahorse/support/akkahttp/WebSocketListener.scala b/akka-http/src/main/scala/gigahorse/support/akkahttp/WebSocketListener.scala new file mode 100644 index 0000000..0d4faf2 --- /dev/null +++ b/akka-http/src/main/scala/gigahorse/support/akkahttp/WebSocketListener.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.akkahttp + +import akka.{ Done, NotUsed } +import akka.util.ByteString +import akka.stream.scaladsl._ +import scala.util.Success +import scala.concurrent.{ Future, Promise, ExecutionContext } +import akka.actor.{ ActorRef, ActorSystem, Props, PoisonPill } +import akka.stream.actor.ActorPublisher +import akka.http.scaladsl.model.ws.{ Message, TextMessage => XTextMessage, BinaryMessage => XBinaryMessage } +import WebSocketEvent._ + +// http://doc.akka.io/api/akka-http/current/akka/index.html +// http://doc.akka.io/api/akka/2.4.16/ + +class WebSocketListener( + handler: PartialFunction[WebSocketEvent, Unit], + system: ActorSystem) { self => + protected var ws: WebSocket = null + protected var open: Boolean = true + val result = Promise[WebSocket]() + val forwarder = system.actorOf(Props[MessageForwarder]) + val publisher = ActorPublisher[Message](forwarder) + val source: Source[Message, NotUsed] = Source.fromPublisher(publisher) + val sink: Sink[Message, Future[Done]] = + Sink.foreach { + case message: XTextMessage.Strict => + broadcast(TextMessage(ws, message.text)) + case message: XBinaryMessage.Strict => + broadcast(BinaryMessage(ws, message.data.toArray)) + case _ => + } + ws = new WebSocket { + def underlying[A]: A = self.asInstanceOf[A] + override def isOpen: Boolean = open + override def sendMessage(message: Array[Byte]): WebSocket = { + forwarder ! XBinaryMessage(ByteString(message)) + this + } + override def sendMessage(message: String): WebSocket = { + forwarder ! XTextMessage(message) + this + } + override def close(): Unit = + { + forwarder ! PoisonPill + open = false + } + override def sendPing(payload: Array[Byte]): WebSocket = ??? + override def sendPong(payload: Array[Byte]): WebSocket = ??? + override def sendFragment(fragment: Array[Byte], last: Boolean): WebSocket = ??? + override def sendFragment(fragment: String, last: Boolean): WebSocket = ??? + } + result.tryComplete(Success(ws)) + private def broadcast(event: WebSocketEvent): Unit = + if (handler.isDefinedAt(event)) handler(event) + else () +} diff --git a/akka-http/src/test/scala/gigahorsetest/AkkaHttpClientSpec.scala b/akka-http/src/test/scala/gigahorsetest/AkkaHttpClientSpec.scala new file mode 100644 index 0000000..be9c93e --- /dev/null +++ b/akka-http/src/test/scala/gigahorsetest/AkkaHttpClientSpec.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorsetest + +import org.scalatest._ +import scala.concurrent.Future +import akka.actor.{ Actor, ActorSystem } +import akka.stream.{ Materializer, ActorMaterializer } + +class AkkaHttpClientSpec extends BaseHttpClientSpec { + // custom loan pattern + override def withHttp(testCode: gigahorse.HttpClient => Future[Assertion]): Future[Assertion] = + { + import gigahorse.support.akkahttp.Gigahorse + implicit val system = ActorSystem("gigahorse-akka-http") + implicit val materializer = ActorMaterializer() + val http: gigahorse.HttpClient = Gigahorse.http(Gigahorse.config, system) + complete { + testCode(http) + } lastly { + system.shutdown() + } + } +} diff --git a/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcCompletionHandler.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcCompletionHandler.scala new file mode 100644 index 0000000..3fa2607 --- /dev/null +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcCompletionHandler.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.asynchttpclient + +import org.asynchttpclient.{ Response => XResponse, _ } + +abstract class AhcCompletionHandler[A] extends CompletionHandler[A] { + val builder = new XResponse.ResponseBuilder + + def onBodyPartReceived(content: HttpResponseBodyPart): State = { + builder.accumulate(content) + State.Continue + } + + def onStatusReceived(status: HttpResponseStatus): State = { + builder.reset() + builder.accumulate(status) + State.Continue + } + + def onHeadersReceived(headers: HttpResponseHeaders): State = { + builder.accumulate(headers) + State.Continue + } + + def onCompleted(response: FullResponse): A +} diff --git a/core/src/main/scala/gigahorse/AhcConfig.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcConfig.scala similarity index 99% rename from core/src/main/scala/gigahorse/AhcConfig.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcConfig.scala index 5120038..92ec15b 100644 --- a/core/src/main/scala/gigahorse/AhcConfig.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcConfig.scala @@ -16,6 +16,7 @@ */ package gigahorse +package support.asynchttpclient import org.asynchttpclient._ diff --git a/core/src/main/scala/gigahorse/AhcResponse.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcFullResponse.scala similarity index 76% rename from core/src/main/scala/gigahorse/AhcResponse.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcFullResponse.scala index 2c6ed40..6b1a924 100644 --- a/core/src/main/scala/gigahorse/AhcResponse.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcFullResponse.scala @@ -16,28 +16,32 @@ */ package gigahorse +package support.asynchttpclient import scala.collection.JavaConverters._ import org.asynchttpclient.{ Response => XResponse, _ } import org.asynchttpclient.util.HttpUtils import java.nio.charset.Charset +import java.nio.ByteBuffer import scala.collection.immutable.TreeMap +import scala.concurrent.{ Future, Promise } -class AhcResponse(ahcResponse: XResponse) extends Response { +class AhcFullResponse(ahcResponse: XResponse) extends FullResponse { /** * @return The underlying response object. */ def underlying[A] = ahcResponse.asInstanceOf[A] /** - * The response body as a byte array. + * The response body as a `ByteBuffer`. */ - def bodyAsBytes: Array[Byte] = ahcResponse.getResponseBodyAsBytes + override def bodyAsByteBuffer: ByteBuffer = + ahcResponse.getResponseBodyAsByteBuffer /** * The response body as String. */ - lazy val body: String = { + override lazy val bodyAsString: String = { // RFC-2616#3.7.1 states that any text/* mime type should default to ISO-8859-1 charset if not // explicitly set, while Plays default encoding is UTF-8. So, use UTF-8 if charset is not explicitly // set and content type is not text/*, otherwise default to ISO-8859-1 @@ -52,22 +56,22 @@ class AhcResponse(ahcResponse: XResponse) extends Response { /** * Return the headers of the response as a case-insensitive map */ - lazy val allHeaders: Map[String, List[String]] = + override lazy val allHeaders: Map[String, List[String]] = TreeMap[String, List[String]]() ++ ahcResponse.getHeaders.asScala.toList.groupBy(_.getKey).mapValues(_.map(_.getValue)) /** * The response status code. */ - def status: Int = ahcResponse.getStatusCode + override def status: Int = ahcResponse.getStatusCode /** * The response status message. */ - def statusText: String = ahcResponse.getStatusText + override def statusText: String = ahcResponse.getStatusText /** * Get a response header. */ - def header(key: String): Option[String] = Option(ahcResponse.getHeader(key)) + override def header(key: String): Option[String] = Option(ahcResponse.getHeader(key)) } diff --git a/core/src/main/scala/gigahorse/AhcHttpClient.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcHttpClient.scala similarity index 91% rename from core/src/main/scala/gigahorse/AhcHttpClient.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcHttpClient.scala index bb60d80..400fcec 100644 --- a/core/src/main/scala/gigahorse/AhcHttpClient.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/AhcHttpClient.scala @@ -16,6 +16,7 @@ */ package gigahorse +package support.asynchttpclient import scala.collection.JavaConverters._ import java.io.{ File, UnsupportedEncodingException } @@ -40,15 +41,15 @@ class AhcHttpClient(config: AsyncHttpClientConfig) extends HttpClient { def this(config: Config) = this(AhcConfig.buildConfig(config)) - /** Runs the request and return a Future of Response. */ - def run(request: Request): Future[Response] = - process(request, OkHandler[Response](identity)) + /** Runs the request and return a Future of FullResponse. */ + def run(request: Request): Future[FullResponse] = + process(request, OkHandler[FullResponse](identity)) /** Runs the request and return a Future of A. */ - def run[A](request: Request, f: Response => A): Future[A] = + def run[A](request: Request, f: FullResponse => A): Future[A] = process(request, OkHandler[A](f)) - /** Runs the request and return a Future of Either a Response or a Throwable. */ + /** Runs the request and return a Future of Either a FullResponse or a Throwable. */ def run[A](request: Request, lifter: FutureLifter[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = lifter.run(run(request)) @@ -60,18 +61,18 @@ class AhcHttpClient(config: AsyncHttpClientConfig) extends HttpClient { out.write(content.getBodyByteBuffer) State.Continue } - override def onCompleted(response: Response) = { + override def onCompleted(response: FullResponse) = { out.close() file } }) - /** Executes the request and return a Future of Response. Does not error on non-OK response. */ - def process(request: Request): Future[Response] = - process(request, FunctionHandler[Response](identity)) + /** Executes the request and return a Future of FullResponse. Does not error on non-OK response. */ + def process(request: Request): Future[FullResponse] = + process(request, FunctionHandler[FullResponse](identity)) /** Executes the request and return a Future of A. Does not error on non-OK response. */ - def process[A](request: Request, f: Response => A): Future[A] = + def process[A](request: Request, f: FullResponse => A): Future[A] = process(request, FunctionHandler[A](f)) /** Executes the request and return a Future of Either a Response or a Throwable. Does not error on non-OK response. */ @@ -79,7 +80,7 @@ class AhcHttpClient(config: AsyncHttpClientConfig) extends HttpClient { lifter.run(process(request)) /** Executes the request. Does not error on non-OK response. */ - def process[A](request: Request, handler: CompletionHandler[A]): Future[A] = + def process[A](request: Request, handler: AhcCompletionHandler[A]): Future[A] = { val result = Promise[A]() val xrequest = buildRequest(request) @@ -97,7 +98,7 @@ class AhcHttpClient(config: AsyncHttpClientConfig) extends HttpClient { onCompleted(handler.builder.build()) } def onCompleted(response: XResponse): XResponse = { - result.success(handler.onCompleted(new AhcResponse(response))) + result.success(handler.onCompleted(new AhcFullResponse(response))) response } override def onThrowable(t: Throwable): Unit = { @@ -113,7 +114,8 @@ class AhcHttpClient(config: AsyncHttpClientConfig) extends HttpClient { val result = Promise[WebSocket]() val xrequest = buildRequest(request) val upgradeHandler = new WebSocketUpgradeHandler.Builder() - asyncHttpClient.executeRequest(xrequest, upgradeHandler.addWebSocketListener(new WebSocketListener(handler, result)).build()) + asyncHttpClient.executeRequest(xrequest, upgradeHandler.addWebSocketListener( + new WebSocketListener(handler, result)).build()) result.future } diff --git a/core/src/main/scala/gigahorse/FunctionHandler.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/FunctionHandler.scala similarity index 69% rename from core/src/main/scala/gigahorse/FunctionHandler.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/FunctionHandler.scala index 0a20af8..8e883d8 100644 --- a/core/src/main/scala/gigahorse/FunctionHandler.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/FunctionHandler.scala @@ -15,11 +15,12 @@ */ package gigahorse +package support.asynchttpclient -abstract class FunctionHandler[A](f: Response => A) extends CompletionHandler[A] { - override def onCompleted(response: Response): A = f(response) +abstract class FunctionHandler[A](f: FullResponse => A) extends AhcCompletionHandler[A] { + override def onCompleted(response: FullResponse): A = f(response) } object FunctionHandler { - def apply[A](f: Response => A): FunctionHandler[A] = new FunctionHandler[A](f) {} + def apply[A](f: FullResponse => A): FunctionHandler[A] = new FunctionHandler[A](f) {} } diff --git a/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/Gigahorse.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/Gigahorse.scala new file mode 100644 index 0000000..0ccb51d --- /dev/null +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/Gigahorse.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.asynchttpclient + +import java.nio.charset.Charset +import scala.concurrent.Future + +abstract class Gigahorse extends GigahorseSupport { + def withHttp[A](config: Config)(f: HttpClient => A): A = + { + val client: HttpClient = http(config) + try { + f(client) + } + finally { + client.close() + } + } + + def withHttp[A](f: HttpClient => A): A = + withHttp(config)(f) + + /** Returns HttpClient. You must call `close` when you're done. */ + def http(config: Config): HttpClient = new AhcHttpClient(config) +} + +object Gigahorse extends Gigahorse diff --git a/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/OkHandler.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/OkHandler.scala new file mode 100644 index 0000000..3643e9f --- /dev/null +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/OkHandler.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorse +package support.asynchttpclient + +import org.asynchttpclient.{ Response => XResponse, _ } + +abstract class OkHandler[A](f: FullResponse => A) extends FunctionHandler[A](f) { + override def onStatusReceived(status: HttpResponseStatus): State = { + val code = status.getStatusCode + if (code / 100 == 2) super.onStatusReceived(status) + else throw StatusError(code) + } +} + +object OkHandler { + def apply[A](f: FullResponse => A): OkHandler[A] = new OkHandler[A](f) {} +} diff --git a/core/src/main/scala/gigahorse/WebSocketListener.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/WebSocketListener.scala similarity index 98% rename from core/src/main/scala/gigahorse/WebSocketListener.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/WebSocketListener.scala index 8135709..087337f 100644 --- a/core/src/main/scala/gigahorse/WebSocketListener.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/WebSocketListener.scala @@ -15,6 +15,7 @@ */ package gigahorse +package support.asynchttpclient import org.asynchttpclient.ws.{ WebSocketByteListener, WebSocketPingListener, WebSocketPongListener, WebSocketTextListener, WebSocket => XWebSocket, WebSocketListener => XWebSocketListener } diff --git a/core/src/main/scala/gigahorse/package.scala b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/package.scala similarity index 90% rename from core/src/main/scala/gigahorse/package.scala rename to asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/package.scala index 3ff6829..47565e6 100644 --- a/core/src/main/scala/gigahorse/package.scala +++ b/asynchttpclient/src/main/scala/gigahorse/support/asynchttpclient/package.scala @@ -14,6 +14,9 @@ * limitations under the License. */ -package object gigahorse { +package gigahorse +package support + +package object asynchttpclient { type AsyncHandler[A] = org.asynchttpclient.AsyncHandler[A] } diff --git a/core/src/test/resources/application.conf b/asynchttpclient/src/test/resources/application.conf similarity index 100% rename from core/src/test/resources/application.conf rename to asynchttpclient/src/test/resources/application.conf diff --git a/asynchttpclient/src/test/scala/gigahorsetest/AhcHttpClientSpec.scala b/asynchttpclient/src/test/scala/gigahorsetest/AhcHttpClientSpec.scala new file mode 100644 index 0000000..062d4ff --- /dev/null +++ b/asynchttpclient/src/test/scala/gigahorsetest/AhcHttpClientSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2016 by Eugene Yokota + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gigahorsetest + +import org.scalatest._ +import scala.concurrent.Future + +class AhcClientSpec extends BaseHttpClientSpec { + import gigahorse.support.asynchttpclient.Gigahorse + // custom loan pattern + override def withHttp(testCode: gigahorse.HttpClient => Future[Assertion]): Future[Assertion] = + { + val http = Gigahorse.http(Gigahorse.config) + complete { + testCode(http) + } lastly { + http.close() + } + } +} diff --git a/build.sbt b/build.sbt index ab08b35..674bfc9 100644 --- a/build.sbt +++ b/build.sbt @@ -1,12 +1,12 @@ import Dependencies._ lazy val root = (project in file(".")). - aggregate(core). + aggregate(core, akkaHttp, asynchttpclient). dependsOn(core). settings(inThisBuild(List( organization := "com.eed3si9n", scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.11.8", "2.12.0"), + crossScalaVersions := scalaBoth, organizationName := "eed3si9n", organizationHomepage := Some(url("http://eed3si9n.com/")), homepage := Some(url("https://github.com/eed3si9n/gigahorse")), @@ -36,8 +36,33 @@ lazy val core = (project in file("core")). settings( commonSettings, name := "gigahorse-core", - libraryDependencies ++= Seq(ahc, sslConfig, scalatest % Test), + libraryDependencies ++= Seq(sslConfig, scalatest % Test), sourceManaged in (Compile, generateDatatypes) := (sourceDirectory in Compile).value / "scala", // You need this otherwise you get X is already defined as class. sources in Compile := (sources in Compile).value.toList.distinct ) + +lazy val commonTest = (project in file("common-test")). + dependsOn(core). + settings( + libraryDependencies ++= Seq(scalatest), + publish := (), + publishLocal := () + ) + +lazy val asynchttpclient = (project in file("asynchttpclient")). + dependsOn(core, commonTest % Test). + settings( + commonSettings, + name := "gigahorse-asynchttpclient", + libraryDependencies ++= Seq(ahc, scalatest % Test) + ) + +lazy val akkaHttp = (project in file("akka-http")). + dependsOn(core, commonTest % Test). + settings( + commonSettings, + name := "gigahorse-akka-http", + libraryDependencies ++= Seq(akkaHttpCore, akkaHttpExperimental, scalatest % Test), + dependencyOverrides += sslConfig + ) diff --git a/core/src/test/scala/gigahorsetest/HttpClientSpec.scala b/common-test/src/main/scala/gigahorsetest/BaseHttpClientSpec.scala similarity index 79% rename from core/src/test/scala/gigahorsetest/HttpClientSpec.scala rename to common-test/src/main/scala/gigahorsetest/BaseHttpClientSpec.scala index 3f78ce9..db4dcf2 100644 --- a/core/src/test/scala/gigahorsetest/HttpClientSpec.scala +++ b/common-test/src/main/scala/gigahorsetest/BaseHttpClientSpec.scala @@ -17,18 +17,36 @@ package gigahorsetest import org.scalatest._ - +import scala.util.{ Failure, Success } import scala.concurrent._ +import scala.concurrent.duration._ import java.io.File +import gigahorse.{ HeaderNames, WebSocketEvent } -import gigahorse.WebSocketEvent - -import scala.util.{ Failure, Success } +abstract class BaseHttpClientSpec extends AsyncFlatSpec with Matchers { + // custom loan pattern + def withHttp(testCode: gigahorse.HttpClient => Future[Assertion]): Future[Assertion] + private[this] val Gigahorse = gigahorse.GigahorseSupport -class HttpClientSpec extends AsyncFlatSpec { - import gigahorse.Gigahorse + "http.run(r)" should "retrieve a resource from Wikipedia" in { + withHttp { http => + val r = Gigahorse.url("https://en.wikipedia.org/w/api.php"). + addQueryString( + "action" -> "query", + "format" -> "json", + "titles" -> "Mad_Max" + ).get. + addHeaders( + HeaderNames.ACCEPT -> "application/json" + ) + val f = http.run(r) + f map { res => + assert(res.bodyAsString contains "Mad Max") + } + } + } - "http.run(r)" should "retrieve a resource" in + it should "retrieve a resource from Duckduckgo.com" in withHttp { http => val r = Gigahorse.url("http://api.duckduckgo.com"). addQueryString( @@ -37,20 +55,21 @@ class HttpClientSpec extends AsyncFlatSpec { ).get val f = http.run(r) f map { res => - assert(res.body contains "2 (number)") + assert(res.bodyAsString contains "2 (number)") } } "http.run(r, Gigahorse.asString)" should "retrieve a resource as String" in withHttp { http => - val r = Gigahorse.url("http://api.duckduckgo.com"). + val r = Gigahorse.url("https://en.wikipedia.org/w/api.php"). addQueryString( - "q" -> "1 + 1", - "format" -> "json" + "action" -> "query", + "format" -> "json", + "titles" -> "Mad_Max" ).get val f = http.run(r, Gigahorse.asString) f map { s => - assert(s contains "2 (number)") + assert(s contains "Mad Max") } } @@ -111,10 +130,9 @@ class HttpClientSpec extends AsyncFlatSpec { "http.process(r)" should "preserve an error response" in withHttp { http => val r = Gigahorse.url("http://getstatuscode.com/500") - val f = http.process(r) - f map { res => - assert(res.body contains "500 HTTP Status Code") - } + for { + res <- http.process(r) + } yield assert(res.bodyAsString contains "500 HTTP Status Code") } "http.process(r, Gigahorse.asEither)" should "preserve an error response and convert to Right given 404" in @@ -122,18 +140,7 @@ class HttpClientSpec extends AsyncFlatSpec { val r = Gigahorse.url("http://getstatuscode.com/404") val f = http.process(r, Gigahorse.asEither) f map { either => - assert(either.right.get.body contains "404 HTTP Status Code") - } - } - - // custom loan pattern - def withHttp(testCode: gigahorse.HttpClient => Future[Assertion]): Future[Assertion] = - { - val http = Gigahorse.http(Gigahorse.config) - complete { - testCode(http) - } lastly { - http.close() + assert(either.right.get.bodyAsString contains "404 HTTP Status Code") } } diff --git a/core/src/main/datatype/gigahorse.json b/core/src/main/datatype/gigahorse.json index c09b9b0..58d03b0 100644 --- a/core/src/main/datatype/gigahorse.json +++ b/core/src/main/datatype/gigahorse.json @@ -17,7 +17,7 @@ }, { "name": "requestTimeout", - "type": "scala.concurrent.duration.Duration", + "type": "scala.concurrent.duration.FiniteDuration", "doc": [ "The maximum time an `HttpClient` waits until the response is completed. (Default: 120s)" ], diff --git a/core/src/main/scala/gigahorse/CompletionHandler.scala b/core/src/main/scala/gigahorse/CompletionHandler.scala index 5422bf0..da8fa9e 100644 --- a/core/src/main/scala/gigahorse/CompletionHandler.scala +++ b/core/src/main/scala/gigahorse/CompletionHandler.scala @@ -16,26 +16,6 @@ package gigahorse -import org.asynchttpclient.{ Response => XResponse, _ } - abstract class CompletionHandler[A] { - val builder = new XResponse.ResponseBuilder - - def onBodyPartReceived(content: HttpResponseBodyPart): State = { - builder.accumulate(content) - State.Continue - } - - def onStatusReceived(status: HttpResponseStatus): State = { - builder.reset() - builder.accumulate(status) - State.Continue - } - - def onHeadersReceived(headers: HttpResponseHeaders): State = { - builder.accumulate(headers) - State.Continue - } - - def onCompleted(response: Response): A + def onCompleted(response: FullResponse): A } diff --git a/core/src/main/scala/gigahorse/Config.scala b/core/src/main/scala/gigahorse/Config.scala index e5a1b49..8fc9cf5 100644 --- a/core/src/main/scala/gigahorse/Config.scala +++ b/core/src/main/scala/gigahorse/Config.scala @@ -8,7 +8,7 @@ final class Config( /** The maximum time an `HttpClient` can wait when connecting to a remote host. (Default: 120s) */ val connectTimeout: scala.concurrent.duration.Duration, /** The maximum time an `HttpClient` waits until the response is completed. (Default: 120s) */ - val requestTimeout: scala.concurrent.duration.Duration, + val requestTimeout: scala.concurrent.duration.FiniteDuration, /** The maximum time an `HttpClient` can stay idle. (Default: 120s) */ val readTimeout: scala.concurrent.duration.Duration, /** Is HTTP redirect enabled. (Default: `true`) */ @@ -58,7 +58,7 @@ final class Config( def withAuth(username: String, password: String): Config = copy(authOpt = Some(Realm(username = username, password = password))) def withAuth(username: String, password: String, scheme: AuthScheme): Config = copy(authOpt = Some(Realm(username = username, password = password, scheme = scheme))) def this() = this(ConfigDefaults.defaultConnectTimeout, ConfigDefaults.defaultRequestTimeout, ConfigDefaults.defaultReadTimeout, ConfigDefaults.defaultFollowRedirects, ConfigDefaults.defaultMaxRedirects, ConfigDefaults.defaultCompressionEnforced, ConfigDefaults.defaultUserAgentOpt, ConfigDefaults.defaultAuthOpt, ConfigDefaults.defaultSslConfig, ConfigDefaults.defaultMaxRequestRetry, ConfigDefaults.defaultDisableUrlEncoding, ConfigDefaults.defaultUseProxyProperties, ConfigDefaults.defaultKeepAlive, ConfigDefaults.defaultPooledConnectionIdleTimeout, ConfigDefaults.defaultConnectionTtl, ConfigDefaults.defaultMaxConnections, ConfigDefaults.defaultMaxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) - def this(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.Duration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int) = this(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) + def this(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.FiniteDuration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int) = this(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) override def equals(o: Any): Boolean = o match { case x: Config => (this.connectTimeout == x.connectTimeout) && (this.requestTimeout == x.requestTimeout) && (this.readTimeout == x.readTimeout) && (this.followRedirects == x.followRedirects) && (this.maxRedirects == x.maxRedirects) && (this.compressionEnforced == x.compressionEnforced) && (this.userAgentOpt == x.userAgentOpt) && (this.authOpt == x.authOpt) && (this.ssl == x.ssl) && (this.maxRequestRetry == x.maxRequestRetry) && (this.disableUrlEncoding == x.disableUrlEncoding) && (this.useProxyProperties == x.useProxyProperties) && (this.keepAlive == x.keepAlive) && (this.pooledConnectionIdleTimeout == x.pooledConnectionIdleTimeout) && (this.connectionTtl == x.connectionTtl) && (this.maxConnections == x.maxConnections) && (this.maxConnectionsPerHost == x.maxConnectionsPerHost) && (this.webSocketMaxFrameSize == x.webSocketMaxFrameSize) @@ -70,13 +70,13 @@ final class Config( override def toString: String = { "Config(" + connectTimeout + ", " + requestTimeout + ", " + readTimeout + ", " + followRedirects + ", " + maxRedirects + ", " + compressionEnforced + ", " + userAgentOpt + ", " + authOpt + ", " + ssl + ", " + maxRequestRetry + ", " + disableUrlEncoding + ", " + useProxyProperties + ", " + keepAlive + ", " + pooledConnectionIdleTimeout + ", " + connectionTtl + ", " + maxConnections + ", " + maxConnectionsPerHost + ", " + webSocketMaxFrameSize + ")" } - private[this] def copy(connectTimeout: scala.concurrent.duration.Duration = connectTimeout, requestTimeout: scala.concurrent.duration.Duration = requestTimeout, readTimeout: scala.concurrent.duration.Duration = readTimeout, followRedirects: Boolean = followRedirects, maxRedirects: Int = maxRedirects, compressionEnforced: Boolean = compressionEnforced, userAgentOpt: Option[String] = userAgentOpt, authOpt: Option[Realm] = authOpt, ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings = ssl, maxRequestRetry: Int = maxRequestRetry, disableUrlEncoding: Boolean = disableUrlEncoding, useProxyProperties: Boolean = useProxyProperties, keepAlive: Boolean = keepAlive, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration = pooledConnectionIdleTimeout, connectionTtl: scala.concurrent.duration.Duration = connectionTtl, maxConnections: Int = maxConnections, maxConnectionsPerHost: Int = maxConnectionsPerHost, webSocketMaxFrameSize: Int = webSocketMaxFrameSize): Config = { + private[this] def copy(connectTimeout: scala.concurrent.duration.Duration = connectTimeout, requestTimeout: scala.concurrent.duration.FiniteDuration = requestTimeout, readTimeout: scala.concurrent.duration.Duration = readTimeout, followRedirects: Boolean = followRedirects, maxRedirects: Int = maxRedirects, compressionEnforced: Boolean = compressionEnforced, userAgentOpt: Option[String] = userAgentOpt, authOpt: Option[Realm] = authOpt, ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings = ssl, maxRequestRetry: Int = maxRequestRetry, disableUrlEncoding: Boolean = disableUrlEncoding, useProxyProperties: Boolean = useProxyProperties, keepAlive: Boolean = keepAlive, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration = pooledConnectionIdleTimeout, connectionTtl: scala.concurrent.duration.Duration = connectionTtl, maxConnections: Int = maxConnections, maxConnectionsPerHost: Int = maxConnectionsPerHost, webSocketMaxFrameSize: Int = webSocketMaxFrameSize): Config = { new Config(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, webSocketMaxFrameSize) } def withConnectTimeout(connectTimeout: scala.concurrent.duration.Duration): Config = { copy(connectTimeout = connectTimeout) } - def withRequestTimeout(requestTimeout: scala.concurrent.duration.Duration): Config = { + def withRequestTimeout(requestTimeout: scala.concurrent.duration.FiniteDuration): Config = { copy(requestTimeout = requestTimeout) } def withReadTimeout(readTimeout: scala.concurrent.duration.Duration): Config = { @@ -130,6 +130,6 @@ final class Config( } object Config { def apply(): Config = new Config(ConfigDefaults.defaultConnectTimeout, ConfigDefaults.defaultRequestTimeout, ConfigDefaults.defaultReadTimeout, ConfigDefaults.defaultFollowRedirects, ConfigDefaults.defaultMaxRedirects, ConfigDefaults.defaultCompressionEnforced, ConfigDefaults.defaultUserAgentOpt, ConfigDefaults.defaultAuthOpt, ConfigDefaults.defaultSslConfig, ConfigDefaults.defaultMaxRequestRetry, ConfigDefaults.defaultDisableUrlEncoding, ConfigDefaults.defaultUseProxyProperties, ConfigDefaults.defaultKeepAlive, ConfigDefaults.defaultPooledConnectionIdleTimeout, ConfigDefaults.defaultConnectionTtl, ConfigDefaults.defaultMaxConnections, ConfigDefaults.defaultMaxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) - def apply(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.Duration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int): Config = new Config(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) - def apply(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.Duration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int, webSocketMaxFrameSize: Int): Config = new Config(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, webSocketMaxFrameSize) + def apply(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.FiniteDuration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int): Config = new Config(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, ConfigDefaults.defaultWebSocketMaxFrameSize) + def apply(connectTimeout: scala.concurrent.duration.Duration, requestTimeout: scala.concurrent.duration.FiniteDuration, readTimeout: scala.concurrent.duration.Duration, followRedirects: Boolean, maxRedirects: Int, compressionEnforced: Boolean, userAgentOpt: Option[String], authOpt: Option[Realm], ssl: com.typesafe.sslconfig.ssl.SSLConfigSettings, maxRequestRetry: Int, disableUrlEncoding: Boolean, useProxyProperties: Boolean, keepAlive: Boolean, pooledConnectionIdleTimeout: scala.concurrent.duration.Duration, connectionTtl: scala.concurrent.duration.Duration, maxConnections: Int, maxConnectionsPerHost: Int, webSocketMaxFrameSize: Int): Config = new Config(connectTimeout, requestTimeout, readTimeout, followRedirects, maxRedirects, compressionEnforced, userAgentOpt, authOpt, ssl, maxRequestRetry, disableUrlEncoding, useProxyProperties, keepAlive, pooledConnectionIdleTimeout, connectionTtl, maxConnections, maxConnectionsPerHost, webSocketMaxFrameSize) } diff --git a/core/src/main/scala/gigahorse/ConfigExtra.scala b/core/src/main/scala/gigahorse/ConfigExtra.scala index 4234aa1..5de0b41 100644 --- a/core/src/main/scala/gigahorse/ConfigExtra.scala +++ b/core/src/main/scala/gigahorse/ConfigExtra.scala @@ -20,7 +20,7 @@ package gigahorse import java.nio.charset.Charset import java.net.URI import com.typesafe.config.{ Config => XConfig } -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{ Duration, FiniteDuration } import com.typesafe.sslconfig.ssl.SSLConfigFactory object ConfigParser { @@ -39,9 +39,9 @@ object ConfigParser { if (config.hasPath("auth")) Some(parseRealm(config.getConfig("auth"))) else defaultAuthOpt Config( - connectTimeout = config.getDuration("connectTimeout", defaultConnectTimeout), - requestTimeout = config.getDuration("requestTimeout", defaultRequestTimeout), - readTimeout = config.getDuration("readTimeout", defaultReadTimeout), + connectTimeout = config.getFiniteDuration("connectTimeout", defaultConnectTimeout), + requestTimeout = config.getFiniteDuration("requestTimeout", defaultRequestTimeout), + readTimeout = config.getFiniteDuration("readTimeout", defaultReadTimeout), followRedirects = config.getBoolean("followRedirects", defaultFollowRedirects), maxRedirects = config.getInt("maxRedirects", defaultMaxRedirects), compressionEnforced = config.getBoolean("compressionEnforced", defaultCompressionEnforced), @@ -93,9 +93,9 @@ object ConfigParser { } object ConfigDefaults { - val defaultConnectTimeout = Duration("120s") - val defaultRequestTimeout = Duration("120s") - val defaultReadTimeout = Duration("120s") + val defaultConnectTimeout = FiniteDuration(120, "s") + val defaultRequestTimeout = FiniteDuration(120, "s") + val defaultReadTimeout = FiniteDuration(120, "s") val defaultFollowRedirects = true val defaultMaxRedirects = 5 val defaultCompressionEnforced = false @@ -129,4 +129,11 @@ class RichXConfig(config: XConfig) { def getDuration(path: String, fallback: Duration): Duration = if (config.hasPath(path)) Duration(config.getString(path)) else fallback + def getFiniteDuration(path: String, fallback: FiniteDuration): FiniteDuration = + if (config.hasPath(path)) { + val d = Duration(config.getString(path)) + if (d.isFinite) FiniteDuration(d.toMillis, "ms") + else sys.error(s"A FiniteDuration is required for $path") + } + else fallback } diff --git a/core/src/main/scala/gigahorse/Response.scala b/core/src/main/scala/gigahorse/FullResponse.scala similarity index 82% rename from core/src/main/scala/gigahorse/Response.scala rename to core/src/main/scala/gigahorse/FullResponse.scala index fad2be9..b6e9c6c 100644 --- a/core/src/main/scala/gigahorse/Response.scala +++ b/core/src/main/scala/gigahorse/FullResponse.scala @@ -17,7 +17,12 @@ package gigahorse -abstract class Response { +import scala.concurrent._ +import java.nio.ByteBuffer + +/** Represents a completed response. + */ +abstract class FullResponse { /** * Return the current headers of the request being constructed */ @@ -26,12 +31,12 @@ abstract class Response { /** * The response body as String. */ - def body: String + def bodyAsString: String /** - * The response body as a byte array. + * The response body as a `ByteBuffer`. */ - def bodyAsBytes: Array[Byte] + def bodyAsByteBuffer: ByteBuffer /** * The response status code. diff --git a/core/src/main/scala/gigahorse/FutureLifter.scala b/core/src/main/scala/gigahorse/FutureLifter.scala index f53f333..25232c4 100644 --- a/core/src/main/scala/gigahorse/FutureLifter.scala +++ b/core/src/main/scala/gigahorse/FutureLifter.scala @@ -21,8 +21,8 @@ import scala.concurrent.{ Future, ExecutionContext } /** * Lifts Future[A] into Future[Either[Throwable, A]] */ -final class FutureLifter[A](f: Response => A) { - def run(value: Future[Response])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = +final class FutureLifter[A](f: FullResponse => A) { + def run(value: Future[FullResponse])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = value map { r => Right[Throwable, A](f(r)) } recoverWith { case e => Future.successful(Left[Throwable, A](e)) } @@ -31,5 +31,5 @@ final class FutureLifter[A](f: Response => A) { } object FutureLifter { - def asEither: FutureLifter[Response] = new FutureLifter[Response](identity) + def asEither: FutureLifter[FullResponse] = new FutureLifter[FullResponse](identity) } diff --git a/core/src/main/scala/gigahorse/Gigahorse.scala b/core/src/main/scala/gigahorse/GigahorseSupport.scala similarity index 68% rename from core/src/main/scala/gigahorse/Gigahorse.scala rename to core/src/main/scala/gigahorse/GigahorseSupport.scala index bb13ab4..347d895 100644 --- a/core/src/main/scala/gigahorse/Gigahorse.scala +++ b/core/src/main/scala/gigahorse/GigahorseSupport.scala @@ -17,8 +17,11 @@ package gigahorse import java.nio.charset.Charset +import scala.concurrent.Future -abstract class Gigahorse { +/** Common interface for Gigahorse backends. + */ +abstract class GigahorseSupport { /** * Generates a request. * @@ -27,19 +30,6 @@ abstract class Gigahorse { */ def url(url: String): Request = Request(url) - def withHttp[A](config: Config)(f: HttpClient => A): A = - { - val client: HttpClient = http(config) - try { - f(client) - } - finally { - client.close() - } - } - def withHttp[A](f: HttpClient => A): A = - withHttp(config)(f) - /** Returns default configuration using `application.conf` if present. */ def config: Config = { @@ -49,17 +39,14 @@ abstract class Gigahorse { else Config() } - /** Returns HttpClient. You must call `close` when you're done. */ - def http(config: Config): HttpClient = new AhcHttpClient(config) - /** Function from `Response` to `String` */ - lazy val asString: Response => String = _.body + lazy val asString: FullResponse => String = _.bodyAsString /** Lifts Future[Reponse] result to Future[Either[Throwable, Reponse]] */ - lazy val asEither: FutureLifter[Response] = FutureLifter.asEither + lazy val asEither: FutureLifter[FullResponse] = FutureLifter.asEither /** UTF-8. */ val utf8 = Charset.forName("UTF-8") } -object Gigahorse extends Gigahorse +object GigahorseSupport extends GigahorseSupport diff --git a/core/src/main/scala/gigahorse/HttpClient.scala b/core/src/main/scala/gigahorse/HttpClient.scala index cce8813..b5f49f1 100644 --- a/core/src/main/scala/gigahorse/HttpClient.scala +++ b/core/src/main/scala/gigahorse/HttpClient.scala @@ -25,29 +25,29 @@ abstract class HttpClient extends AutoCloseable { /** Closes this client, and releases underlying resources. */ def close(): Unit - /** Runs the request and return a Future of Response. Errors on non-OK response. */ - def run(request: Request): Future[Response] + /** Runs the request and return a Future of FullResponse. Errors on non-OK response. */ + def run(request: Request): Future[FullResponse] /** Runs the request and return a Future of A. Errors on non-OK response. */ - def run[A](request: Request, f: Response => A): Future[A] + def run[A](request: Request, f: FullResponse => A): Future[A] - /** Runs the request and return a Future of Either a Response or a Throwable. Errors on non-OK response. */ + /** Runs the request and return a Future of Either a FullResponse or a Throwable. Errors on non-OK response. */ def run[A](request: Request, lifter: FutureLifter[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] /** Downloads the request to the file. Errors on non-OK response. */ def download(request: Request, file: File): Future[File] - /** Executes the request and return a Future of Response. Does not error on non-OK response. */ - def process(request: Request): Future[Response] + /** Executes the request and return a Future of FullResponse. Does not error on non-OK response. */ + def process(request: Request): Future[FullResponse] /** Executes the request and return a Future of A. Does not error on non-OK response. */ - def process[A](request: Request, f: Response => A): Future[A] + def process[A](request: Request, f: FullResponse => A): Future[A] - /** Executes the request and return a Future of Either a Response or a Throwable. Does not error on non-OK response. */ + /** Executes the request and return a Future of Either a FullResponse or a Throwable. Does not error on non-OK response. */ def process[A](request: Request, lifter: FutureLifter[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] /** Executes the request. Does not error on non-OK response. */ - def process[A](request: Request, handler: CompletionHandler[A]): Future[A] + // def process[A](request: Request, handler: CompletionHandler[A]): Future[A] /** Open a websocket connection. */ def websocket(request: Request)(handler: PartialFunction[WebSocketEvent, Unit]): Future[WebSocket] diff --git a/core/src/main/scala/gigahorse/OkHandler.scala b/core/src/main/scala/gigahorse/StatusError.scala similarity index 71% rename from core/src/main/scala/gigahorse/OkHandler.scala rename to core/src/main/scala/gigahorse/StatusError.scala index defe354..b2e3042 100644 --- a/core/src/main/scala/gigahorse/OkHandler.scala +++ b/core/src/main/scala/gigahorse/StatusError.scala @@ -16,20 +16,6 @@ package gigahorse -import org.asynchttpclient._ - -abstract class OkHandler[A](f: Response => A) extends FunctionHandler[A](f) { - override def onStatusReceived(status: HttpResponseStatus): State = { - val code = status.getStatusCode - if (code / 100 == 2) super.onStatusReceived(status) - else throw StatusError(code) - } -} - -object OkHandler { - def apply[A](f: Response => A): OkHandler[A] = new OkHandler[A](f) {} -} - final class StatusError(val status: Int) extends RuntimeException("Unexpected status: " + status.toString) { override def equals(o: Any): Boolean = o match { case x: StatusError => (this.status == x.status) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6c30b2e..99a679b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,13 @@ import sbt._ object Dependencies { + val scala211 = "2.11.8" + val scala212 = "2.12.1" + val scalaBoth = Seq(scala211, scala212) val ahc = "org.asynchttpclient" % "async-http-client" % "2.0.19" val scalatest = "org.scalatest" %% "scalatest" % "3.0.0" val sslConfig = "com.typesafe" %% "ssl-config-core" % "0.2.1" + val akkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % "10.0.1" + val akkaHttpExperimental = "com.typesafe.akka" %% "akka-http" % "10.0.1" + val sbtIo = "org.scala-sbt" %% "io" % "1.0.0-M7" }