/
CleanupCron.scala
99 lines (80 loc) · 4.18 KB
/
CleanupCron.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//: ----------------------------------------------------------------------------
//: Copyright (C) 2017 Verizon. All Rights Reserved.
//:
//: Licensed under the Apache License, Version 2.0 (the "License");
//: you may not use this file except in compliance with the License.
//: You may obtain a copy of the License at
//:
//: http://www.apache.org/licenses/LICENSE-2.0
//:
//: Unless required by applicable law or agreed to in writing, software
//: distributed under the License is distributed on an "AS IS" BASIS,
//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//: See the License for the specific language governing permissions and
//: limitations under the License.
//:
//: ----------------------------------------------------------------------------
package nelson
package cleanup
import nelson.Datacenter.Deployment
import nelson.routing.RoutingTable
import cats.syntax.apply._
import cats.effect.{Effect, IO}
import nelson.CatsHelpers._
import fs2.{Scheduler, Stream}
import java.time.Instant
import journal.Logger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scalaz._
import Scalaz._
final case class DeploymentCtx(deployment: Deployment, status: DeploymentStatus, exp: Option[Instant])
object CleanupCron {
import nelson.storage.{run => runs, StoreOp,StoreOpF}
import nelson.Datacenter.Namespace
private val log = Logger[CleanupCron.type]
// exclude Terminated deployments because they don't need to be cleaned up
private val status =
DeploymentStatus.all.toList.filter(_ != DeploymentStatus.Terminated).toNel.yolo("can't be empty")
private val routables = DeploymentStatus.routable.toList
/* Gets all deployments (exluding terminated) and routing graph for a namespace */
def getDeploymentsForNamespace(ns: Namespace): StoreOpF[Vector[(Namespace,DeploymentCtx)]] =
for {
dps <- StoreOp.listDeploymentsForNamespaceByStatus(ns.id, status)
dcx <- dps.toVector.traverse {
case (d,s) => StoreOp.findDeploymentExpiration(d.id).map(DeploymentCtx(d,s,_))
}
} yield dcx.map(d => (ns,d))
/* Gets all deployment (excluding terminated) and routing graph for each namespace within a Datacenter. */
def getDeploymentsForDatacenter(dc: Datacenter): StoreOpF[Vector[CleanupRow]] =
for {
_ <- log.debug(s"cleanup cron running for ${dc.name}").point[StoreOpF]
ns <- StoreOp.listNamespacesForDatacenter(dc.name)
gr <- RoutingTable.generateRoutingTables(dc.name)
ds <- ns.toVector.traverseM(ns => getDeploymentsForNamespace(ns))
} yield ds.flatMap { case (ns, dcx) => gr.find(_._1 == ns).map(x => (dc,ns,dcx,x._2)) }
/* Gets all deployments (excluding terminated) for all datacenters and namespaces */
def getDeployments(cfg: NelsonConfig): IO[Vector[CleanupRow]] =
runs(cfg.storage, cfg.datacenters.toVector.traverseM(dc => getDeploymentsForDatacenter(dc)))
def routable(d: DeploymentCtx): Boolean =
routables.exists(_ == d.status)
def process(cfg: NelsonConfig): Stream[IO, CleanupRow] =
Stream.eval(getDeployments(cfg)).flatMap(rs => Stream.emits(rs).covary[IO])
.map(a => if (routable(a._3)) Right(a) else Left(a))
.throughO(ExpirationPolicyProcess.expirationProcess(cfg)) // only apply expiration policy to the rhs
.map(_.fold(identity, identity))
.filter(x => GarbageCollector.expired(x._3)) // mark only expired deployments
.through(GarbageCollector.mark(cfg))
def refresh(cfg: NelsonConfig): Stream[IO,Duration] =
Stream.repeatEval(IO(cfg.cleanup.cleanupDelay)).flatMap(d =>
Scheduler.fromScheduledExecutorService(cfg.pools.schedulingPool).awakeEvery[IO](d)(Effect[IO], cfg.pools.schedulingExecutor).head)
/*
* This is the entry point for the cleanup pipeline. The pipeline is run at
* a predetermined cadence and is responsible for expiration policy management,
* marking deploymnets as garbage and running the destroy workflow
* to decommission deployments.
*/
def pipeline(cfg: NelsonConfig)(implicit ec: ExecutionContext): Stream[IO, Unit] =
(refresh(cfg) *> process(cfg).attempt).observeW(cfg.auditor.errorSink).stripW.to(Reaper.reap(cfg))
}