Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,60 @@ public class OzoneClientConfig {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneClientConfig.class);

public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
public static final String OZONE_DOMAIN_SOCKET_PATH = "ozone.domain.socket.path";
public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = "/var/lib/ozone/dn_socket";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use existing directories like ozone.metadata.dirs or hdds.metadata.dir

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO. This OZONE_DOMAIN_SOCKET_PATH_DEFAULT file will be created by DomainSocket process. It cannot be an existing file. Also Hadoop native library has different permission requirement on this path.

https://cwiki.apache.org/confluence/display/HADOOP2/SocketPathSecurity

public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit.";
public static final short DATA_TRANSFER_VERSION = 28;
public static final byte DATA_TRANSFER_MAGIC_CODE = 99;

@Config(key = "read.short-circuit",
defaultValue = "false",
type = ConfigType.BOOLEAN,
description = "Whether read short-circuit is enabled or not",
tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT;

@Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size",
defaultValue = "128KB",
type = ConfigType.SIZE,
description = "Buffer size of reader/writer.",
tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
private int shortCircuitBufferSize = 128 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDFS has default short circuit buffer size of 1MB.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the different values. Bigger buffer size doesn't help to improve the performance.
The short-circuit channel only exchange getBlock request and response. The value is determined by how big will a request and a response of a 256MB block size, 4MB chunk size, 16KB checksum size block. The request is round 500 bytes, and the response is around
30 + 53 * 64 + 6 * 16384 ~ 100k
block size (exclude chunks) - 30 bytes
chunk size (one checksums) - 53 bytes
one checksum size - 6 bytes


@Config(key = SHORT_CIRCUIT_PREFIX + "disable.interval",
defaultValue = "600",
type = ConfigType.LONG,
description = "If some unknown IO error happens on Domain socket read, short circuit read will be disabled " +
"temporarily for this period of time(seconds).",
tags = { ConfigTag.CLIENT })
private long shortCircuitReadDisableInterval = 60 * 10;

public boolean isShortCircuitEnabled() {
return shortCircuitEnabled;
}

public void setShortCircuit(boolean enabled) {
shortCircuitEnabled = enabled;
}


public int getShortCircuitBufferSize() {
return shortCircuitBufferSize;
}

public void setShortCircuitBufferSize(int size) {
this.shortCircuitBufferSize = size;
}

public long getShortCircuitReadDisableInterval() {
return shortCircuitReadDisableInterval;
}

public void setShortCircuitReadDisableInterval(long value) {
shortCircuitReadDisableInterval = value;
}

/**
* Enum for indicating what mode to use when combining chunk and block
* checksums to define an aggregate FileChecksum. This should be considered
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;

import org.apache.hadoop.net.unix.DomainSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;

/**
* Represents a peer that we communicate with by using blocking I/O
* on a UNIX domain socket.
*/
public class DomainPeer implements Closeable {
private final DomainSocket socket;
private final OutputStream out;
private final InputStream in;
private final ReadableByteChannel channel;
public static final Logger LOG = LoggerFactory.getLogger(DomainPeer.class);

public DomainPeer(DomainSocket socket) {
this.socket = socket;
this.out = socket.getOutputStream();
this.in = socket.getInputStream();
this.channel = socket.getChannel();
}

public ReadableByteChannel getInputStreamChannel() {
return channel;
}

public void setReadTimeout(int timeoutMs) throws IOException {
socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
}

public int getReceiveBufferSize() throws IOException {
return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
}

public void setWriteTimeout(int timeoutMs) throws IOException {
socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
}

public boolean isClosed() {
return !socket.isOpen();
}

public void close() throws IOException {
socket.close();
LOG.info("{} is closed", socket);
}

public String getRemoteAddressString() {
return "unix:{" + socket.toString() + "}";
}

public String getLocalAddressString() {
return "<local>";
}

public InputStream getInputStream() throws IOException {
return in;
}

public OutputStream getOutputStream() throws IOException {
return out;
}

@Override
public String toString() {
return "DomainPeer(" + getRemoteAddressString() + ")";
}

public DomainSocket getDomainSocket() {
return socket;
}

public boolean hasSecureChannel() {
//
// Communication over domain sockets is assumed to be secure, since it
// doesn't pass over any network. We also carefully control the privileges
// that can be used on the domain socket inode and its parent directories.
// See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
// for details.
//
// So unless you are running as root or the user launches the service, you cannot
// launch a man-in-the-middle attach on UNIX domain socket traffic.
//
return true;
}
}
Loading