/
MemcacheMultiServerModule.java
66 lines (57 loc) · 2.09 KB
/
MemcacheMultiServerModule.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package memcached;
import io.activej.config.Config;
import io.activej.eventloop.Eventloop;
import io.activej.inject.annotation.Provides;
import io.activej.inject.module.AbstractModule;
import io.activej.memcache.protocol.SerializerDefSlice;
import io.activej.memcache.server.RingBuffer;
import io.activej.promise.Promise;
import io.activej.rpc.server.RpcServer;
import io.activej.serializer.SerializerBuilder;
import io.activej.worker.annotation.Worker;
import io.activej.worker.annotation.WorkerId;
import java.net.InetSocketAddress;
import static io.activej.config.converter.ConfigConverters.ofInteger;
import static io.activej.config.converter.ConfigConverters.ofMemSize;
import static io.activej.memcache.protocol.MemcacheRpcMessage.*;
public class MemcacheMultiServerModule extends AbstractModule {
private MemcacheMultiServerModule() {}
public static MemcacheMultiServerModule create() {
return new MemcacheMultiServerModule();
}
@Provides
@Worker
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
@Worker
InetSocketAddress port(@WorkerId int workerId) {
return new InetSocketAddress("localhost", 9000 + workerId);
}
@Provides
@Worker
RingBuffer ringBuffer(Config config) {
return RingBuffer.create(
config.get(ofInteger(), "memcache.buffers"),
config.get(ofMemSize(), "memcache.bufferCapacity").toInt());
}
@Provides
@Worker
RpcServer server(Eventloop eventloop, RingBuffer storage, InetSocketAddress address) {
return RpcServer.create(eventloop)
.withHandler(GetRequest.class,
request -> Promise.of(new GetResponse(storage.get(request.getKey()))))
.withHandler(PutRequest.class,
request -> {
Slice slice = request.getData();
System.out.println("Server on port #" + address.getPort() + " accepted message!");
storage.put(request.getKey(), slice.array(), slice.offset(), slice.length());
return Promise.of(PutResponse.INSTANCE);
})
.withSerializerBuilder(SerializerBuilder.create()
.withSerializer(Slice.class, new SerializerDefSlice()))
.withMessageTypes(MESSAGE_TYPES)
.withListenAddresses(address);
}
}