Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Spring boot sample #45

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 66 additions & 0 deletions akka-sample-spring-extension-java/pom.xml
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.typesafe.akka.samples</groupId>
<artifactId>akka-sample-spring-extension-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>akka-sample-spring-extension-java</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<akka.version>2.5.6</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also show the alpakka reactive-streams integration? https://akka.io/blog/news/2017/10/23/native-akka-streams-in-spring-web-and-boot-via-alpakka

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently learning Akka Streams for Kafka integration, I could add those in a few days :)

<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>


</project>
@@ -0,0 +1,29 @@
package org.example;

import org.example.akka.di.SpringExtension;
import org.example.akka.message.Message;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

@SpringBootApplication
public class App {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(App.class, args);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't format the code using tabs -- follow the same style as other examples do, which is 2 spaces

ActorSystem system = context.getBean("actorSystem", ActorSystem.class);
SpringExtension springExtension = context.getBean("springExtension", SpringExtension.class);

// Using constructor with parameters.
ActorRef myActor1 = system.actorOf(springExtension.props("myActor", "Parametrized", 100), "MyActor1");
myActor1.tell(new Message(), ActorRef.noSender());

// Using constructor with no parameters
ActorRef myActor2 = system.actorOf(springExtension.props("myActor"), "MyActor2");
myActor2.tell(new Message(), ActorRef.noSender());

}
}
@@ -0,0 +1,33 @@
package org.example.akka;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.example.akka.di.SpringExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AkkaConfig {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private SpringExtension springExtension;

@Bean(destroyMethod = "terminate")
public ActorSystem actorSystem() {
ActorSystem actorSystem = ActorSystem.create("akka-system", akkaConfiguration());
springExtension.initialize(applicationContext);
return actorSystem;
}

@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
}
@@ -0,0 +1,38 @@
package org.example.akka.actor;

import javax.inject.Named;

import org.example.akka.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;

import akka.actor.AbstractActor;

@Named("myActor")
@Scope("prototype")
public class MyActor extends AbstractActor {

@Autowired
private MyActorService myActorService;

private String attribute;
private Integer attribute2;

public MyActor(String attribute, Integer attribute2) {
super();
this.attribute = attribute;
this.attribute2 = attribute2;
}

public MyActor() {
this.attribute = "Default value";
this.attribute2 = Integer.MAX_VALUE;
}

@Override
public Receive createReceive() {
return receiveBuilder().match(Message.class, msg -> {
myActorService.printService(getSelf().path() + " " + attribute + " " + attribute2);
}).build();
}
}
@@ -0,0 +1,22 @@
package org.example.akka.actor;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

@Service
public class MyActorService {

public MyActorService() {

}

@PostConstruct
public void init() {
System.out.println("Created service as bean");
}

public void printService(String string) {
System.out.println("Printed with bean: " + string);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a typical spring "Service" contain mutable state and be injected in multiple actors? In that case please add a comment here that the service must be thread safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a state because it's not an attribute. The service just "processes" the input string, in this case by printing it to console. A very simple example to show that the Autowired annotation works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I got that the sample is stateless, was more thinking of how it would be in a real(tm) spring app. People tend to copy paste these sample and build whole applications so some friendly comment-nudging in the right direction is often good.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sure, I'll do that.

@@ -0,0 +1,38 @@
package org.example.akka.di;

import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

public class SpringActorProducer implements IndirectActorProducer {

final private ApplicationContext applicationContext;
final private String actorBeanName;
final private Object[] args;

public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.args = null;
}

public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object... args) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.args = args;
}

@Override
public Actor produce() {
if (args == null) {
return (Actor) applicationContext.getBean(actorBeanName);
} else {
return (Actor) applicationContext.getBean(actorBeanName, args);
}
}

@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
@@ -0,0 +1,24 @@
package org.example.akka.di;

import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class SpringExtension implements Extension {

private ApplicationContext applicationContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensions are responsible to be thread safe, given that this field must be volatile or an AtomicReference else calls to the props methods (from different threads) may not see that it was initialized with any value

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is making the Props methods Thread-safe with the "synchronized" keyword enough? Also, I'm reading from the docs that the thread safety is important in case multiple Actor Systems use the Extension.

"Since this Extension is a shared instance per ActorSystem we need to be threadsafe"

Does this mean that for a single actor system, the thread safety is not necessary? What about for cluster sharding?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that is an incorrect interpretation of the docs.

An instance of an extension that is bound to a single actor system, the actor system ensure there is only one instance for the actor system, but the thread safety is not guaranteed (like with actors), in that sense it is just a normal object, and since the methods of the extension is called from multiple threads this means they must be thread safe.

Volatile or atomic reference is better than synchronized as they to don't cause any locking.


public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName);
}

public Props props(String actorBeanName, Object... args) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
}
}
@@ -0,0 +1,9 @@
package org.example.akka.message;

import java.io.Serializable;

public class Message implements Serializable {

private static final long serialVersionUID = 713662864902548777L;

}