Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add registration cleaning worker #239

Merged
merged 3 commits into from Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Expand Up @@ -339,7 +339,8 @@ lazy val notificationworkerlambda = project
riffRaffUploadManifestBucket := Option("riffraff-builds"),
riffRaffManifestProjectName := s"mobile-n10n:${name.value}",
riffRaffArtifactResources += (baseDirectory.value / "cfn.yaml", s"ios-notification-worker-cfn/cfn.yaml"),
riffRaffArtifactResources += (baseDirectory.value / "cfn.yaml", s"android-notification-worker-cfn/cfn.yaml")
riffRaffArtifactResources += (baseDirectory.value / "cfn.yaml", s"android-notification-worker-cfn/cfn.yaml"),
riffRaffArtifactResources += (baseDirectory.value / "cfn.yaml", s"registration-cleaning-worker-cfn/cfn.yaml")
)

lazy val fcmworker = project
Expand Down
11 changes: 9 additions & 2 deletions notificationworkerlambda/riff-raff.yaml
Expand Up @@ -7,10 +7,10 @@ deployments:
app: notificationworkerlambda
parameters:
bucket: mobile-notifications-dist
functionNames: [ios-notification-worker-, android-notification-worker-]
functionNames: [ios-notification-worker-, android-notification-worker-, registration-cleaning-worker-]
fileName: notificationworkerlambda.jar
prefixStack: false
dependencies: [ios-notification-worker-cfn, android-notification-worker-cfn]
dependencies: [ios-notification-worker-cfn, android-notification-worker-cfn, registration-cleaning-worker-cfn]
ios-notification-worker-cfn:
type: cloud-formation
app: ios-notification-worker
Expand All @@ -24,4 +24,11 @@ deployments:
parameters:
prependStackToCloudFormationStackName: false
cloudFormationStackName: android-notification-worker-cfn
templatePath: cfn.yaml
registration-cleaning-worker-cfn:
type: cloud-formation
app: registration-cleaning-worker
parameters:
prependStackToCloudFormationStackName: false
cloudFormationStackName: registration-cleaning-worker-cfn
templatePath: cfn.yaml
Expand Up @@ -12,6 +12,7 @@ sealed trait WorkerConfiguration {
}
case class ApnsWorkerConfiguration(jdbcConfig: JdbcConfig, apnsConfig: ApnsConfig) extends WorkerConfiguration
case class FcmWorkerConfiguration(jdbcConfig: JdbcConfig, fcmConfig: FcmConfig) extends WorkerConfiguration
case class CleanerConfiguration(jdbcConfig: JdbcConfig) extends WorkerConfiguration

object Configuration {

Expand Down Expand Up @@ -57,4 +58,11 @@ object Configuration {
)
}

def fetchCleaner(): CleanerConfiguration = {
val config = fetchConfiguration()
CleanerConfiguration(
jdbcConfig(config.getConfig("cleaner"))
)
}

}
@@ -0,0 +1,71 @@
package com.gu.notifications.worker

import cats.effect.{ContextShift, IO}
import cats.effect.IO._
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.gu.notifications.worker.models.InvalidTokens
import db.{DatabaseConfig, RegistrationService}
import doobie.util.transactor.Transactor
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
import fs2.Stream

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

class RegistrationCleaningWorker extends RequestHandler[SQSEvent, Unit] {

implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
val config: CleanerConfiguration = Configuration.fetchCleaner()
val transactor: Transactor[IO] = DatabaseConfig.transactor[IO](config.jdbcConfig)
val registrationService = RegistrationService(transactor)

case class CleaningResult(
deletedRegistrations: Int,
deletedRows: Int,
failures: Int) {
def combined(cleaningResult: CleaningResult): CleaningResult = this.copy(
deletedRegistrations = cleaningResult.deletedRegistrations + deletedRegistrations,
deletedRows = cleaningResult.deletedRows + deletedRows,
failures = cleaningResult.failures + failures
)
}

override def handleRequest(input: SQSEvent, context: Context): Unit = {

def deleteAndSwallowError(token: String): IO[CleaningResult] = {
registrationService.removeAllByToken(token)
.map(deletedRows => CleaningResult(1, deletedRows, 0))
.handleErrorWith {
case NonFatal(e) =>
logger.error(s"Unable to delete token $token", e)
IO.pure(CleaningResult(0, 0, 1))
}
}

def printResult(result: CleaningResult): IO[Unit] = IO {
logger.info(s"Deleted ${result.deletedRows} rows, deleted ${result.deletedRegistrations} registration and failed ${result.failures} time(s)")
}

val tokens = input
.getRecords.asScala
.map(_.getBody)
.map(Json.parse)
.flatMap(_.validate[InvalidTokens].asOpt)
.flatMap(_.tokens)
.toList

Stream
.emits(tokens).covary[IO]
.evalMap(deleteAndSwallowError)
.reduce(_ combined _)
.evalTap(printResult)
.compile
.drain
.unsafeRunSync()
}
Copy link
Contributor

@TBonnin TBonnin Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep using streams to handle the logic. It make it quite easy to follow the logic flow:

    def printResult(result: CleaningResult): IO[Unit] = IO(
      logger.info(s"Deleted ${result.deletedRows} rows, deleted ${result.deletedRegistrations} registration and failed ${result.failures} time(s)")
    )

    val tokens: List[String] = input
      .getRecords.asScala
      .map(_.getBody)
      .map(Json.parse)
      .flatMap(_.validate[InvalidTokens].asOpt)
      .flatMap(_.tokens)
      .toList

    Stream
      .emits(tokens).covary[IO]
      .evalMap(deleteAndSwallowError) // or .parEvalMapUnordered(awsApiConcurrencyLimit)(deleteAndSwallowError) for parallel execution
      .reduce(_ combined _)
      .evalTap(printResult)
      .compile
      .drain
      .unsafeRunSync()

Copy link
Contributor Author

@alexduf alexduf Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, Streams still look like alien technology to me but I'll get used to it.

}
@@ -0,0 +1,11 @@
package com.gu.notifications.worker.models

import play.api.libs.json.{Format, Json}

case class InvalidTokens(
tokens: List[String]
)

object InvalidTokens {
implicit val invalidTokensJF: Format[InvalidTokens] = Json.format[InvalidTokens]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APNS provides a token invalidation timestamp that could be use to avoid discarding a token that would have been refreshed since last time it was fetched. I am not sure we need to handle this edge case though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm hesitating to implement that, I'm worried it might introduce more edge cases