Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#133] feat(netty): Add Netty Utils #727

Merged
merged 8 commits into from
Mar 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;

/** copy from spark, In order to override the createPooledByteBufAllocator method,
* the property DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47. */
public class NettyUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any ways to achieve this? I believe we should avoid this kind code duplication as much as possible unless there's no other way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, the key problem is that spark2.3 is not compatible with a higher version of netty (4.1.68.Final), and the uniffle client needs to use netty 4.1.68.Final version, which is a low-cost implementation, Or do you have a better suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So could we simply add the createPooledByteBufAllocator and its related method here?
Other methods could be reused?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have shaded Netty in Grpc.

<dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-netty-shaded</artifactId>
      <version>${grpc.version}</version>
</dependency>

Could we use this Netty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem appropriate, maybe the grpc dependency will be removed when the netty version is stable in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So could we simply add the createPooledByteBufAllocator and its related method here? Other methods could be reused?

You're right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem appropriate, maybe the grpc dependency will be removed when the netty version is stable in the future.

Grpc and Netty will coexist in the future. Because Grpc is very convenient and bettter compatibility, if we don't need high performance in some rpcs, such control panel, we will still use Grpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks ok, grpc1.47 is using netty4.1.72.Final, link: https://github.com/grpc/grpc-java/blob/v1.47.x/repositories.bzl

Copy link
Contributor

@rickyma rickyma Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found a compatibility issue when using the Spark 2.1.0 version. The following error will be thrown in Spark driver's log:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.network.util.NettyUtils.createEventLoop(Lorg/apache/spark/network/util/IOMode;ILjava/lang/String;)Lio/netty/channel/EventLoopGroup;
at org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:104)
at org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:89)
at org.apache.spark.rpc.netty.NettyRpcEnv.(NettyRpcEnv.scala:70)
at org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:449)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:264)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:271)
at org.apache.spark.SparkContext.(SparkContext.scala:474)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.(SQLApplicationMaster.scala:96)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.(SQLApplicationMaster.scala:53)
at org.apache.spark.deploy.yarn.SQLApplicationMaster$$anonfun$main$1.apply$mcV$sp(SQLApplicationMaster.scala:544)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2337)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60)
at org.apache.spark.deploy.yarn.SQLApplicationMaster$.main(SQLApplicationMaster.scala:543)
at org.apache.spark.deploy.yarn.SQLApplicationMaster.main(SQLApplicationMaster.scala)

This is because the return value of createEventLoop in NettyUtils within Uniffle is org.apache.uniffle.io.netty.channel.EventLoopGroup (which is shaded), while the return value of createEventLoop in NettyUtils within Spark is io.netty.channel.EventLoopGroup. When running a Spark application, the Driver loads NettyUtils from the rss-client's JAR, causing inconsistency in the method's return values and ultimately leading to a NoSuchMethodError exception.

Any ideas? Thx a lot.
@leixm @jerqi


private static final int DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);

/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
}

/** Creates a Netty EventLoopGroup based on the IOMode. */
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
ThreadFactory threadFactory = createThreadFactory(threadPrefix);

switch (mode) {
case NIO:
return new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL:
return new EpollEventLoopGroup(numThreads, threadFactory);
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}

/** Returns the correct (client) SocketChannel class based on IOMode. */
public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioSocketChannel.class;
case EPOLL:
return EpollSocketChannel.class;
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}

/** Returns the correct ServerSocketChannel class based on IOMode. */
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
return EpollServerSocketChannel.class;
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}

/**
* Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame.
* This is used before all decoders.
*/
public static TransportFrameDecoder createFrameDecoder() {
return new TransportFrameDecoder();
}

/** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
public static String getRemoteAddress(Channel channel) {
if (channel != null && channel.remoteAddress() != null) {
return channel.remoteAddress().toString();
}
return "<unknown remote>";
}

/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
* but released by the executor thread rather than the event loop thread. Those thread-local
* caches actually delay the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
boolean allowCache,
int numCores) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
allowCache ? DEFAULT_TINY_CACHE_SIZE : 0,
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
);
}

/** Used to get defaults from Netty's private static fields. */
private static int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
23 changes: 23 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/netty/IOMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty;

public enum IOMode {
NIO,
EPOLL
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty.protocol;

import io.netty.buffer.ByteBuf;

public interface Encodable {

int encodedLength();

void encode(ByteBuf buf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty.protocol;

import io.netty.buffer.ByteBuf;

public abstract class Message implements Encodable {

public abstract Type type();

public enum Type implements Encodable {
UNKNOWN_TYPE(-1),
RPC_RESPONSE(0);

private final byte id;

Type(int id) {
assert id < 128 : "Cannot have more than 128 message types";
this.id = (byte) id;
}

public byte id() {
return id;
}

@Override
public int encodedLength() {
return 1;
}

@Override
public void encode(ByteBuf buf) {
buf.writeByte(id);
}

public static Type decode(ByteBuf buf) {
byte id = buf.readByte();
switch (id) {
case 0:
return RPC_RESPONSE;
case -1:
throw new IllegalArgumentException("User type messages cannot be decoded.");
default:
throw new IllegalArgumentException("Unknown message type: " + id);
}
}
}

public static Message decode(Type msgType, ByteBuf in) {
switch (msgType) {
case RPC_RESPONSE:
return RpcResponse.decode(in);
default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty.protocol;

import io.netty.buffer.ByteBuf;

import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;

public class RpcResponse extends Message {
private long requestId;
private StatusCode statusCode;
private String retMessage;

public RpcResponse(long requestId, StatusCode statusCode) {
this(requestId, statusCode, null);
}

public RpcResponse(long requestId, StatusCode statusCode, String retMessage) {
this.requestId = requestId;
this.statusCode = statusCode;
this.retMessage = retMessage;
}

public StatusCode getStatusCode() {
return statusCode;
}

public String getRetMessage() {
return retMessage;
}

@Override
public String toString() {
return "RpcResponse{"
+ "requestId=" + requestId
+ ", statusCode=" + statusCode
+ ", retMessage='" + retMessage
+ '\'' + '}';
}

@Override
public int encodedLength() {
return Long.BYTES + Integer.BYTES + ByteBufUtils.encodedLength(retMessage);
}

@Override
public void encode(ByteBuf buf) {
buf.writeLong(requestId);
buf.writeInt(statusCode.ordinal());
ByteBufUtils.writeLengthAndString(buf, retMessage);
}


public static RpcResponse decode(ByteBuf buf) {
long requestId = buf.readLong();
StatusCode statusCode = StatusCode.fromCode(buf.readInt());
String retMessage = ByteBufUtils.readLengthAndString(buf);
return new RpcResponse(requestId, statusCode, retMessage);
}

public long getRequestId() {
return requestId;
}

@Override
public Type type() {
return Type.RPC_RESPONSE;
}
}
Loading