Skip to content

Commit

Permalink
HDFS-9924 POC
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jun 9, 2017
1 parent 7c9694c commit aa1623c
Show file tree
Hide file tree
Showing 11 changed files with 991 additions and 1 deletion.
1 change: 0 additions & 1 deletion hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.ipc.HdfsRpcController;
import org.apache.hadoop.hdfs.ipc.RpcClient;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.security.UserGroupInformation;

/**
*
*/
@InterfaceAudience.Private
public class AsyncDFSClient implements Closeable {

private final RpcClient rpcClient;

private final ClientNamenodeProtocol.Interface stub;

private final DfsClientConf conf;

public AsyncDFSClient(Configuration conf, InetSocketAddress addr) throws IOException {
this.conf = new DfsClientConf(conf);
rpcClient = new RpcClient();
stub = ClientNamenodeProtocol.newStub(rpcClient.createRpcChannel(ClientNamenodeProtocolPB.class,
addr, UserGroupInformation.getCurrentUser()));
}

private FsPermission applyUMaskDir(FsPermission permission) {
if (permission == null) {
permission = FsPermission.getDirDefault();
}
return FsCreateModes.applyUMask(permission, conf.getUMask());
}

public CompletableFuture<Boolean> mkdirs(String src, FsPermission permission,
boolean createParent) {
FsPermission masked = applyUMaskDir(permission);
HdfsRpcController controller = new HdfsRpcController();
MkdirsRequestProto.Builder builder = MkdirsRequestProto.newBuilder().setSrc(src)
.setMasked(PBHelperClient.convert(masked)).setCreateParent(createParent);
FsPermission unmasked = masked.getUnmasked();
if (unmasked != null) {
builder.setUnmasked(PBHelperClient.convert(unmasked));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
stub.mkdirs(controller, builder.build(), resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getException());
} else {
future.complete(resp.getResult());
}
});
return future;
}

public CompletableFuture<Optional<HdfsFileStatus>> getFileInfo(String src) {
HdfsRpcController controller = new HdfsRpcController();
CompletableFuture<Optional<HdfsFileStatus>> future = new CompletableFuture<>();
stub.getFileInfo(controller, GetFileInfoRequestProto.newBuilder().setSrc(src).build(), resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getException());
} else {
future.complete(
resp.hasFs() ? Optional.of(PBHelperClient.convert(resp.getFs())) : Optional.empty());
}
});
return future;
}

@Override
public void close() throws IOException {
rpcClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;

@InterfaceAudience.Private
public class BufferCallBeforeInitHandler extends ChannelDuplexHandler {

private enum BufferCallAction {
FLUSH, FAIL
}

public static final class BufferCallEvent {

public final BufferCallAction action;

public final IOException error;

private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
IOException error) {
this.action = action;
this.error = error;
}

public static BufferCallBeforeInitHandler.BufferCallEvent success() {
return SUCCESS_EVENT;
}

public static BufferCallBeforeInitHandler.BufferCallEvent fail(
IOException error) {
return new BufferCallEvent(BufferCallAction.FAIL, error);
}
}

private static final BufferCallEvent SUCCESS_EVENT =
new BufferCallEvent(BufferCallAction.FLUSH, null);

private final Map<Integer, Call> id2Call = new HashMap<>();

@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
if (msg instanceof Call) {
Call call = (Call) msg;
id2Call.put(call.id, call);
// The call is already in track so here we set the write operation as
// success.
// We will fail the call directly if we can not write it out.
promise.trySuccess();
} else {
ctx.write(msg, promise);
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof BufferCallEvent) {
BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
switch (bcEvt.action) {
case FLUSH:
for (Call call : id2Call.values()) {
ctx.write(call);
}
break;
case FAIL:
for (Call call : id2Call.values()) {
call.setException(bcEvt.error);
}
break;
}
ctx.flush();
ctx.pipeline().remove(this);
} else {
ctx.fireUserEventTriggered(evt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;

import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
*
*/
@InterfaceAudience.Private
class Call {
final int id;

final String protocolName;

final long protocolVersion;

final String methodName;

final Message param;

final Message responseDefaultType;

Message response;

IOException error;

boolean done;

final RpcCallback<Call> callback;

public Call(int id, String protocolName, long protocolVersion,
String methodName, Message param, Message responseDefaultType,
RpcCallback<Call> callback) {
this.id = id;
this.protocolName = protocolName;
this.protocolVersion = protocolVersion;
this.methodName = methodName;
this.param = param;
this.responseDefaultType = responseDefaultType;
this.callback = callback;
}

private void callComplete() {
callback.run(this);
}

/**
* Set the exception when there is an error. Notify the caller the call is
* done.
*
* @param error exception thrown by the call; either local or remote
*/
public void setException(IOException error) {
synchronized (this) {
if (done) {
return;
}
this.done = true;
this.error = error;
}
callComplete();
}

/**
* Set the return value when there is no error. Notify the caller the call is
* done.
*
* @param response return value of the call.
* @param cells Can be null
*/
public void setResponse(Message response) {
synchronized (this) {
if (done) {
return;
}
this.done = true;
this.response = response;
}
callComplete();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.ipc;

import java.net.InetSocketAddress;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;

/**
*
*/
@InterfaceAudience.Private
class ConnectionId {

private static final int PRIME = 16777619;

final UserGroupInformation ticket;
final String protocolName;
final InetSocketAddress address;

public ConnectionId(UserGroupInformation ticket, String protocolName,
InetSocketAddress address) {
this.ticket = ticket;
this.protocolName = protocolName;
this.address = address;
}

@Override
public int hashCode() {
int h = ticket == null ? 0 : ticket.hashCode();
h = PRIME * h + protocolName.hashCode();
h = PRIME * h + address.hashCode();
return h;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket)) &&
protocolName.equals(id.protocolName);
}
return false;
}
}

0 comments on commit aa1623c

Please sign in to comment.