Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #22378: Generate policies for campaigns before it starts officially, delete them after it stops (1 hour delay each) #4666

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ object MainCampaignService {
}
}

class MainCampaignService(repo: CampaignEventRepository, campaignRepo: CampaignRepository, uuidGen: StringUuidGenerator) {
class MainCampaignService(
repo: CampaignEventRepository,
campaignRepo: CampaignRepository,
uuidGen: StringUuidGenerator,
startDelay: Int,
endDelay: Int
) {

private[this] var services: List[CampaignHandler] = Nil
def registerService(s: CampaignHandler) = {
Expand Down Expand Up @@ -174,14 +180,15 @@ class MainCampaignService(repo: CampaignEventRepository, campaignRepo: CampaignR
event.copy(state = Skipped(s"Event was cancelled because campaign is archived. ${reasonMessage}"))
)
case Enabled =>
if (event.start.isAfter(now)) {
val effectiveStart = event.start.minusHours(startDelay)
if (effectiveStart.isAfter(now)) {
for {
_ <-
CampaignLogger.debug(
s"Scheduled Campaign event ${event.id.value} put to sleep until it should start, on ${DateFormaterService
.serialize(event.start)}"
.serialize(effectiveStart)}, ${startDelay} hour${if (startDelay > 1) "s" else ""} before official start date, to ensure policies are correctly dispatched, nothing will be applied on the node"
)
_ <- ZIO.sleep(Duration.fromMillis(event.start.getMillis - now.getMillis))
_ <- ZIO.sleep(Duration.fromMillis(effectiveStart.getMillis - now.getMillis))
} yield {
().succeed
}
Expand All @@ -192,30 +199,35 @@ class MainCampaignService(repo: CampaignEventRepository, campaignRepo: CampaignR
}
}
case Running =>
if (event.start.isAfter(now)) {
val effectiveStart = event.start.minusHours(startDelay)
val effectiveEnd = event.end.plusHours(endDelay)
if (effectiveStart.isAfter(now)) {
for {
// Campaign should be planned, not running
_ <-
CampaignLogger.warn(
s"Campaign event ${event.id.value} was considered Running but we are before it start date, setting state to Schedule and wait for event to start, on ${DateFormaterService
.serialize(event.start)}"
s"Campaign event ${event.id.value} was considered Running but we are before its start date, setting state to Schedule and wait for event to start, on ${DateFormaterService
.serialize(effectiveStart)}, ${startDelay} hour${if (startDelay > 1) "s" else ""} before official start date, to ensure policies are correctly dispatched, nothing will be applied on the node"
)
_ <- repo.saveCampaignEvent(event.copy(state = Scheduled))
_ <- CampaignLogger.debug(
s"Scheduled Campaign event ${event.id.value} put to sleep until it should start, on ${DateFormaterService
.serialize(event.start)}"
)
_ <- ZIO.sleep(Duration.fromMillis(event.start.getMillis - now.getMillis))
_ <-
CampaignLogger.debug(
s"Scheduled Campaign event ${event.id.value} put to sleep until it should start, on ${DateFormaterService
.serialize(effectiveStart)}, ${startDelay} hour${if (startDelay > 1) "s" else ""} before official start date, to ensure policies are correctly dispatched, nothing will be applied on the node"
)

_ <- ZIO.sleep(Duration.fromMillis(effectiveStart.getMillis - now.getMillis))
} yield {
()
}
} else if (event.end.isAfter(now)) {
} else if (effectiveEnd.isAfter(now)) {
for {
_ <- CampaignLogger.debug(
s"Running Campaign event ${event.id.value} put to sleep until it should end, on ${DateFormaterService
.serialize(event.end)}"
)
_ <- ZIO.sleep(Duration.fromMillis(event.end.getMillis - now.getMillis))
_ <-
CampaignLogger.debug(
s"Running Campaign event ${event.id.value} put to sleep until it should end, on ${DateFormaterService
.serialize(effectiveEnd)}, ${endDelay} hour${if (endDelay > 1) "s" else ""} after official end date, so that we can gather results"
)
_ <- ZIO.sleep(Duration.fromMillis(effectiveEnd.getMillis - now.getMillis))
} yield {
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ object RunHooks {
case Some(ok) =>
ok.succeed
case None =>
val msg = s"Hook ${cmdInfo} timed out after ${killTimeout.asJava.toString}"
val msg = s"Hook ${cmdInfo} timed out after ${killTimeout.render}"
PureHooksLogger.LongExecLogger.error(msg) *> Unexpected(msg).fail
}
.untraced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3195,6 +3195,6 @@ class MockCampaign() {
}
}

val mainCampaignService = new MainCampaignService(dumbCampaignEventRepository, repo, new StringUuidGeneratorImpl())
val mainCampaignService = new MainCampaignService(dumbCampaignEventRepository, repo, new StringUuidGeneratorImpl(), 0, 0)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2920,7 +2920,7 @@ object RudderConfig extends Loggable {
.make(campaignSerializer, campaignPath, campaignEventRepo)
.runOrDie(err => new RuntimeException(s"Error during initialization of campaign repository: " + err.fullMsg))

val mainCampaignService = new MainCampaignService(campaignEventRepo, campaignRepo, uuidGen)
val mainCampaignService = new MainCampaignService(campaignEventRepo, campaignRepo, uuidGen, 1, 1)
lazy val jsonReportsAnalyzer = JSONReportsAnalyser(reportsRepository, propertyRepository)

// todo: scheduler interval should be a property
Expand Down