Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1116. Add DataStreamType. #238

Merged
merged 2 commits into from Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;

/** A stream factory that does nothing when data stream is disabled. */
public class DisabledDataStreamClientFactory implements DataStreamClientFactory {
public DisabledDataStreamClientFactory(Parameters parameters) {}

@Override
public SupportedDataStreamType getDataStreamType() {
return SupportedDataStreamType.DISABLED;
}

@Override
public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties) {
return new DataStreamClientRpc() {
@Override
public void startClient() {}

@Override
public void closeClient() {}
};
}
}
Expand Up @@ -60,7 +60,7 @@ public DataStreamClientImpl(RaftPeer server, RaftProperties properties, Paramete
this.raftServer = Objects.requireNonNull(server, "server == null");

final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
this.dataStreamClientRpc = DataStreamClientFactory.cast(type.newFactory(parameters))
this.dataStreamClientRpc = DataStreamClientFactory.cast(type.newClientFactory(parameters))
.newDataStreamClientRpc(raftServer, properties);

this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
Expand Down
Expand Up @@ -15,9 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ratis.datastream;

public interface DataStreamFactory {
SupportedDataStreamType getDataStreamType();
public interface DataStreamFactory extends DataStreamType.Get {
}
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.datastream;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;

/** The type of data stream implementations. */
public interface DataStreamType {
/**
* Parse the given string as a {@link SupportedDataStreamType}
* or a user-defined {@link DataStreamType}.
*
* @param dataStreamType The string representation of an {@link DataStreamType}.
* @return a {@link SupportedDataStreamType} or a user-defined {@link DataStreamType}.
*/
static DataStreamType valueOf(String dataStreamType) {
final Throwable fromSupportedRpcType;
try { // Try parsing it as a SupportedRpcType
return SupportedDataStreamType.valueOfIgnoreCase(dataStreamType);
} catch (Throwable t) {
fromSupportedRpcType = t;
}

try {
// Try using it as a class name
return ReflectionUtils.newInstance(ReflectionUtils.getClass(dataStreamType, DataStreamType.class));
} catch(Throwable t) {
final IllegalArgumentException iae = new IllegalArgumentException(
"Invalid " + DataStreamType.class.getSimpleName() + ": \"" + dataStreamType + "\" "
+ " cannot be used as a user-defined " + DataStreamType.class.getSimpleName()
+ " and it is not a " + SupportedDataStreamType.class.getSimpleName() + ".");
iae.addSuppressed(t);
iae.addSuppressed(fromSupportedRpcType);
throw iae;
}
}

/** @return the name of the rpc type. */
String name();

/** @return a new client factory created using the given parameters. */
DataStreamFactory newClientFactory(Parameters parameters);

/** @return a new server factory created using the given parameters. */
DataStreamFactory newServerFactory(Parameters parameters);

/** An interface to get {@link DataStreamType}. */
interface Get {
/** @return the {@link DataStreamType}. */
DataStreamType getDataStreamType();
}
}
Expand Up @@ -20,30 +20,40 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;

public enum SupportedDataStreamType implements DataStreamFactory {
DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
public enum SupportedDataStreamType implements DataStreamType {
DISABLED("org.apache.ratis.client.DisabledDataStreamClientFactory",
"org.apache.ratis.server.DisabledDataStreamServerFactory"),
NETTY("org.apache.ratis.netty.NettyDataStreamFactory");
runzhiwang marked this conversation as resolved.
Show resolved Hide resolved

private final String factoryClassName;

private static final Class<?>[] ARG_CLASSES = {Parameters.class};

public static SupportedDataStreamType valueOfIgnoreCase(String s) {
return valueOf(s.toUpperCase());
}

private final String clientFactoryClassName;
private final String serverFactoryClassName;

SupportedDataStreamType(String clientFactoryClassName, String serverFactoryClassName) {
this.clientFactoryClassName = clientFactoryClassName;
this.serverFactoryClassName = serverFactoryClassName;
}

SupportedDataStreamType(String factoryClassName) {
this.factoryClassName = factoryClassName;
this(factoryClassName, factoryClassName);
}

@Override
public SupportedDataStreamType getDataStreamType() {
return valueOf(this.factoryClassName.toUpperCase());
public DataStreamFactory newClientFactory(Parameters parameters) {
final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
clientFactoryClassName, DataStreamFactory.class);
return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}

public DataStreamFactory newFactory(Parameters parameters) {
@Override
public DataStreamFactory newServerFactory(Parameters parameters) {
final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
factoryClassName, DataStreamFactory.class);
serverFactoryClassName, DataStreamFactory.class);
return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}
}
4 changes: 2 additions & 2 deletions ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -55,7 +55,7 @@ static RpcType valueOf(String rpcType) {
/** @return the name of the rpc type. */
String name();

/** @return a new factory created using the given properties and parameters. */
/** @return a new factory created using the given parameters. */
RpcFactory newFactory(Parameters parameters);

/** An interface to get {@link RpcType}. */
Expand Down
Expand Up @@ -15,35 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
package org.apache.ratis.server;

import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;

import java.util.Collection;

/** A stream factory that does nothing when data stream is disabled. */
public class DisabledDataStreamFactory implements DataStreamServerFactory, DataStreamClientFactory {
public DisabledDataStreamFactory(Parameters parameters) {}

@Override
public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties) {
return new DataStreamClientRpc() {
@Override
public void startClient() {}

@Override
public void closeClient() {}
};
}
public class DisabledDataStreamServerFactory implements DataStreamServerFactory {
public DisabledDataStreamServerFactory(Parameters parameters) {}

@Override
public DataStreamServerRpc newDataStreamServerRpc(
Expand Down
Expand Up @@ -41,15 +41,15 @@ public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);

this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
this.serverRpc = DataStreamServerFactory.cast(type.newServerFactory(parameters))
.newDataStreamServerRpc(server, stateMachine, properties);
}

public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);

this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
this.serverRpc = DataStreamServerFactory.cast(type.newClientFactory(parameters))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClientFactory -> newServerFactory ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It should be newServerFactory.

.newDataStreamServerRpc(server, stateMachine, properties);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ratis.datastream;

import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DisabledDataStreamClientFactory;
import org.apache.ratis.conf.RaftProperties;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -41,7 +42,8 @@ public void testDataStreamDisabled() throws Exception {
setupServer();
setupClient();
exception.expect(UnsupportedOperationException.class);
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1 does not support streamAsync");
exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 this is a better test.

+ "$1 does not support streamAsync");
// stream() will create a header request, thus it will hit UnsupportedOperationException due to
// DisabledDataStreamFactory.
client.stream();
Expand Down