New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add registration cleaning worker #239
Conversation
.map(_.getBody) | ||
.map(Json.parse) | ||
.flatMap(_.validate[InvalidTokens].asOpt) | ||
.foldLeft(List.empty[String]){ case (agg, value) => agg ++ value.tokens } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not .flatMap(_.tokens)
rather than foldLeft
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you mention it, yeah why? 😄
val processReport = traverseAndDelete(tokens, CleaningResult(0, 0, 0)).unsafeRunSync() | ||
|
||
logger.info(s"Deleted ${processReport.deletedRows} rows, deleted ${processReport.deletedRegistrations} registration and failed ${processReport.failures} time(s)") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will keep using streams to handle the logic. It make it quite easy to follow the logic flow:
def printResult(result: CleaningResult): IO[Unit] = IO(
logger.info(s"Deleted ${result.deletedRows} rows, deleted ${result.deletedRegistrations} registration and failed ${result.failures} time(s)")
)
val tokens: List[String] = input
.getRecords.asScala
.map(_.getBody)
.map(Json.parse)
.flatMap(_.validate[InvalidTokens].asOpt)
.flatMap(_.tokens)
.toList
Stream
.emits(tokens).covary[IO]
.evalMap(deleteAndSwallowError) // or .parEvalMapUnordered(awsApiConcurrencyLimit)(deleteAndSwallowError) for parallel execution
.reduce(_ combined _)
.evalTap(printResult)
.compile
.drain
.unsafeRunSync()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, Streams still look like alien technology to me but I'll get used to it.
|
||
object InvalidTokens { | ||
implicit val invalidTokensJF: Format[InvalidTokens] = Json.format[InvalidTokens] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
APNS provides a token invalidation timestamp that could be use to avoid discarding a token that would have been refreshed since last time it was fetched. I am not sure we need to handle this edge case though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I'm hesitating to implement that, I'm worried it might introduce more edge cases
Receives events via SQS.
The events aren't sent yet, I'll add that in a subsequent PR