Skip to content

Commit

Permalink
Merge d4f7101 into 0bcb694
Browse files Browse the repository at this point in the history
  • Loading branch information
zomboc0m committed Oct 27, 2017
2 parents 0bcb694 + d4f7101 commit ac478f0
Show file tree
Hide file tree
Showing 41 changed files with 232 additions and 235 deletions.
Expand Up @@ -5,7 +5,7 @@ import akka.stream.scaladsl.Flow
import com.amazonaws.services.cloudformation.{AmazonCloudFormationAsync, model aws}
import com.monsanto.arch.awsutil.cloudformation.model.AwsConverters._
import com.monsanto.arch.awsutil.cloudformation.model.{DeleteStackRequest, ValidatedTemplate}
import com.monsanto.arch.awsutil.{AWSFlow, AWSFlowAdapter}
import com.monsanto.arch.awsutil._

import scala.collection.JavaConverters._

Expand Down
Expand Up @@ -3,9 +3,8 @@ package com.monsanto.arch.awsutil
import akka.NotUsed
import akka.stream.scaladsl.Flow
import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest}
import com.monsanto.arch.awsutil.impl.{AWSGraphStage, Macros, SimpleAWSFlowAdapter}

import scala.language.experimental.macros
import com.monsanto.arch.awsutil.impl.{AWSGraphStage, SimpleAWSFlowAdapter}
import AWSFlowAdapter.{nextTokenFlowAdapter,markerFlowAdapter,nextMarkerFlowAdapter}

/** Utilities for generating Akka streams that interact with AWS. */
object AWSFlow {
Expand All @@ -22,44 +21,53 @@ object AWSFlow {
/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getNextToken` and `Request.withNextToken`. Amazon exceptions will cause the flow to fail.
*/
def pagedByNextToken[Request <: AmazonWebServiceRequest, Result]
def pagedByNextToken[Request <: AmazonWebServiceRequest: TakesNextToken, Result: HasNextToken]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Result, NotUsed] =
macro Macros.pagedByNextToken[Request,Result]
pagedByNextTokenEither(asyncCall).map {
case Left(exception) => throw exception
case Right(result) => result
}

/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getNextToken` and `Request.withNextToken`.
*/
def pagedByNextTokenEither[Request <: AmazonWebServiceRequest, Result]
def pagedByNextTokenEither[Request <: AmazonWebServiceRequest: TakesNextToken, Result: HasNextToken]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Either[AmazonClientException,Result], NotUsed] =
macro Macros.pagedByNextTokenEither[Request,Result]
akka.stream.scaladsl.Flow.fromGraph(new AWSGraphStage(nextTokenFlowAdapter(asyncCall)))

/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getNextMarker` and `Request.withMarker`.
*/
def pagedByNextMarker[Request <: AmazonWebServiceRequest, Result]
def pagedByNextMarker[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasNextMarker]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Result, NotUsed] =
macro Macros.pagedByNextMarker[Request,Result]
pagedByNextMarkerEither(asyncCall).map {
case Left(exception) => throw exception
case Right(result) => result
}

/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getNextMarker` and `Request.withMarker`. Amazon exceptions will cause the flow to fail.
*/
def pagedByNextMarkerEither[Request <: AmazonWebServiceRequest, Result]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Result, NotUsed] =
macro Macros.pagedByNextMarkerEither[Request,Result]
def pagedByNextMarkerEither[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasNextMarker]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Either[AmazonClientException,Result], NotUsed] =
akka.stream.scaladsl.Flow.fromGraph(new AWSGraphStage(nextMarkerFlowAdapter(asyncCall)))

/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getMarker` and `Request.withMarker`.
*/
def pagedByMarker[Request <: AmazonWebServiceRequest, Result]
def pagedByMarker[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasMarker]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Result, NotUsed] =
macro Macros.pagedByMarker[Request,Result]
pagedByMarkerEither(asyncCall).map {
case Left(exception) => throw exception
case Right(result) => result
}

/** A paged flow may return more than one result for each request, in this case pagination is managed by
* `Result.getMarker` and `Request.withMarker`. Amazon exceptions will cause the flow to fail.
*/
def pagedByMarkerEither[Request <: AmazonWebServiceRequest, Result]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Result, NotUsed] =
macro Macros.pagedByMarkerEither[Request,Result]
def pagedByMarkerEither[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasMarker]
(asyncCall: AWSAsyncCall[Request,Result]): Flow[Request, Either[AmazonClientException,Result], NotUsed] =
akka.stream.scaladsl.Flow.fromGraph(new AWSGraphStage(markerFlowAdapter(asyncCall)))

/** Adds the `toResult` operation to AWS flows. */
implicit class FlowOps[Request <: AmazonWebServiceRequest,Result]
Expand Down
Expand Up @@ -4,9 +4,6 @@ import java.util.concurrent.{Future ⇒ JFuture, TimeUnit}

import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.handlers.AsyncHandler
import com.monsanto.arch.awsutil.impl.Macros

import scala.language.experimental.macros

/** Type class used for paging through asynchronous AWS requests. */
trait AWSFlowAdapter[Request <: AmazonWebServiceRequest, Result] {
Expand All @@ -28,23 +25,35 @@ object AWSFlowAdapter {
/** Generates a flow adapter from an async method handle. The implementation depends on the fact that
* paging is handled by calling `Request.withNextToken` and `Response.getNextToken`.
*/
def nextTokenFlowAdapter[Request <: AmazonWebServiceRequest, Result]
def nextTokenFlowAdapter[Request <: AmazonWebServiceRequest: TakesNextToken, Result: HasNextToken]
(asyncCall: AWSAsyncCall[Request,Result]): AWSFlowAdapter[Request,Result] =
macro Macros.nextTokenFlowAdapter[Request, Result]
new AWSFlowAdapter[Request,Result] {
override def processRequest: RequestProcessor = asyncCall
override def getToken(result: Result): Option[String] = HasNextToken[Result].getToken(result)
override def withToken(request: Request, token: String): Request = TakesNextToken[Request].withToken(request,token)
}

/** Generates a flow adapter from an async method handle. The implementation depends on the fact that
* paging is handled by calling `Request.withMarker` and `Response.getNextMarker`.
*/
def nextMarkerFlowAdapter[Request <: AmazonWebServiceRequest, Result]
def nextMarkerFlowAdapter[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasNextMarker]
(asyncCall: AWSAsyncCall[Request,Result]): AWSFlowAdapter[Request,Result] =
macro Macros.nextMarkerFlowAdapter[Request, Result]
new AWSFlowAdapter[Request,Result] {
override def processRequest: RequestProcessor = asyncCall
override def getToken(result: Result): Option[String] = HasNextMarker[Result].getNextMarker(result)
override def withToken(request: Request, token: String): Request = TakesMarker[Request].withMarker(request,token)
}

/** Generates a flow adapter from an async method handle. The implementation depends on the fact that
* paging is handled by calling `Request.withMarker` and `Response.getMarker`.
*/
def markerFlowAdapter[Request <: AmazonWebServiceRequest, Result]
def markerFlowAdapter[Request <: AmazonWebServiceRequest: TakesMarker, Result: HasMarker]
(asyncCall: AWSAsyncCall[Request,Result]): AWSFlowAdapter[Request,Result] =
macro Macros.markerFlowAdapter[Request, Result]
new AWSFlowAdapter[Request,Result] {
override def processRequest: RequestProcessor = asyncCall
override def getToken(result: Result): Option[String] = HasMarker[Result].getMarker(result)
override def withToken(request: Request, token: String): Request = TakesMarker[Request].withMarker(request,token)
}

/** A utility for wrapping an asynchronous AWS call that returns a useless POJO. This utility will make the result
* of the call be the request that was passed in.
Expand Down
@@ -1,137 +1,32 @@
package com.monsanto.arch.awsutil.impl

import com.amazonaws.AmazonWebServiceRequest
import com.monsanto.arch.awsutil.{AWSAsyncCall, AWSFlowAdapter}
import scala.reflect.macros.blackbox

/** Contains macro implementations. */
object Macros {
def nextTokenFlowAdapter[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val adapterType = weakTypeOf[AWSFlowAdapter[Request,Result]]
q"""
new $adapterType {
def processRequest: RequestProcessor = $asyncCall
def getToken(result: $resultType) = Option(result.getNextToken).filter(_.nonEmpty)
def withToken(request: $requestType, token: String) = request.withNextToken(token)
}
"""
}

def pagedByNextToken[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.nextTokenFlowAdapter($asyncCall)))
.map {
case Left(exception) ⇒ throw exception
case Right(result) ⇒ result
}
"""
}

def pagedByNextTokenEither[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.nextTokenFlowAdapter($asyncCall)))
"""
}

def nextMarkerFlowAdapter[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val adapterType = weakTypeOf[AWSFlowAdapter[Request,Result]]
q"""
new $adapterType {
def processRequest: RequestProcessor = $asyncCall
def getToken(result: $resultType) = Option(result.getNextMarker).filter(_.nonEmpty)
def withToken(request: $requestType, token: String) = request.withMarker(token)
}
"""
}

def pagedByNextMarker[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
def hasNextTokenImpl[Result](c: blackbox.Context)(implicit resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.nextMarkerFlowAdapter($asyncCall)))
.map {
case Left(exception) ⇒ throw exception
case Right(result) ⇒ result
}
"""
q"new HasNextToken[$resultType] { def getToken(res: $resultType) = Option(res.getNextToken).filter(_.nonEmpty) }"
}

def pagedByNextMarkerEither[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
def takesNextTokenImpl[Request](c: blackbox.Context)(implicit requestType: c.WeakTypeTag[Request]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.nextMarkerFlowAdapter($asyncCall)))
"""
q"new TakesNextToken[$requestType] { def withToken(req: $requestType, tok: String) = req.withNextToken(tok) }"
}

def markerFlowAdapter[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
def hasNextMarkerImpl[Result](c: blackbox.Context)(implicit resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val adapterType = weakTypeOf[AWSFlowAdapter[Request,Result]]
q"""
new $adapterType {
def processRequest: RequestProcessor = $asyncCall
def getToken(result: $resultType) = Option(result.getMarker).filter(_.nonEmpty)
def withToken(request: $requestType, token: String) = request.withMarker(token)
}
"""
q"new HasNextMarker[$resultType] { def getNextMarker(res: $resultType) = Option(res.getNextMarker).filter(_.nonEmpty) }"
}

def pagedByMarker[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
def hasMarkerImpl[Result](c: blackbox.Context)(implicit resultType: c.WeakTypeTag[Result]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.markerFlowAdapter($asyncCall)))
.map {
case Left(exception) ⇒ throw exception
case Right(result) ⇒ result
}
"""
q"new HasMarker[$resultType] { def getMarker(res: $resultType) = Option(res.getMarker).filter(_.nonEmpty) }"
}

def pagedByMarkerEither[Request <: AmazonWebServiceRequest, Result]
(c: blackbox.Context)
(asyncCall: c.Expr[AWSAsyncCall[Request,Result]])
(implicit requestType: c.WeakTypeTag[Request], resultType: c.WeakTypeTag[Result]): c.Tree = {
def takesMarkerImpl[Request](c: blackbox.Context)(implicit requestType: c.WeakTypeTag[Request]): c.Tree = {
import c.universe._
val stageType = weakTypeTag[AWSGraphStage[Request,Result]]
val flowAdapterObject = weakTypeTag[AWSFlowAdapter[Request,Result]].tpe.companion
q"""
akka.stream.scaladsl.Flow.fromGraph(new $stageType($flowAdapterObject.markerFlowAdapter($asyncCall)))
"""
q"new TakesMarker[$requestType] { def withMarker(req: $requestType, marker:String) = req.withMarker(marker) }"
}
}
Expand Up @@ -4,7 +4,93 @@ import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.handlers.AsyncHandler
import java.util.concurrent.{Future => JFuture}

import scala.annotation.implicitNotFound
import scala.language.experimental.macros
import com.monsanto.arch.awsutil.impl.Macros

package object awsutil {
/** Handy type alias for an asynchronous AWS call. */
type AWSAsyncCall[Request <: AmazonWebServiceRequest, Result] = (Request, AsyncHandler[Request,Result]) => JFuture[Result]

/** HasNextToken is a typeclass representing classes with a 'getNextToken' method */
@implicitNotFound("Cannot prove that type ${R} has a getNextToken method")
trait HasNextToken[R] {
def getToken(res: R): Option[String]
}

object HasNextToken {
def apply[R: HasNextToken]: HasNextToken[R] = implicitly

implicit def instance[R]: HasNextToken[R] = macro Macros.hasNextTokenImpl[R]

final implicit class HTSyntax[R](val ht: R) extends AnyVal {
def getToken(implicit r: HasNextToken[R]): Option[String] = r.getToken(ht)
}
}

/** TakesNextToken is a typeclass representing classes with a 'withNextToken' method */
@implicitNotFound("Cannot prove that type ${R} has a withNextToken method")
trait TakesNextToken[R] {
def withToken(req: R, tok: String): R
}

object TakesNextToken {
def apply[R: TakesNextToken]: TakesNextToken[R] = implicitly

implicit def instance[R]: TakesNextToken[R] = macro Macros.takesNextTokenImpl[R]

final implicit class TTSyntax[R](val tt: R) extends AnyVal {
def withToken(tok: String)(implicit r: TakesNextToken[R]): R = r.withToken(tt,tok)
}
}

/** HasNextMarker is a typeclass representing classes with a 'getNextMarker' method */
@implicitNotFound("Cannot prove that type ${R} has a getNextMarker method")
trait HasNextMarker[R] {
def getNextMarker(res: R): Option[String]
}

object HasNextMarker {
def apply[R: HasNextMarker]: HasNextMarker[R] = implicitly

implicit def instance[R]: HasNextMarker[R] = macro Macros.hasNextMarkerImpl[R]

final implicit class HNMSyntax[R](val hnm: R) extends AnyVal {
def getNextMarker(implicit r: HasNextMarker[R]): Option[String] = r.getNextMarker(hnm)
}
}

/** HasMarker is a typeclass representing classes with a 'getMarker' method */
@implicitNotFound("Cannot prove that type ${R} has a getMarker method")
trait HasMarker[R] {
def getMarker(res: R): Option[String]
}

object HasMarker {
def apply[R: HasMarker]: HasMarker[R] = implicitly

implicit def instance[R]: HasMarker[R] = macro Macros.hasMarkerImpl[R]

final implicit class HMSyntax[R](val hm: R) extends AnyVal {
def getMarker(implicit r: HasMarker[R]): Option[String] = r.getMarker(hm)
}
}

/**
* TakesMarker is a typeclass representing classes with a 'withMarker' method
*/
@implicitNotFound("Cannot prove that type ${R} has a withMarker method")
trait TakesMarker[R] {
def withMarker(req: R, marker: String): R
}

object TakesMarker {
def apply[R: TakesMarker]: TakesMarker[R] = implicitly

implicit def instance[R]: TakesMarker[R] = macro Macros.takesMarkerImpl[R]

final implicit class TMSyntax[R](val tm: R) extends AnyVal {
def withMarker(marker: String)(implicit r: TakesMarker[R]): R = r.withMarker(tm,marker)
}
}
}
Expand Up @@ -9,7 +9,7 @@ import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest}
import com.monsanto.arch.awsutil.impl.AWSGraphStageSpec._
import com.monsanto.arch.awsutil.{AWSAsyncCall, AWSFlow}
import com.monsanto.arch.awsutil._
import com.typesafe.config.ConfigFactory
import org.scalamock.scalatest.MockFactory
import org.scalatest.Matchers._
Expand Down

0 comments on commit ac478f0

Please sign in to comment.