Skip to content
Permalink
Browse files
Clockoffset now shifts, by tenant, the time at which beats are publis…
…hed.
  • Loading branch information
myrle-krantz committed Dec 18, 2017
1 parent a3c200e commit b9d163ae0e21d88f35277def942d5b5afb580418
Showing 8 changed files with 158 additions and 63 deletions.
@@ -25,6 +25,7 @@
import io.mifos.core.test.listener.EventRecorder;
import io.mifos.rhythm.api.v1.client.RhythmManager;
import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.api.v1.domain.ClockOffset;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.service.config.RhythmConfiguration;
@@ -68,7 +69,7 @@
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
classes = {AbstractRhythmTest.TestConfiguration.class},
properties = {"rhythm.user=homer", "rhythm.beatCheckRate=500"}
properties = {"rhythm.user=homer", "rhythm.beatCheckRate=1000"}
)
public class AbstractRhythmTest {

@@ -205,11 +206,20 @@ Beat createBeat(
}

LocalDateTime getExpectedBeatTimestamp(final LocalDateTime fromTime, final Integer alignmentHour) {
return getExpectedBeatTimestamp(fromTime, alignmentHour, new ClockOffset());
}

LocalDateTime getExpectedBeatTimestamp(
final LocalDateTime fromTime,
final Integer alignmentHour,
final ClockOffset clockOffset) {
final LocalDateTime midnight = fromTime.truncatedTo(ChronoUnit.DAYS);
return midnight.plusHours(alignmentHour);
return midnight.plusHours(alignmentHour + clockOffset.getHours())
.plusMinutes(clockOffset.getMinutes())
.plusSeconds(clockOffset.getSeconds());
}

private LocalDateTime getNextTimeStamp(final LocalDateTime fromTime) {
return fromTime.plusDays(1);
}
}
}
@@ -133,7 +133,30 @@ public void twentyFourBeats() throws InterruptedException {
}

@Test
public void shouldChangeTheTenantClockOffset() throws InterruptedException {
public void shouldBeatForMissingDays() throws InterruptedException {
final String applicationIdentifier = "funnybusiness-v6";
final String beatIdentifier = "fiddlebeat";
createBeatForThisHour(applicationIdentifier, beatIdentifier);

final int daysAgo = 10;
final LocalDateTime nextBeat = setBack(applicationIdentifier, beatIdentifier, daysAgo);

for (int i = daysAgo; i > 0; i--) {
Mockito.verify(beatPublisherServiceMock, Mockito.timeout(4_000).times(1))
.publishBeat(
beatIdentifier,
tenantDataStoreContext.getTenantName(),
applicationIdentifier,
nextBeat.minusDays(daysAgo));
}
}

@Test
public void clockOffsetShouldEffectBeatTiming() throws InterruptedException {
final String tenantIdentifier = tenantDataStoreContext.getTenantName();
final String applicationIdentifier = "funnybusiness-v7";
final String beatIdentifier = "fiddlebeat0";

final ClockOffset initialClockOffset = this.testSubject.getClockOffset();
Assert.assertEquals(Integer.valueOf(0), initialClockOffset.getHours());
Assert.assertEquals(Integer.valueOf(0), initialClockOffset.getMinutes());
@@ -147,21 +170,27 @@ public void shouldChangeTheTenantClockOffset() throws InterruptedException {

final ClockOffset changedClockOffset = this.testSubject.getClockOffset();
Assert.assertEquals(offsetToNow, changedClockOffset);
}

@Test
public void shouldBeatForMissingDays() throws InterruptedException {
final String applicationIdentifier = "funnybusiness-v6";
final String beatIdentifier = "fiddlebeat";
createBeatForThisHour(applicationIdentifier, beatIdentifier);
final Beat beat = new Beat();
beat.setIdentifier(beatIdentifier);
beat.setAlignmentHour(0);

final int daysAgo = 10;
final LocalDateTime nextBeat = setBack(applicationIdentifier, beatIdentifier, daysAgo);
final LocalDateTime expectedBeatTimestamp = getExpectedBeatTimestamp(now, 0, offsetToNow);

for (int i = daysAgo; i > 0; i--) {
Mockito.verify(beatPublisherServiceMock, Mockito.timeout(4_000).times(1))
.publishBeat(beatIdentifier, tenantDataStoreContext.getTenantName(), applicationIdentifier, nextBeat.minusDays(daysAgo));
}
Mockito.doReturn(Optional.of("boop")).when(beatPublisherServiceMock)
.requestPermissionForBeats(Matchers.eq(tenantIdentifier), Matchers.eq(applicationIdentifier));
Mockito.when(beatPublisherServiceMock
.publishBeat(beatIdentifier, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp))
.thenReturn(true);

this.testSubject.createBeat(applicationIdentifier, beat);

Assert.assertTrue(this.eventRecorder.wait(EventConstants.POST_BEAT, new BeatEvent(applicationIdentifier, beat.getIdentifier())));

Mockito.verify(beatPublisherServiceMock, Mockito.timeout(10_000).times(1)).publishBeat(beatIdentifier, tenantIdentifier, applicationIdentifier, expectedBeatTimestamp);

this.testSubject.setClockOffset(initialClockOffset);
Assert.assertTrue(this.eventRecorder.wait(EventConstants.PUT_CLOCKOFFSET, initialClockOffset));
}

@Transactional
@@ -18,6 +18,7 @@
import io.mifos.core.command.annotation.Aggregate;
import io.mifos.core.command.annotation.CommandHandler;
import io.mifos.core.command.annotation.CommandLogLevel;
import io.mifos.rhythm.api.v1.domain.ClockOffset;
import io.mifos.rhythm.api.v1.events.BeatEvent;
import io.mifos.rhythm.api.v1.events.EventConstants;
import io.mifos.rhythm.service.ServiceConstants;
@@ -26,6 +27,7 @@
import io.mifos.rhythm.service.internal.mapper.BeatMapper;
import io.mifos.rhythm.service.internal.repository.BeatEntity;
import io.mifos.rhythm.service.internal.repository.BeatRepository;
import io.mifos.rhythm.service.internal.service.ClockOffsetService;
import io.mifos.rhythm.service.internal.service.IdentityPermittableGroupService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,18 +42,21 @@
public class BeatCommandHandler {
private final IdentityPermittableGroupService identityPermittableGroupService;
private final BeatRepository beatRepository;
private final ClockOffsetService clockOffsetService;
private final EventHelper eventHelper;
private final Logger logger;

@Autowired
public BeatCommandHandler(
final IdentityPermittableGroupService identityPermittableGroupService,
final BeatRepository beatRepository,
final EventHelper eventHelper,
@Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
final IdentityPermittableGroupService identityPermittableGroupService,
final BeatRepository beatRepository,
final ClockOffsetService clockOffsetService,
final EventHelper eventHelper,
@Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
super();
this.identityPermittableGroupService = identityPermittableGroupService;
this.beatRepository = beatRepository;
this.clockOffsetService = clockOffsetService;
this.eventHelper = eventHelper;
this.logger = logger;
}
@@ -70,18 +75,20 @@ public void process(final CreateBeatCommand createBeatCommand) {
//stuff that should happen in the transaction.
@SuppressWarnings("WeakerAccess")
@Transactional
public void processCreateBeatCommand(CreateBeatCommand createBeatCommand) {
public void processCreateBeatCommand(final CreateBeatCommand createBeatCommand) {
final boolean applicationHasRequestForAccessPermission = identityPermittableGroupService.checkThatApplicationHasRequestForAccessPermission(
createBeatCommand.getTenantIdentifier(), createBeatCommand.getApplicationIdentifier());
createBeatCommand.getTenantIdentifier(), createBeatCommand.getApplicationIdentifier());
if (!applicationHasRequestForAccessPermission) {
logger.info("Rhythm needs permission to publish beats to application, but couldn't request that permission for tenant '{}' and application '{}'.",
createBeatCommand.getTenantIdentifier(), createBeatCommand.getApplicationIdentifier());
createBeatCommand.getTenantIdentifier(), createBeatCommand.getApplicationIdentifier());
}
final ClockOffset clockOffset = clockOffsetService.findByTenantIdentifier(createBeatCommand.getTenantIdentifier());

final BeatEntity entity = BeatMapper.map(
createBeatCommand.getTenantIdentifier(),
createBeatCommand.getApplicationIdentifier(),
createBeatCommand.getInstance());
createBeatCommand.getTenantIdentifier(),
createBeatCommand.getApplicationIdentifier(),
createBeatCommand.getInstance(),
clockOffset);
this.beatRepository.save(entity);
}

@@ -16,10 +16,11 @@
package io.mifos.rhythm.service.internal.mapper;

import io.mifos.rhythm.api.v1.domain.Beat;
import io.mifos.rhythm.api.v1.domain.ClockOffset;
import io.mifos.rhythm.service.internal.repository.BeatEntity;

import java.time.Clock;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@@ -42,14 +43,22 @@ static List<Beat> map(final List<BeatEntity> entities) {
return ret;
}

static BeatEntity map(final String tenantIdentifier, final String applicationIdentifier, final Beat instance) {
static BeatEntity map(
final String tenantIdentifier,
final String applicationIdentifier,
final Beat instance,
final ClockOffset offset) {
final BeatEntity ret = new BeatEntity();
ret.setBeatIdentifier(instance.getIdentifier());
ret.setTenantIdentifier(tenantIdentifier);
ret.setApplicationIdentifier(applicationIdentifier);
ret.setAlignmentHour(instance.getAlignmentHour());
//First beat is today. If it's in the past, it will be created nearly immediately.
ret.setNextBeat(LocalDateTime.now(ZoneId.of("UTC")).truncatedTo(ChronoUnit.DAYS).plusHours(instance.getAlignmentHour()));
//First beat is today. If it's in the past, it will be published nearly immediately.
ret.setNextBeat(LocalDateTime.now(Clock.systemUTC())
.truncatedTo(ChronoUnit.DAYS)
.plusHours(instance.getAlignmentHour() + offset.getHours())
.plusMinutes(offset.getMinutes())
.plusSeconds(offset.getSeconds()));
return ret;
}
}
@@ -33,9 +33,9 @@ public ClockOffsetService(final ClockOffsetRepository clockOffsetRepository) {
this.clockOffsetRepository = clockOffsetRepository;
}

public ClockOffset findByTenant(final String tenantIdentifier) {
public ClockOffset findByTenantIdentifier(final String tenantIdentifier) {
return clockOffsetRepository.findByTenantIdentifier(tenantIdentifier)
.map(ClockOffsetMapper::map)
.orElseGet(ClockOffset::new);
.orElseGet(ClockOffset::new); //If none is set, use 0,0,0
}
}
@@ -15,6 +15,7 @@
*/
package io.mifos.rhythm.service.internal.service;

import io.mifos.rhythm.api.v1.domain.ClockOffset;
import io.mifos.rhythm.service.ServiceConstants;
import io.mifos.rhythm.service.internal.repository.BeatEntity;
import io.mifos.rhythm.service.internal.repository.BeatRepository;
@@ -27,8 +28,8 @@
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Nonnull;
import java.time.Clock;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.function.Predicate;
@@ -37,23 +38,25 @@
/**
* @author Myrle Krantz
*/
@SuppressWarnings({"unused", "WeakerAccess"})
@Component
public class Drummer {
private final IdentityPermittableGroupService identityPermittableGroupService;
private final BeatPublisherService beatPublisherService;
private final BeatRepository beatRepository;
private final ClockOffsetService clockOffsetService;
private final Logger logger;

@Autowired
public Drummer(
final IdentityPermittableGroupService identityPermittableGroupService,
final BeatPublisherService beatPublisherService,
final BeatRepository beatRepository,
final ClockOffsetService clockOffsetService,
@Qualifier(ServiceConstants.LOGGER_NAME) final Logger logger) {
this.identityPermittableGroupService = identityPermittableGroupService;
this.beatPublisherService = beatPublisherService;
this.beatRepository = beatRepository;
this.clockOffsetService = clockOffsetService;
this.logger = logger;
}

@@ -63,7 +66,7 @@ public synchronized void checkForBeatsNeeded() {
//In it's current form this function cannot be run in multiple instances of the same service. We need to get
//locking on selected entries corrected here, before this will work.
try {
final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
final LocalDateTime now = LocalDateTime.now(Clock.systemUTC());
//Get beats from the last two hours in case restart/start happens close to hour begin.
final Stream<BeatEntity> beats = beatRepository.findByNextBeatBefore(now);
beats.forEach((beat) -> {
@@ -86,6 +89,7 @@ public synchronized void checkForBeatsNeeded() {
beat.setNextBeat(y);
beatRepository.save(beat);
});
logger.info("Beat updated to {}.", beat);
}
});

@@ -96,14 +100,15 @@ public synchronized void checkForBeatsNeeded() {
}
}

public Optional<LocalDateTime> checkBeatForPublish(
final LocalDateTime now,
final String beatIdentifier,
final String tenantIdentifier,
final String applicationIdentifier,
final Integer alignmentHour,
final LocalDateTime nextBeat) {
return checkBeatForPublishHelper(now, alignmentHour, nextBeat,
private Optional<LocalDateTime> checkBeatForPublish(
final LocalDateTime now,
final String beatIdentifier,
final String tenantIdentifier,
final String applicationIdentifier,
final Integer alignmentHour,
final LocalDateTime nextBeat) {
final ClockOffset clockOffset = clockOffsetService.findByTenantIdentifier(tenantIdentifier);
return checkBeatForPublishHelper(now, alignmentHour, nextBeat, clockOffset,
x -> beatPublisherService.publishBeat(beatIdentifier, tenantIdentifier, applicationIdentifier, x));
}

@@ -112,21 +117,22 @@ static Optional<LocalDateTime> checkBeatForPublishHelper(
final LocalDateTime now,
final Integer alignmentHour,
final LocalDateTime nextBeat,
final ClockOffset clockOffset,
final Predicate<LocalDateTime> publishSucceeded) {
final long numberOfBeatPublishesNeeded = getNumberOfBeatPublishesNeeded(now, nextBeat);
if (numberOfBeatPublishesNeeded == 0)
return Optional.empty();

final Optional<LocalDateTime> firstFailedBeat = Stream.iterate(nextBeat,
x -> incrementToAlignment(x, alignmentHour))
x -> incrementToAlignment(x, alignmentHour, clockOffset))
.limit(numberOfBeatPublishesNeeded)
.filter(x -> !publishSucceeded.test(x))
.findFirst();

if (firstFailedBeat.isPresent())
return firstFailedBeat;
else
return Optional.of(incrementToAlignment(now, alignmentHour));
return Optional.of(incrementToAlignment(now, alignmentHour, clockOffset));
}

static long getNumberOfBeatPublishesNeeded(final LocalDateTime now, final @Nonnull LocalDateTime nextBeat) {
@@ -136,8 +142,15 @@ static long getNumberOfBeatPublishesNeeded(final LocalDateTime now, final @Nonnu
return Math.max(1, nextBeat.until(now, ChronoUnit.DAYS));
}

static LocalDateTime incrementToAlignment(final LocalDateTime toIncrement, final Integer alignmentHour)
static LocalDateTime incrementToAlignment(
final LocalDateTime toIncrement,
final Integer alignmentHour,
final ClockOffset clockOffset)
{
return toIncrement.plusDays(1).truncatedTo(ChronoUnit.DAYS).plusHours(alignmentHour);
return toIncrement.truncatedTo(ChronoUnit.DAYS)
.plusDays(1)
.plusHours(alignmentHour + clockOffset.getHours())
.plusMinutes(clockOffset.getMinutes())
.plusSeconds(clockOffset.getSeconds());
}
}
@@ -57,7 +57,7 @@ public ClockOffsetRestController(
public
@ResponseBody
ResponseEntity<ClockOffset> getClockOffset(@RequestHeader(TENANT_HEADER) final String tenantIdentifier) {
return ResponseEntity.ok(this.clockOffsetService.findByTenant(tenantIdentifier));
return ResponseEntity.ok(this.clockOffsetService.findByTenantIdentifier(tenantIdentifier));
}

@Permittable(value = AcceptedTokenType.SYSTEM)

0 comments on commit b9d163a

Please sign in to comment.