Permalink
Browse files

Add the ability to replay events

  • Loading branch information...
dadepo committed Aug 27, 2016
1 parent 288374a commit adecd71ca5d8c9542d26ffe82231d5c27d93bfa9
@@ -4,15 +4,30 @@
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.contextsupport.spring.AnnotationDriven;
import org.axonframework.eventhandling.SimpleEventBus;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.ClassNamePrefixClusterSelector;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.ClusterSelector;
import org.axonframework.eventhandling.ClusteringEventBus;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventBusTerminal;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.replay.DiscardingIncomingMessageHandler;
import org.axonframework.eventhandling.replay.IncomingMessageHandler;
import org.axonframework.eventhandling.replay.ReplayingCluster;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.fs.FileSystemEventStore;
import org.axonframework.eventstore.fs.SimpleEventFileResolver;
import org.axonframework.eventstore.jdbc.JdbcEventStore;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.Repository;
import org.axonframework.unitofwork.NoTransactionManager;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
* Created by Dadepo Aderemi.
@@ -21,54 +36,143 @@
@AnnotationDriven
public class AppConfiguration {

@Bean
public DataSource dataSource() {
return DataSourceBuilder
.create()
.username("sa")
.password("")
.url("jdbc:h2:mem:exploredb")
.driverClassName("org.h2.Driver")
.build();
}

/**
* An event sourcing implementation needs a place to store events. i.e. The event Store.
* In our use case we will be storing our events in database, so we configure
* the JdbcEventStore as our EventStore implementation
*
* It should be noted that Axon allows storing the events
* in other persistent mechanism...jdbc, jpa, filesystem etc
*
* @return the {@link EventStore}
*/
@Bean
public EventStore jdbcEventStore() {
return new JdbcEventStore(dataSource());
}

@Bean
public SimpleCommandBus commandBus() {
SimpleCommandBus simpleCommandBus = new SimpleCommandBus();
return simpleCommandBus;
}

/**
* The simple command bus, an implementation of an EventBus
* mostly appropriate in a single JVM, single threaded use case.
* @return the {@link SimpleEventBus}
* A cluster which can be used to "cluster" together event handlers. This implementation is based on
* {@link SimpleCluster} and it would be used to cluster event handlers that would listen to events thrown
* normally within the application.
*
* @return an instance of {@link SimpleCluster}
*/
@Bean
public SimpleEventBus eventBus() {
return new SimpleEventBus();
public Cluster normalCluster() {
SimpleCluster simpleCluster = new SimpleCluster("simpleCluster");
return simpleCluster;
}

/**
* A cluster which can be used to "cluster" together event handlers. This implementation is based on
* {@link SimpleCluster} and it would be used to cluster event handlers that would listen to replayed events.
*
* As can be seen, the bean is just a simple implementation of {@link SimpleCluster} there is nothing about
* it that says it would be able to handle replayed events. The bean definition #replayCluster is what makes
* this bean able to handle replayed events.
*
* @return an instance of {@link SimpleCluster}
*/
@Bean
public DefaultCommandGateway commandGateway() {
return new DefaultCommandGateway(commandBus());
public Cluster replay() {
SimpleCluster simpleCluster = new SimpleCluster("replayCluster");
return simpleCluster;
}

/**
* An event sourcing implementation needs a place to store events. i.e. The event Store.
* In our use case we will be storing our events in a file system, so we configure
* the FileSystemEventStore as our EventStore implementation
* Takes the #replay() cluster and wraps it with a Replaying Cluser, turning the event handlers that are registered
* to be able to pick up events when events are replayed.
*
* It should be noted that Axon allows storing the events
* in other persistent mechanism...jdbc, jpa etc
* @return an instance of {@link ReplayingCluster}
*/
@Bean
public ReplayingCluster replayCluster() {
IncomingMessageHandler incomingMessageHandler = new DiscardingIncomingMessageHandler();
EventStoreManagement eventStore = (EventStoreManagement) jdbcEventStore();
return new ReplayingCluster(replay(), eventStore, new NoTransactionManager(),0,incomingMessageHandler);
}

/**
* This configuration registers event handlers with the two defined clusters
*
* @return the {@link EventStore}
* @return an instance of {@link ClusterSelector}
*/
@Bean
public ClusterSelector clusterSelector() {
Map<String, Cluster> clusterMap = new HashMap<>();
clusterMap.put("exploringaxon.eventhandler", normalCluster());
clusterMap.put("exploringaxon.replay", replayCluster());
return new ClassNamePrefixClusterSelector(clusterMap);
}


/**
* This replaces the simple event bus that was initially used. The clustering event bus is needed to be able
* to route events to event handlers in the clusters. It is configured with a {@link EventBusTerminal} defined
* by #terminal(). The EventBusTerminal contains the configuration rules which determines which cluster gets an
* incoming event
*
* @return a {@link ClusteringEventBus} implementation of {@link EventBus}
*/
@Bean
public EventStore eventStore() {
EventStore eventStore = new FileSystemEventStore(new SimpleEventFileResolver(new File("./events")));
return eventStore;
public EventBus clusteringEventBus() {
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(clusterSelector(), terminal());
return clusteringEventBus;
}

/**
* An {@link EventBusTerminal} which publishes application domain events onto the normal cluster
*
* @return an instance of {@link EventBusTerminal}
*/
@Bean
public EventBusTerminal terminal() {
return new EventBusTerminal() {
@Override
public void publish(EventMessage... events) {
normalCluster().publish(events);
}
@Override
public void onClusterCreated(Cluster cluster) {

}
};
}

@Bean
public DefaultCommandGateway commandGateway() {
return new DefaultCommandGateway(commandBus());
}

/**
* Our aggregate root is now created from stream of events and not from a representation in a persistent mechanism,
* thus we need a repository that can handle the retrieving of our aggregate root from the stream of events.
*
* We configure the EventSourcingRepository which does exactly this. We supply it with the event store
* @return {@link EventSourcingRepository}
* @return a {@link EventSourcingRepository} implementation of {@link Repository}
*/
@Bean
public EventSourcingRepository eventSourcingRepository() {
EventSourcingRepository eventSourcingRepository = new EventSourcingRepository(Account.class, eventStore());
eventSourcingRepository.setEventBus(eventBus());
public Repository<Account> eventSourcingRepository() {
EventSourcingRepository eventSourcingRepository = new EventSourcingRepository(Account.class, jdbcEventStore());
eventSourcingRepository.setEventBus(clusteringEventBus());
return eventSourcingRepository;
}
}
@@ -1,5 +1,8 @@
package exploringaxon.api.event;

import java.time.LocalDateTime;
import java.time.ZoneId;

/**
* Event Class that communicates that an account has been credited
*
@@ -10,11 +13,14 @@
private final String accountNo;
private final Double amountCredited;
private final Double balance;
private final long timeStamp;

public AccountCreditedEvent(String accountNo, Double amountCredited, Double balance) {
this.accountNo = accountNo;
this.amountCredited = amountCredited;
this.balance = balance;
ZoneId zoneId = ZoneId.systemDefault();
this.timeStamp = LocalDateTime.now().atZone(zoneId).toEpochSecond();
}

public String getAccountNo() {
@@ -28,4 +34,8 @@ public Double getAmountCredited() {
public Double getBalance() {
return balance;
}

public long getTimeStamp() {
return timeStamp;
}
}
@@ -1,5 +1,8 @@
package exploringaxon.api.event;

import java.time.LocalDateTime;
import java.time.ZoneId;

/**
* Event Class that communicates that an account has been debited
*
@@ -9,11 +12,14 @@
private final String accountNo;
private final Double amountDebited;
private final Double balance;
private final long timeStamp;

public AccountDebitedEvent(String accountNo, Double amountDebited, Double balance) {
this.accountNo = accountNo;
this.amountDebited = amountDebited;
this.balance = balance;
ZoneId zoneId = ZoneId.systemDefault();
this.timeStamp = LocalDateTime.now().atZone(zoneId).toEpochSecond();
}

public String getAccountNo() {
@@ -27,4 +33,8 @@ public Double getAmountDebited() {
public Double getBalance() {
return balance;
}

public long getTimeStamp() {
return timeStamp;
}
}
@@ -0,0 +1,51 @@
package exploringaxon.replay;

import exploringaxon.api.event.AccountCreditedEvent;
import exploringaxon.api.event.AccountDebitedEvent;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventhandling.replay.ReplayAware;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

@Component
public class AccountCreditedReplayEventHandler implements ReplayAware {

List<String> audit = new ArrayList<>();

@EventHandler
public void handle(AccountCreditedEvent event) {
String auditMsg = String.format("%s credited to account with account no {%s} on %s",
event.getAmountCredited(), event.getAccountNo(), formatTimestampToString(event.getTimeStamp()));
audit.add(auditMsg);
}

@EventHandler
public void handle(AccountDebitedEvent event) {
String auditMsg = String.format("%s debited from account with account no {%s} on %s",
event.getAmountDebited(), event.getAccountNo(), formatTimestampToString(event.getTimeStamp()));
audit.add(auditMsg);
}

public List<String> getAudit() {
return audit;
}

@Override
public void beforeReplay() {
audit.clear();
}

@Override
public void afterReplay() {
}

@Override
public void onReplayFailed(Throwable cause) {}

private String formatTimestampToString(long timestamp) {
return new SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(timestamp * 1000);
}
}
@@ -2,8 +2,11 @@

import exploringaxon.api.command.CreditAccountCommand;
import exploringaxon.api.command.DebitAccountCommand;
import exploringaxon.replay.AccountCreditedReplayEventHandler;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.eventhandling.replay.ReplayingCluster;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.ui.Model;
@@ -17,6 +20,14 @@
@Controller
public class IndexController {

@Autowired
@Qualifier("replayCluster")
ReplayingCluster replayCluster;

@Autowired
AccountCreditedReplayEventHandler replayEventHandler;


@Autowired
private CommandGateway commandGateway;

@@ -26,6 +37,11 @@ public String index(Model model) {
return "index";
}

@RequestMapping("/about")
public String about() {
return "about";
}


@RequestMapping("/debit")
@Transactional
@@ -42,4 +58,11 @@ public void doCredit(@RequestParam("acc") String accountNumber, @RequestParam("a
CreditAccountCommand creditAccountCommandCommand = new CreditAccountCommand(accountNumber, amount);
commandGateway.send(creditAccountCommandCommand);
}

@RequestMapping("/events")
public String doReplay(Model model) {
replayCluster.startReplay();
model.addAttribute("events",replayEventHandler.getAudit());
return "events";
}
}
@@ -0,0 +1,28 @@
CREATE TABLE domainevententry
(
aggregateidentifier VARCHAR(255) NOT NULL,
sequencenumber BIGINT NOT NULL,
type VARCHAR(255) NOT NULL,
eventidentifier VARCHAR(255) NOT NULL,
metadata BYTEA,
payload BYTEA,
payloadrevision VARCHAR(255),
payloadtype VARCHAR(255) NOT NULL,
timestamp VARCHAR(255) NOT NULL,
CONSTRAINT newdomainevententry_pkey PRIMARY KEY (aggregateidentifier, sequencenumber, type)
);


CREATE TABLE snapshotevententry
(
aggregateidentifier VARCHAR(255) NOT NULL,
sequencenumber BIGINT NOT NULL,
type VARCHAR(255) NOT NULL,
eventidentifier VARCHAR(255) NOT NULL,
payloadrevision VARCHAR(255),
payloadtype VARCHAR(255) NOT NULL,
timestamp VARCHAR(255) NOT NULL,
metadata BYTEA,
payload BYTEA NOT NULL,
CONSTRAINT snapshotevententry_pkey1 PRIMARY KEY (aggregateidentifier, sequencenumber, type)
);
Oops, something went wrong.

0 comments on commit adecd71

Please sign in to comment.