Skip to content

Commit

Permalink
convert the first lambda in the sf export state machine to make it co…
Browse files Browse the repository at this point in the history
…mpatible with the dsl stuff
  • Loading branch information
pvighi committed Nov 30, 2018
1 parent 1a08205 commit 327f93c
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 125 deletions.
@@ -1,28 +1,86 @@
package com.gu.sf_datalake_export.handlers

import java.io.{InputStream, OutputStream}
import java.time.LocalDate

import com.gu.sf_datalake_export.util.TryOps._
import com.amazonaws.services.lambda.runtime.Context
import com.gu.effects.{GetFromS3, RawEffects}
import com.gu.salesforce.SalesforceAuthenticate.{SFAuthConfig, SFExportAuthConfig}
import com.gu.salesforce.SalesforceClient
import com.gu.sf_datalake_export.handlers.StartJobHandler.{ShouldUploadToDataLake, WireRequest, WireResponse}
import com.gu.sf_datalake_export.salesforce_bulk_api.AddQueryToJob.AddQueryRequest
import com.gu.sf_datalake_export.salesforce_bulk_api.BulkApiParams.ObjectName
import com.gu.sf_datalake_export.salesforce_bulk_api.CreateJob.{CreateJobRequest, JobId}
import com.gu.sf_datalake_export.salesforce_bulk_api.GetBatchResult.JobName
import com.gu.sf_datalake_export.salesforce_bulk_api.{AddQueryToJob, BulkApiParams, CreateJob}
import com.gu.util.apigateway.ApiGatewayHandler.LambdaIO
import com.gu.sf_datalake_export.util.TryOps._
import com.gu.util.config.LoadConfigModule.StringFromS3
import com.gu.util.config.{LoadConfigModule, Stage}
import com.gu.util.handlers.{JsonHandler, LambdaException}
import com.gu.util.handlers.LambdaException
import com.gu.util.resthttp.JsonHttp
import com.gu.util.resthttp.Types.ClientFailableOp
import okhttp3.{Request, Response}
import play.api.libs.json.{Json, Reads}
import play.api.libs.json._

import scala.util.{Failure, Success, Try}

case class StartJobRequest(
objectName: ObjectName,
shouldUploadToDataLake: ShouldUploadToDataLake //todo this should be optional
)

object StartJobRequest {
implicit val format = new OFormat[StartJobRequest] {
override def writes(o: StartJobRequest): JsObject = {
val wire = WireRequest(
objectName = o.objectName.value,
uploadToDataLake = Some(o.shouldUploadToDataLake.value)
)

WireRequest.format.writes(wire)
}

override def reads(json: JsValue): JsResult[StartJobRequest] = WireRequest.format.reads(json).map { wire =>
StartJobRequest(
ObjectName(wire.objectName),
ShouldUploadToDataLake(wire.uploadToDataLake.getOrElse(true))
)
}


}

}

case class StartJobResponse(
jobId: JobId,
jobName: JobName,
objectName: ObjectName,
shouldUploadToDataLake: ShouldUploadToDataLake
)

object StartJobResponse {
implicit val format = new OFormat[StartJobResponse] {
override def writes(o: StartJobResponse): JsObject = {
val wire = WireResponse(
jobId = o.jobId.value,
jobName = o.objectName.value,
objectName = o.objectName.value,
uploadToDataLake = o.shouldUploadToDataLake.value
)
WireResponse.format.writes(wire)
}

override def reads(json: JsValue): JsResult[StartJobResponse] = WireResponse.format.reads(json).map {
wire =>
StartJobResponse(
JobId(wire.jobId),
JobName(wire.jobName),
ObjectName(wire.objectName),
ShouldUploadToDataLake(wire.uploadToDataLake)
)

}
}
}
object StartJobHandler {

case class WireRequest(
Expand All @@ -31,7 +89,7 @@ object StartJobHandler {
)

object WireRequest {
implicit val reads: Reads[WireRequest] = Json.reads[WireRequest]
implicit val format = Json.format[WireRequest]
}

case class WireResponse(
Expand All @@ -42,7 +100,7 @@ object StartJobHandler {
)

object WireResponse {
implicit val writes = Json.writes[WireResponse]
implicit val format = Json.format[WireResponse]
}

case class ShouldUploadToDataLake(value: Boolean) extends AnyVal
Expand All @@ -61,45 +119,35 @@ object StartJobHandler {
createJob: CreateJobRequest => ClientFailableOp[JobId],
addQuery: AddQueryRequest => ClientFailableOp[Unit]
)(
objectName: ObjectName,
shouldUploadToDataLake: ShouldUploadToDataLake
): Try[WireResponse] = {
request: StartJobRequest
): Try[StartJobResponse] = {
for {
sfQueryInfo <- BulkApiParams.byName.get(objectName).toTry(noneErrorMessage = s"invalid object name ${objectName.value}")
sfQueryInfo <- BulkApiParams.byName.get(request.objectName).toTry(noneErrorMessage = s"invalid object name ${request.objectName.value}")
createJobRequest = CreateJobRequest(sfQueryInfo.sfObjectName, sfQueryInfo.batchSize)
jobId <- createJob(createJobRequest).toTry
addQueryRequest = AddQueryRequest(sfQueryInfo.soql, jobId)
_ <- addQuery(addQueryRequest).toTry
jobName = s"${objectName.value}_${getCurrentDate()}"
} yield WireResponse(jobId.value, jobName, objectName.value, shouldUploadToDataLake.value)
jobName = JobName(s"${request.objectName.value}_${getCurrentDate()}")
} yield StartJobResponse(jobId, jobName, request.objectName, request.shouldUploadToDataLake)
}

def operation(
getCurrentDate: () => LocalDate,
stage: Stage,
fetchString: StringFromS3,
getResponse: Request => Response
)(request: WireRequest): Try[WireResponse] = {
)(request: StartJobRequest): Try[StartJobResponse] = {
val loadConfig = LoadConfigModule(stage, fetchString)
for {
sfConfig <- loadConfig[SFAuthConfig](SFExportAuthConfig.location, SFAuthConfig.reads).leftMap(_.error).toTry
sfClient <- SalesforceClient(getResponse, sfConfig).value.toTry
createJobOp = sfClient.wrapWith(JsonHttp.postWithHeaders).wrapWith(CreateJob.wrapper).runRequest _
addQueryToJobOp = sfClient.wrapWith(AddQueryToJob.wrapper).runRequest _
wiredSteps = steps(getCurrentDate, createJobOp, addQueryToJobOp) _
shouldUploadToDataLake <- ShouldUploadToDataLake(request.uploadToDataLake, stage)
response <- wiredSteps(ObjectName(request.objectName), shouldUploadToDataLake)
response <- wiredSteps(request)
} yield response
}

def apply(inputStream: InputStream, outputStream: OutputStream, context: Context): Unit = {
val getCurrentDate = () => RawEffects.now().toLocalDate

JsonHandler(
lambdaIO = LambdaIO(inputStream, outputStream, context),
operation = operation(getCurrentDate, RawEffects.stage, GetFromS3.fetchString, RawEffects.response)
)

}
def wiredOperation = operation(RawEffects.now().toLocalDate, RawEffects.stage, GetFromS3.fetchString, RawEffects.response) _

}
163 changes: 82 additions & 81 deletions handlers/sf-export-dsl/src/main/scala/com/gu/steps/DemoHandler.scala
@@ -1,81 +1,82 @@
package com.gu.steps

import java.io.{InputStream, OutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import com.amazonaws.services.lambda.runtime.Context
import com.gu.steps.CompiledSteps.LambdaId
import play.api.libs.json.{JsValue, Json, OFormat}

import scala.util.{Failure, Success, Try}

object Handler extends App {

case class Initial(data: String)
case class NextState(moreData: String)
case class FinalState(finalData: String)

def step1(initial: Initial): NextState = {
println("RUN STEP 1")
NextState(initial.data)
}

def step2(nextState: NextState): FinalState = {
println("RUN STEP 2")
FinalState(nextState.moreData)
}

// see the s-w tests for a nicer way to compose:
// https://github.com/guardian/support-workers/blob/7faa9f7709d007bb027ae450526193687e401b92/src/test/scala/com/gu/support/workers/EndToEndSpec.scala#L26
lazy val program = TaskStep(step1, EndStep(step2))

implicit lazy val iF: OFormat[Initial] = Json.format[Initial]
implicit lazy val nF: OFormat[NextState] = Json.format[NextState]
implicit lazy val fF: OFormat[FinalState] = Json.format[FinalState]

lazy val interpretedViaJson = InterpJson[Initial].apply(program)

override def main(args: Array[String]): Unit = {

val interpretedDirectly = InterpLocal(Initial("hello"), program)
println(s"interpretedDirectly: $interpretedDirectly")

println(s"interpretedViaJson: $interpretedViaJson")

val runLocallyViaJson = CompiledSteps.runLocal(interpretedViaJson)(Initial("hello"))
println(s"runLocallyViaJson: $runLocallyViaJson")

val handlerFunctionName = this.getClass.getCanonicalName.replaceAll("""\$$""", "") + "::apply"
val cfn = CompiledSteps.toCFN(interpretedViaJson, handlerFunctionName, ENV_VAR)
println(s"CFN: $cfn")
val cfnRaw = Json.prettyPrint(Json.toJson(cfn))
Files.write(Paths.get("target/generated.cfn.json"), cfnRaw.getBytes(StandardCharsets.UTF_8))
}

lazy val ENV_VAR: String = "LAMBDA_ID"

// this is the entry point
def apply(inputStream: InputStream, outputStream: OutputStream, context: Context): Unit = {
val res = for {
envLambdaId <- Try(System.getenv(ENV_VAR))
lambdaId <- LambdaId.fromEnv(envLambdaId)
output <- CompiledSteps.runSingle(interpretedViaJson, lambdaId, Json.parse(inputStream)) match {
case None => Failure(new RuntimeException("oops probably couldn't deserialise"))
case Some(result) => Success(result)
}
} yield output
res match {
case Failure(ex) => throw ex
case Success(outputJS) => outputForAPIGateway(outputStream, outputJS)
}
}

def outputForAPIGateway(outputStream: OutputStream, jsonResponse: JsValue): Unit = {
val writer = new OutputStreamWriter(outputStream, "UTF-8")
println(s"Response will be: \n ${jsonResponse.toString}")
writer.write(Json.stringify(jsonResponse))
writer.close()
}

}
//package com.gu.steps
//
//import java.io.{InputStream, OutputStream, OutputStreamWriter}
//import java.nio.charset.StandardCharsets
//import java.nio.file.{Files, Paths}
//
//import com.amazonaws.services.lambda.runtime.Context
//import com.gu.sf_datalake_export.handlers.StartJobHandler
//import com.gu.steps.CompiledSteps.LambdaId
//import play.api.libs.json.{JsValue, Json, OFormat}
//
//import scala.util.{Failure, Success, Try}
//
//object Handler extends App {
//
// case class Initial(data: String)
// case class NextState(moreData: String)
// case class FinalState(finalData: String)
//
// def step1(initial: Initial): NextState = {
// println("RUN STEP 1")
// NextState(initial.data)
// }
//
// def step2(nextState: NextState): FinalState = {
// println("RUN STEP 2")
// FinalState(nextState.moreData)
// }
//
// // see the s-w tests for a nicer way to compose:
// // https://github.com/guardian/support-workers/blob/7faa9f7709d007bb027ae450526193687e401b92/src/test/scala/com/gu/support/workers/EndToEndSpec.scala#L26
// lazy val program = TaskStep(step1, EndStep(step2))
//
// implicit lazy val iF: OFormat[Initial] = Json.format[Initial]
// implicit lazy val nF: OFormat[NextState] = Json.format[NextState]
// implicit lazy val fF: OFormat[FinalState] = Json.format[FinalState]
//
// lazy val interpretedViaJson = InterpJson[Initial].apply(program)
//
// override def main(args: Array[String]): Unit = {
//
// val interpretedDirectly = InterpLocal(Initial("hello"), program)
// println(s"interpretedDirectly: $interpretedDirectly")
//
// println(s"interpretedViaJson: $interpretedViaJson")
//
// val runLocallyViaJson = CompiledSteps.runLocal(interpretedViaJson)(Initial("hello"))
// println(s"runLocallyViaJson: $runLocallyViaJson")
//
// val handlerFunctionName = this.getClass.getCanonicalName.replaceAll("""\$$""", "") + "::apply"
// val cfn = CompiledSteps.toCFN(interpretedViaJson, handlerFunctionName, ENV_VAR)
// println(s"CFN: $cfn")
// val cfnRaw = Json.prettyPrint(Json.toJson(cfn))
// Files.write(Paths.get("target/generated.cfn.json"), cfnRaw.getBytes(StandardCharsets.UTF_8))
// }
//
// lazy val ENV_VAR: String = "LAMBDA_ID"
//
// // this is the entry point
// def apply(inputStream: InputStream, outputStream: OutputStream, context: Context): Unit = {
// val res = for {
// envLambdaId <- Try(System.getenv(ENV_VAR))
// lambdaId <- LambdaId.fromEnv(envLambdaId)
// output <- CompiledSteps.runSingle(interpretedViaJson, lambdaId, Json.parse(inputStream)) match {
// case None => Failure(new RuntimeException("oops probably couldn't deserialise"))
// case Some(result) => Success(result)
// }
// } yield output
// res match {
// case Failure(ex) => throw ex
// case Success(outputJS) => outputForAPIGateway(outputStream, outputJS)
// }
// }
//
// def outputForAPIGateway(outputStream: OutputStream, jsonResponse: JsValue): Unit = {
// val writer = new OutputStreamWriter(outputStream, "UTF-8")
// println(s"Response will be: \n ${jsonResponse.toString}")
// writer.write(Json.stringify(jsonResponse))
// writer.close()
// }
//
//}

0 comments on commit 327f93c

Please sign in to comment.