Skip to content

Commit

Permalink
RATIS-1116. Add DataStreamType. (#238)
Browse files Browse the repository at this point in the history
* RATIS-1116. Add DataStreamType.

* Fix a bug and change the cast(..) method to newInstance(..).
  • Loading branch information
szetszwo committed Oct 29, 2020
1 parent c498bc1 commit af35841
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 46 deletions.
Expand Up @@ -18,22 +18,23 @@

package org.apache.ratis.client;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.datastream.DataStreamType;
import org.apache.ratis.protocol.RaftPeer;

/**
* A factory to create streaming client.
*/
public interface DataStreamClientFactory extends DataStreamFactory {

static DataStreamClientFactory cast(DataStreamFactory dataStreamFactory) {
static DataStreamClientFactory newInstance(DataStreamType type, Parameters parameters) {
final DataStreamFactory dataStreamFactory = type.newClientFactory(parameters);
if (dataStreamFactory instanceof DataStreamClientFactory) {
return (DataStreamClientFactory) dataStreamFactory;
}
throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
+ " to " + ClientFactory.class
+ "; stream type is " + dataStreamFactory.getDataStreamType());
+ " to " + DataStreamClientFactory.class + "; stream type is " + type);
}

DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties);
Expand Down
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;

/** A stream factory that does nothing when data stream is disabled. */
public class DisabledDataStreamClientFactory implements DataStreamClientFactory {
public DisabledDataStreamClientFactory(Parameters parameters) {}

@Override
public SupportedDataStreamType getDataStreamType() {
return SupportedDataStreamType.DISABLED;
}

@Override
public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties) {
return new DataStreamClientRpc() {
@Override
public void startClient() {}

@Override
public void closeClient() {}
};
}
}
Expand Up @@ -60,7 +60,7 @@ public DataStreamClientImpl(RaftPeer server, RaftProperties properties, Paramete
this.raftServer = Objects.requireNonNull(server, "server == null");

final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
this.dataStreamClientRpc = DataStreamClientFactory.cast(type.newFactory(parameters))
this.dataStreamClientRpc = DataStreamClientFactory.newInstance(type, parameters)
.newDataStreamClientRpc(raftServer, properties);

this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
Expand Down
Expand Up @@ -15,9 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ratis.datastream;

public interface DataStreamFactory {
SupportedDataStreamType getDataStreamType();
public interface DataStreamFactory extends DataStreamType.Get {
}
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.datastream;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;

/** The type of data stream implementations. */
public interface DataStreamType {
/**
* Parse the given string as a {@link SupportedDataStreamType}
* or a user-defined {@link DataStreamType}.
*
* @param dataStreamType The string representation of an {@link DataStreamType}.
* @return a {@link SupportedDataStreamType} or a user-defined {@link DataStreamType}.
*/
static DataStreamType valueOf(String dataStreamType) {
final Throwable fromSupportedRpcType;
try { // Try parsing it as a SupportedRpcType
return SupportedDataStreamType.valueOfIgnoreCase(dataStreamType);
} catch (Throwable t) {
fromSupportedRpcType = t;
}

try {
// Try using it as a class name
return ReflectionUtils.newInstance(ReflectionUtils.getClass(dataStreamType, DataStreamType.class));
} catch(Throwable t) {
final IllegalArgumentException iae = new IllegalArgumentException(
"Invalid " + DataStreamType.class.getSimpleName() + ": \"" + dataStreamType + "\" "
+ " cannot be used as a user-defined " + DataStreamType.class.getSimpleName()
+ " and it is not a " + SupportedDataStreamType.class.getSimpleName() + ".");
iae.addSuppressed(t);
iae.addSuppressed(fromSupportedRpcType);
throw iae;
}
}

/** @return the name of the rpc type. */
String name();

/** @return a new client factory created using the given parameters. */
DataStreamFactory newClientFactory(Parameters parameters);

/** @return a new server factory created using the given parameters. */
DataStreamFactory newServerFactory(Parameters parameters);

/** An interface to get {@link DataStreamType}. */
interface Get {
/** @return the {@link DataStreamType}. */
DataStreamType getDataStreamType();
}
}
Expand Up @@ -20,30 +20,40 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.util.ReflectionUtils;

public enum SupportedDataStreamType implements DataStreamFactory {
DISABLED("org.apache.ratis.server.impl.DisabledDataStreamFactory"),
public enum SupportedDataStreamType implements DataStreamType {
DISABLED("org.apache.ratis.client.DisabledDataStreamClientFactory",
"org.apache.ratis.server.DisabledDataStreamServerFactory"),
NETTY("org.apache.ratis.netty.NettyDataStreamFactory");

private final String factoryClassName;

private static final Class<?>[] ARG_CLASSES = {Parameters.class};

public static SupportedDataStreamType valueOfIgnoreCase(String s) {
return valueOf(s.toUpperCase());
}

private final String clientFactoryClassName;
private final String serverFactoryClassName;

SupportedDataStreamType(String clientFactoryClassName, String serverFactoryClassName) {
this.clientFactoryClassName = clientFactoryClassName;
this.serverFactoryClassName = serverFactoryClassName;
}

SupportedDataStreamType(String factoryClassName) {
this.factoryClassName = factoryClassName;
this(factoryClassName, factoryClassName);
}

@Override
public SupportedDataStreamType getDataStreamType() {
return valueOf(this.factoryClassName.toUpperCase());
public DataStreamFactory newClientFactory(Parameters parameters) {
final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
clientFactoryClassName, DataStreamFactory.class);
return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}

public DataStreamFactory newFactory(Parameters parameters) {
@Override
public DataStreamFactory newServerFactory(Parameters parameters) {
final Class<? extends DataStreamFactory> clazz = ReflectionUtils.getClass(
factoryClassName, DataStreamFactory.class);
serverFactoryClassName, DataStreamFactory.class);
return ReflectionUtils.newInstance(clazz, ARG_CLASSES, parameters);
}
}
4 changes: 2 additions & 2 deletions ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -55,7 +55,7 @@ static RpcType valueOf(String rpcType) {
/** @return the name of the rpc type. */
String name();

/** @return a new factory created using the given properties and parameters. */
/** @return a new factory created using the given parameters. */
RpcFactory newFactory(Parameters parameters);

/** An interface to get {@link RpcType}. */
Expand Down
Expand Up @@ -17,22 +17,22 @@
*/
package org.apache.ratis.server;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.datastream.DataStreamType;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.statemachine.StateMachine;

/** A {@link DataStreamFactory} to create server-side objects. */
public interface DataStreamServerFactory extends DataStreamFactory {

static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
static DataStreamServerFactory newInstance(DataStreamType type, Parameters parameters) {
final DataStreamFactory dataStreamFactory = type.newServerFactory(parameters);
if (dataStreamFactory instanceof DataStreamServerFactory) {
return (DataStreamServerFactory)dataStreamFactory;
}
throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
+ " to " + ServerFactory.class
+ "; rpc type is " + dataStreamFactory.getDataStreamType());
+ " to " + DataStreamServerFactory.class + "; rpc type is " + type);
}

/** Create a new {@link DataStreamServerRpc}. */
Expand Down
Expand Up @@ -15,35 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
package org.apache.ratis.server;

import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;

import java.util.Collection;

/** A stream factory that does nothing when data stream is disabled. */
public class DisabledDataStreamFactory implements DataStreamServerFactory, DataStreamClientFactory {
public DisabledDataStreamFactory(Parameters parameters) {}

@Override
public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties properties) {
return new DataStreamClientRpc() {
@Override
public void startClient() {}

@Override
public void closeClient() {}
};
}
public class DisabledDataStreamServerFactory implements DataStreamServerFactory {
public DisabledDataStreamServerFactory(Parameters parameters) {}

@Override
public DataStreamServerRpc newDataStreamServerRpc(
Expand Down
Expand Up @@ -41,15 +41,15 @@ public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);

this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
.newDataStreamServerRpc(server, stateMachine, properties);
}

public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
RaftProperties properties, Parameters parameters){
final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);

this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
.newDataStreamServerRpc(server, stateMachine, properties);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ratis.datastream;

import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DisabledDataStreamClientFactory;
import org.apache.ratis.conf.RaftProperties;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -41,7 +42,8 @@ public void testDataStreamDisabled() throws Exception {
setupServer();
setupClient();
exception.expect(UnsupportedOperationException.class);
exception.expectMessage("org.apache.ratis.server.impl.DisabledDataStreamFactory$1 does not support streamAsync");
exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
+ "$1 does not support streamAsync");
// stream() will create a header request, thus it will hit UnsupportedOperationException due to
// DisabledDataStreamFactory.
client.stream();
Expand Down

0 comments on commit af35841

Please sign in to comment.