Skip to content

Commit

Permalink
Merge eeb2289 into adcf8d8
Browse files Browse the repository at this point in the history
  • Loading branch information
pvighi committed Dec 3, 2018
2 parents adcf8d8 + eeb2289 commit aec23cb
Show file tree
Hide file tree
Showing 15 changed files with 662 additions and 162 deletions.
105 changes: 92 additions & 13 deletions handlers/sf-datalake-export/cfn.yaml
Expand Up @@ -12,11 +12,29 @@ Parameters:
Mappings:
StageMap:
CODE:
destBuckets: "arn:aws:s3:::gu-salesforce-export-test/CODE/*"
PROD:
destBuckets: ["arn:aws:s3:::gu-salesforce-export-test/PROD/*", "arn:aws:s3:::ophan-raw-salesforce-customer-data-*"]
destBuckets: "arn:aws:s3:::gu-salesforce-export-code*"
localbucketName : "gu-salesforce-export-code"


PROD:
destBuckets: ["arn:aws:s3:::gu-salesforce-export-prod*", "arn:aws:s3:::ophan-raw-salesforce-customer-data-*"]
localbucketName: "gu-salesforce-export-prod"
Resources:
localBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !FindInMap [StageMap, !Ref Stage, localbucketName]
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
LifecycleConfiguration:
Rules:
- TagFilters:
- Key: housekeeping
Value: delete
ExpirationInDays: 2
Status: Enabled
StartJobRole:
Type: AWS::IAM::Role
Properties:
Expand Down Expand Up @@ -149,17 +167,46 @@ Resources:
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:DeleteObject
- s3:GetObject
- s3:GetObjectAcl
- s3:GetBucketAcl
- s3:ListBucket
- s3:PutObject
- s3:GetObjectVersion
- s3:DeleteObjectVersion
- s3:PutObjectAcl
Resource: !FindInMap [StageMap, !Ref Stage, destBuckets]
CleanBucketRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: LambdaPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- lambda:InvokeFunction
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/sf-export-clean-bucket-${Stage}:log-stream:*"
- PolicyName: DestBucket
PolicyDocument:
Statement:
- Effect: Allow
Action:
- s3:DeleteObject
- s3:GetObject
- s3:GetObjectAcl
- s3:GetBucketAcl
- s3:ListBucket
- s3:GetObjectVersion
- s3:DeleteObjectVersion
Resource: !FindInMap [StageMap, !Ref Stage, destBuckets]

DownloadBatch:
Type: AWS::Lambda::Function
Properties:
Expand Down Expand Up @@ -209,6 +256,26 @@ Resources:
- Effect: Allow
Action: s3:GetObject
Resource: !Sub "arn:aws:s3:::gu-reader-revenue-private/membership/support-service-lambdas/${Stage}/sfExportAuth-${Stage}*.json"
CleanBucket:
Type: AWS::Lambda::Function
Properties:
Description: clean old results from bucket before uploading new ones
FunctionName:
!Sub sf-export-clean-bucket-${Stage}
Code:
S3Bucket: zuora-auto-cancel-dist
S3Key: !Sub membership/${Stage}/sf-datalake-export/sf-datalake-export.jar
Handler: com.gu.sf_datalake_export.handlers.CleanBucketHandler::apply
Environment:
Variables:
Stage: !Ref Stage
Role:
!GetAtt CleanBucketRole.Arn
MemorySize: 1536
Runtime: java8
Timeout: 900
DependsOn:
- CleanBucketRole
EndJob:
Type: AWS::Lambda::Function
Properties:
Expand Down Expand Up @@ -291,10 +358,20 @@ Resources:
"Choices": [{
"Variable": "$.jobStatus",
"StringEquals": "Completed",
"Next": "downloadBatch"
"Next": "cleanBucket"
}],
"Default": "WaitSomeTime"
},
"cleanBucket": {
"Type": "Task",
"Resource": "${cleanBucketArn}",
"Next": "downloadBatch",
"Retry": [{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 30,
"MaxAttempts": 3
}]
},
"downloadBatch": {
"Type": "Task",
"Resource": "${downloadBatchArn}",
Expand Down Expand Up @@ -334,5 +411,7 @@ Resources:
startJobArn: !GetAtt [ StartJob, Arn ],
getBatchesArn: !GetAtt [ GetBatchesLambda, Arn ],
downloadBatchArn: !GetAtt [ DownloadBatch, Arn ],
endJobArn: !GetAtt [ EndJob, Arn ]
endJobArn: !GetAtt [ EndJob, Arn ],
cleanBucketArn: !GetAtt [ CleanBucket, Arn ]

}
1 change: 1 addition & 0 deletions handlers/sf-datalake-export/riff-raff.yaml
Expand Up @@ -19,4 +19,5 @@ deployments:
- sf-get-batch-state-
- sf-download-batch-
- sf-end-export-job-
- sf-export-clean-bucket-
dependencies: [cfn]
@@ -0,0 +1,84 @@
package com.gu.sf_datalake_export.handlers

import java.io.{InputStream, OutputStream}

import com.amazonaws.services.lambda.runtime.Context
import com.gu.effects._
import com.gu.sf_datalake_export.handlers.DownloadBatchHandler.WireState
import com.gu.sf_datalake_export.handlers.StartJobHandler.ShouldUploadToDataLake
import com.gu.sf_datalake_export.salesforce_bulk_api.BulkApiParams.ObjectName
import com.gu.sf_datalake_export.salesforce_bulk_api.GetBatchResult.JobName
import com.gu.sf_datalake_export.util.ExportS3Path
import com.gu.util.Logging
import com.gu.util.apigateway.ApiGatewayHandler.LambdaIO
import com.gu.util.config.Stage
import com.gu.util.handlers.JsonHandler

import scala.util.{Success, Try}

object CleanBucketHandler extends Logging {

def apply(inputStream: InputStream, outputStream: OutputStream, context: Context): Unit = {

val wiredOperation: WireState => Try[WireState] = wireOperation(RawEffects.stage, ListS3Objects.listObjectsWithPrefix _, DeleteS3Objects.deleteObjects _)

JsonHandler(
lambdaIO = LambdaIO(inputStream, outputStream, context),
operation = wiredOperation
)
}

def wireOperation(
stage: Stage,
listS3ForPrefix: S3Path => Try[List[Key]],
deleteS3Objects: (BucketName, List[Key]) => Try[Unit]
)(state: WireState): Try[WireState] = {

val wiredCleanBucket = cleanBucket(
ExportS3Path(stage) _,
listS3ForPrefix,
deleteS3Objects
) _

handleCleanBucket(wiredCleanBucket)(state)
}

def handleCleanBucket(
cleanBucketOp: (ObjectName, JobName, ShouldUploadToDataLake) => Try[Unit]
)(state: WireState): Try[WireState] = {
val objectName = ObjectName(state.objectName)
val jobName = JobName(state.jobName)
val shouldUploadToDataLake = ShouldUploadToDataLake(state.uploadToDataLake)
cleanBucketOp(
objectName,
jobName,
shouldUploadToDataLake
) map (_ => state)
}

def cleanBucket(
basePathFor: (ObjectName, ShouldUploadToDataLake) => S3Path,
listObjectsWithPrefix: S3Path => Try[List[Key]],
deleteObjects: (BucketName, List[Key]) => Try[Unit]
)(
objectName: ObjectName,
jobName: JobName,
shouldUploadToDataLake: ShouldUploadToDataLake
): Try[Unit] = {
val basePath = basePathFor(objectName, shouldUploadToDataLake)
logger.info(s"cleaning ${basePath}")
val prefixPath = S3Path.appendToPrefix(basePath, jobName.value)
for {
keysWithPrefix <- listObjectsWithPrefix(prefixPath)
_ <- keysWithPrefix match {
case Nil =>
logger.info("nothing to delete")
Success(())
case nonEmptyKeyList =>
logger.info(s"deleting $nonEmptyKeyList")
deleteObjects(prefixPath.bucketName, nonEmptyKeyList)
}
} yield ()
}

}

0 comments on commit aec23cb

Please sign in to comment.