Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Update from FB trunk on 4/8/11

  • Loading branch information...
commit fe528a0e9b8e732707846e542cd0acbf2c8a870f 1 parent b6449e4
jgray authored
Showing with 1,594 additions and 409 deletions.
  1. +1 −1  CHANGES.txt
  2. +2 −3 NOTICE.txt
  3. +9 −5 bin/hadoop
  4. +8 −4 bin/hadoop-daemon.sh
  5. +8 −0 build.xml
  6. +2 −2 ivy/libraries.properties
  7. +0 −1  src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
  8. +1 −0  src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
  9. +9 −1 src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/resourceutilization/Collector.java
  10. +8 −0 src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/resourceutilization/CollectorCached.java
  11. +47 −1 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/DistributedAvatarFileSystem.java
  12. +10 −3 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeProtocols.java
  13. +9 −1 src/contrib/hmon/src/java/org/apache/hadoop/mapred/UtilizationCollector.java
  14. +8 −0 src/contrib/hmon/src/java/org/apache/hadoop/mapred/UtilizationCollectorCached.java
  15. +7 −0 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
  16. +17 −0 src/core/org/apache/hadoop/fs/DU.java
  17. +32 −13 src/core/org/apache/hadoop/fs/Trash.java
  18. +25 −25 src/core/org/apache/hadoop/io/compress/CodecPool.java
  19. +1 −0  src/core/org/apache/hadoop/ipc/Client.java
  20. +157 −33 src/core/org/apache/hadoop/ipc/RPC.java
  21. +11 −0 src/core/org/apache/hadoop/ipc/VersionedProtocol.java
  22. +17 −20 src/core/org/apache/hadoop/metrics/util/MetricsRegistry.java
  23. +29 −0 src/core/org/apache/hadoop/util/DataChecksum.java
  24. +343 −67 src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
  25. +17 −6 src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
  26. +6 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
  27. +26 −10 src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  28. +20 −23 src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  29. +4 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
  30. +7 −2 src/hdfs/org/apache/hadoop/hdfs/protocol/ProtocolCompatible.java
  31. +11 −0 src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
  32. +4 −4 src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
  33. +20 −6 src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  34. +27 −2 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  35. +43 −6 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  36. +4 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  37. +6 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  38. +29 −2 src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
  39. +4 −0 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  40. +54 −6 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  41. +124 −20 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  42. +93 −31 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  43. +2 −0  src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
  44. +0 −2  src/hdfs/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
  45. +15 −4 src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
  46. +28 −4 src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  47. +3 −1 src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  48. +1 −1  src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
  49. +19 −11 src/hdfs/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
  50. +5 −1 src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java
  51. +10 −6 src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  52. +24 −11 src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  53. +7 −0 src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  54. +7 −0 src/mapred/org/apache/hadoop/mapred/JobTracker.java
  55. +15 −2 src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  56. +7 −0 src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  57. +13 −0 src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  58. +7 −10 src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
  59. +12 −1 src/test/org/apache/hadoop/hdfs/TestFileCorruption.java
  60. +37 −0 src/test/org/apache/hadoop/hdfs/TestFileCreation.java
  61. +44 −38 src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
  62. +5 −0 src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  63. +46 −16 src/test/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
  64. +1 −1  src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  65. +18 −0 src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
  66. +6 −0 src/test/org/apache/hadoop/ipc/TestRPC.java
  67. +2 −2 src/test/org/apache/hadoop/net/TestInetSocketAddressFactory.java
View
2  CHANGES.txt
@@ -1,7 +1,7 @@
Hadoop Change Log
Titan
-
+ HDFS-1539 Prevent data loss when a cluster suffers a power loss.
HDFS-200. In HDFS, sync() not yet guarantees data available to the new
readers
View
5 NOTICE.txt
@@ -1,7 +1,6 @@
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).
-This product includes software developed by Yahoo! Inc.,
-powering the largest Hadoop clusters in the Universe!
-(http://developer.yahoo.com/hadoop).
+This product includes software developed by Facebook.
+(http://github.com/facebook).
View
14 bin/hadoop
@@ -180,19 +180,23 @@ if [ "$HADOOP_POLICYFILE" = "" ]; then
HADOOP_POLICYFILE="hadoop-policy.xml"
fi
+if [ "$HADOOP_GC_LOG_OPTS" != "" ]; then
+ HADOOP_GC_LOG_OPTS="${HADOOP_GC_LOG_OPTS}$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$COMMAND-gc.log"
+fi
+
# restore ordinary behaviour
unset IFS
# figure out which class to run
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS $HADOOP_NAMENODE_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
elif [ "$COMMAND" = "datanode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode'
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_DATANODE_OPTS"
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS $HADOOP_DATANODE_OPTS"
elif [ "$COMMAND" = "fs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
@@ -216,10 +220,10 @@ elif [ "$COMMAND" = "jmxget" ] ; then
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "jobtracker" ] ; then
CLASS=org.apache.hadoop.mapred.JobTracker
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOBTRACKER_OPTS"
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS $HADOOP_JOBTRACKER_OPTS"
elif [ "$COMMAND" = "tasktracker" ] ; then
CLASS=org.apache.hadoop.mapred.TaskTracker
- HADOOP_OPTS="$HADOOP_OPTS $HADOOP_TASKTRACKER_OPTS"
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_GC_LOG_OPTS $HADOOP_TASKTRACKER_OPTS"
elif [ "$COMMAND" = "job" ] ; then
CLASS=org.apache.hadoop.mapred.JobClient
elif [ "$COMMAND" = "queue" ] ; then
View
12 bin/hadoop-daemon.sh
@@ -49,18 +49,18 @@ shift
hadoop_rotate_log ()
{
- log=$1;
+ rlog=$1;
num=5;
if [ -n "$2" ]; then
num=$2
fi
- if [ -f "$log" ]; then # rotate logs
+ if [ -f "$rlog" ]; then # rotate logs
while [ $num -gt 1 ]; do
prev=`expr $num - 1`
- [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+ [ -f "$rlog.$prev" ] && mv "$rlog.$prev" "$rlog.$num"
num=$prev
done
- mv "$log" "$log.$num";
+ mv "$rlog" "$rlog.$num";
fi
}
@@ -93,6 +93,7 @@ export HADOOP_LOGFILE=hadoop-$HADOOP_IDENT_STRING-$command-$HOSTNAME.log
export HADOOP_ROOT_LOGGER="INFO,DRFA"
log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-$HOSTNAME.out
pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pid
+gc_log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-gc.log
# Set default scheduling priority
if [ "$HADOOP_NICENESS" = "" ]; then
@@ -118,6 +119,7 @@ case $startStop in
fi
hadoop_rotate_log $log
+ hadoop_rotate_log $gc_log
echo starting $command, logging to $log
cd "$HADOOP_HOME"
nohup nice -n $HADOOP_NICENESS "$HADOOP_HOME"/bin/hadoop --config $HADOOP_CONF_DIR $command "$@" > "$log" 2>&1 < /dev/null &
@@ -133,6 +135,8 @@ case $startStop in
kill `cat $pid`
else
echo no $command to stop
+ #we found a pidfile, but cant kill -0 the process, nuke the file.
+ rm $pid
fi
else
echo no $command to stop
View
8 build.xml
@@ -1277,6 +1277,14 @@
<delete dir="${build.dir}"/>
<delete dir="${docs.src}/build"/>
<delete file="${jdiff.xml.dir}/hadoop_${version}.xml"/>
+ <delete file="${conf.dir}/hdfs-site.xml"/>
+ <delete file="${conf.dir}/core-site.xml"/>
+ <delete file="${conf.dir}/hadoop-policy.xml"/>
+ <delete file="${conf.dir}/capacity-scheduler.xml"/>
+ <delete file="${conf.dir}/mapred-site.xml"/>
+ <delete file="${conf.dir}/mapred-queue-acls.xml"/>
+ <delete file="${conf.dir}/slaves"/>
+ <delete file="${conf.dir}/masters"/>
</target>
<!-- ================================================================== -->
View
4 ivy/libraries.properties
@@ -46,9 +46,9 @@ jasper.version=5.5.12
jsp.version=2.1
jsp-api.version=5.5.12
jets3t.version=0.6.1
-jetty.version=6.1.24
+jetty.version=6.1.26
jetty.jsp.version=6.1.14
-jetty-util.version=6.1.24
+jetty-util.version=6.1.26
junit.version=4.5
jdiff.version=1.0.9
json.version=1.0
View
1  src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
@@ -392,7 +392,6 @@ boolean hasSpeculativeTask(long currentTime, double averageProgress) {
return false;
}
- @Override
public boolean isRunning() {
return !activeTasks.isEmpty();
}
View
1  src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
View
10 ...ontrib/fairscheduler/src/java/org/apache/hadoop/mapred/resourceutilization/Collector.java
@@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
@@ -465,6 +466,13 @@ public long getProtocolVersion(String protocol,
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* main program to run on the Collector server
*/
@@ -480,4 +488,4 @@ public static void main(String argv[]) throws Exception {
System.exit(-1);
}
}
-}
+}
View
8 .../fairscheduler/src/java/org/apache/hadoop/mapred/resourceutilization/CollectorCached.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.Daemon;
@@ -220,4 +221,11 @@ public long getProtocolVersion(String protocol,
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
}
View
48 ...contrib/highavailability/src/java/org/apache/hadoop/hdfs/DistributedAvatarFileSystem.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC.VersionIncompatible;
import org.apache.hadoop.security.AccessControlException;
import org.apache.zookeeper.WatchedEvent;
@@ -176,6 +177,16 @@ public synchronized boolean isDown() {
}
@Override
+ public int getDataTransferProtocolVersion() throws IOException {
+ return (new ImmutableFSCaller<Integer>() {
+ @Override
+ Integer call() throws IOException {
+ return namenode.getDataTransferProtocolVersion();
+ }
+ }).callFS();
+ }
+
+ @Override
public void abandonBlock(final Block b, final String src,
final String holder) throws IOException {
(new MutableFSCaller<Boolean>() {
@@ -629,6 +640,16 @@ Boolean call(int r) throws IOException {
}
@Override
+ public void saveNamespace(final boolean force) throws IOException {
+ (new MutableFSCaller<Boolean>() {
+ Boolean call(int r) throws IOException {
+ namenode.saveNamespace(force);
+ return true;
+ }
+ }).callFS();
+ }
+
+ @Override
public void setOwner(final String src, final String username,
final String groupname) throws IOException {
(new MutableFSCaller<Boolean>() {
@@ -708,6 +729,20 @@ Long call() throws IOException {
}
@Override
+ public ProtocolSignature getProtocolSignature(final String protocol,
+ final long clientVersion, final int clientMethodsHash) throws IOException {
+ return (new ImmutableFSCaller<ProtocolSignature>() {
+
+ @Override
+ ProtocolSignature call() throws IOException {
+ return namenode.getProtocolSignature(
+ protocol, clientVersion, clientMethodsHash);
+ }
+
+ }).callFS();
+ }
+
+ @Override
public void recoverLease(final String src, final String clientName)
throws IOException {
// Treating this as immutable
@@ -718,8 +753,19 @@ Boolean call() throws IOException {
}
}).callFS();
}
+
+ @Override
+ public boolean closeRecoverLease(final String src, final String clientName)
+ throws IOException {
+ // Treating this as immutable
+ return (new ImmutableFSCaller<Boolean>() {
+ Boolean call() throws IOException {
+ return namenode.closeRecoverLease(src, clientName);
+ }
+ }).callFS();
+ }
+
}
-
private boolean shouldHandleException(IOException ex) {
if (ex.getMessage().contains("java.io.EOFException")) {
return true;
View
13 ...b/highavailability/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeProtocols.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ProtocolSignature;
/**********************************************************************
* Protocol that a DFS datanode uses to communicate with the NameNode.
@@ -95,7 +96,13 @@ public long getProtocolVersion(String protocol,
return lastProt; // all objects have the same version
}
-
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* This method should not be invoked on the composite
* DatanodeProtocols object. You can call these on the individual
@@ -189,11 +196,11 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
}
/** {@inheritDoc} */
- public long nextGenerationStamp(Block block) throws IOException {
+ public long nextGenerationStamp(Block block, boolean fromNN) throws IOException {
IOException last = new IOException("No DatanodeProtocol found.");
for (int i = 0; i < numProtocol; i++) {
try {
- return node[i].nextGenerationStamp(block);
+ return node[i].nextGenerationStamp(block, fromNN);
} catch (IOException e) {
last = e;
LOG.info("Server " + node[i] + " " +
View
10 src/contrib/hmon/src/java/org/apache/hadoop/mapred/UtilizationCollector.java
@@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.net.NetUtils;
@@ -466,6 +467,13 @@ public long getProtocolVersion(String protocol,
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* main program to run on the Collector server
*/
@@ -481,4 +489,4 @@ public static void main(String argv[]) throws Exception {
System.exit(-1);
}
}
-}
+}
View
8 src/contrib/hmon/src/java/org/apache/hadoop/mapred/UtilizationCollectorCached.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.Daemon;
@@ -221,4 +222,11 @@ public long getProtocolVersion(String protocol,
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
}
View
7 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -184,6 +184,13 @@ public long getProtocolVersion(String protocol,
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* Wait for service to finish.
* (Normally, it runs forever.)
View
17 src/core/org/apache/hadoop/fs/DU.java
@@ -64,6 +64,23 @@ public DU(File path, Configuration conf) throws IOException {
//10 minutes default refresh interval
}
+ private long getUsedSpace(File root) {
+ long result = 0;
+ if (root.isFile())
+ return root.length();
+
+ for (File sub : root.listFiles()) {
+ result += getUsedSpace(sub);
+ }
+ return result;
+ }
+
+ @Override
+ protected void run() throws IOException {
+ File rootDir = new File(getDirPath());
+ this.used.set(getUsedSpace(rootDir));
+ }
+
/**
* This thread refreshes the "used" variable.
*
View
45 src/core/org/apache/hadoop/fs/Trash.java
@@ -47,13 +47,14 @@
private static final FsPermission PERMISSION =
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
- private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm");
+ private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
private static final int MSECS_PER_MINUTE = 60*1000;
private final FileSystem fs;
private final Path trash;
private final Path current;
- private final long interval;
+ private final long deletionInterval;
+ private final Path homesParent;
/** Construct a trash can accessor.
* @param conf a Configuration
@@ -69,16 +70,20 @@ public Trash(FileSystem fs, Configuration conf) throws IOException {
super(conf);
this.fs = fs;
this.trash = new Path(fs.getHomeDirectory(), TRASH);
+ this.homesParent = fs.getHomeDirectory().getParent();
this.current = new Path(trash, CURRENT);
- this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
+ this.deletionInterval = (long) (conf.getFloat("fs.trash.interval", 60) *
+ MSECS_PER_MINUTE);
}
private Trash(Path home, Configuration conf) throws IOException {
super(conf);
this.fs = home.getFileSystem(conf);
this.trash = new Path(home, TRASH);
+ this.homesParent = home.getParent();
this.current = new Path(trash, CURRENT);
- this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
+ this.deletionInterval = (long) (conf.getFloat("fs.trash.interval", 60) *
+ MSECS_PER_MINUTE);
}
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
@@ -89,7 +94,7 @@ private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
* @return false if the item is already in the trash or trash is disabled
*/
public boolean moveToTrash(Path path) throws IOException {
- if (interval == 0)
+ if (deletionInterval == 0)
return false;
if (!path.isAbsolute()) // make path absolute
@@ -186,7 +191,7 @@ public void expunge() throws IOException {
continue;
}
- if ((now - interval) > time) {
+ if ((now - deletionInterval) > time) {
if (fs.delete(path, true)) {
LOG.info("Deleted trash checkpoint: "+dir);
} else {
@@ -211,26 +216,40 @@ public Runnable getEmptier() throws IOException {
return new Emptier(getConf());
}
- private static class Emptier implements Runnable {
+ private class Emptier implements Runnable {
private Configuration conf;
private FileSystem fs;
- private long interval;
+ private long emptierInterval;
public Emptier(Configuration conf) throws IOException {
this.conf = conf;
- this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
- this.fs = FileSystem.get(conf);
+ this.emptierInterval = (long)
+ (conf.getFloat("fs.trash.checkpoint.interval", 0) *
+ MSECS_PER_MINUTE);
+ if (this.emptierInterval > deletionInterval ||
+ this.emptierInterval == 0) {
+ LOG.warn("The configured interval for checkpoint is " +
+ this.emptierInterval + " minutes." +
+ " Using interval of " + deletionInterval +
+ " minutes that is used for deletion instead");
+ this.emptierInterval = deletionInterval;
+ }
+ // This ensuers that deletion coming from Emptier will
+ // skip deleteUsingTrash
+ Configuration newConf = new Configuration(conf);
+ newConf.set("fs.shell.delete.classname", "Emptier");
+ this.fs = FileSystem.get(newConf);
}
public void run() {
- if (interval == 0)
+ if (emptierInterval == 0)
return; // trash disabled
long now = System.currentTimeMillis();
long end;
while (true) {
- end = ceiling(now, interval);
+ end = ceiling(now, emptierInterval);
try { // sleep for interval
Thread.sleep(end - now);
} catch (InterruptedException e) {
@@ -243,7 +262,7 @@ public void run() {
FileStatus[] homes = null;
try {
- homes = fs.listStatus(HOMES); // list all home dirs
+ homes = fs.listStatus(homesParent); // list all home dirs
} catch (IOException e) {
LOG.warn("Trash can't list homes: "+e+" Sleeping.");
continue;
View
50 src/core/org/apache/hadoop/io/compress/CodecPool.java
@@ -20,7 +20,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,51 +37,51 @@
* A global compressor pool used to save the expensive
* construction/destruction of (possibly native) decompression codecs.
*/
- private static final Map<Class<Compressor>, List<Compressor>> compressorPool =
- new HashMap<Class<Compressor>, List<Compressor>>();
+ private static final ConcurrentHashMap<Class<Compressor>,
+ List<Compressor>> compressorPool =
+ new ConcurrentHashMap<Class<Compressor>, List<Compressor>>();
/**
* A global decompressor pool used to save the expensive
* construction/destruction of (possibly native) decompression codecs.
*/
- private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool =
- new HashMap<Class<Decompressor>, List<Decompressor>>();
+ private static final ConcurrentHashMap<Class<Decompressor>,
+ List<Decompressor>> decompressorPool =
+ new ConcurrentHashMap<Class<Decompressor>, List<Decompressor>>();
- private static <T> T borrow(Map<Class<T>, List<T>> pool,
+ private static <T> T borrow(ConcurrentHashMap<Class<T>, List<T>> pool,
Class<? extends T> codecClass) {
T codec = null;
// Check if an appropriate codec is available
- synchronized (pool) {
- if (pool.containsKey(codecClass)) {
- List<T> codecList = pool.get(codecClass);
+ List<T> codecList = pool.get(codecClass);
- if (codecList != null) {
- synchronized (codecList) {
- if (!codecList.isEmpty()) {
- codec = codecList.remove(codecList.size()-1);
- }
- }
+ if (codecList != null) {
+ synchronized (codecList) {
+ int size = codecList.size();
+ if (size != 0) {
+ codec = codecList.remove(size-1);
}
}
}
-
return codec;
}
- private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+ private static <T> void payback(ConcurrentHashMap<Class<T>, List<T>> pool,
+ T codec) {
if (codec != null) {
Class<T> codecClass = ReflectionUtils.getClass(codec);
- synchronized (pool) {
- if (!pool.containsKey(codecClass)) {
- pool.put(codecClass, new ArrayList<T>());
- }
-
- List<T> codecList = pool.get(codecClass);
- synchronized (codecList) {
- codecList.add(codec);
+ List<T> codecList = pool.get(codecClass);
+ if (codecList == null) {
+ codecList = new ArrayList<T>();
+ List<T> old = pool.putIfAbsent(codecClass, codecList);
+ if (old != null) {
+ codecList = old;
}
}
+ synchronized (codecList) {
+ codecList.add(codec);
+ }
}
}
View
1  src/core/org/apache/hadoop/ipc/Client.java
@@ -585,6 +585,7 @@ private void receiveResponse() {
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
+ calls.remove(id);
} else if (state == Status.FATAL.state) {
// Close the connection
markClosed(new RemoteException(WritableUtils.readString(in),
View
190 src/core/org/apache/hadoop/ipc/RPC.java
@@ -202,13 +202,16 @@ private void stopClient(Client client) {
private Client client;
private boolean isClosed = false;
final private int rpcTimeout;
+ final private Class<?> protocol;
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) {
+ Configuration conf, SocketFactory factory, int rpcTimeout,
+ Class<?> protocol) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
+ this.protocol = protocol;
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -221,7 +224,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
- method.getDeclaringClass(), ticket, rpcTimeout);
+ protocol, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -321,12 +324,43 @@ public VersionedProtocol getProxy() {
}
}
- public static VersionedProtocol waitForProxy(Class<?> protocol,
+ public static <T extends VersionedProtocol> T waitForProxy(
+ Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
) throws IOException {
- return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
+ }
+
+ public static <T extends VersionedProtocol> ProtocolProxy<T> waitForProtocolProxy(
+ Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf
+ ) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+ }
+
+ public static <T extends VersionedProtocol> T waitForProxy(
+ Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf,
+ long timeout
+ ) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf, timeout).
+ getProxy();
+ }
+
+ public static <T extends VersionedProtocol> ProtocolProxy<T> waitForProtocolProxy(
+ Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf,
+ long timeout
+ ) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf, timeout, 0);
}
/**
@@ -336,15 +370,18 @@ public static VersionedProtocol waitForProxy(Class<?> protocol,
* @param addr remote address
* @param conf configuration to use
* @param connTimeout time in milliseconds before giving up
+ * @param rpcTimeout timeout for each RPC
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
- static VersionedProtocol waitForProxy(Class<?> protocol,
+ static <T extends VersionedProtocol> T waitForProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
- long connTimeout) throws IOException {
- return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ long connTimeout,
+ int rpcTimeout) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf,
+ connTimeout, rpcTimeout).getProxy();
}
/**
@@ -358,11 +395,14 @@ static VersionedProtocol waitForProxy(Class<?> protocol,
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
- public static VersionedProtocol waitForProxy(Class<?> protocol,
- long clientVersion,
- InetSocketAddress addr, Configuration conf,
- int rpcTimeout,
- long timeout) throws IOException {
+ static <T extends VersionedProtocol> ProtocolProxy<T> waitForProtocolProxy(
+ Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf,
+ long timeout,
+ int rpcTimeout
+ ) throws IOException {
long startTime = System.currentTimeMillis();
UserGroupInformation ugi = null;
try {
@@ -373,9 +413,9 @@ public static VersionedProtocol waitForProxy(Class<?> protocol,
IOException ioe;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr,
- ugi, conf,
- NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+ return getProtocolProxy(protocol, clientVersion, addr,
+ ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
ioe = se;
@@ -399,7 +439,19 @@ public static VersionedProtocol waitForProxy(Class<?> protocol,
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static VersionedProtocol getProxy(Class<?> protocol,
+ public static <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol,
+ long clientVersion, InetSocketAddress addr,
+ Configuration conf, SocketFactory factory) throws IOException {
+ return getProtocolProxy(protocol, clientVersion,
+ addr, conf, factory).getProxy();
+ }
+
+ /** Construct a client-side protocol proxy that contains a set of server
+ * methods and a proxy object implementing the named protocol,
+ * talking to a server at the named address. */
+ public static <T extends VersionedProtocol> ProtocolProxy<T> getProtocolProxy(
+ Class<T> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = null;
@@ -408,15 +460,27 @@ public static VersionedProtocol getProxy(Class<?> protocol,
} catch (LoginException le) {
throw new RuntimeException("Couldn't login!");
}
- return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
+ return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory);
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static VersionedProtocol getProxy(Class<?> protocol,
+ public static <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol,
+ long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory) throws IOException {
+ return getProtocolProxy(protocol, clientVersion,
+ addr, ticket, conf, factory).getProxy();
+ }
+
+ /** Construct a client-side protocol proxy that contains a set of server
+ * methods and a proxy object implementing the named protocol,
+ * talking to a server at the named address. */
+ public static <T extends VersionedProtocol> ProtocolProxy<T> getProtocolProxy(
+ Class<T> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+ return getProtocolProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
}
/**
@@ -433,24 +497,65 @@ public static VersionedProtocol getProxy(Class<?> protocol,
* @return the proxy
* @throws IOException if any error occurs
*/
- public static VersionedProtocol getProxy(Class<?> protocol,
+ public static <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
+ return getProtocolProxy(protocol, clientVersion, addr, ticket,
+ conf, factory, rpcTimeout).getProxy();
+ }
+
+ /**
+ * Construct a client-side proxy that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol protocol
+ * @param clientVersion client's version
+ * @param addr server address
+ * @param ticket security ticket
+ * @param conf configuration
+ * @param factory socket factory
+ * @param rpcTimeout max time for each rpc; 0 means no timeout
+ * @return the proxy
+ * @throws IOException if any error occurs
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends VersionedProtocol> ProtocolProxy<T> getProtocolProxy(
+ Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf,
+ SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ T proxy = (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory, rpcTimeout));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- } else {
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion, proxy);
+ new Invoker(addr, ticket, conf, factory, rpcTimeout, protocol));
+ String protocolName = protocol.getName();
+
+ try {
+ ProtocolSignature serverInfo = proxy
+ .getProtocolSignature(protocolName, clientVersion,
+ ProtocolSignature.getFingerprint(protocol.getMethods()));
+ return new ProtocolProxy<T>(protocol, proxy, serverInfo.getMethods());
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException(IOException.class);
+ if (ioe.getMessage().startsWith(IOException.class.getName() + ": "
+ + NoSuchMethodException.class.getName())) {
+ // Method getProtocolSignature not supported
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
+ if (serverVersion == clientVersion) {
+ return new ProtocolProxy<T>(protocol, proxy, null);
+ }
+ throw new VersionMismatch(protocolName, clientVersion,
+ serverVersion, proxy);
+ }
+ throw re;
}
}
@@ -464,19 +569,38 @@ public static VersionedProtocol getProxy(Class<?> protocol,
* @return a proxy instance
* @throws IOException
*/
- public static VersionedProtocol getProxy(Class<?> protocol,
+ public static <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf)
+ throws IOException {
+ return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
+ }
+
+ /**
+ * Construct a client-side proxy object with the default SocketFactory
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @return a proxy instance
+ * @throws IOException
+ */
+ public static <T extends VersionedProtocol> ProtocolProxy<T> getProtocolProxy(
+ Class<T> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
- return getProxy(protocol, clientVersion, addr, conf, NetUtils
+ return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf));
}
/**
* Stop this proxy and release its invoker's resource
+ * @param <T>
* @param proxy the proxy to be stopped
*/
- public static void stopProxy(VersionedProtocol proxy) {
+ public static <T extends VersionedProtocol> void stopProxy(T proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
View
11 src/core/org/apache/hadoop/ipc/VersionedProtocol.java
@@ -38,4 +38,15 @@
*/
public long getProtocolVersion(String protocol, long clientVersion)
throws RPC.VersionIncompatible, IOException;
+
+ /**
+ * Return protocol version corresponding to protocol interface.
+ * @param protocol The classname of the protocol interface
+ * @param clientVersion The version of the protocol that the client speaks
+ * @param clientMethodsHash the hashcode of client protocol methods
+ * @return the version that the server will speak
+ */
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion,
+ int clientMethodsHash) throws IOException;
}
View
37 src/core/org/apache/hadoop/metrics/util/MetricsRegistry.java
@@ -18,68 +18,65 @@
package org.apache.hadoop.metrics.util;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
- *
+ *
* This is the registry for metrics.
* Related set of metrics should be declared in a holding class and registered
* in a registry for those metrics which is also stored in the the holding class.
*
*/
public class MetricsRegistry {
- private Map<String, MetricsBase> metricsList = new HashMap<String, MetricsBase>();
+ private ConcurrentHashMap<String, MetricsBase> metricsList =
+ new ConcurrentHashMap<String, MetricsBase>();
public MetricsRegistry() {
}
-
+
/**
- *
+ *
* @return number of metrics in the registry
*/
public int size() {
return metricsList.size();
}
-
+
/**
* Add a new metrics to the registry
* @param metricsName - the name
* @param theMetricsObj - the metrics
* @throws IllegalArgumentException if a name is already registered
*/
- public synchronized void add(final String metricsName, final MetricsBase theMetricsObj) {
- if (metricsList.containsKey(metricsName)) {
+ public void add(final String metricsName, final MetricsBase theMetricsObj) {
+ if (metricsList.putIfAbsent(metricsName, theMetricsObj) != null) {
throw new IllegalArgumentException("Duplicate metricsName:" + metricsName);
}
- metricsList.put(metricsName, theMetricsObj);
}
-
/**
- *
+ *
* @param metricsName
* @return the metrics if there is one registered by the supplied name.
* Returns null if none is registered
*/
- public synchronized MetricsBase get(final String metricsName) {
+ public MetricsBase get(final String metricsName) {
return metricsList.get(metricsName);
}
-
-
+
/**
- *
+ *
* @return the list of metrics names
*/
- public synchronized Collection<String> getKeyList() {
+ public Collection<String> getKeyList() {
return metricsList.keySet();
}
-
+
/**
- *
+ *
* @return the list of metrics
*/
- public synchronized Collection<MetricsBase> getMetricsList() {
+ public Collection<MetricsBase> getMetricsList() {
return metricsList.values();
}
}
View
29 src/core/org/apache/hadoop/util/DataChecksum.java
@@ -57,6 +57,24 @@ public static DataChecksum newDataChecksum( int type, int bytesPerChecksum ) {
return null;
}
}
+
+ // This constructor uses the specified summer instance
+ public static DataChecksum newDataChecksum( int type, int bytesPerChecksum, Checksum sum ) {
+ if ( bytesPerChecksum <= 0 ) {
+ return null;
+ }
+
+ switch ( type ) {
+ case CHECKSUM_NULL :
+ return new DataChecksum( CHECKSUM_NULL, new ChecksumNull(),
+ CHECKSUM_NULL_SIZE, bytesPerChecksum );
+ case CHECKSUM_CRC32 :
+ return new DataChecksum( CHECKSUM_CRC32, sum,
+ CHECKSUM_CRC32_SIZE, bytesPerChecksum );
+ default:
+ return null;
+ }
+ }
/**
* Creates a DataChecksum from HEADER_LEN bytes from arr[offset].
@@ -90,6 +108,17 @@ public static DataChecksum newDataChecksum( DataInputStream in )
}
return summer;
}
+ public static DataChecksum newDataChecksum( DataInputStream in, Checksum sum )
+ throws IOException {
+ int type = in.readByte();
+ int bpc = in.readInt();
+ DataChecksum summer = newDataChecksum( type, bpc, sum);
+ if ( summer == null ) {
+ throw new IOException( "Could not create DataChecksum of type " +
+ type + " with bytesPerChecksum " + bpc );
+ }
+ return summer;
+ }
/**
* Writes the checksum header to the output stream <i>out</i>.
View
410 src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
+import org.apache.hadoop.hdfs.metrics.DFSClientMetrics;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -73,7 +74,9 @@
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
public ClientProtocol namenode;
- private final ClientProtocol rpcNamenode;
+ private ClientProtocol rpcNamenode;
+ // Namenode proxy that supports method-based compatibility
+ private ProtocolProxy<ClientProtocol> namenodeProtocolProxy = null;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
Random r = new Random();
@@ -98,6 +101,11 @@
*/
private volatile boolean serverSupportsHdfs630 = true;
private long namenodeVersion = ClientProtocol.versionID;
+ private DFSClientMetrics metrics = new DFSClientMetrics();
+ private boolean shortCircuitLocalReads = false;
+ private final InetAddress localHost;
+
+ private Integer dataTransferVersion = -1;
/**
* The locking hierarchy is to first acquire lock on DFSClient object, followed by
@@ -111,7 +119,7 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
try {
return createNamenode(createRPCNamenode(nameNodeAddr, conf,
- UnixUserGroupInformation.login(conf, true)));
+ UnixUserGroupInformation.login(conf, true)).getProxy());
} catch (LoginException e) {
throw (IOException)(new IOException().initCause(e));
}
@@ -126,12 +134,13 @@ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
* @param ugi ticket
* @return a NameNode proxy that's compatible with the client
*/
- private ClientProtocol createRPCNamenodeIfCompatible(
+ private void createRPCNamenodeIfCompatible(
InetSocketAddress nameNodeAddr,
Configuration conf,
UnixUserGroupInformation ugi) throws IOException {
try {
- return createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.namenodeProtocolProxy = createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.rpcNamenode = this.namenodeProtocolProxy.getProxy();
} catch (RPC.VersionMismatch e) {
long clientVersion = e.getClientVersion();
namenodeVersion = e.getServerVersion();
@@ -141,14 +150,15 @@ private ClientProtocol createRPCNamenodeIfCompatible(
throw new RPC.VersionIncompatible(
ClientProtocol.class.getName(), clientVersion, namenodeVersion);
}
- return (ClientProtocol)e.getProxy();
+ this.rpcNamenode = (ClientProtocol)e.getProxy();
}
}
- private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
+ private static ProtocolProxy<ClientProtocol> createRPCNamenode(
+ InetSocketAddress nameNodeAddr,
Configuration conf, UnixUserGroupInformation ugi)
throws IOException {
- return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
+ return RPC.getProtocolProxy(ClientProtocol.class,
ClientProtocol.versionID, nameNodeAddr, ugi, conf,
NetUtils.getSocketFactory(conf, ClientProtocol.class));
}
@@ -256,6 +266,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
+ this.localHost = InetAddress.getLocalHost();
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -273,7 +284,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
defaultReplication = (short) conf.getInt("dfs.replication", 3);
if (nameNodeAddr != null && rpcNamenode == null) {
- this.rpcNamenode = createRPCNamenodeIfCompatible(nameNodeAddr, conf, ugi);
+ createRPCNamenodeIfCompatible(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
@@ -283,6 +294,8 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
+ // read directly from the block file if configured.
+ this.shortCircuitLocalReads = conf.getBoolean("dfs.read.shortcircuit", false);
}
static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -688,24 +701,90 @@ public OutputStream create(String src,
/**
* Recover a file's lease
+ *
* @param src a file's path
+ * @return if lease recovery completes
* @throws IOException
*/
- void recoverLease(String src) throws IOException {
+ boolean recoverLease(String src) throws IOException {
checkOpen();
+ if (this.namenodeProtocolProxy == null) {
+ return versionBasedRecoverLease(src);
+ }
+ return methodBasedRecoverLease(src);
+ }
+
+ /** recover lease based on version */
+ private boolean versionBasedRecoverLease(String src) throws IOException {
+
if (namenodeVersion < ClientProtocol.RECOVER_LEASE_VERSION) {
- OutputStream out = append(src,
- conf.getInt("io.file.buffer.size", 4096), null);
+ OutputStream out;
+ try {
+ out = append(src, conf.getInt("io.file.buffer.size", 4096), null);
+ } catch (RemoteException re) {
+ IOException e = re.unwrapRemoteException(AlreadyBeingCreatedException.class);
+ if (e instanceof AlreadyBeingCreatedException) {
+ return false;
+ }
+ throw re;
+ }
out.close();
+ return true;
+ } else if (namenodeVersion < ClientProtocol.CLOSE_RECOVER_LEASE_VERSION){
+ try {
+ namenode.recoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class);
+ }
+ return !namenode.getBlockLocations(src, 0, Long.MAX_VALUE).isUnderConstruction();
} else {
try {
+ return namenode.closeRecoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class);
+ }
+ }
+ }
+
+ /** recover lease based on method name */
+ private boolean methodBasedRecoverLease(String src) throws IOException {
+ // check if closeRecoverLease is supported
+ if (namenodeProtocolProxy.isMethodSupported(
+ "closeRecoverLease", String.class, String.class)) {
+ try {
+ return namenode.closeRecoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class);
+ }
+ }
+ // check if recoverLease is supported
+ if (namenodeProtocolProxy.isMethodSupported(
+ "recoverLease", String.class, String.class)) {
+ try {
namenode.recoverLease(src, clientName);
} catch (RemoteException re) {
throw re.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class);
}
+ return !namenode.getBlockLocations(src, 0, Long.MAX_VALUE).isUnderConstruction();
}
+ // now use append
+ OutputStream out;
+ try {
+ out = append(src, conf.getInt("io.file.buffer.size", 4096), null);
+ } catch (RemoteException re) {
+ IOException e = re.unwrapRemoteException(AlreadyBeingCreatedException.class);
+ if (e instanceof AlreadyBeingCreatedException) {
+ return false;
+ }
+ throw re;
+ }
+ out.close();
+ return true;
}
private void closeFile(String src) throws IOException {
@@ -877,6 +956,7 @@ public boolean isDirectory(String src) throws IOException {
*/
public FileStatus[] listPaths(String src) throws IOException {
checkOpen();
+ metrics.incLsCalls();
try {
return namenode.getListing(src);
} catch(RemoteException re) {
@@ -901,7 +981,8 @@ public FileStatus getFileInfo(String src) throws IOException {
*/
MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
+ return getFileChecksum(getDataTransferProtocolVersion(),
+ src, namenode, socketFactory, socketTimeout);
}
/**
@@ -909,7 +990,8 @@ MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
* @param src The file path
* @return The checksum
*/
- public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ public static MD5MD5CRC32FileChecksum getFileChecksum(
+ int dataTransferVersion, String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
) throws IOException {
//get all block locations
@@ -950,7 +1032,7 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ DataTransferProtocol.OP_BLOCK_CHECKSUM +
", block=" + block);
}
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ out.writeShort(dataTransferVersion);
out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
out.writeLong(block.getBlockId());
out.writeLong(block.getGenerationStamp());
@@ -1113,9 +1195,39 @@ public boolean setSafeMode(SafeModeAction action) throws IOException {
*
* @see ClientProtocol#saveNamespace()
*/
- void saveNamespace() throws AccessControlException, IOException {
+ void saveNamespace(boolean force) throws AccessControlException, IOException {
+ if (this.namenodeProtocolProxy == null) {
+ versionBasedSaveNamespace(force);
+ } else {
+ methodBasedSaveNamespace(force);
+ }
+ }
+
+ /** save namespace based on version */
+ private void versionBasedSaveNamespace(boolean force)
+ throws AccessControlException, IOException {
+ try {
+ if (namenodeVersion >= ClientProtocol.SAVENAMESPACE_FORCE) {
+ namenode.saveNamespace(force);
+ } else {
+ namenode.saveNamespace();
+ }
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class);
+ }
+ }
+
+ /** save namespace based on method name */
+ private void methodBasedSaveNamespace(boolean force)
+ throws AccessControlException, IOException {
+
try {
- namenode.saveNamespace();
+ if (this.namenodeProtocolProxy.isMethodSupported(
+ "saveNamespace", boolean.class)) {
+ namenode.saveNamespace(force);
+ } else {
+ namenode.saveNamespace();
+ }
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
@@ -1254,7 +1366,19 @@ private DatanodeInfo bestNode(DatanodeInfo nodes[],
}
}
}
- throw new IOException("No live nodes contain current block");
+ StringBuilder errMsgr = new StringBuilder(
+ "No live nodes contain current block ");
+ errMsgr.append("Block locations:");
+ for (DatanodeInfo datanode : nodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ errMsgr.append(" Dead nodes: ");
+ for (DatanodeInfo datanode : deadNodes.values()) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ throw new IOException(errMsgr.toString());
}
boolean isLeaseCheckerStarted() {
@@ -1413,16 +1537,16 @@ public String toString() {
private Socket dnSock; //for now just sending checksumOk.
private DataInputStream in;
- private DataChecksum checksum;
- private long lastChunkOffset = -1;
- private long lastChunkLen = -1;
+ protected DataChecksum checksum;
+ protected long lastChunkOffset = -1;
+ protected long lastChunkLen = -1;
private long lastSeqNo = -1;
- private long startOffset;
- private long firstChunkOffset;
- private int bytesPerChecksum;
- private int checksumSize;
- private boolean gotEOS = false;
+ protected long startOffset;
+ protected long firstChunkOffset;
+ protected int bytesPerChecksum;
+ protected int checksumSize;
+ protected boolean gotEOS = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
@@ -1454,12 +1578,11 @@ public synchronized int read(byte[] buf, int off, int len)
throw new IOException("Could not skip required number of bytes");
}
}
-
boolean eosBefore = gotEOS;
int nRead = super.read(buf, off, len);
// if gotEOS was set in the previous read and checksum is enabled :
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
+ if (dnSock != null && gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
//checksum is verified and there are no errors.
checksumOk(dnSock);
}
@@ -1637,23 +1760,45 @@ private BlockReader( String file, long blockId, DataInputStream in,
checksumSize = this.checksum.getChecksumSize();
}
- public static BlockReader newBlockReader(Socket sock, String file, long blockId,
+ /**
+ * Public constructor
+ */
+ BlockReader(Path file, int numRetries) {
+ super(file, numRetries);
+ }
+
+ protected BlockReader(Path file, int numRetries, DataChecksum checksum, boolean verifyChecksum) {
+ super(file,
+ numRetries,
+ verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+ }
+
+
+ public static BlockReader newBlockReader(int dataTransferVersion,
+ Socket sock, String file, long blockId,
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
- return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
+ return newBlockReader(dataTransferVersion,
+ sock, file, blockId, genStamp, startOffset, len, bufferSize,
true);
}
/** Java Doc required */
- public static BlockReader newBlockReader( Socket sock, String file, long blockId,
+ public static BlockReader newBlockReader(int dataTransferVersion,
+ Socket sock, String file, long blockId,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum)
throws IOException {
- return newBlockReader(sock, file, blockId, genStamp, startOffset,
+ return newBlockReader(dataTransferVersion,
+ sock, file, blockId, genStamp, startOffset,
len, bufferSize, verifyChecksum, "");
}
- public static BlockReader newBlockReader( Socket sock, String file,
+ public static BlockReader newBlockReader(int dataTransferVersion,
+ Socket sock, String file,
long blockId,
long genStamp,
long startOffset, long len,
@@ -1665,7 +1810,7 @@ public static BlockReader newBlockReader( Socket sock, String file,
new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
//write the header.
- out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
+ out.writeShort( dataTransferVersion );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
@@ -1689,7 +1834,7 @@ public static BlockReader newBlockReader( Socket sock, String file,
" for file " + file +
" for block " + blockId);
}
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ DataChecksum checksum = DataChecksum.newDataChecksum( in , new PureJavaCrc32());
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
@@ -1751,7 +1896,7 @@ private void checksumOk(Socket sock) {
private long prefetchSize = 10 * defaultBlockSize;
private BlockReader blockReader = null;
private boolean verifyChecksum;
- private LocatedBlocks locatedBlocks = null;
+ private DFSLocatedBlocks locatedBlocks = null;
private DatanodeInfo currentNode = null;
private Block currentBlock = null;
private long pos = 0;
@@ -1820,13 +1965,8 @@ synchronized void openInfo() throws IOException {
if (newInfo.isUnderConstruction() && newInfo.locatedBlockCount() > 0) {
LocatedBlock last = newInfo.get(newInfo.locatedBlockCount()-1);
if (last.getLocations().length > 0) {
- ClientDatanodeProtocol primary = null;
- DatanodeInfo primaryNode = last.getLocations()[0];
try {
- primary = createClientDatanodeProtocolProxy(primaryNode, conf,
- socketTimeout);
- Block newBlock = null;
- newBlock = primary.getBlockInfo(last.getBlock());
+ Block newBlock = getBlockInfo(last);
// only if the block has data (not null)
if (newBlock != null) {
long newBlockSize = newBlock.getNumBytes();
@@ -1843,16 +1983,61 @@ synchronized void openInfo() throws IOException {
} catch (IOException e) {
LOG.debug("DFSClient file " + src +
" is being concurrently append to" +
- " but datanode " + primaryNode.getHostName() +
- " probably does not have block " + last.getBlock(),
+ " but datanodes probably does not have block " +
+ last.getBlock(),
e);
}
}
}
- this.locatedBlocks = newInfo;
+ this.locatedBlocks = new DFSLocatedBlocks(newInfo);
this.currentNode = null;
}
+ /** Get block info from a datanode */
+ private Block getBlockInfo(LocatedBlock locatedblock) throws IOException {
+ if (locatedblock == null || locatedblock.getLocations().length == 0) {
+ return null;
+ }
+ int replicaNotFoundCount = locatedblock.getLocations().length;
+
+ for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ ClientDatanodeProtocol cdp = null;
+
+ try {
+ cdp = createClientDatanodeProtocolProxy(datanode, conf, socketTimeout);
+
+ final Block newBlock = cdp.getBlockInfo(locatedblock.getBlock());
+
+ if (newBlock == null) {
+ // special case : replica might not be on the DN, treat as 0 length
+ replicaNotFoundCount--;
+ } else {
+ return newBlock;
+ }
+ }
+ catch(IOException ioe) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Failed to getBlockInfo from datanode "
+ + datanode + " for block " + locatedblock.getBlock(), ioe);
+ }
+ } finally {
+ if (cdp != null) {
+ RPC.stopProxy(cdp);
+ }
+ }
+ }
+
+ // Namenode told us about these locations, but none know about the replica
+ // means that we hit the race between pipeline creation start and end.
+ // we require all because some other exception could have happened
+ // on a DN that has it. we want to report that error
+ if (replicaNotFoundCount == 0) {
+ return null;
+ }
+
+ throw new IOException("Cannot obtain block info for " + locatedblock);
+ }
+
/**
* Returns whether the file opened is under construction.
*/
@@ -1860,8 +2045,8 @@ public synchronized boolean isUnderConstruction() {
return locatedBlocks.isUnderConstruction();
}
- public synchronized long getFileLength() {
- return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+ public long getFileLength() {
+ return locatedBlocks.getFileLength();
}
/**
@@ -1925,8 +2110,7 @@ private LocatedBlock getBlockAt(long offset, boolean updatePosition)
* @return consequent segment of located blocks
* @throws IOException
*/
- private synchronized List<LocatedBlock> getBlockRange(long offset,
- long length)
+ private List<LocatedBlock> getBlockRange(long offset, long length)
throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
@@ -1992,21 +2176,49 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
+ // try reading the block locally. if this fails, then go via
+ // the datanode
+ Block blk = targetBlock.getBlock();
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("blockSeekTo shortCircuitLocalReads " + shortCircuitLocalReads +
+ " localhost " + localHost +
+ " targetAddr " + targetAddr);
+ }
+ if (shortCircuitLocalReads && localHost != null &&
+ (targetAddr.getAddress().equals(localHost) ||
+ targetAddr.getHostName().startsWith("localhost"))) {
+ blockReader = BlockReaderLocal.newBlockReader(conf, src, blk,
+ chosenNode,
+ offsetIntoBlock,
+ blk.getNumBytes() - offsetIntoBlock,
+ socketTimeout,
+ metrics,
+ this.verifyChecksum);
+ return chosenNode;
+ }
+ } catch (IOException ex) {
+ LOG.info("Failed to read block " + targetBlock.getBlock() +
+ " on local machine " + localHost +
+ ". Try via the datanode on " + targetAddr + ":"
+ + StringUtils.stringifyException(ex));
+ }
+
try {
s = socketFactory.createSocket();
NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
- Block blk = targetBlock.getBlock();
- blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
+ blockReader = BlockReader.newBlockReader(
+ getDataTransferProtocolVersion(),
+ s, src, blk.getBlockId(),
blk.getGenerationStamp(),
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, clientName);
return chosenNode;
} catch (IOException ex) {
// Put chosen node into dead list, continue
- LOG.debug("Failed to connect to " + targetAddr + ":"
- + StringUtils.stringifyException(ex));
+ LOG.warn("Failed to connect to " + targetAddr, ex);
addToDeadNodes(chosenNode);
if (s != null) {
try {
@@ -2056,7 +2268,6 @@ public synchronized int read() throws IOException {
private synchronized int readBuffer(byte buf[], int off, int len)
throws IOException {
IOException ioe;
-
/* we retry current node only once. So this is set to true only here.
* Intention is to handle one common case of an error that is not a
* failure on datanode or client : when DataNode closes the connection
@@ -2068,7 +2279,7 @@ private synchronized int readBuffer(byte buf[], int off, int len)
while (true) {
// retry as many times as seekToNewSource allows.
try {
- return blockReader.read(buf, off, len);
+ return blockReader.read(buf, off, len);
} catch ( ChecksumException ce ) {
LOG.warn("Found Checksum error for " + currentBlock + " from " +
currentNode.getName() + " at " + ce.getPos());
@@ -2110,6 +2321,8 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
+ long start = System.currentTimeMillis();
+ long startCpuTime = metrics.getCurrentThreadCpuTime();
failures = 0;
if (pos < getFileLength()) {
@@ -2131,6 +2344,10 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
if (stats != null && result != -1) {
stats.incrementBytesRead(result);
}
+ long timeval = System.currentTimeMillis() - start;
+ metrics.incReadTime(timeval);
+ metrics.incReadSize(result);
+ metrics.incReadCpu(metrics.getCurrentThreadCpuTime() - startCpuTime);
return result;
} catch (ChecksumException ce) {
throw ce;
@@ -2185,7 +2402,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block)
// expanded to 9000ms.
double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
- LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+ LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.", ie);
Thread.sleep((long)waitTime);
} catch (InterruptedException iex) {
}
@@ -2214,19 +2431,40 @@ private void fetchBlockByteRange(LocatedBlock block, long start,
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;
+ int len = (int) (end - start + 1);
try {
- dn = socketFactory.createSocket();
- NetUtils.connect(dn, targetAddr, socketTimeout);
- dn.setSoTimeout(socketTimeout);
-
- int len = (int) (end - start + 1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fetchBlockByteRange shortCircuitLocalReads " +
+ shortCircuitLocalReads +
+ " localhst " + localHost +
+ " targetAddr " + targetAddr);
+ }
+ // first try reading the block locally.
+ if (shortCircuitLocalReads && localHost != null &&
+ (targetAddr.getAddress().equals(localHost) ||
+ targetAddr.getHostName().startsWith("localhost"))) {
+ reader = BlockReaderLocal.newBlockReader(conf, src,
+ block.getBlock(),
+ chosenNode,
+ start,
+ len,
+ socketTimeout,
+ metrics,
+ verifyChecksum);
+ } else {
+ // go to the datanode
+ dn = socketFactory.createSocket();
+ NetUtils.connect(dn, targetAddr, socketTimeout);
+ dn.setSoTimeout(socketTimeout);
- reader = BlockReader.newBlockReader(dn, src,
+ reader = BlockReader.newBlockReader(getDataTransferProtocolVersion(),
+ dn, src,
block.getBlock().getBlockId(),
block.getBlock().getGenerationStamp(),
start, len, buffersize,
verifyChecksum, clientName);
+ }
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -2264,12 +2502,14 @@ private void fetchBlockByteRange(LocatedBlock block, long start,
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)
- throws IOException {
+ throws IOException {
// sanity checks
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
+ long start = System.currentTimeMillis();
+ long startCpuTime = metrics.getCurrentThreadCpuTime();
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
@@ -2297,6 +2537,10 @@ public int read(long position, byte[] buffer, int offset, int length)
if (stats != null) {
stats.incrementBytesRead(realLen);
}
+ long timeval = System.currentTimeMillis() - start;
+ metrics.incPreadTime(timeval);
+ metrics.incPreadSize(realLen);
+ metrics.incPreadCpu(metrics.getCurrentThreadCpuTime() - startCpuTime);
return realLen;
}
@@ -2873,7 +3117,7 @@ public void run() {
// process responses from datanodes.
try {
// read an ack from the pipeline
- ack.readFields(blockReplyStream);
+ ack.readFields(blockReplyStream, targets.length);
if (LOG.isDebugEnabled()) {
LOG.debug("DFSClient for block " + block + " " + ack);
}
@@ -3107,7 +3351,7 @@ private boolean processDatanodeError(boolean hasError, boolean isAppend) {
}
private void isClosed() throws IOException {
- if (closed && lastException != null) {
+ if ((closed || !clientRunning) && lastException != null) {
throw lastException;
}
}
@@ -3147,7 +3391,8 @@ private DFSOutputStream(String src, long blockSize, Progressable progress,
}
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
- bytesPerChecksum);
+ bytesPerChecksum,
+ new PureJavaCrc32());
}
/**
@@ -3298,7 +3543,7 @@ private void computePacketChunkSize(int psize, int csize) {
success = createBlockOutputStream(nodes, clientName, false);
if (!success) {
- LOG.info("Abandoning block " + block);
+ LOG.info("Abandoning block " + block + " for file " + src);
namenode.abandonBlock(block, src, clientName);
if (errorIndex < nodes.length) {
@@ -3379,7 +3624,9 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
} catch (IOException ie) {
- LOG.info("Exception in createBlockOutputStream " + ie);
+ LOG.info("Exception in createBlockOutputStream " + nodes[0].getName() + " " +
+ " for file " + src +
+ ie);
// find the datanode that matches
if (firstBadLink.length() != 0) {
@@ -3571,6 +3818,7 @@ private synchronized void enqueueCurrentPacket() {
* datanode. Block allocations are persisted on namenode.
*/
public void sync() throws IOException {
+ long start = System.currentTimeMillis();
try {
long toWaitFor;
synchronized (this) {
@@ -3627,6 +3875,8 @@ public void sync() throws IOException {
if (willPersist) {
namenode.fsync(src, clientName);
}
+ long timeval = System.currentTimeMillis() - start;
+ metrics.incSyncTime(timeval);
} catch (IOException e) {
lastException = new IOException("IOException flush:" + e);
closed = true;
@@ -3851,6 +4101,32 @@ void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
}
}
+ /**
+ * Get the data transfer protocol version supported in the cluster
+ * assuming all the datanodes have the same version.
+ *
+ * @return the data transfer protocol version supported in the cluster
+ */
+ int getDataTransferProtocolVersion() throws IOException {
+ synchronized (dataTransferVersion) {
+ if (dataTransferVersion == -1) {
+ // Get the version number from NN
+ try {
+ dataTransferVersion = namenode.getDataTransferProtocolVersion();
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException(IOException.class);
+ if (ioe.getMessage().startsWith(IOException.class.getName() + ": " +
+ NoSuchMethodException.class.getName())) {
+ dataTransferVersion = 17; // last version not supportting this RPC
+ } else {
+ throw ioe;
+ }
+ }
+ }
+ return dataTransferVersion;
+ }
+ }
+
/** {@inheritDoc} */
public String toString() {
return getClass().getSimpleName() + "[clientName=" + clientName
View
23 src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.util.PathValidator;
import org.apache.hadoop.security.AccessControlException;