Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud CommandRouter #214

Merged
merged 18 commits into from
Dec 6, 2016
Merged

Spring Cloud CommandRouter #214

merged 18 commits into from
Dec 6, 2016

Conversation

smcvb
Copy link
Member

@smcvb smcvb commented Nov 25, 2016

An implementation of the CommandRouter based on Spring-Cloud-Commons.
When a Spring-Cloud compatible server is present and additional applications annotated with @EnableDiscoveryClient are fired up, ServiceInstance will be created.
The SpringCloudCommandRouter updates the LocalServiceInstance it's Metadata with the load factor and command filter. The SpringCloudCommandRouter has an @eventlistener annotated method to listen to every HearthbeatEvent and will check whether it's set of Axon-nodes has to be updated and will if necessary.

Additionally a SpringHttpCommandBusConnector has been addded for command sending/receiving based on the RestTemplate and RestController.

stevenb added 14 commits November 21, 2016 15:29
Added a basic Spring Cloud Command Router implementation which uses the Spring Cloud
Discovery Client to update it's local command-routing information and retrieve command-routing
information from all other services on 'findDestination(...)'.
This class still needs improvements
Serialize and deserialize the CommandNameFilter Predicate rather than building a comma separated list
and update the ConsistentHash on 'updateMembership(...)' and HearthBeat events
Extract the DispatchMessage and CallbackMessage classes from their JGroups parent classes (respectively DispatchMessage.java and ReplyMessage.java),
because their internals can be reused for a Spring Web specific CommandBusConnector for the dispatch/reply message serialization logic.
As such the DispatchMessage will be renamed to JGroupsDispatchMessage and the ReplyMessage will be renamed to JGroupsReplyMessage
Spring Http implementation of the DispatchMessage and ReplyMessage to be used in the SpringHttpCommandBusConnector
A crude, working implementation of the SpringHttpCommandBusConnector.
It will use a RestTemplate to send commands and will be annotated with @RestController to be able to receive commands through a SpringHttpDispatchMessage.
The callbacks will be implemented through the use of returning a ComparableFuture containing a SpringHttpReplyMessage.
Create a more sustainable and cleaner implementation  of the SpringHttpCommandBusConnector by deduplicating some code and by adding sensible logging/exception
Add some generics to the SpringHttpReplyMessage to have the ReplyMessage return a generic result.
Clean up the private class SpringHttpFutureCallback by removing redundant overrides and renaming it to SpringHttpReplyFutureCallback.
Clean up the SpringCloudCommandRouter by:
- Instantiate ConsistentHash at global level
- Removing redundant logging
- Move filtering of already registered members to ConsistentHash.with() function
Create a test for the SpringCloudCommandRouterTest
Remove the useless split statics for the path for the commands.
Make the SpringHttpDispatchMessage generic to coop with errors in the SpringHttpCommandBusConnector
Make the SpringHttpReplyFutureCallback extends ComparableFuture<SpringHttpReplyMessage> and implement FutureCallback<C, R> rather than extending FutureCallback<C, SpringHttpReplyMessage>  to solve generic errors in the receiveCommands(...) method.
Return ComparableFuture<?> rather than ComparableFuture<SpringHttpReplyMessage<R>> since receiveCommand may return several different ComparableFutures
Return null for a successful dispatch without a callback
Write a test class for the SpringHttpCommandBusConnector
Update the SpringCloudCommandRouter to filter for ServiceInstances which do not have any of the CommandRouter specific metadata fields.
Update the SpringCloudCommandRouterTest accordingly.
Apparently a Spring RestController doesn't like it when you return null, hence we return "" instead when a dispatch without callback has succeeded.
Additionally change the 'receiveCommand(...)' method to return a CompletableFuture.completedFuture() in every if/else/catch block rather than creating a
CompletableFuture at the start of the function and calling 'complet(...)' on it
Add a static setFieldValue() function to ReflectionUtils for use in the SpringCloudCommandRouterTest
Replace all occurrences of spring.ReflectionTestUtils in favour of axon.ReflectionUtils
Update the distributed-commandbus pom to include some dependencies in the banned dependencies set up
and exclude two dependencies from the spring-web dependency
@smcvb smcvb closed this Nov 25, 2016
@sganslandt
Copy link
Contributor

Interesting stuff. We've built something very similar both for Axon 2 and 3, and it would definitely be beneficial to something like this being part of the core offering. Why was the PR closed?

stevenb added 2 commits November 25, 2016 14:14
…odules

Split the JGroups specific and Spring Cloud specific implementation of the CommandRouter and CommandBusConnector into two separate modules.
This will help spring starter projects to auto-implement the distributed implementation based on which of both is found on the classpath.
Following to this, all the corresponding distributed CommandBus code has to be moved to axon-core/commandhandling/distributed to have a single point of entrance for the interfaces e.g.
…ndRouter

Wrap the ConsistentHash in an AtomicReference the ensure no 2+ threads can access the ConsistentHash at the same time to overcome potential issues.
@smcvb smcvb reopened this Nov 25, 2016
@smcvb
Copy link
Member Author

smcvb commented Nov 25, 2016

No worries @sganslandt, has been opened up again!
I was trying out some things.

stevenb added 2 commits November 25, 2016 15:08
Solve some code smells as a follow up of the issues codacy has showed based on the pull request for this branch
@abuijze abuijze merged commit e4f9221 into master Dec 6, 2016
@jorgheymans
Copy link
Contributor

@smcvb IIUC this is an alternative to jgroups based command routing, based on for example zookeeper instead ?

@smcvb
Copy link
Member Author

smcvb commented Dec 6, 2016

@jorgheymans yes it is. For testing this stuff I've created an Axon application through Spring Boot which has Netflix's Eureka Client on the pom, together with a Eureka Server running to regulate it all

@nickdk
Copy link

nickdk commented Jan 17, 2017

I know I've asked this in other places already (axon mailing list) but would be neat if we could have some example project or some documentation. I just tried to get an example working myself with a Eureka server and a few clients. Everything seems to work except that I can never get the updateMemberships to trigger when new clients register themselves, causing the whole consistentHash to only refer to one member. I'm new to the whole Eureka thing so I might be doing something wrong there, I'm gonna keep trying to get it working but again it would be lovely if someone has some extra info or a working example :-)

@smcvb
Copy link
Member Author

smcvb commented Jan 17, 2017

@nickdk I'm actually planning to send out a example project for this, maybe through a blog, but cannot give you a ETA for that right now.

How I tested it, was by running a Spring Boot Eureka Server and running several Spring Boot applications which where Eureka clients and Axon nodes. What did you do Axon configuration-wise?

@alexey-krylov
Copy link

alexey-krylov commented Jan 17, 2017 via email

@nickdk
Copy link

nickdk commented Jan 17, 2017

Thanks already guys. I was able to get some progress, the @eventlistener on the SpringCloudCommandRouter wasn't getting picked up by Spring scanning, fixed that. Now I'm in the situation where I have one ConfigServer running (that also serves as an AxonClient) and two separate AxonClient instances. All running localhost on different ports (server 8761 and clients 8762 and 8763). These two classes:

@EnableEurekaServer
@SpringBootApplication(scanBasePackages = "org.axonframework")
public class ConfigServer {

	@Autowired
	private DiscoveryClient discoveryClient;

	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(ConfigServer.class, args);
	}

	@Bean
	public CommandGateway commandGateway() {
		return new DefaultCommandGateway(commandBus());
	}

	@Bean(initMethod = "start")
	public SubscribingEventProcessor eventProcessor(){
		EventStore eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
		Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

		new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus());

		return new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
			ItemCreated itemCreated = (ItemCreated) event.getPayload();
			System.out.println("Received an ItemCreated event with id: " + itemCreated.getItemId());
		}), eventStore);
	}

	@Bean
	public SpringHttpCommandBusConnector springHttpCommandBusConnector() {
		CommandBus commandBus = new SimpleCommandBus();

		RestTemplate restTemplate = new RestTemplate();
		return new SpringHttpCommandBusConnector(commandBus, restTemplate, new XStreamSerializer());
	}

	@Bean
	public SpringCloudCommandRouter commandRouter() {
		return new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy(), new XStreamSerializer());
	}

	@Bean
	public CommandBus commandBus() {
		return new DistributedCommandBus(commandRouter(), springHttpCommandBusConnector());
	}

}
@EnableDiscoveryClient
@SpringBootApplication(scanBasePackages = "org.axonframework")
public class AxonClient {

	@Autowired
	private DiscoveryClient discoveryClient;

	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(AxonClient.class, args);
	}

	@Bean
	public CommandGateway commandGateway() {
		return new DefaultCommandGateway(commandBus());
	}

	@Bean(initMethod = "start")
	public SubscribingEventProcessor eventProcessor(){
		EventStore eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
		Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);

		new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus());

		return new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
			ItemCreated itemCreated = (ItemCreated) event.getPayload();
			System.out.println("Received an ItemCreated event with id: " + itemCreated.getItemId());
		}), eventStore);
	}

	@Bean
	public SpringHttpCommandBusConnector springHttpCommandBusConnector() {
		CommandBus commandBus = new SimpleCommandBus();

		RestTemplate restTemplate = new RestTemplate();
		return new SpringHttpCommandBusConnector(commandBus, restTemplate, new XStreamSerializer());
	}

	@Bean
	public CommandBus commandBus() {
		SpringCloudCommandRouter commandRouter = new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy(), new XStreamSerializer());

		return new DistributedCommandBus(commandRouter, springHttpCommandBusConnector());
	}
}

I also have a simple RestController -> commandGateway that I can use to push some commands through a HTTP endpoint. When I push commands the commands are always executed on the instance I target with the port.

When I debug I see that the atomicConsistentHash of the router only contains one member, I then tried debugging the updateMemberships I noticed something odd:

image

You can see that the code evaluation sees 3 instances (these are the correct 8761, 8762 and 8763 instances). Yet in the breakpoint I can see that only 1 of them is passed to the private updateMemberships that actually adds the instances to the atomicConsistentHash. So seems like the 3 instances get reduced to 1 by the stream() chain.

Maybe I'm doing something wrong configuration wise? Anyway, gonna see if I can progress tomorrow, if not I'll have a look at your example @alexey-krylov, thanks!

@smcvb
Copy link
Member Author

smcvb commented Jan 18, 2017

The only diff I currently can point out, compared to your set up, is that my Eureka server was really just a Eureka server, nothing more, whilst yours is server and client in one.

Have you tried debugging in the ConsistentHash class? There is a call on the ConsistentHash.with(...) function that check whether the member is already contained in the set. If so, it just returns the ConsistentHash as is instead of adding a new member. Although your debug screen cap shows three instances, it does not tell me their contents so I'm not sure if they are equal when they're converted to a ConsistentHashMember.

@nickdk
Copy link

nickdk commented Jan 18, 2017

Yeah it indeed seems that for some reason they are seen as one member, but they already get reduced from 3 to 1 even before the ConsistentHash gets into play. Anyway, I hopefully should have some free hours this afternoon, I'll keep you posted ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants