Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.netty.server;

import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** Map: {@link ChannelId} -> {@link ClientInvocationId}s. */
class ChannelMap {
private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();

void add(ChannelId channelId, ClientInvocationId clientInvocationId) {
map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>())
.put(clientInvocationId, clientInvocationId);
}

void remove(ChannelId channelId, ClientInvocationId clientInvocationId) {
Optional.ofNullable(map.get(channelId))
.ifPresent((ids) -> ids.remove(clientInvocationId));
}

Set<ClientInvocationId> remove(ChannelId channelId) {
return Optional.ofNullable(map.remove(channelId))
.map(Map::keySet)
.orElse(Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -219,52 +216,10 @@ private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
}
}

static class StreamMap {
private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap<>();

StreamInfo computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, StreamInfo> function) {
final StreamInfo info = map.computeIfAbsent(key, function);
LOG.debug("computeIfAbsent({}) returns {}", key, info);
return info;
}

StreamInfo get(ClientInvocationId key) {
final StreamInfo info = map.get(key);
LOG.debug("get({}) returns {}", key, info);
return info;
}

StreamInfo remove(ClientInvocationId key) {
final StreamInfo info = map.remove(key);
LOG.debug("remove({}) returns {}", key, info);
return info;
}
}

public static class ChannelMap {
private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();

public void add(ChannelId channelId,
ClientInvocationId clientInvocationId) {
map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
}

public void remove(ChannelId channelId,
ClientInvocationId clientInvocationId) {
Optional.ofNullable(map.get(channelId)).ifPresent((ids) -> ids.remove(clientInvocationId));
}

public Set<ClientInvocationId> remove(ChannelId channelId) {
return Optional.ofNullable(map.remove(channelId))
.map(Map::keySet)
.orElse(Collections.emptySet());
}
}

private final RaftServer server;
private final String name;

private final StreamMap streams = new StreamMap();
private final StreamMap<StreamInfo> streams = new StreamMap<>();
private final ChannelMap channels;
private final ExecutorService requestExecutor;
private final ExecutorService writeExecutor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.netty.server;

import org.apache.ratis.protocol.ClientInvocationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

/**
* Map: {@link ClientInvocationId} -> {@link STREAM}.
*
* @param <STREAM> the stream type.
*/
class StreamMap<STREAM> {
public static final Logger LOG = LoggerFactory.getLogger(StreamMap.class);

private final ConcurrentMap<ClientInvocationId, STREAM> map = new ConcurrentHashMap<>();

STREAM computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, STREAM> function) {
final STREAM info = map.computeIfAbsent(key, function);
LOG.debug("computeIfAbsent({}) returns {}", key, info);
return info;
}

STREAM get(ClientInvocationId key) {
final STREAM info = map.get(key);
LOG.debug("get({}) returns {}", key, info);
return info;
}

STREAM remove(ClientInvocationId key) {
final STREAM info = map.remove(key);
LOG.debug("remove({}) returns {}", key, info);
return info;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,10 @@
import org.apache.ratis.protocol.RaftPeer;

import java.io.Closeable;
import java.net.InetSocketAddress;

/**
* A server interface handling incoming streams
* Relays those streams to other servers after persisting
*/
public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
/**
* start server
*/
void start();

/** @return the address where this RPC server is listening to. */
InetSocketAddress getInetSocketAddress();
public interface DataStreamServerRpc extends ServerRpc, RaftPeer.Add, Closeable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,13 @@
import org.apache.ratis.util.JavaUtils;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;

/**
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, RaftPeer.Add, Closeable {
/** Start the RPC service. */
void start() throws IOException;

/** @return the address where this RPC server is listening */
InetSocketAddress getInetSocketAddress();

public interface RaftServerRpc extends RaftServerProtocol, ServerRpc, RpcType.Get, RaftPeer.Add, Closeable {
/** @return the address where this RPC server is listening for client requests */
default InetSocketAddress getClientServerAddress() {
return getInetSocketAddress();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;

/**
* A general server interface.
*/
public interface ServerRpc extends Closeable {
/** Start the RPC service. */
void start() throws IOException;

/** @return the address where this RPC server is listening to. */
InetSocketAddress getInetSocketAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ RaftServer getRaftServer() {
return raftServer;
}

void start() {
void start() throws IOException {
dataStreamServer.getServerRpc().start();
}

Expand All @@ -90,7 +90,7 @@ Server getPrimaryServer() {
return servers.get(0);
}

void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) throws Exception {
raftGroup = RaftGroup.valueOf(groupId, peers);
this.peers = peers;
servers = new ArrayList<>(peers.size());
Expand Down