Skip to content
Permalink
Browse files
Started work on keeping track of which events have been sent when.
  • Loading branch information
mifosio-04-04-2018 committed May 15, 2017
1 parent 5a34f48 commit 123a0db3f1b5a1ce86ff460f019275f9f8448a5c
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 8 deletions.
@@ -0,0 +1,55 @@
/*
* Copyright 2017 The Mifos Initiative.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mifos.rhythm.service.internal.command;

import java.time.LocalDateTime;

/**
* @author Myrle Krantz
*/
public class CheckPublishBeatCommand {
private final String tenantIdentifier;
private final String applicationName;
private final String beatIdentifier;
private final LocalDateTime publishedSince;

public CheckPublishBeatCommand(
final String tenantIdentifier,
final String applicationName,
final String beatIdentifier,
final LocalDateTime publishedSince) {
this.tenantIdentifier = tenantIdentifier;
this.applicationName = applicationName;
this.beatIdentifier = beatIdentifier;
this.publishedSince = publishedSince;
}

public String getTenantIdentifier() {
return tenantIdentifier;
}

public String getApplicationName() {
return applicationName;
}

public String getBeatIdentifier() {
return beatIdentifier;
}

public LocalDateTime getPublishedSince() {
return publishedSince;
}
}
@@ -0,0 +1,100 @@
/*
* Copyright 2017 The Mifos Initiative.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mifos.rhythm.service.internal.command.handler;

import io.mifos.core.command.annotation.Aggregate;
import io.mifos.core.command.annotation.CommandHandler;
import io.mifos.core.lang.ServiceException;
import io.mifos.rhythm.service.config.RhythmProperties;
import io.mifos.rhythm.service.internal.command.CheckPublishBeatCommand;
import io.mifos.rhythm.service.internal.repository.BeatEntity;
import io.mifos.rhythm.service.internal.repository.BeatRepository;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Nullable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.stream.Stream;

/**
* @author Myrle Krantz
*/
@SuppressWarnings("unused")
@Aggregate
public class PublishBeatCommandHandler {

private final BeatRepository publishedBeatRepository;
private final RhythmProperties properties;
private final Logger logger;

public PublishBeatCommandHandler(
final BeatRepository publishedBeatRepository,
final RhythmProperties properties,
final Logger logger) {
this.publishedBeatRepository = publishedBeatRepository;
this.properties = properties;
this.logger = logger;
}

@CommandHandler
@Transactional
public void process(final CheckPublishBeatCommand checkPublishBeatCommand) {
final BeatEntity beat
= this.publishedBeatRepository.findByTenantIdentifierAndApplicationNameAndBeatIdentifier(
checkPublishBeatCommand.getTenantIdentifier(),
checkPublishBeatCommand.getApplicationName(),
checkPublishBeatCommand.getBeatIdentifier())
.orElseThrow(
() -> ServiceException.notFound("Could not publish the beat because it was not found. Tenant '{}', Application '{}', Beat '{}'",
checkPublishBeatCommand.getTenantIdentifier(),
checkPublishBeatCommand.getApplicationName(),
checkPublishBeatCommand.getBeatIdentifier()));


getTimesNeedingEvents(beat.getLastPublishedOn(), checkPublishBeatCommand.getPublishedSince(), beat.getAlignmentHour())
.forEach(x -> {
//TODO: Actually send the event.
logger.info("This is where the beat {} should be published with timestamp {} under user {}.", beat, x, properties.getUser());
beat.setLastPublishedOn(LocalDateTime.now(ZoneId.of("UTC")));
this.publishedBeatRepository.save(beat);
});
}

static Stream<LocalDateTime> getTimesNeedingEvents(
final @Nullable LocalDateTime lastEventTime,
final LocalDateTime mostRecentEventRequired,
final Integer alignmentHour) {
if (lastEventTime == null) {
return Stream.of(mostRecentEventRequired);
}

if (lastEventTime.compareTo(mostRecentEventRequired) >= 0) {
return Stream.empty();
}

final long days = lastEventTime.until(mostRecentEventRequired, ChronoUnit.DAYS);
return Stream.iterate(incrementToAlignment(lastEventTime, alignmentHour),
(lastPublishDate) -> incrementToAlignment(lastPublishDate, alignmentHour))
.limit(days);
}

static LocalDateTime incrementToAlignment(final LocalDateTime toIncrement, final Integer alignmentHour)
{
return toIncrement.plusDays(1).truncatedTo(ChronoUnit.DAYS).plusHours(alignmentHour);
}
}
@@ -15,7 +15,10 @@
*/
package io.mifos.rhythm.service.internal.repository;

import io.mifos.core.mariadb.util.LocalDateTimeConverter;

import javax.persistence.*;
import java.time.LocalDateTime;
import java.util.Objects;

/**
@@ -30,18 +33,22 @@ public class BeatEntity {
@Column(name = "id")
private Long id;

@Column(name = "beat_identifier")
@Column(name = "beat_identifier", nullable = false)
private String beatIdentifier;

@Column(name = "tenant_identifier")
@Column(name = "tenant_identifier", nullable = false)
private String tenantIdentifier;

@Column(name = "application_name")
@Column(name = "application_name", nullable = false)
private String applicationName;

@Column(name = "alignment_hour")
@Column(name = "alignment_hour", nullable = false)
private Integer alignmentHour;

@Column(name = "last_published_on")
@Convert(converter = LocalDateTimeConverter.class)
private LocalDateTime lastPublishedOn;

public BeatEntity() {
super();
}
@@ -86,17 +93,38 @@ public void setAlignmentHour(Integer alignmentHour) {
this.alignmentHour = alignmentHour;
}

public LocalDateTime getLastPublishedOn() {
return lastPublishedOn;
}

public void setLastPublishedOn(LocalDateTime lastPublishedOn) {
this.lastPublishedOn = lastPublishedOn;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof BeatEntity)) return false;
if (o == null || getClass() != o.getClass()) return false;
BeatEntity that = (BeatEntity) o;
return Objects.equals(getBeatIdentifier(), that.getBeatIdentifier()) &&
Objects.equals(getApplicationName(), that.getApplicationName());
return Objects.equals(beatIdentifier, that.beatIdentifier) &&
Objects.equals(tenantIdentifier, that.tenantIdentifier) &&
Objects.equals(applicationName, that.applicationName);
}

@Override
public int hashCode() {
return Objects.hash(getBeatIdentifier(), getApplicationName());
return Objects.hash(beatIdentifier, tenantIdentifier, applicationName);
}

@Override
public String toString() {
return "BeatEntity{" +
"id=" + id +
", beatIdentifier='" + beatIdentifier + '\'' +
", tenantIdentifier='" + tenantIdentifier + '\'' +
", applicationName='" + applicationName + '\'' +
", alignmentHour=" + alignmentHour +
", lastPublishedOn=" + lastPublishedOn +
'}';
}
}
@@ -20,6 +20,7 @@

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

/**
* @author Myrle Krantz
@@ -29,4 +30,5 @@ public interface BeatRepository extends JpaRepository<BeatEntity, Long> {
void deleteByTenantIdentifierAndApplicationName(String tenantIdentifier, String applicationName);
List<BeatEntity> findByTenantIdentifierAndApplicationName(String tenantIdentifier, String applicationName);
Optional<BeatEntity> findByTenantIdentifierAndApplicationNameAndBeatIdentifier(String tenantIdentifier, String applicationName, String identifier);
Stream<BeatEntity> findByAlignmentHour(int alignmentHour);
}
@@ -0,0 +1,61 @@
/*
* Copyright 2017 The Mifos Initiative.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mifos.rhythm.service.internal.scheduler;

import io.mifos.core.command.gateway.CommandGateway;
import io.mifos.rhythm.service.internal.command.CheckPublishBeatCommand;
import io.mifos.rhythm.service.internal.repository.BeatEntity;
import io.mifos.rhythm.service.internal.repository.BeatRepository;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.stream.Stream;

/**
* @author Myrle Krantz
*/
@SuppressWarnings("unused")
@Component
public class Drummer {
private final BeatRepository beatRepository;
private final CommandGateway commandGateway;

public Drummer(
final BeatRepository beatRepository,
final CommandGateway commandGateway) {
this.beatRepository = beatRepository;
this.commandGateway = commandGateway;
}

//@Scheduled(fixedRate = 300_000) //TimeUnit.MINUTES.toMillis(5)
@Scheduled(fixedRate = 500)
public void checkForBeatsNeeded() {
final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
int alignmentHour = now.getHour();
final Stream<BeatEntity> beats = beatRepository.findByAlignmentHour(alignmentHour);
beats.forEach((beat) -> publishBeat(beat, now));
}

private void publishBeat(final BeatEntity beat, final LocalDateTime now) {
final LocalDateTime topOfToday = now.truncatedTo(ChronoUnit.DAYS);
final LocalDateTime publishedSince = topOfToday.plusHours(beat.getAlignmentHour());
commandGateway.process(
new CheckPublishBeatCommand(beat.getTenantIdentifier(), beat.getApplicationName(), beat.getBeatIdentifier(), publishedSince));
}
}
@@ -23,6 +23,7 @@ CREATE TABLE khepri_beats (
application_name VARCHAR(64) NOT NULL,
beat_identifier VARCHAR(32) NOT NULL,
alignment_hour INT NOT NULL,
last_published_on TIMESTAMP(3) NULL,
CONSTRAINT khepri_beats_uq UNIQUE (tenant_identifier, application_name, beat_identifier),
CONSTRAINT khepri_beats_pk PRIMARY KEY (id)
);
@@ -0,0 +1,51 @@
/*
* Copyright 2017 The Mifos Initiative.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.mifos.rhythm.service.internal.command.handler;

import org.junit.Assert;
import org.junit.Test;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.stream.Stream;

/**
* @author Myrle Krantz
*/
public class PublishBeatCommandHandlerTest {
@Test
public void incrementToAlignment() {
final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
final LocalDateTime tomorrow = PublishBeatCommandHandler.incrementToAlignment(now, 3);

Assert.assertEquals(tomorrow.minusDays(1).truncatedTo(ChronoUnit.DAYS), now.truncatedTo(ChronoUnit.DAYS));
Assert.assertEquals(3, tomorrow.getHour());
}

@Test
public void getTimesNeedingEvents() {
final LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
final Stream<LocalDateTime> noStartTime = PublishBeatCommandHandler.getTimesNeedingEvents(null, now.plus(3, ChronoUnit.DAYS), 0);
Assert.assertEquals(1, noStartTime.count());

final Stream<LocalDateTime> threeDaysStartTime = PublishBeatCommandHandler.getTimesNeedingEvents(now, now.plus(3, ChronoUnit.DAYS), 0);
Assert.assertEquals(3, threeDaysStartTime.count());

final Stream<LocalDateTime> eventsAlreadyDone = PublishBeatCommandHandler.getTimesNeedingEvents(now, now.minus(1, ChronoUnit.DAYS), 0);
Assert.assertEquals(0, eventsAlreadyDone.count());
}
}

0 comments on commit 123a0db

Please sign in to comment.