Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

rebase src

Summary:

Task ID: #

Blame Rev:

Reviewers:

CC:

Test Plan:

Revert Plan:

Tags:
  • Loading branch information...
commit cdc4657195d262ff5b0817fc12fd66a88b0283cd 1 parent f7e2812
heyongqiang authored
Showing with 14,715 additions and 4,263 deletions.
  1. +245 −0 src/c++/libhdfs/hdfs.c
  2. +7 −1 src/c++/libhdfs/hdfsJniHelper.c
  3. +106 −0 src/c++/libhdfs/hdfs_direct.h
  4. +2 −0  src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/AllTestDriver.java
  5. +174 −0 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSDirTest.java
  6. +1 −1  src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSIOTest.java
  7. +256 −0 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSLockTest.java
  8. +30 −0 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DirConstant.java
  9. +5 −5 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
  10. +114 −0 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/DirMapper.java
  11. +107 −0 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/DirReduce.java
  12. +5 −5 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/MultiTaskTracker.java
  13. +106 −0 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/SleepJobRunner.java
  14. +4 −0 src/contrib/build-contrib.xml
  15. +0 −1  src/contrib/build.xml
  16. +0 −3  src/contrib/corona/src/java/org/apache/hadoop/corona/Session.java
  17. +17 −1 src/contrib/corona/src/java/org/apache/hadoop/corona/SessionDriver.java
  18. +3 −1 src/contrib/corona/src/java/org/apache/hadoop/corona/SessionNotificationCtx.java
  19. +31 −11 src/contrib/corona/src/java/org/apache/hadoop/mapred/CoronaJobInProgress.java
  20. +96 −41 src/contrib/corona/src/java/org/apache/hadoop/mapred/CoronaJobTracker.java
  21. +61 −0 src/contrib/corona/src/java/org/apache/hadoop/mapred/MultiCoronaTaskTracker.java
  22. +5 −4 src/contrib/corona/src/java/org/apache/hadoop/mapred/ResourceTracker.java
  23. +3 −0  src/contrib/corona/src/test/org/apache/hadoop/mapred/TestResourceTracker.java
  24. +41 −2 src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
  25. +9 −8 src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
  26. +44 −1 src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
  27. +1 −1  src/contrib/hdfsproxy/ivy/libraries.properties
  28. +8 −0 src/contrib/highavailability/conf/avatar-site.xml.template
  29. +1 −1  src/contrib/highavailability/ivy/libraries.properties
  30. +66 −0 src/contrib/highavailability/src/java/avatar-default.xml
  31. +43 −55 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/AvatarShell.java
  32. +123 −98 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/AvatarZKShell.java
  33. +173 −4 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/DistributedAvatarFileSystem.java
  34. +3 −1 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/protocol/AvatarConstants.java
  35. +1 −0  src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/protocol/AvatarProtocol.java
  36. +569 −356 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/datanode/AvatarDataNode.java
  37. +18 −10 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeProtocols.java
  38. +116 −30 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/datanode/OfferService.java
  39. +118 −15 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/namenode/AvatarNode.java
  40. +1 −1  src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/namenode/Ingest.java
  41. +4 −3 src/contrib/highavailability/src/java/org/apache/hadoop/hdfs/server/namenode/Standby.java
  42. +490 −228 src/contrib/highavailability/src/test/org/apache/hadoop/hdfs/MiniAvatarCluster.java
  43. +48 −7 src/contrib/highavailability/src/test/org/apache/hadoop/hdfs/TestAvatarAPI.java
  44. +95 −8 src/contrib/highavailability/src/test/org/apache/hadoop/hdfs/TestAvatarFailover.java
  45. +391 −0 ...ghavailability/src/test/org/apache/hadoop/hdfs/server/datanode/TestAvatarDataNodeMultipleRegistrations.java
  46. +118 −0 src/contrib/highavailability/src/test/org/apache/hadoop/hdfs/server/datanode/TestAvatarRefreshNamenodes.java
  47. +1 −1  src/contrib/hmon/ivy/libraries.properties
  48. +8 −2 src/contrib/raid/build.xml
  49. +1 −1  src/contrib/raid/ivy/libraries.properties
  50. +4 −1 src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
  51. +6 −6 src/contrib/raid/src/java/org/apache/hadoop/raid/BlockIntegrityMonitor.java
  52. +20 −5 src/contrib/raid/src/java/org/apache/hadoop/raid/BlockMover.java
  53. +71 −36 src/contrib/raid/src/java/org/apache/hadoop/raid/BlockReconstructor.java
  54. +68 −93 src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
  55. +49 −7 src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
  56. +8 −8 src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockIntegrityMonitor.java
  57. +1 −1  src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
  58. +1 −1  src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockIntegrityMonitor.java
  59. +110 −0 src/contrib/raid/src/java/org/apache/hadoop/raid/MissingParityFiles.java
  60. +28 −0 src/contrib/raid/src/java/org/apache/hadoop/raid/MonitoredDistRaid.java
  61. +65 −40 src/contrib/raid/src/java/org/apache/hadoop/raid/PlacementMonitor.java
  62. +2 −3 src/contrib/raid/src/java/org/apache/hadoop/raid/PurgeMonitor.java
  63. +39 −46 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
  64. +14 −5 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNodeMetrics.java
  65. +24 −7 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
  66. +6 −2 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidState.java
  67. +6 −0 src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
  68. +70 −6 src/contrib/raid/src/java/org/apache/hadoop/raid/Statistics.java
  69. +98 −17 src/contrib/raid/src/java/org/apache/hadoop/raid/StatisticsCollector.java
  70. +2 −0  src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
  71. +1 −1  src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
  72. +41 −35 src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
  73. +53 −36 src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockCopier.java
  74. +52 −66 src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
  75. +2 −2 src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
  76. +194 −0 src/contrib/raid/src/test/org/apache/hadoop/raid/TestMissingParity.java
  77. +16 −11 src/contrib/raid/src/test/org/apache/hadoop/raid/TestPlacementMonitor.java
  78. +1 −2  src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
  79. +14 −15 src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
  80. +1 −1  src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNodeMetrics.java
  81. +36 −38 src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
  82. +12 −13 src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
  83. +3 −3 src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
  84. +42 −5 src/contrib/raid/src/test/org/apache/hadoop/raid/TestStatisticsCollector.java
  85. +2 −2 src/contrib/raid/webapps/raid/raid.jsp
  86. +1 −1  src/contrib/snapshot/ivy/libraries.properties
  87. +6 −2 src/contrib/snapshot/src/java/org/apache/hadoop/hdfs/server/namenode/SnapshotNode.java
  88. +1 −0  src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
  89. +1 −0  src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
  90. +97 −0 src/core/org/apache/hadoop/conf/ConfServlet.java
  91. +94 −77 src/core/org/apache/hadoop/conf/Configuration.java
  92. +1 −1  src/core/org/apache/hadoop/filecache/DistributedCache.java
  93. +7 −1 src/core/org/apache/hadoop/fs/BufferedFSInputStream.java
  94. +8 −0 src/core/org/apache/hadoop/fs/FSDataInputStream.java
  95. +8 −2 src/core/org/apache/hadoop/fs/FSInputChecker.java
  96. +19 −0 src/core/org/apache/hadoop/fs/FSInputStream.java
  97. +5 −2 src/core/org/apache/hadoop/fs/FileStatus.java
  98. +1 −1  src/core/org/apache/hadoop/fs/FileSystem.java
  99. +4 −3 src/core/org/apache/hadoop/fs/FileUtil.java
  100. +269 −205 src/core/org/apache/hadoop/fs/FsShell.java
  101. +1 −1  src/core/org/apache/hadoop/fs/LocalDirAllocator.java
  102. +10 −0 src/core/org/apache/hadoop/fs/PositionedReadable.java
  103. +2 −0  src/core/org/apache/hadoop/fs/TrashPolicyDefault.java
  104. +66 −10 src/core/org/apache/hadoop/http/HttpServer.java
  105. +36 −0 src/core/org/apache/hadoop/io/BufferTooSmallException.java
  106. +21 −0 src/core/org/apache/hadoop/io/BytesWritable.java
  107. +21 −1 src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
  108. +5 −2 src/core/org/apache/hadoop/ipc/Client.java
  109. +10 −0 src/core/org/apache/hadoop/ipc/ProtocolProxy.java
  110. +6 −1 src/core/org/apache/hadoop/ipc/ProtocolSignature.java
  111. +13 −0 src/core/org/apache/hadoop/ipc/RPC.java
  112. +149 −141 src/core/org/apache/hadoop/ipc/Server.java
  113. +52 −12 src/core/org/apache/hadoop/metrics/file/FileContext.java
  114. +16 −0 src/core/org/apache/hadoop/net/NetUtils.java
  115. +56 −0 src/core/org/apache/hadoop/util/BeanTracker.java
  116. +1 −1  src/core/org/apache/hadoop/util/DataChecksum.java
  117. +29 −0 src/core/org/apache/hadoop/util/PulseCheckable.java
  118. +87 −0 src/core/org/apache/hadoop/util/PulseChecker.java
  119. +31 −0 src/core/org/apache/hadoop/util/PulseMBean.java
  120. +11 −4 src/core/org/apache/hadoop/util/Shell.java
  121. +115 −10 src/examples/org/apache/hadoop/examples/SleepJob.java
  122. +86 −0 src/hdfs/hdfs-default.xml
  123. +220 −0 src/hdfs/org/apache/hadoop/hdfs/BlockReaderAccelerator.java
  124. +41 −4 src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
  125. +636 −203 src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
  126. +65 −49 src/hdfs/org/apache/hadoop/hdfs/DFSLocatedBlocks.java
  127. +311 −3 src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
  128. +12 −0 src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
  129. +123 −0 src/hdfs/org/apache/hadoop/hdfs/LeaseRenewal.java
  130. +1 −1  src/hdfs/org/apache/hadoop/hdfs/LookasideCacheFileSystem.java
  131. +6 −4 src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
  132. +71 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
  133. +140 −11 src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  134. +10 −3 src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  135. +14 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
  136. +19 −3 src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
  137. +7 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
  138. +80 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlockWithMetaInfo.java
  139. +114 −57 src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
  140. +81 −0 src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocksWithMetaInfo.java
  141. +248 −166 src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  142. +8 −1 src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
  143. +77 −11 src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
  144. +34 −32 src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  145. +17 −13 src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  146. +264 −194 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
  147. +360 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScannerSet.java
  148. +1,652 −782 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  149. +269 −45 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
  150. +295 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataWriter.java
  151. +210 −223 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  152. +36 −6 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
  153. +114 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverThreadPool.java
  154. +6 −6 src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
  155. +1,082 −468 src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  156. +6 −4 src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
  157. +120 −24 src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  158. +488 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/NameSpaceSliceStorage.java
  159. +50 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/NamespaceService.java
  160. +8 −6 src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java
  161. +5 −4 src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
  162. +213 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/VolumeMap.java
  163. +3 −0  src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
  164. +7 −0 src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
  165. +768 −0 src/hdfs/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
  166. +11 −15 src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
  167. +12 −3 src/hdfs/org/apache/hadoop/hdfs/server/namenode/DfsServlet.java
  168. +2 −17 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  169. +22 −6 src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
Sorry, we could not display the entire diff because too many files (316) changed.
View
245 src/c++/libhdfs/hdfs.c
@@ -17,6 +17,7 @@
*/
#include "hdfs.h"
+#include "hdfs_direct.h"
#include "hdfsJniHelper.h"
@@ -56,6 +57,25 @@ typedef struct
JNIEnv* env;
} hdfsJniEnv;
+/**
+ * hdfsBuffer: a struct that wraps a Java ByteArray that can be reused as a
+ * destination for data.
+ */
+struct hdfsBuffer {
+ jobject jbRarray;
+ tSize size;
+ /* Small read cache to avoid crossing JNI boundary for
+ * small sequential reads. This should significantly reduce the number of
+ * JNI calls for access patterns like reading individual integers */
+ jbyte *jni_cache;
+ tSize jni_cache_size;
+ tSize jni_cache_offset; // within window, negative if cache invalid
+};
+
+// reset a buf to be reused for new data
+static void hdfsResetBuffer(hdfsBuf buf) {
+ buf->jni_cache_offset = -1;
+}
/**
@@ -2369,8 +2389,233 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
free(hdfsFileInfo);
}
+hdfsBuf hdfsCreateBuffer(tSize size, tSize jni_cache_size) {
+ if (size <= 0) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ hdfsBuf result = malloc(sizeof(*result));
+ if (!result) {
+ errno = ENOMEM;
+ return NULL;
+ }
+ result->size = size;
+ result->jni_cache_offset = -1; // contents invalid
+ result->jni_cache_size = jni_cache_size;
+ if (jni_cache_size > 0) {
+ result->jni_cache = malloc(sizeof(jbyte)*jni_cache_size);
+ if (result->jni_cache == NULL) {
+ free(result);
+ errno = ENOMEM;
+ return NULL;
+ }
+ } else {
+ result->jni_cache = NULL;
+ }
+
+ // Create byte array
+ JNIEnv* env = getJNIEnv();
+ jbyteArray locArray = (*env)->NewByteArray(env, size);
+ if (locArray == NULL) {
+ errno = EINTERNAL;
+ if (result->jni_cache) {
+ free(result->jni_cache);
+ }
+ free(result);
+ return NULL;
+ }
+ result->jbRarray = (*env)->NewGlobalRef(env, locArray);
+ destroyLocalReference(env, locArray);
+
+ if(result->jbRarray == NULL) {
+ errno = EINTERNAL;
+ if (result->jni_cache) {
+ free(result->jni_cache);
+ }
+ free(result);
+ return NULL;
+ }
+ return result;
+}
+
+tSize hdfsDestroyBuffer(hdfsBuf buf) {
+ if (buf == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+ JNIEnv* env = getJNIEnv();
+ (*env)->DeleteGlobalRef(env, buf->jbRarray);
+ if (buf->jni_cache) {
+ free(buf->jni_cache);
+ }
+ free(buf);
+ return 0;
+}
+
+tSize hdfsBufSize(hdfsBuf buf) {
+ if (buf == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+ return buf->size;
+}
+
+tSize hdfsBufRead(hdfsBuf src, tSize srcoff, void *dst, tSize len) {
+ if (src == NULL || dst == NULL || src->size - srcoff < len) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // cache for small reads
+ if (src->jni_cache_offset >= 0 && // buffer valid
+ src->jni_cache_offset <= srcoff && // read fits in buffer
+ src->jni_cache_offset + src->jni_cache_size >= srcoff + len) {
+
+ memcpy(dst, src->jni_cache + (srcoff - src->jni_cache_offset), len);
+ return len;
+ }
+
+ // get byte array region
+ JNIEnv* env = getJNIEnv();
+
+ if (len <= src->jni_cache_size) {
+ // cache the read
+ (*env)->GetByteArrayRegion(env, src->jbRarray, srcoff,
+ src->jni_cache_size, src->jni_cache);
+ src->jni_cache_offset = srcoff;
+ memcpy(dst, src->jni_cache, len);
+ } else {
+ // direct copy
+ (*env)->GetByteArrayRegion(env, src->jbRarray, srcoff, len,
+ (jbyte*)dst);
+ }
+ return len;
+}
+
+tSize hdfsRead_direct(hdfsFS fs, hdfsFile f, hdfsBuf buffer,
+ tSize off, tSize length) {
+ // JAVA EQUIVALENT:
+ // fis.read(bR, off, length);
+ //Get the JNIEnv* corresponding to current thread
+ if (length > buffer->size - off) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ //Parameters
+ jobject jInputStream = (jobject)(f ? f->file : NULL);
+ jint noReadBytes = 0;
+ jvalue jVal;
+ jthrowable jExc = NULL;
+
+ //Sanity check
+ if (!f || f->type == UNINITIALIZED) {
+ errno = EBADF;
+ return -1;
+ }
+
+ //Error checking... make sure that this file is 'readable'
+ if (f->type != INPUT) {
+ fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+ errno = EINVAL;
+ return -1;
+ }
+
+ hdfsResetBuffer(buffer);
+ //Read the requisite bytes
+ if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
+ "read", "([BII)I", buffer->jbRarray, off, length) != 0) {
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+ "FSDataInputStream::read");
+ noReadBytes = -1;
+ }
+ else {
+ noReadBytes = jVal.i;
+ if (noReadBytes <= 0) {
+ //This is a valid case: there aren't any bytes left to read!
+ if (noReadBytes == 0 || noReadBytes < -1) {
+ fprintf(stderr, "WARN: FSDataInputStream.read returned invalid "
+ "return code - libhdfs returning EOF, i.e., 0: %d\n",
+ noReadBytes);
+ }
+ noReadBytes = 0;
+ }
+ errno = 0;
+ }
+
+ return noReadBytes;
+}
+
+tSize hdfsPread_direct(hdfsFS fs, hdfsFile f, tOffset position,
+ hdfsBuf buffer, tSize off, tSize length) {
+ // JAVA EQUIVALENT:
+ // fis.read(pos, bR, off, length);
+
+ if (length > buffer->size - off) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+
+ //Parameters
+ jobject jInputStream = (jobject)(f ? f->file : NULL);
+
+ jint noReadBytes = 0;
+ jvalue jVal;
+ jthrowable jExc = NULL;
+
+ //Sanity check
+ if (!f || f->type == UNINITIALIZED) {
+ errno = EBADF;
+ return -1;
+ }
+
+ //Error checking... make sure that this file is 'readable'
+ if (f->type != INPUT) {
+ fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+ errno = EINVAL;
+ return -1;
+ }
+
+ hdfsResetBuffer(buffer);
+ //Read the requisite bytes
+ if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
+ "read", "(J[BII)I", position, buffer->jbRarray, off, length) != 0) {
+ errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
+ "FSDataInputStream::read");
+ noReadBytes = -1;
+ fprintf(stderr, "invokeMethod error\n");
+ }
+ else {
+ noReadBytes = jVal.i;
+ if (noReadBytes <= 0) {
+ //This is a valid case: there aren't any bytes left to read!
+ if (noReadBytes == 0 || noReadBytes < -1) {
+ fprintf(stderr, "WARN: FSDataInputStream.read returned invalid "
+ "return code - libhdfs returning EOF, i.e., 0: %d\n",
+ noReadBytes);
+ }
+ noReadBytes = 0;
+ }
+ errno = 0;
+ }
+
+ return noReadBytes;
+}
/**
* vim: ts=4: sw=4: et:
View
8 src/c++/libhdfs/hdfsJniHelper.c
@@ -426,8 +426,11 @@ JNIEnv* getJNIEnv(void)
char *hadoopJvmArgs = getenv("LIBHDFS_OPTS");
char jvmArgDelims[] = " ";
if (hadoopJvmArgs != NULL) {
+ char hadoopJvmArgs_cpy[strlen(hadoopJvmArgs)+1];
+ strcpy(hadoopJvmArgs_cpy, hadoopJvmArgs);
char *result = NULL;
result = strtok( hadoopJvmArgs, jvmArgDelims );
+ result = strtok( hadoopJvmArgs_cpy, jvmArgDelims );
while ( result != NULL ) {
noArgs++;
result = strtok( NULL, jvmArgDelims);
@@ -437,11 +440,14 @@ JNIEnv* getJNIEnv(void)
options[0].optionString = optHadoopClassPath;
//fill in any specified arguments
if (hadoopJvmArgs != NULL) {
+ char hadoopJvmArgs_cpy[strlen(hadoopJvmArgs)+1];
+ strcpy(hadoopJvmArgs_cpy, hadoopJvmArgs);
char *result = NULL;
- result = strtok( hadoopJvmArgs, jvmArgDelims );
+ result = strtok( hadoopJvmArgs_cpy, jvmArgDelims );
int argNum = 1;
for (;argNum < noArgs ; argNum++) {
options[argNum].optionString = result; //optHadoopArg;
+ result = strtok( NULL, jvmArgDelims);
}
}
View
106 src/c++/libhdfs/hdfs_direct.h
@@ -0,0 +1,106 @@
+#include "hdfs.h"
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBHDFS_HDFSHIPERF_H
+#define LIBHDFS_HDFSHIPERF_H
+/* Low-level additions to the libhdfs API that allow optimizations that can
+ * reduce the number of JNI array allocations and copies */
+
+
+#endif /*LIBHDFS_HDFSHIPERF_H*/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* A buffer type that can be reused for multiple hdfs reads */
+typedef struct hdfsBuffer* hdfsBuf;
+
+/**
+ * hdfsCreateBuffer
+ * Creates a Java byte array to be reused as a buffer for hdfs reads.
+ * Returns NULL on error
+ * @param size The size in bytes of the buffer to create
+ * @param jni_cache_size cache reads of less than this size to avoid extra JNI
+ * calls
+ */
+hdfsBuf hdfsCreateBuffer(tSize size, tSize jni_cache_size);
+
+/**
+ * hdfsDestroyBuffer
+ * Destroys a previously created buffer
+ * returns 0 on success'
+ * @param buf A hdfsBuf previously created with hdfsCreateBuffer and not yet
+ * destroyed
+ */
+tSize hdfsDestroyBuffer(hdfsBuf buf);
+
+/**
+ * hdfsBufSize
+ * Returns the size in bytes of a previously created buffer
+ * Returns -1 if buf invalid
+ * @param buf A hdfsbuf
+ */
+tSize hdfsBufSize(hdfsBuf buf);
+
+/**
+ * hdfsBufRead
+ * Read from hdfsBuf into a character array
+ * @param src The hdfsBuf to read from
+ * @param srcoff The offset within the hdfsBuf to read from
+ * @param dst A character array to read into
+ * @param len The length of data to read (must be <= size of dst
+ * and <= hdfsBufSize(src) - srcoff)
+ * returns the number of bytes read, or -1 on error
+ */
+tSize hdfsBufRead(hdfsBuf src, tSize srcoff, void *dst, tSize len);
+
+/**
+ * hdfsRead_direct
+ * Read from a file into a hdfsBuf
+ * @param fs a distributed file system
+ * @param f an open file handle
+ * @param buffer: the buffer to read data into
+ * @param off The position in the buffer to read into.
+ * @param length the length of data to read (must be less than
+ * hdfsBufSize(buffer))
+ */
+tSize hdfsRead_direct(hdfsFS fs, hdfsFile f, hdfsBuf buffer,
+ tSize off, tSize length);
+/**
+ * hdfsPread_direct
+ * Read from a file into a hdfsBuf
+ * @param fs a distributed file system
+ * @param f an open file handle
+ * @param position the position within the file to read from
+ * @param buffer the buffer to read data into
+ * @param off The position in the buffer to read into.
+ * @param length the length of data to read (must be less than
+ * hdfsBufSize(buffer))
+ */
+tSize hdfsPread_direct(hdfsFS fs, hdfsFile f, tOffset position,
+ hdfsBuf buffer, tSize off, tSize length);
+
+#ifdef __cplusplus
+}
+#endif
+
+/**
+ * vim: ts=4: sw=4: et
+ */
View
2  src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/AllTestDriver.java
@@ -28,6 +28,8 @@
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
+ pgd.addClass("locktest", DFSLockTest.class, "A benchmark that spawns many threads and each thread run many configurable read/write FileSystem operations to test FSNamesystem lock's concurrency.");
+ pgd.addClass("dirtest", DFSDirTest.class, "A map/reduce benchmark that creates many jobs and each job spawns many threads and each thread create/delete many dirs.");
pgd.addClass("dfstest", DFSIOTest.class, "A map/reduce benchmark that creates many jobs and each jobs can create many files to test i/o rate per task of hadoop cluster.");
pgd.addClass("structure-gen", StructureGenerator.class, "Create a structure of files and directories as an input for data-gen");
pgd.addClass("data-gen", DataGenerator.class, "Create files and directories on cluster as inputs for load-gen");
View
174 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSDirTest.java
@@ -0,0 +1,174 @@
+package org.apache.hadoop.hdfs;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.ReadMapper;
+import org.apache.hadoop.mapred.DirReduce;
+import org.apache.hadoop.mapred.DirMapper;
+import org.apache.hadoop.hdfs.Constant;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+@SuppressWarnings("deprecation")
+public class DFSDirTest extends Configured implements Tool, DirConstant{
+
+ private static long nmaps;
+ private static Configuration fsConfig;
+ private static long ntasks;
+ private static long nthreads;
+ private static boolean samedir = false;
+
+ public static void printUsage() {
+ System.out.println("USAGE: bin/hadoop hadoop-*-benchmark.jar dirtest nMaps " +
+ "[-nTasks] [-nThreads] [-samedir]");
+ System.out.println("Default nTasks = " + NSUBTASKS);
+ System.out.println("Default nThreads = " + NTHREADS);
+ System.out.println("-samedir : all dirs are created under the same directory");
+ System.exit(0);
+ }
+
+ public void control(Configuration fsConfig, String fileName)
+ throws IOException {
+ String name = fileName;
+ FileSystem fs = FileSystem.get(fsConfig);
+ fs.delete(new Path(DFS_INPUT, name), true);
+
+ SequenceFile.Writer write = null;
+ for (int i = 0; i < nmaps; i++) {
+ try {
+ Path controlFile = new Path(DFS_INPUT, name + i);
+ write = SequenceFile.createWriter(fs, fsConfig, controlFile,
+ Text.class, LongWritable.class, CompressionType.NONE);
+ write.append(new Text(name + i), new LongWritable(this.nthreads));
+ } finally {
+ if (write != null)
+ write.close();
+ write = null;
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws IOException {
+
+ long startTime = System.currentTimeMillis();
+ if (args.length < 1) {
+ printUsage();
+ }
+ nmaps = Long.parseLong(args[0]);
+ ntasks = NSUBTASKS;
+ nthreads = NTHREADS;
+
+
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-nTasks")) ntasks = Long.parseLong(args[++i]); else
+ if (args[i].equals("-nThreads")) nthreads = Long.parseLong(args[++i]); else
+ if (args[i].equals("-samedir")) samedir = true; else
+ printUsage();
+ }
+
+ // running the Writting
+ fsConfig = new Configuration(getConf());
+ fsConfig.set("dfs.nmaps", String.valueOf(nmaps));
+ fsConfig.set("dfs.nTasks", String.valueOf(ntasks));
+ fsConfig.set("dfs.samedir", String.valueOf(samedir));
+ FileSystem fs = FileSystem.get(fsConfig);
+
+ if (fs.exists(new Path(DFS_OUTPUT)))
+ fs.delete(new Path(DFS_OUTPUT), true);
+ if (fs.exists(new Path(DFS_INPUT)))
+ fs.delete(new Path(DFS_INPUT), true);
+ if (fs.exists(new Path(INPUT)))
+ fs.delete(new Path(INPUT), true);
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ fs.delete(new Path(TRASH), true);
+
+ // run the control() to set up for the FileSystem
+ control(fsConfig, "testing");
+
+ // prepare the for map reduce job
+ JobConf conf = new JobConf(fsConfig, DFSDirTest.class);
+ conf.setJobName("dirtest-writing");
+
+ // set the output and input for the map reduce
+ FileOutputFormat.setOutputPath(conf, new Path(DFS_OUTPUT + "writing"));
+ FileInputFormat.setInputPaths(conf, new Path(DFS_INPUT));
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ conf.setMapperClass(DirMapper.class);
+ conf.setReducerClass(DirReduce.class);
+ conf.setNumReduceTasks(1);
+ conf.setSpeculativeExecution(false);
+ JobClient.runJob(conf);
+
+ // printout the result
+ System.out.println("-------------------");
+ System.out.println("RESULT FOR WRITING");
+ System.out.println("-------------------");
+ FSDataInputStream out = fs.open(new Path(OUTPUT,"result-writing"));
+ while (true) {
+ String temp = out.readLine();
+ if (temp == null)
+ break;
+ System.out.println(temp);
+ }
+ out.close();
+
+ long endTime = System.currentTimeMillis();
+
+ System.out.println("------------------");
+ double execTime = (endTime - startTime) / 1000.0;
+ String unit = "seconds";
+ if (execTime > 60) {
+ execTime /= 60.0;
+ unit = "mins";
+ }
+ if (execTime > 60) {
+ execTime /= 60.0;
+ unit = "hours";
+ }
+ System.out.println("Time executed :\t" + execTime + " " + unit);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new DFSDirTest(), args));
+ }
+
+}
View
2  src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSIOTest.java
@@ -100,7 +100,7 @@ public int run(String[] args) throws IOException {
fsConfig.set("dfs.buffer.size.write", String.valueOf(bufferLimitW));
fsConfig.set("dfs.nmaps", String.valueOf(nmaps));
fsConfig.set("dfs.nTasks", String.valueOf(ntasks));
- fsConfig.setInt("dfs.replication", 1);
+ fsConfig.setInt("dfs.replication", (int)replications);
FileSystem fs = FileSystem.get(fsConfig);
if (fs.exists(new Path(DFS_OUTPUT)))
View
256 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DFSLockTest.java
@@ -0,0 +1,256 @@
+package org.apache.hadoop.hdfs;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+@SuppressWarnings("deprecation")
+public class DFSLockTest extends Configured implements Tool{
+
+ private static long ntasks;
+ private static long nread;
+ private static long nwrite;
+ private static long rseed;
+ private static long startTime;
+ private static final int NSUBTASKS = 10;
+ private static final int NTHREADS = 0;
+ private static final int DEFAULT_INTERVAL = 10;
+ private static final int DEFAULT_RSEED = 10;
+ private static final String DEFAULT_WRITEOP = "0,1,2,3";
+ private static final String DEFAULT_READOP = "0,1,2";
+ private static final Path TESTDIR = new Path("/lockbench");
+ private static final FsPermission all = new FsPermission((short)0777);
+ private Configuration conf;
+
+ private enum THREADTYPE {
+ READ, WRITE;
+ }
+ private static final String[] DEFAULT_READOPS =
+ {"getFileStatus", "getContentSummary", "listStatus"};
+ private static final String[] DEFAULT_WRITEOPS =
+ {"mkdirs", "setPermission", "rename", "createEmptyFile"};
+ private static Integer[] writeOps;
+ private static Integer[] readOps;
+
+ public static void printUsage() {
+ System.out.println("USAGE: bin/hadoop hadoop-*-benchmark.jar dirtest " +
+ "[-nTasks] [-nRead] [-nWrite] [-rseed] [-readOP] [-writeOP] ");
+ System.out.println("Default nTasks = " + NSUBTASKS);
+ System.out.println("Default nRead = " + NTHREADS);
+ System.out.println("Default nWrite = " + NTHREADS);
+ System.out.println("Default rseed= " + DEFAULT_RSEED);
+ System.out.println("Default readOP= " + DEFAULT_READOP);
+ System.out.println("Default writeOP= " + DEFAULT_WRITEOP);
+ System.out.print("Read: ");
+ for (int i = 0; i < DEFAULT_READOPS.length; i++) {
+ System.out.print(i + ":" + DEFAULT_READOPS[i] + " ");
+ }
+ System.out.println();
+ System.out.print("Write: ");
+ for (int i = 0; i < DEFAULT_WRITEOPS.length; i++) {
+ System.out.print(i + ":" + DEFAULT_WRITEOPS[i] + " ");
+ }
+ System.out.println();
+ System.exit(0);
+ }
+
+ private class LockThread extends Thread {
+ THREADTYPE type;
+ int id = 0;
+ long acquiredTime; // lock acquired time
+ long releasedTime = 0; // lock release time
+ Path runPath;
+ Path renameablePath;
+ private FileSystem fs;
+ Random rand;
+ int runType;
+
+ public LockThread(Configuration conf, THREADTYPE type, int id)
+ throws IOException{
+ this.type = type;
+ this.id = id;
+ this.fs = FileSystem.newInstance(conf);
+ this.runPath = new Path(TESTDIR, Integer.toString(id));
+ this.renameablePath = new Path(runPath, "rename");
+ this.rand = new Random(id * 32 + rseed);
+ fs.mkdirs(runPath);
+ fs.mkdirs(renameablePath);
+ }
+
+ public void read() throws Exception{
+ // test read lock
+ runType = readOps[rand.nextInt(readOps.length)];
+ switch (runType) {
+ case 0:
+ FileStatus status = fs.getFileStatus(TESTDIR); break;
+ case 1:
+ ContentSummary cs = fs.getContentSummary(TESTDIR); break;
+ case 2:
+ FileStatus[] fss = fs.listStatus(TESTDIR); break;
+ }
+ }
+
+ public void write(int i) throws Exception{
+ // test write lock
+ Path newPath = new Path(runPath, Integer.toString(i));
+ runType = writeOps[rand.nextInt(writeOps.length)];
+ switch (runType) {
+ case 0:
+ fs.mkdirs(newPath); break;
+ case 1:
+ fs.setPermission(runPath, all); break;
+ case 2:
+ if (fs.rename(this.renameablePath, newPath)) {
+ this.renameablePath = newPath;
+ }
+ break;
+ case 3:
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(newPath, true);
+ out.close();
+ out = null;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ break;
+ }
+ }
+
+ public void run() {
+ try {
+ for (int i = 0; i < ntasks; i++) {
+ acquiredTime = System.currentTimeMillis();
+ String operationStr = "";
+ try {
+ switch (type) {
+ case READ:
+ read();
+ operationStr = DEFAULT_READOPS[runType];
+ break;
+ case WRITE:
+ write(i);
+ operationStr = DEFAULT_WRITEOPS[runType];
+ break;
+ }
+ } catch (Exception ioe) {
+ System.err.println(ioe.getLocalizedMessage());
+ ioe.printStackTrace();
+ }
+ releasedTime = System.currentTimeMillis();
+ System.out.println(type.name() + ": " + id + "." + operationStr + "." +
+ i + " " + (acquiredTime - startTime) + " to " + (releasedTime - startTime));
+ }
+ } catch (Exception ioe) {
+ System.err.println(ioe.getLocalizedMessage());
+ ioe.printStackTrace();
+ } finally {
+ try {
+ fs.close();
+ } catch (IOException ie) {
+ }
+ }
+ }
+ }
+
+ public void test() throws IOException {
+ // spawn ntasks threads
+ startTime = System.currentTimeMillis();
+ conf = new Configuration(getConf());
+ FileSystem fs = FileSystem.newInstance(conf);
+ fs.delete(TESTDIR, true);
+ ArrayList<LockThread> threads = new ArrayList<LockThread>((int)(nread + nwrite));
+ int i;
+ int id = 0;
+ for (i=0; i < nread; i++, id++) {
+ threads.add(new LockThread(conf, THREADTYPE.READ, id));
+ }
+ for (i=0; i < nwrite; i++, id++) {
+ threads.add(new LockThread(conf, THREADTYPE.WRITE, id));
+ }
+ Collections.shuffle(threads, new Random(rseed));
+ for (LockThread thread : threads) {
+ thread.start();
+ }
+ for (LockThread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws IOException {
+ ntasks = NSUBTASKS;
+ nread = NTHREADS;
+ nwrite = NTHREADS;
+ rseed = DEFAULT_RSEED;
+ String strWriteOP = DEFAULT_WRITEOP;
+ String strReadOP = DEFAULT_READOP;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-nTasks")) ntasks = Long.parseLong(args[++i]); else
+ if (args[i].equals("-nRead")) nread = Long.parseLong(args[++i]); else
+ if (args[i].equals("-nWrite")) nwrite = Long.parseLong(args[++i]); else
+ if (args[i].equals("-rseed")) rseed = Long.parseLong(args[++i]); else
+ if (args[i].equals("-readOP")) strReadOP = args[++i]; else
+ if (args[i].equals("-writeOP")) strWriteOP = args[++i]; else
+ printUsage();
+ }
+ String[] ops = strReadOP.split(",");
+ ArrayList<Integer> opsStr = new ArrayList<Integer>();
+ for (String op: ops) {
+ int index = Integer.parseInt(op);
+ if (index >= 0 && index < DEFAULT_READOPS.length) {
+ opsStr.add(index);
+ }
+ }
+ readOps = opsStr.toArray(new Integer[opsStr.size()]);
+ ops = strWriteOP.split(",");
+ opsStr = new ArrayList<Integer>();
+ for (String op: ops) {
+ int index = Integer.parseInt(op);
+ if (index >= 0 && index < DEFAULT_WRITEOPS.length) {
+ opsStr.add(index);
+ }
+ }
+ writeOps = opsStr.toArray(new Integer[opsStr.size()]);
+ test();
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new DFSLockTest(), args));
+ }
+}
View
30 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/DirConstant.java
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hdfs;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface DirConstant {
+ public static final long NSUBTASKS = 100;
+ public static final long NTHREADS = 20;
+
+ public static final String ROOT = "/dirbenchmark/";
+ public static final String TRASH = ROOT + ".Trash/";
+ public static final String INPUT = ROOT + "input/";
+ public static final String DFS_INPUT = ROOT + "dirtest-input/";
+ public static final String DFS_OUTPUT = ROOT + "dirtest-output/";
+ public static final String OUTPUT = ROOT + "output/";
+}
View
10 src/contrib/benchmark/src/java/org/apache/hadoop/hdfs/NNThroughputBenchmark.java
@@ -825,8 +825,8 @@ String getName() {
void register() throws IOException {
// get versions from the namenode
nsInfo = nameNode.versionRequest();
- dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
- DataNode.setNewStorageID(dnRegistration);
+ dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""), "");
+ dnRegistration.storageID = DataNode.createNewStorageId(dnRegistration.getPort());
// register datanode
dnRegistration = nameNode.register(dnRegistration);
}
@@ -837,7 +837,7 @@ void register() throws IOException {
void sendHeartbeat() throws IOException {
// register datanode
DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
- DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+ DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0);
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
LOG.debug("sendHeartbeat Name-node reply: "
@@ -874,7 +874,7 @@ public int compareTo(String name) {
int replicateBlocks() throws IOException {
// register datanode
DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
- DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+ DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0);
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@@ -901,7 +901,7 @@ private int transferBlocks(Block blocks[], DatanodeInfo xferTargets[][])
DatanodeRegistration receivedDNReg;
receivedDNReg = new DatanodeRegistration(dnInfo.getName());
receivedDNReg.setStorageInfo(new DataStorage(nsInfo, dnInfo
- .getStorageID()));
+ .getStorageID()), dnInfo.getStorageID());
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
Block[] bi = new Block[] { blocks[i] };
nameNode.blockReceivedAndDeleted(receivedDNReg, bi);
View
114 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/DirMapper.java
@@ -0,0 +1,114 @@
+package org.apache.hadoop.mapred;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hdfs.DirConstant;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+@SuppressWarnings("deprecation")
+ public class DirMapper extends MapReduceBase implements
+ Mapper<Text, LongWritable, Text, Text>, DirConstant{
+
+ private Configuration conf;
+
+ public void configure(JobConf configuration) {
+ conf = configuration;
+ }
+
+ private class DirThread extends Thread {
+ private int id;
+ private long ntasks;
+ private String name;
+ private boolean samedir;
+ private FileSystem fs;
+
+ public DirThread(Configuration conf, String name, int id,
+ long ntasks, boolean samedir) throws IOException{
+ this.id = id;
+ this.name = name;
+ this.ntasks = ntasks;
+ this.samedir = samedir;
+ this.fs = FileSystem.newInstance(conf);
+ }
+
+ public void run() {
+ try {
+ Path basePath = new Path(INPUT);
+ if (samedir == false) {
+ basePath = new Path(INPUT, name + "/" + id);
+ }
+
+ for (int i = 0; i < ntasks; i++) {
+ Path tmpDir = new Path(basePath, name + "_" + id + "_" + i + "_dir");
+ Path tmpDir1 = new Path(basePath, name + "_" + id + "_" + i + "_dir1");
+ fs.mkdirs(tmpDir);
+ fs.mkdirs(tmpDir1);
+ fs.delete(tmpDir, true);
+ fs.delete(tmpDir1, true);
+ }
+ } catch (Exception ioe) {
+ System.err.println(ioe.getLocalizedMessage());
+ ioe.printStackTrace();
+ }
+ }
+
+ }
+
+ @Override
+ public void map(Text key, LongWritable value,
+ OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException{
+ // for testing
+ String taskID = conf.get("mapred.task.id");
+ String name = key.toString() + taskID;
+
+ long ntasks = Long.parseLong(conf.get("dfs.nTasks"));
+ boolean samedir = Boolean.parseBoolean(conf.get("dfs.samedir", Boolean.toString(false)));
+ long nthreads = value.get();
+ // spawn ntasks threads
+ DirThread[] threads = new DirThread[(int)nthreads];
+ for (int i=0; i<nthreads; i++) {
+ threads[i] = new DirThread(conf, name, i, ntasks, samedir);
+ threads[i].start();
+ }
+ for (DirThread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ }
+ }
+ output.collect(
+ new Text("1"),
+ new Text(String.valueOf(1)));
+ }
+
+ }
View
107 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/DirReduce.java
@@ -0,0 +1,107 @@
+package org.apache.hadoop.mapred;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hdfs.DirConstant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+@SuppressWarnings("deprecation")
+public class DirReduce extends MapReduceBase implements
+ Reducer<Text, Text, Text, Text>, DirConstant {
+
+ private FileSystem fs;
+ private Configuration conf;
+
+ public void configure(JobConf configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter)
+ throws IOException {
+ int size = 0;
+
+ double averageIORate = 0;
+ List<Float> list = new ArrayList<Float>();
+
+ fs = FileSystem.get(conf);
+ FSDataOutputStream out;
+
+ if (fs.exists(new Path(OUTPUT, "result-writing")))
+ out = fs.create(new Path(OUTPUT, "result-reading"), true);
+ else
+ out = fs.create(new Path(OUTPUT, "result-writing"), true);
+
+ long nTasks = Long.parseLong(conf.get("dfs.nTasks"));
+ long nmaps = Long.parseLong(conf.get("dfs.nmaps"));
+
+
+ out.writeChars("-----------------------------\n");
+ out.writeChars("Number of tasks:\t" + nmaps + "\n");
+ out.writeChars("Files per task:\t\t" + nTasks + "\n");
+ float min = Float.MAX_VALUE;
+ float max = Float.MIN_VALUE;
+
+ // TODO Auto-generated method stub
+ for (; values.hasNext();) {
+ size++;
+ // tokens.nextToken(); there is only one value per line
+ double ioRate = Double.parseDouble(values.next().toString());
+ if (ioRate > max)
+ max = (float) ioRate;
+ if (ioRate < min)
+ min = (float) ioRate;
+ list.add((float) ioRate);
+ // this is for testing
+ // output.collect(new Text(String.valueOf(bufferSize)), new
+ // Text(
+ // String.valueOf(ioRate)));
+ // out.writeChars(bufferSize + " bytes\t\t" + ioRate +
+ // " Mb/s\n");
+ averageIORate += ioRate;
+ }
+ out.writeChars("Min\t\t\t" + min + "\n");
+ out.writeChars("Max\t\t\t" + max + "\n");
+ averageIORate /= size;
+ float temp = (float) 0.0;
+ for (int i = 0; i < list.size(); i++) {
+ temp += Math.pow(list.get(i) - averageIORate, 2);
+ }
+ out.writeChars("Average\t\t\t: " + averageIORate + "\n");
+ float dev = (float) Math.sqrt(temp / size);
+ out.writeChars("Std. dev\t\t: " + dev + "\n");
+ out.close();
+ }
+
+}
View
10 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/MultiTaskTracker.java
@@ -13,13 +13,13 @@
public static void main(String[] args) throws IOException {
int numTaskTrackers = Integer.parseInt(args[0]);
- Configuration conf = new Configuration();
- JobConf jConf = new JobConf(conf);
- jConf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
- String[] baseLocalDirs = jConf.getLocalDirs();
- List<String> localDirs = new LinkedList<String>();
List<TaskTrackerRunner> runners = new ArrayList<TaskTrackerRunner>();
for (int i = 0; i < numTaskTrackers; i++) {
+ Configuration conf = new Configuration();
+ JobConf jConf = new JobConf(conf);
+ jConf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
+ String[] baseLocalDirs = jConf.getLocalDirs();
+ List<String> localDirs = new LinkedList<String>();
localDirs.clear();
for (String localDir : baseLocalDirs) {
File baseLocalDir = new File(localDir);
View
106 src/contrib/benchmark/src/java/org/apache/hadoop/mapred/SleepJobRunner.java
@@ -0,0 +1,106 @@
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+
+public class SleepJobRunner {
+
+ public static void printUsage() {
+ System.out.println("Usage: SleepJobRunner numberOfJobs percentOfSmallJobs "
+ + "percentOfShortJobs");
+ System.out.println("numberOfJobs\tthe number of jobs to launch");
+ System.out.println("percentOfSmallJobs\tpercentage of jobs to be small: "
+ + "100 mappers, 1 reducer. The long jobs are the rest with "
+ + "5000 maps and 397 reducers");
+ System.out.println("percentOfShortJobs\tpercentage of jobs to be short: "
+ + "1 ms of wait time. The long jobas are the rest with "
+ + "60 seconds of wait time in the mapper");
+ }
+
+ public static void main(String[] args) throws Exception {
+ String[] pools = new String[15];
+ for (int i = 0; i < pools.length; i++) {
+ pools[i] = "pool" + i;
+ }
+ if (args.length != 3) {
+ printUsage();
+ }
+ int jobs = Integer.valueOf(args[0]);
+ int percentageSmall = Integer.valueOf(args[1]);
+ int percentageShort = Integer.valueOf(args[2]);
+
+ List<SleepJobRunnerThread> threads = new ArrayList<SleepJobRunnerThread>();
+ Random rand = new Random();
+
+ for (int i = 0; i < jobs; i++) {
+ Configuration conf = new Configuration();
+ conf.set("mapred.child.java.opts",
+ "-Xmx50m -Djava.net.preferIPv4Stack=true "
+ + "-XX:+UseCompressedOops");
+ conf.set("io.sort.mb", "5");
+ conf.set("mapred.fairscheduler.pool", pools[i % pools.length]);
+
+ int nMappers, nReducers, sleepTime;
+ if (rand.nextInt(100) + 1 <= percentageSmall) {
+ nMappers = 10;
+ nReducers = 1;
+ } else {
+ nMappers = 5000;
+ nReducers = 397;
+ }
+
+ if (rand.nextInt(100) + 1 <= percentageShort) {
+ sleepTime = 1;
+ } else {
+ sleepTime = 60000;
+ }
+
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ SleepJobRunnerThread t =
+ new SleepJobRunnerThread(conf, nMappers, nReducers, sleepTime);
+ threads.add(t);
+ }
+
+ for (SleepJobRunnerThread t : threads) {
+ t.start();
+ }
+
+ for (SleepJobRunnerThread t : threads) {
+ t.join();
+ }
+ }
+
+ public static class SleepJobRunnerThread extends Thread {
+
+ SleepJob jobToRun = null;
+ int nMappers = 0;
+ int nReducers = 0;
+ int sleepTime = 0;
+
+ public SleepJobRunnerThread(Configuration conf, int nMappers,
+ int nReducers, int sleepTime) {
+ super();
+ jobToRun = new SleepJob();
+ jobToRun.setConf(conf);
+ this.nMappers = nMappers;
+ this.nReducers = nReducers;
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ public void run() {
+ try {
+ jobToRun.run(nMappers, nReducers, sleepTime, 10, sleepTime, 10, false,
+ new ArrayList<String>(), new ArrayList<String>(), 10, 10,
+ new ArrayList<String>(), 1);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+}
+
View
4 src/contrib/build-contrib.xml
@@ -104,12 +104,16 @@
<path id="contrib-classpath">
<pathelement location="${build.classes}"/>
<pathelement location="${hadoop.root}/build/tools"/>
+ <pathelement location="${hadoop.root}/build/examples"/>
<fileset refid="lib.jars"/>
<pathelement location="${hadoop.root}/build/classes"/>
<pathelement location="${hadoop.root}/build/tools"/>
<fileset dir="${hadoop.root}/lib">
<include name="**/*.jar" />
</fileset>
+ <fileset dir="${hadoop.root}/build/contrib">
+ <include name="**/*.jar" />
+ </fileset>
<path refid="${ant.project.name}.common-classpath"/>
</path>
View
1  src/contrib/build.xml
@@ -48,7 +48,6 @@
<subant target="test">
<fileset dir="." includes="streaming/build.xml"/>
<fileset dir="." includes="fairscheduler/build.xml"/>
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
<fileset dir="." includes="gridmix/build.xml"/>
<fileset dir="." includes="raid/build.xml"/>
<fileset dir="." includes="corona/build.xml"/>
View
3  src/contrib/corona/src/java/org/apache/hadoop/corona/Session.java
@@ -305,9 +305,6 @@ public void grantResource(ResourceRequest req, ResourceGrant grant) {
removeGrantedRequest(req, true);
- // the revoked grants are now pending again
- addPendingRequest(req);
-
// we have previously granted this resource, return to caller
canceledGrants.add(grant);
}
View
18 src/contrib/corona/src/java/org/apache/hadoop/corona/SessionDriver.java
@@ -9,9 +9,12 @@
import java.net.Socket;
import java.net.InetSocketAddress;
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
@@ -49,6 +52,18 @@ public SessionDriver(Configuration conf, SessionDriverService.Iface iface)
this(new CoronaConf(conf), iface);
}
+ private static UnixUserGroupInformation getUGI(Configuration conf)
+ throws IOException {
+ UnixUserGroupInformation ugi = null;
+ try {
+ ugi = UnixUserGroupInformation.login(conf, true);
+ } catch (LoginException e) {
+ throw (IOException)(new IOException(
+ "Failed to get the current user's information.").initCause(e));
+ }
+ return ugi;
+ }
+
public SessionDriver(CoronaConf conf, SessionDriverService.Iface iface)
throws IOException {
this.conf = conf;
@@ -63,7 +78,8 @@ public SessionDriver(CoronaConf conf, SessionDriverService.Iface iface)
LOG.info("My serverSocketPort " + serverSocket.getLocalPort());
LOG.info("My Address " + myAddress.getHost() + ":" + myAddress.getPort());
- String userName = System.getProperty("user.name");
+ UnixUserGroupInformation ugi = getUGI(conf);
+ String userName = ugi.getUserName();
String sessionName = userName + "-" + new java.util.Date().toString();
this.sessionInfo = new SessionInfo();
this.sessionInfo.setAddress(myAddress);
View
4 src/contrib/corona/src/java/org/apache/hadoop/corona/SessionNotificationCtx.java
@@ -89,7 +89,7 @@ public String getSessionHandle() {
}
private void dispatchCall(TBase call) throws TException {
-
+ LOG.info("Dispatching call " + call.getClass().getName());
if (call instanceof SessionDriverService.grantResource_args) {
SessionDriverService.grantResource_args args = ( SessionDriverService.grantResource_args)call;
if (!args.handle.equals(handle))
@@ -162,6 +162,8 @@ public boolean makeCalls(long now) {
}
}
+ close();
+
return true;
}
View
42 src/contrib/corona/src/java/org/apache/hadoop/mapred/CoronaJobInProgress.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -1257,20 +1258,34 @@ else if (state == TaskStatus.State.FAILED ||
private void processTaskResource(TaskStatus.State state,
TaskInProgress tip, TaskAttemptID taskid) {
- if (TaskStatus.TERMINATING_STATES.contains(state)) {
- Integer grant = taskLookupTable.getGrantIdForTask(taskid);
- taskLookupTable.removeTaskEntry(taskid);
- if (state == TaskStatus.State.SUCCEEDED) {
- if (shouldReuseTaskResource(tip)) {
- resourceTracker.reuseGrant(grant);
- } else {
- resourceTracker.taskDone(tip);
- }
+ if (!TaskStatus.TERMINATING_STATES.contains(state)) {
+ return;
+ }
+ Integer grant = taskLookupTable.getGrantIdForTask(taskid);
+ taskLookupTable.removeTaskEntry(taskid);
+ if (state == TaskStatus.State.SUCCEEDED || !tip.isRunnable()) {
+ assert (grant != null) : "Grant for task id " + taskid + " is null!";
+ if (shouldReuseTaskResource(tip)) {
+ resourceTracker.reuseGrant(grant);
} else {
- if (tip.isRunnable()) {
+ resourceTracker.taskDone(tip);
+ }
+ } else {
+ if (tip.isRunnable()) {
+ if (grant == null) {
+ // grant could be null if the task reached a terminating state twice,
+ // e.g. succeeded then failed due to a fetch failure. Or if a TT
+ // dies after after a success
+ if (tip.isMapTask()) {
+ resourceTracker.addNewMapTask(tip);
+ } else {
+ resourceTracker.addNewReduceTask(tip);
+ }
+
+ } else {
resourceTracker.releaseAndRequestAnotherResource(grant);
}
- }
+ }
}
}
@@ -2398,4 +2413,9 @@ public boolean shouldSpeculateAllRemainingReduces() {
}
return false;
}
+
+ @Override
+ DataStatistics getRunningTaskStatistics(Phase phase) {
+ throw new RuntimeException("Not yet implemented.");
+ }
}
View
137 src/contrib/corona/src/java/org/apache/hadoop/mapred/CoronaJobTracker.java
@@ -60,10 +60,19 @@
extends JobTrackerTraits
implements JobSubmissionProtocol, SessionDriverService.Iface,
InterTrackerProtocol, ResourceTracker.ResourceProcessor {
-
+ static {
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("UNCAUGHT: Thread " + t.getName() + " got an uncaught exception", e);
+ System.exit(1);
+ }
+ });
+ }
public static final Log LOG = LogFactory.getLog(CoronaJobTracker.class);
static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+ public static final String TT_CONNECT_TIMEOUT_MSEC_KEY = "corona.tasktracker.connect.timeout.msec";
public static final String HEART_BEAT_INTERVAL_KEY = "corona.jobtracker.heartbeat.interval";
JobConf conf; // JT conf.
@@ -267,9 +276,9 @@ public String getAssignedTracker(TaskAttemptID attempt) {
public void run() {
while (running) {
try {
+ expireLaunchingTasks();
// Every 3 minutes check for any tasks that are overdue
Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
- expireLaunchingTasks();
} catch (InterruptedException ie) {
// ignore. if shutting down, while cond. will catch it
} catch (Exception e) {
@@ -293,26 +302,7 @@ void expireLaunchingTasks() {
long age = now - (pair.getValue()).longValue();
if (age > TASKTRACKER_EXPIRY_INTERVAL) {
LOG.info("Launching task " + taskId + " timed out.");
- TaskInProgress tip = taskLookupTable.getTIP(taskId);
- if (tip != null) {
- Integer grantId = taskLookupTable.getGrantIdForTask(taskId);
- ResourceGrant grant = resourceTracker.getGrant(grantId);
- if (grant != null) {
- String trackerName = grant.getNodeName();
- TaskTrackerStatus trackerStatus =
- getTaskTrackerStatus(trackerName);
-
- TaskStatus.Phase phase =
- tip.isMapTask()? TaskStatus.Phase.MAP:
- TaskStatus.Phase.STARTING;
- boolean isFailed = true;
- CoronaJobTracker.this.job.failedTask(
- tip, taskId, "Error launching task", phase,
- isFailed, trackerName, trackerStatus);
- } else {
- LOG.error("Task " + taskId + " is running but has no associated resource");
- }
- }
+ failTask(taskId, "Error launching task", false);
itr.remove();
}
}
@@ -347,6 +337,24 @@ public void removeTask(TaskAttemptID taskName) {
}
}
+ private void failTask(TaskAttemptID taskId, String reason, boolean isFailed) {
+ TaskInProgress tip = taskLookupTable.getTIP(taskId);
+ Integer grantId = taskLookupTable.getGrantIdForTask(taskId);
+ ResourceGrant grant = resourceTracker.getGrant(grantId);
+ assert grant != null : "Task " + taskId +
+ " is running but has no associated resource";
+ String trackerName = grant.getNodeName();
+ TaskTrackerStatus trackerStatus =
+ getTaskTrackerStatus(trackerName);
+
+ TaskStatus.Phase phase =
+ tip.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.STARTING;
+ CoronaJobTracker.this.job.failedTask(
+ tip, taskId,reason, phase,
+ isFailed, trackerName, trackerStatus);
+ }
+
static class ActionToSend {
String trackerHost;
int port;
@@ -613,8 +621,9 @@ public boolean processAvailableResource(ResourceGrant grant) {
org.apache.hadoop.corona.InetAddress addr =
Utilities.appInfoToAddress(grant.appInfo);
String trackerName = grant.getNodeName();
-
- Task task = getSetupAndCleanupTasks(trackerName, addr.host);
+ boolean isMapGrant =
+ grant.getType().equals(ResourceTracker.RESOURCE_TYPE_MAP);
+ Task task = getSetupAndCleanupTasks(trackerName, addr.host, isMapGrant);
if (task == null) {
TaskInProgress tip = resourceTracker.findTipForGrant(grant);
if (tip.isMapTask()) {
@@ -649,26 +658,51 @@ void assignTasks() throws InterruptedException {
}
void processGrantsToRevoke() {
- Map<Integer, TaskAttemptID> processed = new HashMap<Integer, TaskAttemptID>();
+ Map<ResourceGrant, TaskAttemptID> processed =
+ new HashMap<ResourceGrant, TaskAttemptID>();
+ Set<String> nodesOfGrants = new HashSet<String>();
synchronized(lockObject) {
- for (ResourceGrant grant: grantsToRevoke) {
+ for (ResourceGrant grant : grantsToRevoke) {
TaskAttemptID attemptId = taskLookupTable.taskForGrant(grant);
if (attemptId != null) {
+ if (removeFromTasksForLaunch(attemptId)) {
+ // Kill the task in the job since it never got launched
+ expireLaunchingTasks.failedLaunch(attemptId);
+ continue;
+ }
boolean shouldFail = false;
boolean killed = killTaskUnprotected(attemptId, shouldFail);
- processed.put(grant.getId(), attemptId);
+ processed.put(grant, attemptId);
+ nodesOfGrants.add(grant.getNodeName());
// Grant will get removed from the resource tracker
// when the kill takes effect and we get a response from TT.
- queueKillActions(grant.getNodeName());
- taskLookupTable.removeTaskEntry(attemptId);
}
}
+ for (String ttNode : nodesOfGrants) {
+ queueKillActions(ttNode);
+ }
}
- for (Map.Entry<Integer, TaskAttemptID> entry: processed.entrySet()) {
- LOG.info("Revoking resource " + entry.getKey() +
+ for (Map.Entry<ResourceGrant, TaskAttemptID> entry : processed.entrySet()) {
+ LOG.info("Revoking resource " + entry.getKey().getId() +
" task: " + entry.getValue());
+ grantsToRevoke.remove(entry.getKey());
}
}
+
+ boolean removeFromTasksForLaunch(TaskAttemptID attempt) {
+ synchronized(actionsToSend) {
+ Iterator<ActionToSend> actionIter = actionsToSend.iterator();
+ while (actionIter.hasNext()) {
+ ActionToSend action = actionIter.next();
+ if (action.action instanceof LaunchTaskAction &&
+ ((LaunchTaskAction)action.action).getTask().getTaskID().equals(attempt)) {
+ actionIter.remove();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
void queueTaskForLaunch(Task task, String trackerName,
org.apache.hadoop.corona.InetAddress addr) {
@@ -727,9 +761,10 @@ public synchronized void resetClient(InetSocketAddress s) {
protected CoronaTaskTrackerProtocol createClient(InetSocketAddress s)
throws IOException {
LOG.info("Creating client to " + s.getHostName() + ":" + s.getPort());
+ long connectTimeout = conf.getLong(TT_CONNECT_TIMEOUT_MSEC_KEY, 10000L);
return (CoronaTaskTrackerProtocol) RPC.waitForProxy(
CoronaTaskTrackerProtocol.class,
- CoronaTaskTrackerProtocol.versionID, s, conf);
+ CoronaTaskTrackerProtocol.versionID, s, conf, connectTimeout);
}
public synchronized void clearClient(InetSocketAddress s) {
@@ -766,10 +801,18 @@ void launchTasks() throws InterruptedException {
actions.addAll(actionsToSend);
actionsToSend.clear();
}
+ Set<InetSocketAddress> badTrackers = new HashSet<InetSocketAddress>();
for (ActionToSend actionToSend: actions) {
// Get the tracker address.
InetSocketAddress trackerAddress =
new InetSocketAddress(actionToSend.trackerHost, actionToSend.port);
+ if (badTrackers.contains(trackerAddress)) {
+ LOG.info("Not sending " + actionToSend.action.getClass() + " to " +
+ actionToSend.trackerHost + " since previous communication " +
+ " in this run failed");
+ processSendingError(actionToSend);
+ continue;
+ }
// Fill in the job tracker information.
CoronaSessionInfo info = new CoronaSessionInfo(sessionId, jobTrackerAddress);
actionToSend.action.setExtensible(info);
@@ -782,15 +825,24 @@ void launchTasks() throws InterruptedException {
LOG.error("Could not send " + actionToSend.action.getClass() +
" action to " + actionToSend.trackerHost, e);
trackerClientCache.resetClient(trackerAddress);
- if (actionToSend.action instanceof LaunchTaskAction) {
- LaunchTaskAction launchTaskAction = (LaunchTaskAction) actionToSend.action;
- TaskAttemptID attempt = launchTaskAction.getTask().getTaskID();
- expireLaunchingTasks.failedLaunch(attempt);
- }
+ badTrackers.add(trackerAddress);
+ processSendingError(actionToSend);
}
}
}
+ void processSendingError(ActionToSend actionToSend) {
+ if (actionToSend.action instanceof LaunchTaskAction) {
+ LaunchTaskAction launchTaskAction = (LaunchTaskAction) actionToSend.action;
+ TaskAttemptID attempt = launchTaskAction.getTask().getTaskID();
+ expireLaunchingTasks.failedLaunch(attempt);
+ } else if (actionToSend.action instanceof KillTaskAction) {
+ KillTaskAction killTaskAction = (KillTaskAction) actionToSend.action;
+ TaskAttemptID attempt = killTaskAction.getTaskID();
+ failTask(attempt, "TaskTracker is dead", true);
+ }
+ }
+
/**
* A thread to update resource requests/releases.
*/
@@ -844,15 +896,16 @@ public void updateResources() throws IOException {
}
}
- Task getSetupAndCleanupTasks(String taskTrackerName, String hostName) {
+ Task getSetupAndCleanupTasks(String taskTrackerName, String hostName,
+ boolean isMapGrant) {
Task t = null;
- t = job.obtainJobCleanupTask(taskTrackerName, hostName, true);
+ t = job.obtainJobCleanupTask(taskTrackerName, hostName, isMapGrant);
if (t == null) {
- t = job.obtainTaskCleanupTask(taskTrackerName, true);
+ t = job.obtainTaskCleanupTask(taskTrackerName, isMapGrant);
}
if (t == null) {
- t = job.obtainJobSetupTask(taskTrackerName, hostName, true);
+ t = job.obtainJobSetupTask(taskTrackerName, hostName, isMapGrant);
}
return t;
}
@@ -1162,6 +1215,8 @@ public void revokeResource(String handle,
synchronized(lockObject) {
grantsToRevoke.addAll(revoked);
}
+ LOG.info("Giving up " + revoked.size() + " grants: " +
+ revoked.toString());
}
/////////////////////////////////////////////////////////////////////////////
View
61 src/contrib/corona/src/java/org/apache/hadoop/mapred/MultiCoronaTaskTracker.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker;
+
+public class MultiCoronaTaskTracker {
+
+ public static void main(String[] args) throws IOException {
+ int numTaskTrackers = Integer.parseInt(args[0]);
+ List<TaskTrackerRunner> runners = new ArrayList<TaskTrackerRunner>();
+ for (int i = 0; i < numTaskTrackers; i++) {
+ Configuration conf = new Configuration();
+ JobConf jConf = new JobConf(conf);
+ jConf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
+ String[] baseLocalDirs = jConf.getLocalDirs();
+ List<String> localDirs = new LinkedList<String>();
+ localDirs.clear();
+ for (String localDir : baseLocalDirs) {
+ File baseLocalDir = new File(localDir);
+ File localDirFile = new File(baseLocalDir, "TT_" + i);
+ localDirFile.mkdirs();
+ localDirs.add(localDirFile.getAbsolutePath());
+ }
+ jConf.setStrings("mapred.local.dir",
+ localDirs.toArray(new String[localDirs.size()]));
+ CoronaTaskTracker tracker = new CoronaTaskTracker(jConf);
+ TaskTrackerRunner runner = new TaskTrackerRunner(tracker);
+ runner.setDaemon(true);
+ runners.add(runner);
+ runner.start();
+ }
+ for (TaskTrackerRunner runner : runners) {
+ try {
+ runner.join();
+ } catch (InterruptedException iex) {
+ }
+ }
+ }
+
+ private static class TaskTrackerRunner extends Thread {
+
+ private TaskTracker ttToRun = null;
+
+ public TaskTrackerRunner(CoronaTaskTracker tt) {
+ super();
+ this.ttToRun = tt;
+ }
+
+ @Override
+ public void run() {
+ ttToRun.run();
+ }
+ }
+}
+
View
9 src/contrib/corona/src/java/org/apache/hadoop/mapred/ResourceTracker.java
@@ -177,10 +177,9 @@ public void releaseAndRequestAnotherResource(Integer grantIdToRelease) {
synchronized(lockObject) {
TaskInProgress task = requestToTipMap.get(grantIdToRelease);
- if (task == null) {
- LOG.info ("releaseAndRequest for grant: " + grantIdToRelease + " has no matching task");
- return;
- }
+ assert (task != null) : ("releaseAndRequest for grant: " +
+ grantIdToRelease + " has no matching task");
+
tipId = task.getTIPId();
ResourceGrant grantToRelease = grantedResources.get(grantIdToRelease);
String excluded = grantToRelease.getAddress().getHost();
@@ -323,6 +322,8 @@ public void addNewGrants(List<ResourceGrant> grants) {
LOG.info("Request for grant " + grant.getId() + " no longer exists");
continue;
}
+ assert !grantedResources.containsKey(grant.getId()) :
+ "Grant " + grant.getId() + " has already been processed.";
updateTrackerAddressUnprotected(grant);
addGrantedResourceUnprotected(grant);
View
3  src/contrib/corona/src/test/org/apache/hadoop/mapred/TestResourceTracker.java
@@ -34,6 +34,9 @@ protected void setUp() {
public DataStatistics getRunningTaskStatistics(boolean isMap) { return null; }
@Override
+ DataStatistics getRunningTaskStatistics(TaskStatus.Phase phase) { return null; }
+
+ @Override
public float getSlowTaskThreshold() { return 0; }
@Override
View
43 src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
@@ -122,8 +122,11 @@
public static final String TASK_LIMIT_PROPERTY = "mapred.fairscheduler.total.task.limit";
public static final int DEFAULT_TOTAL_TASK_LIMIT = 800000;
+ public static final String SOFT_TASK_LIMIT_PERCENT = "mapred.fairscheduler.soft.task.limit.percent";
+ public static final float DEFAULT_SOFT_TASK_LIMIT_PERCENT = 0.8f;
private static final int SLOW_UPDATE_TASK_COUNTS_PERIOD = 30;
private int totalTaskLimit;
+ private double softTaskLimit;
private int totalTaskCount;
private int updateTaskCountsCounter = 0;
@@ -343,6 +346,9 @@ public void start() {
"mapred.fairscheduler.dump.status.period", dumpStatusPeriod);
totalTaskLimit = conf.getInt(
TASK_LIMIT_PROPERTY, DEFAULT_TOTAL_TASK_LIMIT);
+ jobInitializer.notifyTaskLimit(totalTaskLimit);
+ softTaskLimit = conf.getFloat(SOFT_TASK_LIMIT_PERCENT,
+ DEFAULT_SOFT_TASK_LIMIT_PERCENT);
if (defaultDelay == -1 &&
(localityDelayNodeLocal == -1 || localityDelayRackLocal == -1)) {
autoComputeLocalityDelay = true; // Compute from heartbeat interval
@@ -1177,12 +1183,16 @@ private void updateRunnability() {
Map<String, Integer> userJobs = new HashMap<String, Integer>();
Map<String, Integer> poolJobs = new HashMap<String, Integer>();
Map<String, Integer> poolTasks = new HashMap<String, Integer>();
+ Map<String, Integer> poolWaitingMaps = new HashMap<String, Integer>();
+ List<JobInProgress> waitingJobs = new LinkedList<JobInProgress>();
for (JobInProgress job: jobs) {
String user = job.getJobConf().getUser();
String pool = poolMgr.getPoolName(job);
int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
int poolTaskCount = poolTasks.containsKey(pool) ? poolTasks.get(pool) : 0;
+ int waitingMaps = poolWaitingMaps.containsKey(pool) ?
+ poolWaitingMaps.get(pool) : 0;
if (userCount < poolMgr.getUserMaxJobs(user) &&
poolCount < poolMgr.getPoolMaxJobs(pool) &&
poolTaskCount < poolMgr.getPoolMaxInitedTasks(pool)) {
@@ -1190,17 +1200,46 @@ private void updateRunnability() {
job.getStatus().getRunState() == JobStatus.PREP) {
userJobs.put(user, userCount + 1);
poolJobs.put(pool, poolCount + 1);
- poolTasks.put(pool, poolTaskCount + infos.get(job).totalInitedTasks);
+ poolTasks.put(pool, poolTaskCount + job.numMapTasks +
+ job.numReduceTasks);
JobInfo jobInfo = infos.get(job);
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ poolWaitingMaps.put(pool, waitingMaps + job.neededMaps());
jobInfo.runnable = true;
} else {
+ poolWaitingMaps.put(pool, waitingMaps + job.numMapTasks);
+ totalTaskCount += job.numMapTasks + job.numReduceTasks;
if (jobInfo.needsInitializing) {
jobInfo.needsInitializing = false;
jobInitializer.addJob(job);
}
}
}
+ } else {
+ waitingJobs.add(job);
+ }
+ }
+ for (JobInProgress job : waitingJobs) {
+ // Have another go at all the jobs that are waiting to be scheduled
+ String pool = poolMgr.getPoolName(job);
+ int waitingMaps = poolWaitingMaps.containsKey(pool) ?
+ poolWaitingMaps.get(pool) : 0;