Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merged conflicted sources

  • Loading branch information...
commit ed6ddd359ecb91f3f310f673a8ac93adf727999c 2 parents f5b8c7d + 5a021db
@baepiff authored
Showing with 16,474 additions and 1,621 deletions.
  1. +5 −2 .classpath
  2. +1 −0  .gitignore
  3. +4 −4 .settings/org.eclipse.jdt.core.prefs
  4. +7 −0 META-INF/MANIFEST.MF
  5. +87 −39 bin/generate_cluster_xml.py
  6. +93 −0 bin/repeat-junit-test.sh
  7. +75 −0 bin/repeat-junit.sh
  8. +6 −1 build.properties
  9. +51 −8 build.xml
  10. +4 −0 clients/python/voldemort/client.py
  11. +94 −5 clients/python/voldemort/protocol/voldemort_admin_pb2.py
  12. +39 −32 config/single_node_cluster/config/stores.xml
  13. +5 −5 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
  14. +2 −1  contrib/hadoop-store-builder/perf/voldemort/contrib/batchindexer/performance/BdbBuildPerformanceTest.java
  15. +2 −1  contrib/hadoop-store-builder/perf/voldemort/contrib/batchindexer/performance/MysqlBuildPerformanceTest.java
  16. +339 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java
  17. +358 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java
  18. +40 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java
  19. +59 −37 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
  20. +20 −88 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java
  21. +255 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java
  22. +150 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java
  23. +115 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducer.java
  24. +114 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducerPerBucket.java
  25. +71 −18 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java
  26. +18 −237 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducer.java
  27. +17 −248 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducerPerBucket.java
  28. +16 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderUtils.java
  29. +16 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreJobRunner.java
  30. +16 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HdfsDataFileChunk.java
  31. +36 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonMapper.java
  32. +38 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonReducer.java
  33. +92 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java
  34. +89 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/VoldemortStoreBuilderMapper.java
  35. +284 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractHadoopJob.java
  36. +132 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractVoldemortBatchCopyJob.java
  37. +75 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/Job.java
  38. +29 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/StoreBuilderTransformation.java
  39. +27 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/UndefinedPropertyException.java
  40. +421 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBatchIndexJob.java
  41. +886 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java
  42. +850 −0 ...b/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortMultiStoreBuildAndPushJob.java
  43. +132 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortRollbackJob.java
  44. +469 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortStoreBuilderJob.java
  45. +216 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortSwapJob.java
  46. +88 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortSwapperUtils.java
  47. +92 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonConfigurable.java
  48. +115 −0 ...rib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonDeserializerComparator.java
  49. +75 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonMapper.java
  50. +60 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonOutputCollector.java
  51. +97 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonReducer.java
  52. +126 −0 ...ib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonSequenceFileInputFormat.java
  53. +109 −0 ...b/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonSequenceFileOutputFormat.java
  54. +121 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/AvroUtils.java
  55. +203 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/EmailMessage.java
  56. +1,001 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/HadoopUtils.java
  57. +65 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/JsonSchema.java
  58. +56 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java
  59. +153 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java
  60. +138 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/VoldemortUtils.java
  61. +10 −3 contrib/krati/src/java/voldemort/store/krati/KratiStorageConfiguration.java
  62. +1 −1  contrib/krati/src/java/voldemort/store/krati/KratiStorageEngine.java
  63. BIN  lib/azkaban-common-0.05.jar
  64. BIN  lib/joda-time-1.6.jar
  65. BIN  lib/mail-1.4.1.jar
  66. +109 −1 release_notes.txt
  67. +3 −2 src/java/log4j.properties
  68. +534 −182 src/java/voldemort/VoldemortAdminTool.java
  69. +29 −14 src/java/voldemort/VoldemortClientShell.java
  70. +150 −27 src/java/voldemort/client/AbstractStoreClientFactory.java
  71. +13 −12 src/java/voldemort/client/CachingStoreClientFactory.java
  72. +262 −1 src/java/voldemort/client/ClientConfig.java
  73. +244 −0 src/java/voldemort/client/ClientInfo.java
  74. +11 −9 src/java/voldemort/client/DefaultStoreClient.java
  75. +3 −4 src/java/voldemort/client/HttpStoreClientFactory.java
  76. +26 −0 src/java/voldemort/client/LazyStoreClient.java
  77. +0 −1  src/java/voldemort/client/MockStoreClientFactory.java
  78. +23 −13 src/java/voldemort/client/SocketStoreClientFactory.java
  79. +0 −3  src/java/voldemort/client/StoreClientFactory.java
  80. +220 −0 src/java/voldemort/client/SystemStore.java
  81. +65 −0 src/java/voldemort/client/SystemStoreRepository.java
  82. +66 −0 src/java/voldemort/client/TimeoutConfig.java
  83. +276 −0 src/java/voldemort/client/ZenStoreClient.java
  84. +370 −29 src/java/voldemort/client/protocol/admin/AdminClient.java
  85. +2 −2 src/java/voldemort/client/protocol/admin/SocketPool.java
  86. +809 −69 src/java/voldemort/client/protocol/pb/VAdminProto.java
  87. +1 −1  src/java/voldemort/client/protocol/vold/VoldemortNativeClientRequestFormat.java
  88. +3 −0  src/java/voldemort/client/rebalance/RebalanceController.java
  89. +204 −0 src/java/voldemort/client/scheduler/AsyncMetadataVersionManager.java
  90. +95 −0 src/java/voldemort/client/scheduler/ClientRegistryRefresher.java
  91. +6 −0 src/java/voldemort/cluster/Node.java
  92. +45 −16 src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java
  93. +1 −1  src/java/voldemort/cluster/failuredetector/AsyncRecoveryFailureDetector.java
  94. +2 −1  src/java/voldemort/cluster/failuredetector/BannagePeriodFailureDetector.java
  95. +4 −0 src/java/voldemort/cluster/failuredetector/BasicStoreVerifier.java
  96. +6 −0 src/java/voldemort/cluster/failuredetector/ClientStoreVerifier.java
  97. +36 −2 src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java
  98. +4 −0 src/java/voldemort/cluster/failuredetector/ServerStoreVerifier.java
  99. +5 −0 src/java/voldemort/cluster/failuredetector/StoreVerifier.java
  100. +1 −1  src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java
  101. +53 −0 src/java/voldemort/common/OpTimeMap.java
  102. +1 −1  src/java/voldemort/{serialization → common}/VoldemortOpCode.java
  103. +1 −1  src/java/voldemort/{server → common/service}/AbstractService.java
  104. +19 −4 src/java/voldemort/{server/scheduler → common/service}/SchedulerService.java
  105. +1 −1  src/java/voldemort/{server → common/service}/ServiceType.java
  106. +2 −1  src/java/voldemort/{server → common/service}/VoldemortService.java
  107. +42 −0 src/java/voldemort/routing/RouteToAllLocalPrefStrategy.java
  108. +2 −0  src/java/voldemort/routing/RoutingStrategyFactory.java
  109. +1 −0  src/java/voldemort/routing/RoutingStrategyType.java
  110. +19 −1 src/java/voldemort/serialization/DefaultSerializerFactory.java
  111. +1 −0  src/java/voldemort/serialization/VoldemortOperation.java
  112. +171 −0 src/java/voldemort/serialization/avro/versioned/AvroVersionedGenericSerializer.java
  113. +874 −0 src/java/voldemort/serialization/avro/versioned/SchemaEvolutionValidator.java
  114. +3 −0  src/java/voldemort/server/AbstractSocketService.java
  115. +114 −3 src/java/voldemort/server/VoldemortConfig.java
  116. +19 −5 src/java/voldemort/server/VoldemortServer.java
  117. +3 −3 src/java/voldemort/server/gossip/GossipService.java
  118. +2 −2 src/java/voldemort/server/http/HttpService.java
  119. +1 −1  src/java/voldemort/server/http/StoreServlet.java
  120. +1 −1  src/java/voldemort/server/http/gui/ReadOnlyStoreManagementServlet.java
  121. +1 −1  src/java/voldemort/server/http/gui/StatusServlet.java
  122. +3 −12 src/java/voldemort/server/jmx/JmxService.java
  123. +47 −3 src/java/voldemort/server/niosocket/AsyncRequestHandler.java
  124. +26 −2 src/java/voldemort/server/niosocket/NioSelectorManager.java
  125. +26 −3 src/java/voldemort/server/niosocket/NioSocketService.java
  126. +1 −0  src/java/voldemort/server/protocol/AbstractRequestHandler.java
  127. +132 −54 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
  128. +3 −3 src/java/voldemort/server/protocol/admin/AsyncOperationService.java
  129. +12 −1 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
  130. +123 −1 src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java
  131. +1 −1  src/java/voldemort/server/rebalance/Rebalancer.java
  132. +3 −3 src/java/voldemort/server/rebalance/RebalancerService.java
  133. +69 −34 src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java
  134. +58 −63 src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java
  135. +25 −8 src/java/voldemort/server/scheduler/DataCleanupJob.java
  136. +5 −5 src/java/voldemort/server/scheduler/slop/BlockingSlopPusherJob.java
  137. +4 −4 src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
  138. +1 −1  src/java/voldemort/server/socket/SocketService.java
  139. +13 −13 src/java/voldemort/server/storage/RepairJob.java
  140. +88 −0 src/java/voldemort/server/storage/ScanPermitWrapper.java
  141. +330 −21 src/java/voldemort/server/storage/StorageService.java
  142. +9 −2 src/java/voldemort/store/StorageConfiguration.java
  143. +43 −11 src/java/voldemort/store/StoreDefinition.java
  144. +48 −24 src/java/voldemort/store/StoreDefinitionBuilder.java
  145. +15 −3 src/java/voldemort/store/bdb/BdbRuntimeConfig.java
  146. +150 −23 src/java/voldemort/store/bdb/BdbStorageConfiguration.java
  147. +81 −2 src/java/voldemort/store/bdb/BdbStorageEngine.java
  148. +131 −35 src/java/voldemort/store/bdb/stats/BdbEnvironmentStats.java
  149. +70 −0 src/java/voldemort/store/bdb/stats/SpaceUtilizationStats.java
  150. +55 −0 src/java/voldemort/store/configuration/FileBackedCachingStorageConfiguration.java
  151. +336 −0 src/java/voldemort/store/configuration/FileBackedCachingStorageEngine.java
  152. +2 −3 src/java/voldemort/store/logging/LoggingStore.java
  153. +8 −2 src/java/voldemort/store/memory/CacheStorageConfiguration.java
  154. +8 −2 src/java/voldemort/store/memory/InMemoryStorageConfiguration.java
  155. +51 −5 src/java/voldemort/store/metadata/MetadataStore.java
  156. +7 −2 src/java/voldemort/store/mysql/MysqlStorageConfiguration.java
  157. +2 −2 src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java
  158. +11 −4 src/java/voldemort/store/readonly/ReadOnlyStorageConfiguration.java
  159. +2 −2 src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
  160. +14 −0 src/java/voldemort/store/routed/PipelineData.java
  161. +110 −0 src/java/voldemort/store/routed/PipelineRoutedStats.java
  162. +334 −42 src/java/voldemort/store/routed/PipelineRoutedStore.java
  163. +4 −3 src/java/voldemort/store/routed/RoutedStore.java
  164. +33 −7 src/java/voldemort/store/routed/RoutedStoreFactory.java
  165. +24 −9 src/java/voldemort/store/routed/ThreadPoolRoutedStore.java
  166. +21 −4 src/java/voldemort/store/routed/action/AbstractReadRepair.java
  167. +147 −0 src/java/voldemort/store/routed/action/ConfigureNodesByZone.java
  168. +87 −0 src/java/voldemort/store/routed/action/ConfigureNodesDefault.java
  169. +91 −0 src/java/voldemort/store/routed/action/ConfigureNodesLocalHost.java
  170. +88 −0 src/java/voldemort/store/routed/action/ConfigureNodesLocalHostByZone.java
  171. +4 −3 src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java
  172. +3 −1 src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java
  173. +21 −17 src/java/voldemort/store/routed/action/PerformParallelPutRequests.java
  174. +17 −0 src/java/voldemort/store/routed/action/PerformParallelRequests.java
  175. +49 −18 src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java
  176. +37 −5 src/java/voldemort/store/routed/action/PerformSerialPutRequests.java
  177. +35 −1 src/java/voldemort/store/routed/action/PerformSerialRequests.java
  178. +32 −11 src/java/voldemort/store/slop/HintedHandoff.java
Sorry, we could not display the entire diff because it was too big.
View
7 .classpath
@@ -51,8 +51,11 @@
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
<classpathentry kind="lib" path="lib/compress-lzf-0.9.1.jar"/>
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
- <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar" />
- <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar" />
+ <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar"/>
+ <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+ <classpathentry kind="lib" path="lib/joda-time-1.6.jar"/>
+ <classpathentry kind="lib" path="lib/mail-1.4.1.jar"/>
+ <classpathentry kind="lib" path="lib/azkaban-common-0.05.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
1  .gitignore
@@ -13,3 +13,4 @@ server.state
.version
.temp
.idea
+data/
View
8 .settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Fri Dec 30 14:37:10 PST 2011
+#Thu Aug 30 10:43:57 PDT 2012
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -11,9 +11,9 @@ org.eclipse.jdt.core.codeComplete.staticFieldSuffixes=
org.eclipse.jdt.core.codeComplete.staticFinalFieldPrefixes=
org.eclipse.jdt.core.codeComplete.staticFinalFieldSuffixes=
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.5
+org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
@@ -77,7 +77,7 @@ org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disa
org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.5
+org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=82
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=82
View
7 META-INF/MANIFEST.MF
@@ -0,0 +1,7 @@
+Manifest-Version: 1.0
+Ant-Version: Apache Ant 1.7.1
+Created-By: 20.2-b06 (Sun Microsystems Inc.)
+Implementation-Title: Voldemort
+Implementation-Version: 1.0.0
+Implementation-Vendor: LinkedIn
+
View
126 bin/generate_cluster_xml.py
@@ -1,42 +1,90 @@
+#!/usr/bin/python
+
import sys
import random
+import argparse
+
+# Get a random seed
+rseed = int(random.randint(00000000001,99999999999))
+
+# Setup and argument parser
+parser = argparse.ArgumentParser(description='Build a voldemort cluster.xml.')
+# Add supported arguments
+parser.add_argument('-N', '--name', type=str, default='voldemort', dest='name',
+ help='the name you want to give the cluster')
+parser.add_argument('-n', '--nodes', type=int, default=2, dest='nodes',
+ help='the number of nodes in the cluster')
+parser.add_argument('-p', '--partitions', type=int, default=300,
+ dest='partitions', help='number of partitions per node')
+parser.add_argument('-s', '--socket-port', type=int, default=6666,
+ dest='sock_port', help='socket port number')
+parser.add_argument('-a', '--admin-port', type=int, default=6667,
+ dest='admin_port', help='admin port number')
+parser.add_argument('-H', '--http-port', type=int, default=6665,
+ dest='http_port', help='http port number')
+genType = parser.add_mutually_exclusive_group()
+genType.add_argument('-S', '--seed', type=int, default=rseed, dest='seed',
+ help='seed for randomizing partition distribution')
+genType.add_argument('-l', '--loops', type=int, default=1000, dest='loops',
+ help='loop n times, using a different random seed every \
+ time (Note: not currently supported)')
+parser.add_argument('-z', '--zones', type=int, dest='zones',
+ help='if using zones, the number of zones you will have\
+ (Note: you must add your own <zone> fields \
+ manually)')
+
+# Parse arguments
+args = parser.parse_args()
+
+# Check args
+if args.zones:
+ zones = args.zones
+ if (args.nodes % zones) != 0:
+ print "Number of nodes must be evenly divisible by number of zones"
+ sys.exit(1)
+
+# Store arguments
+nodes = args.nodes
+partitions = args.partitions
+name = args.name
+http_port = args.http_port
+sock_port = args.sock_port
+admin_port = args.admin_port
+seed = args.seed
+
+# Generate the full list of partition IDs
+part_ids = range(nodes * partitions)
+# Generate full list of zone IDs
+if args.zones:
+ zone_ids = range(zones)
+ zone_id = 0
+
+# Shuffle up the partitions
+random.seed(seed)
+random.shuffle(part_ids)
+
+# Printing cluster.xml
+print "<!-- Partition distribution generated using seed [%d] -->" % seed
+print "<cluster>"
+print " <name>%s</name>" % name
+
+for i in xrange(nodes):
+ node_partitions = ", ".join(str(p) for p in sorted(part_ids[i*partitions:(i+1)*partitions]))
+
+ print " <server>"
+ print " <id>%d</id>" % i
+ print " <host>host%d</host>" % i
+ print " <http-port>%d</http-port>" % http_port
+ print " <socket-port>%d</socket-port>" % sock_port
+ print " <admin-port>%d</admin-port>" % admin_port
+ print " <partitions>%s</partitions>" % node_partitions
+ # If zones are being used, assign a zone-id
+ if args.zones:
+ print " <zone-id>%d</zone-id>" % zone_id
+ if zone_id == (zones - 1):
+ zone_id = 0
+ else:
+ zone_id += 1
+ print " </server>"
-if len(sys.argv) != 3:
- print >> sys.stderr, "USAGE: python generate_partitions.py <nodes_file> <partitions_per_node>"
- sys.exit()
-
-FORMAT_WIDTH = 10
-
-nodes = 0
-for line in open(sys.argv[1],'r'):
- nodes+=1
-
-partitions = int(sys.argv[2])
-
-ids = range(nodes * partitions)
-
-# use known seed so this is repeatable
-random.seed(92873498274)
-random.shuffle(ids)
-
-print '<cluster>'
-print '<name>prodcluster</name>'
-id = 0
-for host in open(sys.argv[1],'r'):
- print '<server>'
- print " <id>%d</id>" % id
- print " <host>%s</host>" % host.strip()
- print ' <http-port>8081</http-port>'
- print ' <socket-port>6666</socket-port>'
- print ' <partitions>',
- node_ids = sorted(ids[id*partitions:(id+1)*partitions])
- for j in xrange(len(node_ids)):
- print str(node_ids[j]) + ',',
- if j % FORMAT_WIDTH == FORMAT_WIDTH - 1:
- print ' ',
- print ' </partitions>'
- print '</server>'
- id += 1
-print '</cluster>'
-
-
+print "</cluster>"
View
93 bin/repeat-junit-test.sh
@@ -0,0 +1,93 @@
+#!/bin/bash -e
+
+# Copyright 2012 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+usage() {
+ echo
+ echo "Usage:"
+ echo "bin/repeat-junit-test.sh test_file num_times"
+ echo
+ cat <<EOF
+Invoke bin/repeat-junit-test.sh from the root of a Voldemort
+checkout. bin/repeat-junit-test.sh invokes 'ant junit-test' num_times
+for test test_name.
+
+The argument num_times must be an integer. The argument test_name must
+be a class name suitable for 'ant junit-test'. I.e., a fully qualified
+java class name. Remember, the class name does not include the .java
+extension. An example test_name is voldemort.utils.ServerTestUtilsTest.
+
+The pretty html junit output that ends up in dist/junit-single-report
+on a single invocation of 'ant junit-test' is collected in a temp
+directory. This circumvents the normal behavior of ant in which
+dist/junit-single-report is overwritten with each invocation of 'ant
+junit-test'.
+
+bin/repeat-junit-test.sh is useful to run after adding a new test
+case, or when trying to reproduce intermittent failures of a specific
+test.
+EOF
+}
+
+if [ $# != 2 ]; then
+ echo "ERROR: Incorrect number of arguments: $# provided, 2 needed." >&2
+ usage
+ exit 1
+fi
+
+TESTNAME=$1
+# Hackish test that makes sure some java file exists for given
+# testname. No guarantee that junit-test can run the specified test,
+# but at least protects against typos.
+FILENAME=`echo $TESTNAME | sed 's/.*\.//g'`.java
+FINDFILE=`find . -name "$FILENAME" | wc -l`
+if [[ $FINDFILE == 0 ]]
+then
+ echo "ERROR: Did not find an appropriate file (with name $FILENAME), given test name $TESTNAME." >&2
+ usage
+ exit 1
+fi
+
+
+NUMTIMES=$2
+if [[ ! $NUMTIMES == +([0-9]) ]]
+then
+ echo "ERROR: argument num_times is not an integer: $NUMTIMES." >&2
+ usage
+ exit 1
+fi
+
+TMPDIR=`mktemp -d -p '/tmp/'`
+
+for ((i=1;i<=$NUMTIMES;i++)); do
+ echo
+ echo "STARTING ITERATION $i"
+ echo
+
+ # Run junit-test and capture stdout to .out and stderr to .err
+ junitiout="$TMPDIR/TEST-$TESTNAME-$i.out"
+ junitierr="$TMPDIR/TEST-$TESTNAME-$i.err"
+ ant junit-test -Dtest.name=$TESTNAME > >(tee $junitiout) 2> >(tee $junitierr >&2)
+
+ # Collect results
+ junitidir="$TMPDIR/junit-single-report-$TESTNAME-$i"
+ echo
+ echo "COLLECTING RESULTS OF ITERATION $i IN $junitidir"
+ cp -r dist/junit-single-reports $junitidir
+ mv $junitiout $junitidir
+ mv $junitierr $junitidir
+done
+
+
View
75 bin/repeat-junit.sh
@@ -0,0 +1,75 @@
+#!/bin/bash -e
+
+# Copyright 2012 LinkedIn, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+usage() {
+ echo
+ echo "Usage:"
+ echo "bin/repeat-junit.sh num_times"
+ echo
+ cat <<EOF
+Invoke bin/repeat-junit.sh from the root of a Voldemort
+checkout. bin/repeat-junit.sh invokes 'ant junit' num_times.
+
+The argument num_times must be an integer.
+
+The pretty html junit output that ends up in dist/junit-reports on a
+single invocation of 'ant junit' is collected in a temp
+directory. This circumvents the normal behavior of ant in which
+dist/junit-reports is overwritten with each invocation of 'ant
+junit'.
+
+bin/repeat-junit.sh is useful to run after making some substantial
+changes, or when trying to track down intermittent failures (that
+occur more on your local box then on a Hudson test machine...).
+EOF
+}
+
+if [ $# != 1 ]; then
+ echo "ERROR: Incorrect number of arguments: $# provided, 1 needed." >&2
+ usage
+ exit 1
+fi
+
+NUMTIMES=$1
+if [[ ! $NUMTIMES == +([0-9]) ]]
+then
+ echo "ERROR: argument num_times is not an integer: $NUMTIMES." >&2
+ usage
+ exit 1
+fi
+
+TMPDIR=`mktemp -d -p '/tmp/'`
+
+for ((i=1;i<=$NUMTIMES;i++)); do
+ echo
+ echo "STARTING ITERATION $i"
+ echo
+
+ # Run junit and capture stdout to .out and stderr to .err
+ junitiout="$TMPDIR/junit-$i.out"
+ junitierr="$TMPDIR/junit-$i.err"
+ ant junit > >(tee $junitiout) 2> >(tee $junitierr >&2)
+
+ # Collect results
+ junitidir="$TMPDIR/junit-reports-$i"
+ echo
+ echo "COLLECTING RESULTS OF ITERATION $i IN $junitidir"
+ cp -r dist/junit-reports $junitidir
+ mv $junitiout $junitidir
+ mv $junitierr $junitidir
+done
+
+
View
7 build.properties
@@ -9,10 +9,15 @@ classes.dir=dist/classes
resources.dir=dist/resources
commontestsrc.dir=test/common
unittestsrc.dir=test/unit
+longtestsrc.dir=test/long
inttestsrc.dir=test/integration
testclasses.dir=dist/testclasses
testreport.dir=dist/junit-reports
testhtml.dir=dist/junit-reports/html
+singletestreport.dir=dist/junit-single-reports
+singletesthtml.dir=dist/junit-single-reports/html
+longtestreport.dir=dist/junit-long-reports
+longtesthtml.dir=dist/junit-long-reports/html
## Contrib
contrib.root.dir=contrib
@@ -34,4 +39,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort
## Release
-curr.release=0.90.1
+curr.release=1.0.0
View
59 build.xml
@@ -5,7 +5,7 @@
<property name="name" value="voldemort" />
<property name="display.name" value="Voldemort" />
- <property name="author" value="Jay Kreps, Roshan Sumbaly, Alex Feinberg, Bhupesh Bansal, Lei Gao" />
+ <property name="author" value="Jay Kreps, Roshan Sumbaly, Alex Feinberg, Bhupesh Bansal, Lei Gao, Chinmay Soman, Vinoth Chandar, Zhongjie Wu" />
<property environment="env" />
<path id="main-classpath">
@@ -27,6 +27,7 @@
</condition>
<path id="contrib-classpath">
+ <pathelement path="${resources.dir}" />
<fileset dir="${dist.dir}">
<include name="${name}-${curr.release}.jar" />
</fileset>
@@ -37,6 +38,7 @@
</path>
<path id="test-classpath">
+ <pathelement path="${resources.dir}" />
<pathelement path="${env.VOLD_TEST_JARS}" />
<path refid="main-classpath" />
<pathelement path="${testclasses.dir}" />
@@ -76,6 +78,12 @@
<exclude name="**/log4j.properties" />
</fileset>
</copy>
+ <replace-dir dir="META-INF" />
+ <manifest file="META-INF/MANIFEST.MF">
+ <attribute name="Implementation-Title" value="Voldemort" />
+ <attribute name="Implementation-Version" value="${curr.release}" />
+ <attribute name="Implementation-Vendor" value="LinkedIn" />
+ </manifest>
<!-- place to put log4j.properties -->
<replace-dir dir="${resources.dir}"/>
<copy file="${java.dir}/log4j.properties" todir="${resources.dir}"/>
@@ -103,6 +111,7 @@
<src path="${unittestsrc.dir}" />
<src path="${inttestsrc.dir}" />
<src path="${commontestsrc.dir}" />
+ <src path="${longtestsrc.dir}" />
<classpath refid="main-classpath" />
</javac>
</target>
@@ -140,6 +149,9 @@
<fileset dir="${java.dir}">
<include name="**/*.xsd" />
</fileset>
+ <fileset dir=".">
+ <include name="META-INF/MANIFEST.MF" />
+ </fileset>
</jar>
</target>
@@ -210,7 +222,7 @@
<target name="contrib-junit" depends="contrib-jar" description="Run contrib junit tests except EC2 and Krati tests.">
<replace-dir dir="${contribtestreport.dir}" />
<replace-dir dir="${contribtesthtml.dir}" />
- <junit printsummary="yes" maxmemory="1024m" showoutput="true" failureProperty="test.failure">
+ <junit printsummary="yes" maxmemory="2048m" showoutput="true" failureProperty="test.failure">
<classpath refid="contrib-test-classpath" />
<formatter type="xml" />
<batchtest fork="yes" todir="${contribtestreport.dir}">
@@ -237,7 +249,7 @@
</copy>
<replace-dir dir="${contribtestreport.dir}" />
<replace-dir dir="${contribtesthtml.dir}" />
- <junit printsummary="yes" maxmemory="1024m" showoutput="true" failureProperty="test.failure">
+ <junit printsummary="yes" maxmemory="2048m" showoutput="true" failureProperty="test.failure">
<syspropertyset>
<propertyref prefix="ec2" />
</syspropertyset>
@@ -369,7 +381,7 @@
<target name="junit" depends="build, buildtest" description="Run junit tests.">
<replace-dir dir="${testreport.dir}" />
<replace-dir dir="${testhtml.dir}" />
- <junit printsummary="yes" showoutput="true" maxmemory="1024m">
+ <junit printsummary="yes" showoutput="true" maxmemory="2048m" timeout="1200000">
<classpath refid="test-classpath" />
<formatter type="xml" />
<batchtest fork="yes" todir="${testreport.dir}">
@@ -387,14 +399,45 @@
</junitreport>
</target>
- <target name="junit-test" depends="build, buildtest, contrib-jar" description="Run single junit test with -Dtest.name=">
- <junit printsummary="yes" showoutput="true" maxmemory="1024m">
+ <target name="junit-long" depends="build, buildtest, junit" description="Run long junit tests that uses larger data sets than normal junit tests.">
+ <replace-dir dir="${longtestreport.dir}" />
+ <replace-dir dir="${longtesthtml.dir}" />
+ <junit printsummary="yes" showoutput="true" maxmemory="2048m" fork="yes" timeout="5400000">
+ <classpath refid="test-classpath" />
+ <formatter type="xml" />
+ <batchtest todir="${longtestreport.dir}">
+ <fileset dir="${longtestsrc.dir}">
+ <include name="**/*Test.java" />
+ </fileset>
+ </batchtest>
+ </junit>
+ <junitreport todir="${longtesthtml.dir}">
+ <fileset dir="${longtestreport.dir}">
+ <include name="TEST-*.xml" />
+ </fileset>
+ <report todir="${longtesthtml.dir}" format="frames" />
+ </junitreport>
+ </target>
+
+ <target name="junit-test" depends="build, buildtest, contrib-jar" description="Run single junit test for class ClassName with -Dtest.name=[ClassName] (Note: Use the class name, not the file name with the .java extension)">
+ <replace-dir dir="${singletestreport.dir}" />
+ <replace-dir dir="${singletesthtml.dir}" />
+ <junit printsummary="on" showoutput="true" maxmemory="2048m">
<classpath refid="contrib-test-classpath" />
- <test name="${test.name}" />
+ <classpath path="${log4j.properties.dir}" />
+ <formatter type="plain" />
+ <formatter type="xml" />
+ <test name="${test.name}" todir="${singletestreport.dir}"/>
</junit>
+ <junitreport todir="${singletesthtml.dir}">
+ <fileset dir="${singletestreport.dir}">
+ <include name="TEST-*.xml" />
+ </fileset>
+ <report todir="${singletesthtml.dir}" format="frames" />
+ </junitreport>
</target>
- <target name="junit-all" depends="junit, contrib-junit" description="Run All junit tests including contrib.">
+ <target name="junit-all" depends="junit-long, contrib-junit" description="Run All junit tests including contrib.">
</target>
<macrodef name="make-javadocs">
View
4 clients/python/voldemort/client.py
@@ -239,6 +239,9 @@ def _send_request(self, connection, req_bytes):
## read a response from the connection
def _receive_response(self, connection):
size_bytes = connection.recv(4)
+ if not size_bytes:
+ raise VoldemortException('Connection closed')
+
size = struct.unpack('>i', size_bytes)[0]
bytes_read = 0
@@ -252,6 +255,7 @@ def _receive_response(self, connection):
return ''.join(data)
+
## Bootstrap cluster metadata from a list of urls of nodes in the cluster.
## The urls are tuples in the form (host, port).
## A dictionary of node_id => node is returned.
View
99 clients/python/voldemort/protocol/voldemort_admin_pb2.py
@@ -10,7 +10,7 @@
DESCRIPTOR = descriptor.FileDescriptor(
name='voldemort-admin.proto',
package='voldemort',
- serialized_pb='\n\x15voldemort-admin.proto\x12\tvoldemort\x1a\x16voldemort-client.proto\"!\n\x12GetMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\"]\n\x13GetMetadataResponse\x12%\n\x07version\x18\x01 \x01(\x0b\x32\x14.voldemort.Versioned\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x15UpdateMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"9\n\x16UpdateMetadataResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"7\n\tFileEntry\x12\x11\n\tfile_name\x18\x01 \x02(\t\x12\x17\n\x0f\x66ile_size_bytes\x18\x02 \x02(\x03\"F\n\x0ePartitionEntry\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"\x8e\x01\n\x1dUpdatePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x32\n\x0fpartition_entry\x18\x02 \x02(\x0b\x32\x19.voldemort.PartitionEntry\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"A\n\x1eUpdatePartitionEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"-\n\x0fVoldemortFilter\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"\xaf\x01\n\x18UpdateSlopEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x0b\n\x03key\x18\x02 \x02(\x0c\x12\'\n\x07version\x18\x03 \x02(\x0b\x32\x16.voldemort.VectorClock\x12,\n\x0crequest_type\x18\x04 \x02(\x0e\x32\x16.voldemort.RequestType\x12\r\n\x05value\x18\x05 \x01(\x0c\x12\x11\n\ttransform\x18\x06 \x01(\x0c\"<\n\x19UpdateSlopEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"d\n\x1a\x46\x65tchPartitionFilesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\"\xd7\x01\n\x1c\x46\x65tchPartitionEntriesRequest\x12\x37\n\x14replica_to_partition\x18\x01 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x14\n\x0c\x66\x65tch_values\x18\x04 \x01(\x08\x12\x14\n\x0cskip_records\x18\x05 \x01(\x03\x12\x17\n\x0finitial_cluster\x18\x06 \x01(\t\"\x81\x01\n\x1d\x46\x65tchPartitionEntriesResponse\x12\x32\n\x0fpartition_entry\x18\x01 \x01(\x0b\x32\x19.voldemort.PartitionEntry\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x1f\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x10.voldemort.Error\"\xac\x01\n\x1d\x44\x65letePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x17\n\x0finitial_cluster\x18\x04 \x01(\t\"P\n\x1e\x44\x65letePartitionEntriesResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x03\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\xcf\x01\n\x1dInitiateFetchAndUpdateRequest\x12\x0f\n\x07node_id\x18\x01 \x02(\x05\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x37\n\x14replica_to_partition\x18\x04 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12\x17\n\x0finitial_cluster\x18\x05 \x01(\t\x12\x10\n\x08optimize\x18\x06 \x01(\x08\"1\n\x1b\x41syncOperationStatusRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"/\n\x19\x41syncOperationStopRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"=\n\x1a\x41syncOperationStopResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"2\n\x19\x41syncOperationListRequest\x12\x15\n\rshow_complete\x18\x02 \x02(\x08\"R\n\x1a\x41syncOperationListResponse\x12\x13\n\x0brequest_ids\x18\x01 \x03(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\":\n\x0ePartitionTuple\x12\x14\n\x0creplica_type\x18\x01 \x02(\x05\x12\x12\n\npartitions\x18\x02 \x03(\x05\"e\n\x16PerStorePartitionTuple\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\"\xf8\x01\n\x19RebalancePartitionInfoMap\x12\x12\n\nstealer_id\x18\x01 \x02(\x05\x12\x10\n\x08\x64onor_id\x18\x02 \x02(\x05\x12\x0f\n\x07\x61ttempt\x18\x03 \x02(\x05\x12\x43\n\x18replica_to_add_partition\x18\x04 \x03(\x0b\x32!.voldemort.PerStorePartitionTuple\x12\x46\n\x1breplica_to_delete_partition\x18\x05 \x03(\x0b\x32!.voldemort.PerStorePartitionTuple\x12\x17\n\x0finitial_cluster\x18\x06 \x02(\t\"f\n\x1cInitiateRebalanceNodeRequest\x12\x46\n\x18rebalance_partition_info\x18\x01 \x02(\x0b\x32$.voldemort.RebalancePartitionInfoMap\"m\n#InitiateRebalanceNodeOnDonorRequest\x12\x46\n\x18rebalance_partition_info\x18\x01 \x03(\x0b\x32$.voldemort.RebalancePartitionInfoMap\"\x8a\x01\n\x1c\x41syncOperationStatusResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\t\x12\x10\n\x08\x63omplete\x18\x04 \x01(\x08\x12\x1f\n\x05\x65rror\x18\x05 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x16TruncateEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\":\n\x17TruncateEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"*\n\x0f\x41\x64\x64StoreRequest\x12\x17\n\x0fstoreDefinition\x18\x01 \x02(\t\"3\n\x10\x41\x64\x64StoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x12\x44\x65leteStoreRequest\x12\x11\n\tstoreName\x18\x01 \x02(\t\"6\n\x13\x44\x65leteStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"P\n\x11\x46\x65tchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\x12\x14\n\x0cpush_version\x18\x03 \x01(\x03\"9\n\x10SwapStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"P\n\x11SwapStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\x12\x1a\n\x12previous_store_dir\x18\x02 \x01(\t\"@\n\x14RollbackStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x14\n\x0cpush_version\x18\x02 \x02(\x03\"8\n\x15RollbackStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"&\n\x10RepairJobRequest\x12\x12\n\nstore_name\x18\x01 \x01(\t\"4\n\x11RepairJobResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"=\n\x14ROStoreVersionDirMap\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"/\n\x19GetROMaxVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROMaxVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"3\n\x1dGetROCurrentVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"}\n\x1eGetROCurrentVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"/\n\x19GetROStorageFormatRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROStorageFormatResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"@\n\x17\x46\x61iledFetchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\";\n\x18\x46\x61iledFetchStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\xe6\x01\n\x1bRebalanceStateChangeRequest\x12K\n\x1drebalance_partition_info_list\x18\x01 \x03(\x0b\x32$.voldemort.RebalancePartitionInfoMap\x12\x16\n\x0e\x63luster_string\x18\x02 \x02(\t\x12\x0f\n\x07swap_ro\x18\x03 \x02(\x08\x12\x1f\n\x17\x63hange_cluster_metadata\x18\x04 \x02(\x08\x12\x1e\n\x16\x63hange_rebalance_state\x18\x05 \x02(\x08\x12\x10\n\x08rollback\x18\x06 \x02(\x08\"?\n\x1cRebalanceStateChangeResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"G\n DeleteStoreRebalanceStateRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x0f\n\x07node_id\x18\x02 \x02(\x05\"D\n!DeleteStoreRebalanceStateResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"h\n\x13NativeBackupRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x12\n\nbackup_dir\x18\x02 \x02(\t\x12\x14\n\x0cverify_files\x18\x03 \x02(\x08\x12\x13\n\x0bincremental\x18\x04 \x02(\x08\"\xb7\x0e\n\x15VoldemortAdminRequest\x12)\n\x04type\x18\x01 \x02(\x0e\x32\x1b.voldemort.AdminRequestType\x12\x33\n\x0cget_metadata\x18\x02 \x01(\x0b\x32\x1d.voldemort.GetMetadataRequest\x12\x39\n\x0fupdate_metadata\x18\x03 \x01(\x0b\x32 .voldemort.UpdateMetadataRequest\x12J\n\x18update_partition_entries\x18\x04 \x01(\x0b\x32(.voldemort.UpdatePartitionEntriesRequest\x12H\n\x17\x66\x65tch_partition_entries\x18\x05 \x01(\x0b\x32\'.voldemort.FetchPartitionEntriesRequest\x12J\n\x18\x64\x65lete_partition_entries\x18\x06 \x01(\x0b\x32(.voldemort.DeletePartitionEntriesRequest\x12K\n\x19initiate_fetch_and_update\x18\x07 \x01(\x0b\x32(.voldemort.InitiateFetchAndUpdateRequest\x12\x46\n\x16\x61sync_operation_status\x18\x08 \x01(\x0b\x32&.voldemort.AsyncOperationStatusRequest\x12H\n\x17initiate_rebalance_node\x18\t \x01(\x0b\x32\'.voldemort.InitiateRebalanceNodeRequest\x12\x42\n\x14\x61sync_operation_stop\x18\n \x01(\x0b\x32$.voldemort.AsyncOperationStopRequest\x12\x42\n\x14\x61sync_operation_list\x18\x0b \x01(\x0b\x32$.voldemort.AsyncOperationListRequest\x12;\n\x10truncate_entries\x18\x0c \x01(\x0b\x32!.voldemort.TruncateEntriesRequest\x12-\n\tadd_store\x18\r \x01(\x0b\x32\x1a.voldemort.AddStoreRequest\x12\x33\n\x0c\x64\x65lete_store\x18\x0e \x01(\x0b\x32\x1d.voldemort.DeleteStoreRequest\x12\x31\n\x0b\x66\x65tch_store\x18\x0f \x01(\x0b\x32\x1c.voldemort.FetchStoreRequest\x12/\n\nswap_store\x18\x10 \x01(\x0b\x32\x1b.voldemort.SwapStoreRequest\x12\x37\n\x0erollback_store\x18\x11 \x01(\x0b\x32\x1f.voldemort.RollbackStoreRequest\x12\x44\n\x16get_ro_max_version_dir\x18\x12 \x01(\x0b\x32$.voldemort.GetROMaxVersionDirRequest\x12L\n\x1aget_ro_current_version_dir\x18\x13 \x01(\x0b\x32(.voldemort.GetROCurrentVersionDirRequest\x12\x44\n\x15\x66\x65tch_partition_files\x18\x14 \x01(\x0b\x32%.voldemort.FetchPartitionFilesRequest\x12@\n\x13update_slop_entries\x18\x16 \x01(\x0b\x32#.voldemort.UpdateSlopEntriesRequest\x12>\n\x12\x66\x61iled_fetch_store\x18\x18 \x01(\x0b\x32\".voldemort.FailedFetchStoreRequest\x12\x43\n\x15get_ro_storage_format\x18\x19 \x01(\x0b\x32$.voldemort.GetROStorageFormatRequest\x12\x46\n\x16rebalance_state_change\x18\x1a \x01(\x0b\x32&.voldemort.RebalanceStateChangeRequest\x12/\n\nrepair_job\x18\x1b \x01(\x0b\x32\x1b.voldemort.RepairJobRequest\x12X\n initiate_rebalance_node_on_donor\x18\x1c \x01(\x0b\x32..voldemort.InitiateRebalanceNodeOnDonorRequest\x12Q\n\x1c\x64\x65lete_store_rebalance_state\x18\x1d \x01(\x0b\x32+.voldemort.DeleteStoreRebalanceStateRequest\x12\x35\n\rnative_backup\x18\x1e \x01(\x0b\x32\x1e.voldemort.NativeBackupRequest*\xb4\x05\n\x10\x41\x64minRequestType\x12\x10\n\x0cGET_METADATA\x10\x00\x12\x13\n\x0fUPDATE_METADATA\x10\x01\x12\x1c\n\x18UPDATE_PARTITION_ENTRIES\x10\x02\x12\x1b\n\x17\x46\x45TCH_PARTITION_ENTRIES\x10\x03\x12\x1c\n\x18\x44\x45LETE_PARTITION_ENTRIES\x10\x04\x12\x1d\n\x19INITIATE_FETCH_AND_UPDATE\x10\x05\x12\x1a\n\x16\x41SYNC_OPERATION_STATUS\x10\x06\x12\x1b\n\x17INITIATE_REBALANCE_NODE\x10\x07\x12\x18\n\x14\x41SYNC_OPERATION_STOP\x10\x08\x12\x18\n\x14\x41SYNC_OPERATION_LIST\x10\t\x12\x14\n\x10TRUNCATE_ENTRIES\x10\n\x12\r\n\tADD_STORE\x10\x0b\x12\x10\n\x0c\x44\x45LETE_STORE\x10\x0c\x12\x0f\n\x0b\x46\x45TCH_STORE\x10\r\x12\x0e\n\nSWAP_STORE\x10\x0e\x12\x12\n\x0eROLLBACK_STORE\x10\x0f\x12\x1a\n\x16GET_RO_MAX_VERSION_DIR\x10\x10\x12\x1e\n\x1aGET_RO_CURRENT_VERSION_DIR\x10\x11\x12\x19\n\x15\x46\x45TCH_PARTITION_FILES\x10\x12\x12\x17\n\x13UPDATE_SLOP_ENTRIES\x10\x14\x12\x16\n\x12\x46\x41ILED_FETCH_STORE\x10\x16\x12\x19\n\x15GET_RO_STORAGE_FORMAT\x10\x17\x12\x1a\n\x16REBALANCE_STATE_CHANGE\x10\x18\x12\x0e\n\nREPAIR_JOB\x10\x19\x12$\n INITIATE_REBALANCE_NODE_ON_DONOR\x10\x1a\x12 \n\x1c\x44\x45LETE_STORE_REBALANCE_STATE\x10\x1b\x12\x11\n\rNATIVE_BACKUP\x10\x1c\x42-\n\x1cvoldemort.client.protocol.pbB\x0bVAdminProtoH\x01')
+ serialized_pb='\n\x15voldemort-admin.proto\x12\tvoldemort\x1a\x16voldemort-client.proto\"!\n\x12GetMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\"]\n\x13GetMetadataResponse\x12%\n\x07version\x18\x01 \x01(\x0b\x32\x14.voldemort.Versioned\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"M\n\x15UpdateMetadataRequest\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"9\n\x16UpdateMetadataResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"7\n\tFileEntry\x12\x11\n\tfile_name\x18\x01 \x02(\t\x12\x17\n\x0f\x66ile_size_bytes\x18\x02 \x02(\x03\"F\n\x0ePartitionEntry\x12\x0b\n\x03key\x18\x01 \x02(\x0c\x12\'\n\tversioned\x18\x02 \x02(\x0b\x32\x14.voldemort.Versioned\"\x8e\x01\n\x1dUpdatePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x32\n\x0fpartition_entry\x18\x02 \x02(\x0b\x32\x19.voldemort.PartitionEntry\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\"A\n\x1eUpdatePartitionEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"-\n\x0fVoldemortFilter\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"\xaf\x01\n\x18UpdateSlopEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x0b\n\x03key\x18\x02 \x02(\x0c\x12\'\n\x07version\x18\x03 \x02(\x0b\x32\x16.voldemort.VectorClock\x12,\n\x0crequest_type\x18\x04 \x02(\x0e\x32\x16.voldemort.RequestType\x12\r\n\x05value\x18\x05 \x01(\x0c\x12\x11\n\ttransform\x18\x06 \x01(\x0c\"<\n\x19UpdateSlopEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"d\n\x1a\x46\x65tchPartitionFilesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\"\xd7\x01\n\x1c\x46\x65tchPartitionEntriesRequest\x12\x37\n\x14replica_to_partition\x18\x01 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x14\n\x0c\x66\x65tch_values\x18\x04 \x01(\x08\x12\x14\n\x0cskip_records\x18\x05 \x01(\x03\x12\x17\n\x0finitial_cluster\x18\x06 \x01(\t\"\x81\x01\n\x1d\x46\x65tchPartitionEntriesResponse\x12\x32\n\x0fpartition_entry\x18\x01 \x01(\x0b\x32\x19.voldemort.PartitionEntry\x12\x0b\n\x03key\x18\x02 \x01(\x0c\x12\x1f\n\x05\x65rror\x18\x03 \x01(\x0b\x32\x10.voldemort.Error\"\xac\x01\n\x1d\x44\x65letePartitionEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x17\n\x0finitial_cluster\x18\x04 \x01(\t\"P\n\x1e\x44\x65letePartitionEntriesResponse\x12\r\n\x05\x63ount\x18\x01 \x01(\x03\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"\xcf\x01\n\x1dInitiateFetchAndUpdateRequest\x12\x0f\n\x07node_id\x18\x01 \x02(\x05\x12\r\n\x05store\x18\x02 \x02(\t\x12*\n\x06\x66ilter\x18\x03 \x01(\x0b\x32\x1a.voldemort.VoldemortFilter\x12\x37\n\x14replica_to_partition\x18\x04 \x03(\x0b\x32\x19.voldemort.PartitionTuple\x12\x17\n\x0finitial_cluster\x18\x05 \x01(\t\x12\x10\n\x08optimize\x18\x06 \x01(\x08\"1\n\x1b\x41syncOperationStatusRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"/\n\x19\x41syncOperationStopRequest\x12\x12\n\nrequest_id\x18\x01 \x02(\x05\"=\n\x1a\x41syncOperationStopResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"2\n\x19\x41syncOperationListRequest\x12\x15\n\rshow_complete\x18\x02 \x02(\x08\"R\n\x1a\x41syncOperationListResponse\x12\x13\n\x0brequest_ids\x18\x01 \x03(\x05\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\":\n\x0ePartitionTuple\x12\x14\n\x0creplica_type\x18\x01 \x02(\x05\x12\x12\n\npartitions\x18\x02 \x03(\x05\"e\n\x16PerStorePartitionTuple\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x37\n\x14replica_to_partition\x18\x02 \x03(\x0b\x32\x19.voldemort.PartitionTuple\"\xf8\x01\n\x19RebalancePartitionInfoMap\x12\x12\n\nstealer_id\x18\x01 \x02(\x05\x12\x10\n\x08\x64onor_id\x18\x02 \x02(\x05\x12\x0f\n\x07\x61ttempt\x18\x03 \x02(\x05\x12\x43\n\x18replica_to_add_partition\x18\x04 \x03(\x0b\x32!.voldemort.PerStorePartitionTuple\x12\x46\n\x1breplica_to_delete_partition\x18\x05 \x03(\x0b\x32!.voldemort.PerStorePartitionTuple\x12\x17\n\x0finitial_cluster\x18\x06 \x02(\t\"f\n\x1cInitiateRebalanceNodeRequest\x12\x46\n\x18rebalance_partition_info\x18\x01 \x02(\x0b\x32$.voldemort.RebalancePartitionInfoMap\"m\n#InitiateRebalanceNodeOnDonorRequest\x12\x46\n\x18rebalance_partition_info\x18\x01 \x03(\x0b\x32$.voldemort.RebalancePartitionInfoMap\"\x8a\x01\n\x1c\x41syncOperationStatusResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\t\x12\x10\n\x08\x63omplete\x18\x04 \x01(\x08\x12\x1f\n\x05\x65rror\x18\x05 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x16TruncateEntriesRequest\x12\r\n\x05store\x18\x01 \x02(\t\":\n\x17TruncateEntriesResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"*\n\x0f\x41\x64\x64StoreRequest\x12\x17\n\x0fstoreDefinition\x18\x01 \x02(\t\"3\n\x10\x41\x64\x64StoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\'\n\x12\x44\x65leteStoreRequest\x12\x11\n\tstoreName\x18\x01 \x02(\t\"6\n\x13\x44\x65leteStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"P\n\x11\x46\x65tchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\x12\x14\n\x0cpush_version\x18\x03 \x01(\x03\"9\n\x10SwapStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"P\n\x11SwapStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\x12\x1a\n\x12previous_store_dir\x18\x02 \x01(\t\"@\n\x14RollbackStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x14\n\x0cpush_version\x18\x02 \x02(\x03\"8\n\x15RollbackStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"&\n\x10RepairJobRequest\x12\x12\n\nstore_name\x18\x01 \x01(\t\"4\n\x11RepairJobResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"=\n\x14ROStoreVersionDirMap\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\"/\n\x19GetROMaxVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROMaxVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"3\n\x1dGetROCurrentVersionDirRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"}\n\x1eGetROCurrentVersionDirResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"/\n\x19GetROStorageFormatRequest\x12\x12\n\nstore_name\x18\x01 \x03(\t\"y\n\x1aGetROStorageFormatResponse\x12:\n\x11ro_store_versions\x18\x01 \x03(\x0b\x32\x1f.voldemort.ROStoreVersionDirMap\x12\x1f\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x10.voldemort.Error\"@\n\x17\x46\x61iledFetchStoreRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x11\n\tstore_dir\x18\x02 \x02(\t\";\n\x18\x46\x61iledFetchStoreResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\xe6\x01\n\x1bRebalanceStateChangeRequest\x12K\n\x1drebalance_partition_info_list\x18\x01 \x03(\x0b\x32$.voldemort.RebalancePartitionInfoMap\x12\x16\n\x0e\x63luster_string\x18\x02 \x02(\t\x12\x0f\n\x07swap_ro\x18\x03 \x02(\x08\x12\x1f\n\x17\x63hange_cluster_metadata\x18\x04 \x02(\x08\x12\x1e\n\x16\x63hange_rebalance_state\x18\x05 \x02(\x08\x12\x10\n\x08rollback\x18\x06 \x02(\x08\"?\n\x1cRebalanceStateChangeResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"G\n DeleteStoreRebalanceStateRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x0f\n\x07node_id\x18\x02 \x02(\x05\"D\n!DeleteStoreRebalanceStateResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"h\n\x13NativeBackupRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x12\n\nbackup_dir\x18\x02 \x02(\t\x12\x14\n\x0cverify_files\x18\x03 \x02(\x08\x12\x13\n\x0bincremental\x18\x04 \x02(\x08\">\n\x14ReserveMemoryRequest\x12\x12\n\nstore_name\x18\x01 \x02(\t\x12\x12\n\nsize_in_mb\x18\x02 \x02(\x03\"8\n\x15ReserveMemoryResponse\x12\x1f\n\x05\x65rror\x18\x01 \x01(\x0b\x32\x10.voldemort.Error\"\xf0\x0e\n\x15VoldemortAdminRequest\x12)\n\x04type\x18\x01 \x02(\x0e\x32\x1b.voldemort.AdminRequestType\x12\x33\n\x0cget_metadata\x18\x02 \x01(\x0b\x32\x1d.voldemort.GetMetadataRequest\x12\x39\n\x0fupdate_metadata\x18\x03 \x01(\x0b\x32 .voldemort.UpdateMetadataRequest\x12J\n\x18update_partition_entries\x18\x04 \x01(\x0b\x32(.voldemort.UpdatePartitionEntriesRequest\x12H\n\x17\x66\x65tch_partition_entries\x18\x05 \x01(\x0b\x32\'.voldemort.FetchPartitionEntriesRequest\x12J\n\x18\x64\x65lete_partition_entries\x18\x06 \x01(\x0b\x32(.voldemort.DeletePartitionEntriesRequest\x12K\n\x19initiate_fetch_and_update\x18\x07 \x01(\x0b\x32(.voldemort.InitiateFetchAndUpdateRequest\x12\x46\n\x16\x61sync_operation_status\x18\x08 \x01(\x0b\x32&.voldemort.AsyncOperationStatusRequest\x12H\n\x17initiate_rebalance_node\x18\t \x01(\x0b\x32\'.voldemort.InitiateRebalanceNodeRequest\x12\x42\n\x14\x61sync_operation_stop\x18\n \x01(\x0b\x32$.voldemort.AsyncOperationStopRequest\x12\x42\n\x14\x61sync_operation_list\x18\x0b \x01(\x0b\x32$.voldemort.AsyncOperationListRequest\x12;\n\x10truncate_entries\x18\x0c \x01(\x0b\x32!.voldemort.TruncateEntriesRequest\x12-\n\tadd_store\x18\r \x01(\x0b\x32\x1a.voldemort.AddStoreRequest\x12\x33\n\x0c\x64\x65lete_store\x18\x0e \x01(\x0b\x32\x1d.voldemort.DeleteStoreRequest\x12\x31\n\x0b\x66\x65tch_store\x18\x0f \x01(\x0b\x32\x1c.voldemort.FetchStoreRequest\x12/\n\nswap_store\x18\x10 \x01(\x0b\x32\x1b.voldemort.SwapStoreRequest\x12\x37\n\x0erollback_store\x18\x11 \x01(\x0b\x32\x1f.voldemort.RollbackStoreRequest\x12\x44\n\x16get_ro_max_version_dir\x18\x12 \x01(\x0b\x32$.voldemort.GetROMaxVersionDirRequest\x12L\n\x1aget_ro_current_version_dir\x18\x13 \x01(\x0b\x32(.voldemort.GetROCurrentVersionDirRequest\x12\x44\n\x15\x66\x65tch_partition_files\x18\x14 \x01(\x0b\x32%.voldemort.FetchPartitionFilesRequest\x12@\n\x13update_slop_entries\x18\x16 \x01(\x0b\x32#.voldemort.UpdateSlopEntriesRequest\x12>\n\x12\x66\x61iled_fetch_store\x18\x18 \x01(\x0b\x32\".voldemort.FailedFetchStoreRequest\x12\x43\n\x15get_ro_storage_format\x18\x19 \x01(\x0b\x32$.voldemort.GetROStorageFormatRequest\x12\x46\n\x16rebalance_state_change\x18\x1a \x01(\x0b\x32&.voldemort.RebalanceStateChangeRequest\x12/\n\nrepair_job\x18\x1b \x01(\x0b\x32\x1b.voldemort.RepairJobRequest\x12X\n initiate_rebalance_node_on_donor\x18\x1c \x01(\x0b\x32..voldemort.InitiateRebalanceNodeOnDonorRequest\x12Q\n\x1c\x64\x65lete_store_rebalance_state\x18\x1d \x01(\x0b\x32+.voldemort.DeleteStoreRebalanceStateRequest\x12\x35\n\rnative_backup\x18\x1e \x01(\x0b\x32\x1e.voldemort.NativeBackupRequest\x12\x37\n\x0ereserve_memory\x18\x1f \x01(\x0b\x32\x1f.voldemort.ReserveMemoryRequest*\xc8\x05\n\x10\x41\x64minRequestType\x12\x10\n\x0cGET_METADATA\x10\x00\x12\x13\n\x0fUPDATE_METADATA\x10\x01\x12\x1c\n\x18UPDATE_PARTITION_ENTRIES\x10\x02\x12\x1b\n\x17\x46\x45TCH_PARTITION_ENTRIES\x10\x03\x12\x1c\n\x18\x44\x45LETE_PARTITION_ENTRIES\x10\x04\x12\x1d\n\x19INITIATE_FETCH_AND_UPDATE\x10\x05\x12\x1a\n\x16\x41SYNC_OPERATION_STATUS\x10\x06\x12\x1b\n\x17INITIATE_REBALANCE_NODE\x10\x07\x12\x18\n\x14\x41SYNC_OPERATION_STOP\x10\x08\x12\x18\n\x14\x41SYNC_OPERATION_LIST\x10\t\x12\x14\n\x10TRUNCATE_ENTRIES\x10\n\x12\r\n\tADD_STORE\x10\x0b\x12\x10\n\x0c\x44\x45LETE_STORE\x10\x0c\x12\x0f\n\x0b\x46\x45TCH_STORE\x10\r\x12\x0e\n\nSWAP_STORE\x10\x0e\x12\x12\n\x0eROLLBACK_STORE\x10\x0f\x12\x1a\n\x16GET_RO_MAX_VERSION_DIR\x10\x10\x12\x1e\n\x1aGET_RO_CURRENT_VERSION_DIR\x10\x11\x12\x19\n\x15\x46\x45TCH_PARTITION_FILES\x10\x12\x12\x17\n\x13UPDATE_SLOP_ENTRIES\x10\x14\x12\x16\n\x12\x46\x41ILED_FETCH_STORE\x10\x16\x12\x19\n\x15GET_RO_STORAGE_FORMAT\x10\x17\x12\x1a\n\x16REBALANCE_STATE_CHANGE\x10\x18\x12\x0e\n\nREPAIR_JOB\x10\x19\x12$\n INITIATE_REBALANCE_NODE_ON_DONOR\x10\x1a\x12 \n\x1c\x44\x45LETE_STORE_REBALANCE_STATE\x10\x1b\x12\x11\n\rNATIVE_BACKUP\x10\x1c\x12\x12\n\x0eRESERVE_MEMORY\x10\x1d\x42-\n\x1cvoldemort.client.protocol.pbB\x0bVAdminProtoH\x01')
_ADMINREQUESTTYPE = descriptor.EnumDescriptor(
name='AdminRequestType',
@@ -126,11 +126,15 @@
name='NATIVE_BACKUP', index=26, number=28,
options=None,
type=None),
+ descriptor.EnumValueDescriptor(
+ name='RESERVE_MEMORY', index=27, number=29,
+ options=None,
+ type=None),
],
containing_type=None,
options=None,
- serialized_start=6792,
- serialized_end=7484,
+ serialized_start=6971,
+ serialized_end=7683,
)
@@ -161,6 +165,7 @@
INITIATE_REBALANCE_NODE_ON_DONOR = 26
DELETE_STORE_REBALANCE_STATE = 27
NATIVE_BACKUP = 28
+RESERVE_MEMORY = 29
@@ -2124,6 +2129,69 @@
)
+_RESERVEMEMORYREQUEST = descriptor.Descriptor(
+ name='ReserveMemoryRequest',
+ full_name='voldemort.ReserveMemoryRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ descriptor.FieldDescriptor(
+ name='store_name', full_name='voldemort.ReserveMemoryRequest.store_name', index=0,
+ number=1, type=9, cpp_type=9, label=2,
+ has_default_value=False, default_value=unicode("", "utf-8"),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ descriptor.FieldDescriptor(
+ name='size_in_mb', full_name='voldemort.ReserveMemoryRequest.size_in_mb', index=1,
+ number=2, type=3, cpp_type=2, label=2,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ extension_ranges=[],
+ serialized_start=4941,
+ serialized_end=5003,
+)
+
+
+_RESERVEMEMORYRESPONSE = descriptor.Descriptor(
+ name='ReserveMemoryResponse',
+ full_name='voldemort.ReserveMemoryResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ descriptor.FieldDescriptor(
+ name='error', full_name='voldemort.ReserveMemoryResponse.error', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ options=None,
+ is_extendable=False,
+ extension_ranges=[],
+ serialized_start=5005,
+ serialized_end=5061,
+)
+
+
_VOLDEMORTADMINREQUEST = descriptor.Descriptor(
name='VoldemortAdminRequest',
full_name='voldemort.VoldemortAdminRequest',
@@ -2327,6 +2395,13 @@
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
+ descriptor.FieldDescriptor(
+ name='reserve_memory', full_name='voldemort.VoldemortAdminRequest.reserve_memory', index=28,
+ number=31, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None),
],
extensions=[
],
@@ -2336,8 +2411,8 @@
options=None,
is_extendable=False,
extension_ranges=[],
- serialized_start=4942,
- serialized_end=6789,
+ serialized_start=5064,
+ serialized_end=6968,
)
import voldemort_client_pb2
@@ -2387,6 +2462,7 @@
_REBALANCESTATECHANGEREQUEST.fields_by_name['rebalance_partition_info_list'].message_type = _REBALANCEPARTITIONINFOMAP
_REBALANCESTATECHANGERESPONSE.fields_by_name['error'].message_type = voldemort_client_pb2._ERROR
_DELETESTOREREBALANCESTATERESPONSE.fields_by_name['error'].message_type = voldemort_client_pb2._ERROR
+_RESERVEMEMORYRESPONSE.fields_by_name['error'].message_type = voldemort_client_pb2._ERROR
_VOLDEMORTADMINREQUEST.fields_by_name['type'].enum_type = _ADMINREQUESTTYPE
_VOLDEMORTADMINREQUEST.fields_by_name['get_metadata'].message_type = _GETMETADATAREQUEST
_VOLDEMORTADMINREQUEST.fields_by_name['update_metadata'].message_type = _UPDATEMETADATAREQUEST
@@ -2415,6 +2491,7 @@
_VOLDEMORTADMINREQUEST.fields_by_name['initiate_rebalance_node_on_donor'].message_type = _INITIATEREBALANCENODEONDONORREQUEST
_VOLDEMORTADMINREQUEST.fields_by_name['delete_store_rebalance_state'].message_type = _DELETESTOREREBALANCESTATEREQUEST
_VOLDEMORTADMINREQUEST.fields_by_name['native_backup'].message_type = _NATIVEBACKUPREQUEST
+_VOLDEMORTADMINREQUEST.fields_by_name['reserve_memory'].message_type = _RESERVEMEMORYREQUEST
class GetMetadataRequest(message.Message):
__metaclass__ = reflection.GeneratedProtocolMessageType
@@ -2746,6 +2823,18 @@ class NativeBackupRequest(message.Message):
# @@protoc_insertion_point(class_scope:voldemort.NativeBackupRequest)
+class ReserveMemoryRequest(message.Message):
+ __metaclass__ = reflection.GeneratedProtocolMessageType
+ DESCRIPTOR = _RESERVEMEMORYREQUEST
+
+ # @@protoc_insertion_point(class_scope:voldemort.ReserveMemoryRequest)
+
+class ReserveMemoryResponse(message.Message):
+ __metaclass__ = reflection.GeneratedProtocolMessageType
+ DESCRIPTOR = _RESERVEMEMORYRESPONSE
+
+ # @@protoc_insertion_point(class_scope:voldemort.ReserveMemoryResponse)
+
class VoldemortAdminRequest(message.Message):
__metaclass__ = reflection.GeneratedProtocolMessageType
DESCRIPTOR = _VOLDEMORTADMINREQUEST
View
71 config/single_node_cluster/config/stores.xml
@@ -1,32 +1,39 @@
-<stores>
- <store>
- <name>test</name>
- <persistence>bdb</persistence>
- <description>Test store</description>
- <owners> harry@hogwarts.edu, hermoine@hogwarts.edu </owners>
- <routing>client</routing>
- <replication-factor>1</replication-factor>
- <required-reads>1</required-reads>
- <required-writes>1</required-writes>
- <key-serializer>
- <type>string</type>
- </key-serializer>
- <value-serializer>
- <type>string</type>
- </value-serializer>
- </store>
- <view>
- <name>test-view</name>
- <view-of>test</view-of>
- <owners> ron@hogwarts.edu </owners>
- <view-class>
- voldemort.store.views.UpperCaseView
- </view-class>
- <value-serializer>
- <type>string</type>
- </value-serializer>
- <transforms-serializer>
- <type>string</type>
- </transforms-serializer>
- </view>
-</stores>
+<stores>
+ <store>
+ <name>test</name>
+ <persistence>bdb</persistence>
+ <description>Test store</description>
+ <owners>harry@hogwarts.edu, hermoine@hogwarts.edu</owners>
+ <routing-strategy>consistent-routing</routing-strategy>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>string</type>
+ </key-serializer>
+ <value-serializer>
+ <type>string</type>
+ </value-serializer>
+ </store>
+ <store>
+ <name>test-evolution</name>
+ <persistence>bdb</persistence>
+ <description>Test store</description>
+ <owners>harry@hogwarts.edu, hermoine@hogwarts.edu</owners>
+ <routing-strategy>consistent-routing</routing-strategy>
+ <routing>client</routing>
+ <replication-factor>1</replication-factor>
+ <required-reads>1</required-reads>
+ <required-writes>1</required-writes>
+ <key-serializer>
+ <type>string</type>
+ </key-serializer>
+ <value-serializer>
+ <type>avro-generic-versioned</type>
+ <schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
+ <schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}</schema-info>
+ </value-serializer>
+ </store>
+
+</stores>
View
10 contrib/ec2-testing/test/voldemort/utils/Ec2FailureDetectorTest.java
@@ -150,7 +150,7 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));
// 2. Stop all the nodes, then test enough that we can cause the nodes
@@ -159,19 +159,19 @@ public void testAllNodesOffline() throws Exception {
test(store);
assertEquals(0, failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertFalse(failureDetector.isAvailable(n));
// 3. Now start the cluster up, test, and make sure everything's OK.
startClusterAsync(hostNames, ec2FailureDetectorTestConfig, nodeIds);
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
failureDetector.waitForAvailability(n);
test(store);
assertEquals(hostNamePairs.size(), failureDetector.getAvailableNodeCount());
- for(Node n: failureDetector.getConfig().getNodes())
+ for(Node n: failureDetector.getConfig().getCluster().getNodes())
assertTrue(failureDetector.isAvailable(n));
}
@@ -252,7 +252,7 @@ private Node getNodeByHostName(String hostName, FailureDetector failureDetector)
throws Exception {
Integer offlineNodeId = nodeIds.get(hostName);
- for(Node n: failureDetector.getConfig().getNodes()) {
+ for(Node n: failureDetector.getConfig().getCluster().getNodes()) {
if(offlineNodeId.equals(n.getId()))
return n;
}
View
3  ...tore-builder/perf/voldemort/contrib/batchindexer/performance/BdbBuildPerformanceTest.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
@@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];
- final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
+ final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
final AtomicInteger obsoletes = new AtomicInteger(0);
View
3  ...re-builder/perf/voldemort/contrib/batchindexer/performance/MysqlBuildPerformanceTest.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import voldemort.TestUtils;
import voldemort.performance.PerformanceTest;
import voldemort.server.VoldemortConfig;
import voldemort.store.Store;
@@ -51,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];
- final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(storeName);
+ final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
final AtomicInteger obsoletes = new AtomicInteger(0);
View
339 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.store.readonly.disk;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.store.StoreDefinition;
+import voldemort.store.readonly.ReadOnlyUtils;
+import voldemort.store.readonly.checksum.CheckSum;
+import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
+import voldemort.utils.ByteUtils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+// The default Voldemort keyvalue writer
+// generates index and data files
+public class HadoopStoreWriter implements KeyValueWriter<BytesWritable, BytesWritable> {
+
+ private static final Logger logger = Logger.getLogger(HadoopStoreWriter.class);
+
+ private DataOutputStream indexFileStream = null;
+ private DataOutputStream valueFileStream = null;
+ private int position;
+ private String taskId = null;
+
+ private int nodeId = -1;
+ private int partitionId = -1;
+ private int chunkId = -1;
+ private int replicaType = -1;
+
+ private Path taskIndexFileName;
+ private Path taskValueFileName;
+
+ private JobConf conf;
+ private CheckSumType checkSumType;
+ private CheckSum checkSumDigestIndex;
+ private CheckSum checkSumDigestValue;
+
+ private String outputDir;
+
+ private FileSystem fs;
+
+ private int numChunks;
+ private Cluster cluster;
+ private StoreDefinition storeDef;
+ private boolean saveKeys;
+ private boolean reducerPerBucket;
+
+ public Cluster getCluster() {
+ checkNotNull(cluster);
+ return cluster;
+ }
+
+ public boolean getSaveKeys() {
+ return this.saveKeys;
+ }
+
+ public boolean getReducerPerBucket() {
+ return this.reducerPerBucket;
+ }
+
+ public StoreDefinition getStoreDef() {
+ checkNotNull(storeDef);
+ return storeDef;
+ }
+
+ public String getStoreName() {
+ checkNotNull(storeDef);
+ return storeDef.getName();
+ }
+
+ private final void checkNotNull(Object o) {
+ if(o == null)
+ throw new VoldemortException("Not configured yet!");
+ }
+
+ public int getNumChunks() {
+ return this.numChunks;
+ }
+
+ @Override
+ public void conf(JobConf job) {
+
+ conf = job;
+ try {
+
+ this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml")));
+ List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml")));
+ if(storeDefs.size() != 1)
+ throw new IllegalStateException("Expected to find only a single store, but found multiple!");
+ this.storeDef = storeDefs.get(0);
+
+ this.numChunks = conf.getInt("num.chunks", -1);
+ if(this.numChunks < 1)
+ throw new VoldemortException("num.chunks not specified in the job conf.");
+ this.saveKeys = conf.getBoolean("save.keys", false);
+ this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);
+ this.conf = job;
+ this.position = 0;
+ this.outputDir = job.get("final.output.dir");
+ this.taskId = job.get("mapred.task.id");
+ this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
+ this.checkSumDigestIndex = CheckSum.getInstance(checkSumType);
+ this.checkSumDigestValue = CheckSum.getInstance(checkSumType);
+
+ this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
+ + "."
+ + this.taskId
+ + ".index");
+ this.taskValueFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
+ + "."
+ + this.taskId
+ + ".data");
+
+ if(this.fs == null)
+ this.fs = this.taskIndexFileName.getFileSystem(job);
+
+ this.indexFileStream = fs.create(this.taskIndexFileName);
+ this.valueFileStream = fs.create(this.taskValueFileName);
+
+ logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName
+ + " for writing.");
+
+ } catch(IOException e) {
+ throw new RuntimeException("Failed to open Input/OutputStream", e);
+ }
+
+ }
+
+ @Override
+ public void write(BytesWritable key, Iterator<BytesWritable> iterator, Reporter reporter)
+ throws IOException {
+
+ // Write key and position
+ this.indexFileStream.write(key.get(), 0, key.getSize());
+ this.indexFileStream.writeInt(this.position);
+
+ // Run key through checksum digest
+ if(this.checkSumDigestIndex != null) {
+ this.checkSumDigestIndex.update(key.get(), 0, key.getSize());
+ this.checkSumDigestIndex.update(this.position);
+ }
+
+ short numTuples = 0;
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream valueStream = new DataOutputStream(stream);
+
+ while(iterator.hasNext()) {
+ BytesWritable writable = iterator.next();
+ byte[] valueBytes = writable.get();
+ int offsetTillNow = 0;
+
+ // Read node Id
+ if(this.nodeId == -1)
+ this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow);
+ offsetTillNow += ByteUtils.SIZE_OF_INT;
+
+ // Read partition id
+ if(this.partitionId == -1)
+ this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow);
+ offsetTillNow += ByteUtils.SIZE_OF_INT;
+
+ // Read chunk id
+ if(this.chunkId == -1)
+ this.chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
+
+ // Read replica type
+ if(getSaveKeys()) {
+ if(this.replicaType == -1)
+ this.replicaType = (int) ByteUtils.readBytes(valueBytes,
+ offsetTillNow,
+ ByteUtils.SIZE_OF_BYTE);
+ offsetTillNow += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ int valueLength = writable.getSize() - offsetTillNow;
+ if(getSaveKeys()) {
+ // Write ( key_length, value_length, key,
+ // value )
+ valueStream.write(valueBytes, offsetTillNow, valueLength);
+ } else {
+ // Write (value_length + value)
+ valueStream.writeInt(valueLength);
+ valueStream.write(valueBytes, offsetTillNow, valueLength);
+ }
+
+ numTuples++;
+
+ // If we have multiple values for this md5 that is a collision,
+ // throw an exception--either the data itself has duplicates, there
+ // are trillions of keys, or someone is attempting something
+ // malicious ( We obviously expect collisions when we save keys )
+ if(!getSaveKeys() && numTuples > 1)
+ throw new VoldemortException("Duplicate keys detected for md5 sum "
+ + ByteUtils.toHexString(ByteUtils.copy(key.get(),
+ 0,
+ key.getSize())));
+
+ }
+
+ if(numTuples < 0) {
+ // Overflow
+ throw new VoldemortException("Found too many collisions: chunk " + chunkId
+ + " has exceeded " + Short.MAX_VALUE + " collisions.");
+ } else if(numTuples > 1) {
+ // Update number of collisions + max keys per collision
+ reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1);
+
+ long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter();
+ if(numTuples > numCollisions) {
+ reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions);
+ }
+ }
+
+ // Flush the value
+ valueStream.flush();
+ byte[] value = stream.toByteArray();
+
+ // Start writing to file now
+ // First, if save keys flag set the number of keys
+ if(getSaveKeys()) {
+
+ this.valueFileStream.writeShort(numTuples);
+ this.position += ByteUtils.SIZE_OF_SHORT;
+
+ if(this.checkSumDigestValue != null) {
+ this.checkSumDigestValue.update(numTuples);
+ }
+ }
+
+ this.valueFileStream.write(value);
+ this.position += value.length;
+
+ if(this.checkSumDigestValue != null) {
+ this.checkSumDigestValue.update(value);
+ }
+
+ if(this.position < 0)
+ throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
+ + " has exceeded " + Integer.MAX_VALUE + " bytes.");
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ this.indexFileStream.close();
+ this.valueFileStream.close();
+
+ if(this.nodeId == -1 || this.chunkId == -1 || this.partitionId == -1) {
+ // Issue 258 - No data was read in the reduce phase, do not create
+ // any output
+ return;
+ }
+
+ // If the replica type read was not valid, shout out
+ if(getSaveKeys() && this.replicaType == -1) {
+ throw new RuntimeException("Could not read the replica type correctly for node "
+ + nodeId + " ( partition - " + this.partitionId + " )");
+ }
+
+ String fileNamePrefix = null;
+ if(getSaveKeys()) {
+ fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
+ + Integer.toString(this.replicaType) + "_"
+ + Integer.toString(this.chunkId));
+ } else {
+ fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
+ + Integer.toString(this.chunkId));
+ }
+
+ // Initialize the node directory
+ Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId);
+
+ // Create output directory, if it doesn't exist
+ FileSystem outputFs = nodeDir.getFileSystem(this.conf);
+ outputFs.mkdirs(nodeDir);
+
+ // Write the checksum and output files
+ if(this.checkSumType != CheckSumType.NONE) {
+
+ if(this.checkSumDigestIndex != null && this.checkSumDigestValue != null) {
+ Path checkSumIndexFile = new Path(nodeDir, fileNamePrefix + ".index.checksum");
+ Path checkSumValueFile = new Path(nodeDir, fileNamePrefix + ".data.checksum");
+
+ FSDataOutputStream output = outputFs.create(checkSumIndexFile);
+ output.write(this.checkSumDigestIndex.getCheckSum());
+ output.close();
+
+ output = outputFs.create(checkSumValueFile);
+ output.write(this.checkSumDigestValue.getCheckSum());
+ output.close();
+ } else {
+ throw new RuntimeException("Failed to open checksum digest for node " + nodeId
+ + " ( partition - " + this.partitionId + ", chunk - "
+ + chunkId + " )");
+ }
+ }
+
+ // Generate the final chunk files
+ Path indexFile = new Path(nodeDir, fileNamePrefix + ".index");
+ Path valueFile = new Path(nodeDir, fileNamePrefix + ".data");
+
+ logger.info("Moving " + this.taskIndexFileName + " to " + indexFile);
+ outputFs.rename(taskIndexFileName, indexFile);
+ logger.info("Moving " + this.taskValueFileName + " to " + valueFile);
+ outputFs.rename(this.taskValueFileName, valueFile);
+ }
+
+}
View
358 ...doop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.store.readonly.disk;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.store.StoreDefinition;
+import voldemort.store.readonly.ReadOnlyUtils;
+import voldemort.store.readonly.checksum.CheckSum;
+import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
+import voldemort.utils.ByteUtils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+public class HadoopStoreWriterPerBucket implements KeyValueWriter<BytesWritable, BytesWritable> {
+
+ private static final Logger logger = Logger.getLogger(HadoopStoreWriterPerBucket.class);
+
+ private DataOutputStream[] indexFileStream = null;
+ private DataOutputStream[] valueFileStream = null;
+ private int[] position;
+ private String taskId = null;
+
+ private int nodeId = -1;
+ private int partitionId = -1;
+ private int replicaType = -1;
+
+ private Path[] taskIndexFileName;
+ private Path[] taskValueFileName;
+
+ private JobConf conf;
+ private CheckSumType checkSumType;
+ private CheckSum[] checkSumDigestIndex;
+ private CheckSum[] checkSumDigestValue;
+
+ private String outputDir;
+
+ private FileSystem fs;
+
+ @Override
+ public void conf(JobConf job) {
+
+ JobConf conf = job;
+ try {
+
+ this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml")));
+ List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml")));
+ if(storeDefs.size() != 1)
+ throw new IllegalStateException("Expected to find only a single store, but found multiple!");
+ this.storeDef = storeDefs.get(0);
+
+ this.numChunks = conf.getInt("num.chunks", -1);
+ if(this.numChunks < 1)
+ throw new VoldemortException("num.chunks not specified in the job conf.");
+
+ this.saveKeys = conf.getBoolean("save.keys", false);
+ this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);
+ this.conf = job;
+ this.outputDir = job.get("final.output.dir");
+ this.taskId = job.get("mapred.task.id");
+ this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
+
+ this.checkSumDigestIndex = new CheckSum[getNumChunks()];
+ this.checkSumDigestValue = new CheckSum[getNumChunks()];
+ this.position = new int[getNumChunks()];
+ this.taskIndexFileName = new Path[getNumChunks()];
+ this.taskValueFileName = new Path[getNumChunks()];
+ this.indexFileStream = new DataOutputStream[getNumChunks()];
+ this.valueFileStream = new DataOutputStream[getNumChunks()];
+
+ for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
+
+ this.checkSumDigestIndex[chunkId] = CheckSum.getInstance(checkSumType);
+ this.checkSumDigestValue[chunkId] = CheckSum.getInstance(checkSumType);
+ this.position[chunkId] = 0;
+
+ this.taskIndexFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job),
+ getStoreName() + "."
+ + Integer.toString(chunkId)
+ + "_" + this.taskId + ".index");
+ this.taskValueFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job),
+ getStoreName() + "."
+ + Integer.toString(chunkId)
+ + "_" + this.taskId + ".data");
+
+ if(this.fs == null)
+ this.fs = this.taskIndexFileName[chunkId].getFileSystem(job);
+
+ this.indexFileStream[chunkId] = fs.create(this.taskIndexFileName[chunkId]);
+ this.valueFileStream[chunkId] = fs.create(this.taskValueFileName[chunkId]);
+
+ logger.info("Opening " + this.taskIndexFileName[chunkId] + " and "
+ + this.taskValueFileName[chunkId] + " for writing.");
+ }
+
+ } catch(IOException e) {
+ // throw new RuntimeException("Failed to open Input/OutputStream",
+ // e);
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void write(BytesWritable key, Iterator<BytesWritable> iterator, Reporter reporter)
+ throws IOException {
+
+ // Read chunk id
+ int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
+
+ // Write key and position
+ this.indexFileStream[chunkId].write(key.get(), 0, key.getSize());
+ this.indexFileStream[chunkId].writeInt(this.position[chunkId]);
+
+ // Run key through checksum digest
+ if(this.checkSumDigestIndex[chunkId] != null) {
+ this.checkSumDigestIndex[chunkId].update(key.get(), 0, key.getSize());
+ this.checkSumDigestIndex[chunkId].update(this.position[chunkId]);
+ }
+
+ short numTuples = 0;
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream valueStream = new DataOutputStream(stream);
+
+ while(iterator.hasNext()) {
+ BytesWritable writable = iterator.next();
+ byte[] valueBytes = writable.get();
+ int offsetTillNow = 0;
+
+ // Read node Id
+ if(this.nodeId == -1)
+ this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow);
+ offsetTillNow += ByteUtils.SIZE_OF_INT;
+
+ // Read partition id
+ if(this.partitionId == -1)
+ this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow);
+ offsetTillNow += ByteUtils.SIZE_OF_INT;
+
+ // Read replica type
+ if(getSaveKeys()) {
+ if(this.replicaType == -1)
+ this.replicaType = (int) ByteUtils.readBytes(valueBytes,
+ offsetTillNow,
+ ByteUtils.SIZE_OF_BYTE);
+ offsetTillNow += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ int valueLength = writable.getSize() - offsetTillNow;
+ if(getSaveKeys()) {
+ // Write ( key_length, value_length, key,
+ // value )
+ valueStream.write(valueBytes, offsetTillNow, valueLength);
+ } else {
+ // Write (value_length + value)
+ valueStream.writeInt(valueLength);
+ valueStream.write(valueBytes, offsetTillNow, valueLength);
+ }
+
+ numTuples++;
+
+ // If we have multiple values for this md5 that is a collision,
+ // throw an exception--either the data itself has duplicates, there
+ // are trillions of keys, or someone is attempting something
+ // malicious ( We obviously expect collisions when we save keys )
+ if(!getSaveKeys() && numTuples > 1)
+ throw new VoldemortException("Duplicate keys detected for md5 sum "
+ + ByteUtils.toHexString(ByteUtils.copy(key.get(),
+ 0,
+ key.getSize())));
+
+ }
+
+ if(numTuples < 0) {
+ // Overflow
+ throw new VoldemortException("Found too many collisions: chunk " + chunkId
+ + " has exceeded " + Short.MAX_VALUE + " collisions.");
+ } else if(numTuples > 1) {
+ // Update number of collisions + max keys per collision
+ reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1);
+
+ long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter();
+ if(numTuples > numCollisions) {
+ reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions);
+ }
+ }
+
+ // Flush the value
+ valueStream.flush();
+ byte[] value = stream.toByteArray();
+
+ // Start writing to file now
+ // First, if save keys flag set the number of keys
+ if(getSaveKeys()) {
+
+ this.valueFileStream[chunkId].writeShort(numTuples);
+ this.position[chunkId] += ByteUtils.SIZE_OF_SHORT;
+
+ if(this.checkSumDigestValue[chunkId] != null) {
+ this.checkSumDigestValue[chunkId].update(numTuples);
+ }
+ }
+
+ this.valueFileStream[chunkId].write(value);
+ this.position[chunkId] += value.length;
+
+ if(this.checkSumDigestValue[chunkId] != null) {
+ this.checkSumDigestValue[chunkId].update(value);
+ }
+
+ if(this.position[chunkId] < 0)
+ throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
+ + " has exceeded " + Integer.MAX_VALUE + " bytes.");
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
+ this.indexFileStream[chunkId].close();
+ this.valueFileStream[chunkId].close();
+ }
+
+ if(this.nodeId == -1 || this.partitionId == -1) {
+ // Issue 258 - No data was read in the reduce phase, do not create
+ // any output
+ return;
+ }
+
+ // If the replica type read was not valid, shout out
+ if(getSaveKeys() && this.replicaType == -1) {
+ throw new RuntimeException("Could not read the replica type correctly for node "
+ + nodeId + " ( partition - " + this.partitionId + " )");
+ }
+
+ String fileNamePrefix = null;
+ if(getSaveKeys()) {
+ fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
+ + Integer.toString(this.replicaType) + "_");
+ } else {
+ fileNamePrefix = new String(Integer.toString(this.partitionId) + "_");
+ }
+
+ // Initialize the node directory
+ Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId);
+
+ // Create output directory, if it doesn't exist
+ FileSystem outputFs = nodeDir.getFileSystem(this.conf);
+ outputFs.mkdirs(nodeDir);
+
+ // Write the checksum and output files
+ for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) {
+
+ String chunkFileName = fileNamePrefix + Integer.toString(chunkId);
+ if(this.checkSumType != CheckSumType.NONE) {
+
+ if(this.checkSumDigestIndex[chunkId] != null
+ && this.checkSumDigestValue[chunkId] != null) {
+ Path checkSumIndexFile = new Path(nodeDir, chunkFileName + ".index.checksum");
+ Path checkSumValueFile = new Path(nodeDir, chunkFileName + ".data.checksum");
+
+ FSDataOutputStream output = outputFs.create(checkSumIndexFile);
+ output.write(this.checkSumDigestIndex[chunkId].getCheckSum());
+ output.close();
+
+ output = outputFs.create(checkSumValueFile);
+ output.write(this.checkSumDigestValue[chunkId].getCheckSum());
+ output.close();
+ } else {
+ throw new RuntimeException("Failed to open checksum digest for node " + nodeId
+ + " ( partition - " + this.partitionId
+ + ", chunk - " + chunkId + " )");
+ }
+ }
+
+ // Generate the final chunk files
+ Path indexFile = new Path(nodeDir, chunkFileName + ".index");
+ Path valueFile = new Path(nodeDir, chunkFileName + ".data");
+
+ logger.info("Moving " + this.taskIndexFileName[chunkId] + " to " + indexFile);
+ fs.rename(taskIndexFileName[chunkId], indexFile);
+ logger.info("Moving " + this.taskValueFileName[chunkId] + " to " + valueFile);
+ fs.rename(this.taskValueFileName[chunkId], valueFile);
+
+ }
+
+ }
+
+ private int numChunks;
+ private Cluster cluster;
+ private StoreDefinition storeDef;
+ private boolean saveKeys;
+ private boolean reducerPerBucket;
+
+ public Cluster getCluster() {
+ checkNotNull(cluster);
+ return cluster;
+ }
+
+ public boolean getSaveKeys() {
+ return this.saveKeys;
+ }
+
+ public boolean getReducerPerBucket() {
+ return this.reducerPerBucket;
+ }
+
+ public StoreDefinition getStoreDef() {
+ checkNotNull(storeDef);
+ return storeDef;
+ }
+
+ public String getStoreName() {
+ checkNotNull(storeDef);
+ return storeDef.getName();
+ }
+
+ private final void checkNotNull(Object o) {
+ if(o == null)
+ throw new VoldemortException("Not configured yet!");
+ }
+
+ public int getNumChunks() {
+ return this.numChunks;
+ }
+
+}
View
40 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.store.readonly.disk;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+// Interface used by reducers to layout the datqa on disk
+public interface KeyValueWriter<K, V> {
+
+ public static enum CollisionCounter {
+
+ NUM_COLLISIONS,
+ MAX_COLLISIONS;
+ }
+
+ public void conf(JobConf job);
+
+ public void write(K key, Iterator<V> iterator, Reporter reporter) throws IOException;
+
+ public void close() throws IOException;
+
+}
View
96 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
@@ -16,6 +16,7 @@
package voldemort.store.readonly.fetcher;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -67,6 +68,7 @@
private EventThrottler throttler = null;
private long minBytesPerSecond = 0;
private DynamicThrottleLimit globalThrottleLimit = null;
+ private static final int NUM_RETRIES = 3;
public HdfsFetcher(VoldemortConfig config) {
this(config.getMaxBytesPerSecond(),
@@ -281,46 +283,66 @@ private void copyFileWithCheckSum(FileSystem fs,
logger.info("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
- try {
- input = fs.open(source);
- output = new FileOutputStream(dest);
- byte[] buffer = new byte[bufferSize];
- while(true) {
- int read = input.read(buffer);
- if(read < 0) {
- break;
- } else if(read < bufferSize) {
- buffer = ByteUtils.copy(buffer, 0, read);
- }
- output.write(buffer);
- if(fileCheckSumGenerator != null)
- fileCheckSumGenerator.update(buffer);
- if(throttler != null)
- throttler.maybeThrottle(read);
- stats.recordBytes(read);
- if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
- NumberFormat format = NumberFormat.getNumberInstance();
- format.setMaximumFractionDigits(2);
- logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
- + format.format(stats.getBytesPerSecond() / (1024 * 1024))
- + " MB/sec - " + format.format(stats.getPercentCopied())
- + " % complete");
- if(this.status != null) {
- this.status.setStatus(stats.getTotalBytesCopied()
- / (1024 * 1024)
- + " MB copied at "
- + format.format(stats.getBytesPerSecond()
- / (1024 * 1024)) + " MB/sec - "
- + format.format(stats.getPercentCopied())
- + " % complete");
+ for(int attempt = 0; attempt < NUM_RETRIES; attempt++) {
+ boolean success = true;
+ try {
+
+ input = fs.open(source);
+ output = new BufferedOutputStream(new FileOutputStream(dest));
+ byte[] buffer = new byte[bufferSize];
+ while(true) {
+ int read = input.read(buffer);
+ if(read < 0) {
+ break;
+ } else {
+ output.write(buffer, 0, read);
}
- stats.reset();
+
+ if(fileCheckSumGenerator != null)
+ fileCheckSumGenerator.update(buffer, 0, read);
+ if(throttler != null)
+ throttler.maybeThrottle(read);
+ stats.recordBytes(read);
+ if(stats.getBytesSinceLastReport() > reportingIntervalBytes) {
+ NumberFormat format = NumberFormat.getNumberInstance();
+ format.setMaximumFractionDigits(2);
+ logger.info(stats.getTotalBytesCopied() / (1024 * 1024) + " MB copied at "
+ + format.format(stats.getBytesPerSecond() / (1024 * 1024))
+ + " MB/sec - " + format.format(stats.getPercentCopied())
+ + " % complete, destination:" + dest);
+ if(this.status != null) {
+ this.status.setStatus(stats.getTotalBytesCopied()
+ / (1024 * 1024)
+ + " MB copied at "
+ + format.format(stats.getBytesPerSecond()
+ / (1024 * 1024)) + " MB/sec - "
+ + format.format(stats.getPercentCopied())
+ + " % complete, destination:" + dest);
+ }
+ stats.reset();
+ }
+ }
+ logger.info("Completed copy of " + source + " to " + dest);
+
+ } catch(IOException ioe) {
+ success = false;
+ logger.error("Error during copying file ", ioe);
+ ioe.printStackTrace();
+ if(attempt < NUM_RETRIES - 1) {
+ logger.info("retrying copying");
+ } else {
+ throw ioe;
+ }
+
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(input);
+ if(success) {
+ break;
}
+
}
- logger.info("Completed copy of " + source + " to " + dest);
- } finally {
- IOUtils.closeQuietly(output);
- IOUtils.closeQuietly(input);
+
}
}
View
108 ...-store-builder/src/java/voldemort/store/readonly/mr/AbstractHadoopStoreBuilderMapper.java
<
@@ -26,7 +26,6 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
@@ -34,6 +33,7 @@
import voldemort.serialization.SerializerFactory;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
+import voldemort.store.readonly.mr.utils.MapperKeyValueWriter;
import voldemort.utils.ByteUtils;
/**
@@ -79,96 +79,28 @@ public void map(K key,
byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));
- // Compress key and values if required
- if(keySerializerDefinition.hasCompression()) {
- keyBytes = keyCompressor.deflate(keyBytes);
- }
-
- if(valueSerializerDefinition.hasCompression()) {
- valBytes = valueCompressor.deflate(valBytes);
- }
-
- // Get the output byte arrays ready to populate
- byte[] outputValue;
- BytesWritable outputKey;
-
- // Leave initial offset for (a) node id (b) partition id
- // since they are written later
- int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT;
-
- if(getSaveKeys()) {
-
- // In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( replica
- // type - primary | secondary | tertiary... ] + 4 ( key size )
- // size ) + 4 ( value size ) + key + value
- outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4