Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ZOOKEEPER-1038. Move bookkeeper and hedwig code in subversion

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1087134 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit c1c8bef6125cb70ed2da68e92bb49e45f4ea64a9 1 parent 9ea3777
@breed breed authored
Showing with 26,701 additions and 0 deletions.
  1. +202 −0 LICENSE.txt
  2. +2 −0  NOTICE.txt
  3. +3 −0  README
  4. +137 −0 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/MySqlClient.java
  5. +252 −0 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
  6. +62 −0 bookkeeper/README.txt
  7. +545 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
  8. +81 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
  9. +168 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
  10. +487 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
  11. +124 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
  12. +536 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
  13. +133 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
  14. +151 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
  15. +147 −0 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java
  16. +126 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
  17. +249 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/BKException.java
  18. +410 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
  19. +204 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
  20. +50 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
  21. +184 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/DigestManager.java
  22. +61 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
  23. +167 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
  24. +80 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
  25. +83 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
  26. +547 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
  27. +198 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
  28. +140 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
  29. +178 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
  30. +67 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
  31. +138 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
  32. +170 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
  33. +87 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
  34. +85 −0 bookkeeper/src/main/java/org/apache/bookkeeper/client/SyncCounter.java
  35. +178 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
  36. +75 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
  37. +209 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
  38. +57 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
  39. +521 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
  40. +573 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
  41. +148 −0 bookkeeper/src/main/java/org/apache/bookkeeper/proto/ServerStats.java
  42. +173 −0 bookkeeper/src/main/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
  43. +147 −0 bookkeeper/src/main/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
  44. +763 −0 bookkeeper/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
  45. +209 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
  46. +54 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/Main.java
  47. +38 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/MathUtils.java
  48. +98 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
  49. +38 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/SafeRunnable.java
  50. +94 −0 bookkeeper/src/main/java/org/apache/bookkeeper/util/StringUtils.java
  51. +256 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
  52. +176 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
  53. +232 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
  54. +305 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
  55. +720 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
  56. +400 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java
  57. +74 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/CloseTest.java
  58. +178 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
  59. +163 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
  60. +88 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/LedgerRecoveryTest.java
  61. +117 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
  62. +60 −0 bookkeeper/src/test/java/org/apache/bookkeeper/test/NIOServerFactoryTest.java
  63. +7 −0 conf/hw_client_sample.conf
  64. +10 −0 conf/hw_server_sample.conf
  65. +72 −0 conf/log4j.properties
  66. +146 −0 doc/build.txt
  67. +338 −0 doc/dev.txt
  68. +17 −0 doc/doc.txt
  69. +252 −0 doc/user.txt
  70. +286 −0 formatter.xml
  71. +86 −0 hedwig-client/pom.xml
  72. +29 −0 hedwig-client/src/main/cpp/Makefile.am
  73. +186 −0 hedwig-client/src/main/cpp/aminclude.am
  74. +1,252 −0 hedwig-client/src/main/cpp/c-doc.Doxyfile
  75. +56 −0 hedwig-client/src/main/cpp/config.h.in
  76. +40 −0 hedwig-client/src/main/cpp/configure.ac
  77. +30 −0 hedwig-client/src/main/cpp/hedwig-0.1.pc.in
  78. +45 −0 hedwig-client/src/main/cpp/inc/hedwig/callback.h
  79. +80 −0 hedwig-client/src/main/cpp/inc/hedwig/client.h
  80. +51 −0 hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
  81. +61 −0 hedwig-client/src/main/cpp/inc/hedwig/publish.h
  82. +52 −0 hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
  83. +32 −0 hedwig-client/src/main/cpp/lib/Makefile.am
  84. +420 −0 hedwig-client/src/main/cpp/lib/channel.cpp
  85. +156 −0 hedwig-client/src/main/cpp/lib/channel.h
  86. +57 −0 hedwig-client/src/main/cpp/lib/client.cpp
  87. +376 −0 hedwig-client/src/main/cpp/lib/clientimpl.cpp
  88. +150 −0 hedwig-client/src/main/cpp/lib/clientimpl.h
  89. +166 −0 hedwig-client/src/main/cpp/lib/data.cpp
  90. +99 −0 hedwig-client/src/main/cpp/lib/data.h
  91. +72 −0 hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
  92. +44 −0 hedwig-client/src/main/cpp/lib/eventdispatcher.h
  93. +27 −0 hedwig-client/src/main/cpp/lib/exceptions.cpp
  94. +83 −0 hedwig-client/src/main/cpp/lib/publisherimpl.cpp
  95. +54 −0 hedwig-client/src/main/cpp/lib/publisherimpl.h
  96. +434 −0 hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
  97. +166 −0 hedwig-client/src/main/cpp/lib/subscriberimpl.h
  98. +141 −0 hedwig-client/src/main/cpp/lib/util.cpp
  99. +86 −0 hedwig-client/src/main/cpp/lib/util.h
  100. +49 −0 hedwig-client/src/main/cpp/log4cpp.conf
  101. +111 −0 hedwig-client/src/main/cpp/m4/ax_boost_asio.m4
  102. +252 −0 hedwig-client/src/main/cpp/m4/ax_boost_base.m4
  103. +149 −0 hedwig-client/src/main/cpp/m4/ax_boost_thread.m4
  104. +533 −0 hedwig-client/src/main/cpp/m4/ax_doxygen.m4
  105. +49 −0 hedwig-client/src/main/cpp/scripts/log4cxx.conf
  106. +64 −0 hedwig-client/src/main/cpp/scripts/network-delays.sh
  107. +49 −0 hedwig-client/src/main/cpp/scripts/server-control.sh
  108. +95 −0 hedwig-client/src/main/cpp/scripts/tester.sh
  109. +26 −0 hedwig-client/src/main/cpp/test/Makefile.am
  110. +78 −0 hedwig-client/src/main/cpp/test/main.cpp
  111. +286 −0 hedwig-client/src/main/cpp/test/publishtest.cpp
  112. +47 −0 hedwig-client/src/main/cpp/test/pubsubdatatest.cpp
  113. +373 −0 hedwig-client/src/main/cpp/test/pubsubtest.cpp
  114. +184 −0 hedwig-client/src/main/cpp/test/servercontrol.cpp
  115. +66 −0 hedwig-client/src/main/cpp/test/servercontrol.h
  116. +238 −0 hedwig-client/src/main/cpp/test/subscribetest.cpp
  117. +21 −0 hedwig-client/src/main/cpp/test/test.sh
  118. +145 −0 hedwig-client/src/main/cpp/test/util.h
  119. +90 −0 hedwig-client/src/main/cpp/test/utiltest.cpp
  120. +48 −0 hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java
  121. +63 −0 hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java
  122. +237 −0 hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java
  123. +133 −0 hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
  124. +136 −0 hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
  125. +176 −0 hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
  126. +46 −0 hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
  127. +127 −0 hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
  128. +148 −0 hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
  129. +58 −0 hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
  130. +149 −0 hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
  131. +74 −0 hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
  132. +37 −0 hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
  133. +38 −0 hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
  134. +39 −0 hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
  135. +95 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
  136. +87 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
  137. +70 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
  138. +113 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeReconnectCallback.java
  139. +329 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
  140. +83 −0 hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java
  141. +58 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/ClientChannelPipelineFactory.java
  142. +122 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/ConnectCallback.java
  143. +359 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClient.java
  144. +224 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
  145. +585 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
  146. +365 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
  147. +98 −0 hedwig-client/src/main/java/org/apache/hedwig/client/netty/WriteCallback.java
  148. +41 −0 hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslClientContextFactory.java
  149. +65 −0 hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java
  150. +45 −0 hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java
  151. +47 −0 hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java
  152. +185 −0 hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java
  153. +49 −0 hedwig-client/src/main/java/org/apache/hedwig/util/ConcurrencyUtils.java
  154. +50 −0 hedwig-client/src/main/java/org/apache/hedwig/util/Either.java
  155. +97 −0 hedwig-client/src/main/java/org/apache/hedwig/util/FileUtils.java
  156. +138 −0 hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java
  157. +43 −0 hedwig-client/src/main/java/org/apache/hedwig/util/Option.java
  158. +42 −0 hedwig-client/src/main/java/org/apache/hedwig/util/Pair.java
  159. +56 −0 hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java
  160. +32 −0 hedwig-client/src/main/resources/log4j.properties
  161. +51 −0 hedwig-client/src/test/java/org/apache/hedwig/client/AppTest.java
  162. +41 −0 hedwig-client/src/test/java/org/apache/hedwig/util/TestFileUtils.java
  163. +104 −0 hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java
  164. +54 −0 hedwig-client/src/test/java/org/apache/hedwig/util/TestPathUtils.java
  165. +26 −0 hedwig-protocol/Makefile
  166. +77 −0 hedwig-protocol/pom.xml
  167. +162 −0 hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java
  168. +153 −0 hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/MessageIdUtils.java
  169. +43 −0 hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/PubSubResponseUtils.java
  170. +41 −0 hedwig-protocol/src/main/java/org/apache/hedwig/protoextensions/SubscriptionStateUtils.java
Sorry, we could not display the entire diff because it was too big.
View
202 LICENSE.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
View
2  NOTICE.txt
@@ -0,0 +1,2 @@
+Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+
View
3  README
@@ -0,0 +1,3 @@
+Hedwig is a large scale pub/sub system built on top of ZooKeeper and BookKeeper.
+
+For documentation on building, setting up, and using Hedwig see the `doc` directory.
View
137 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/MySqlClient.java
@@ -0,0 +1,137 @@
+package org.apache.bookkeeper.benchmark;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.FileOutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.QuorumEngine;
+import org.apache.log4j.Logger;
+
+
+import org.apache.zookeeper.KeeperException;
+
+public class MySqlClient {
+ static Logger LOG = Logger.getLogger(QuorumEngine.class);
+
+ BookKeeper x;
+ LedgerHandle lh;
+ Integer entryId;
+ HashMap<Integer, Integer> map;
+
+ FileOutputStream fStream;
+ FileOutputStream fStreamLocal;
+ long start, lastId;
+ Connection con;
+ Statement stmt;
+
+
+ public MySqlClient(String hostport, String user, String pass)
+ throws ClassNotFoundException {
+ entryId = 0;
+ map = new HashMap<Integer, Integer>();
+ Class.forName("com.mysql.jdbc.Driver");
+ // database is named "bookkeeper"
+ String url = "jdbc:mysql://" + hostport + "/bookkeeper";
+ try {
+ con = DriverManager.getConnection(url, user, pass);
+ stmt = con.createStatement();
+ // drop table and recreate it
+ stmt.execute("DROP TABLE IF EXISTS data;");
+ stmt.execute("create table data(transaction_id bigint PRIMARY KEY AUTO_INCREMENT, content TEXT);");
+ LOG.info("Database initialization terminated");
+ } catch (SQLException e) {
+
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void closeHandle() throws KeeperException, InterruptedException, SQLException{
+ con.close();
+ }
+ /**
+ * First parameter is an integer defining the length of the message
+ * Second parameter is the number of writes
+ * Third parameter is host:port
+ * Fourth parameter is username
+ * Fifth parameter is password
+ * @param args
+ * @throws ClassNotFoundException
+ * @throws SQLException
+ */
+ public static void main(String[] args) throws ClassNotFoundException, SQLException {
+ int lenght = Integer.parseInt(args[1]);
+ StringBuilder sb = new StringBuilder();
+ while(lenght-- > 0){
+ sb.append('a');
+ }
+ try {
+ MySqlClient c = new MySqlClient(args[2], args[3], args[4]);
+ c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[0]));
+ c.writeSameEntry(sb.toString().getBytes(), Integer.parseInt(args[0]));
+ c.closeHandle();
+ } catch (NumberFormatException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Adds data entry to the DB
+ * @param data the entry to be written, given as a byte array
+ * @param times the number of times the entry should be written on the DB */
+ void writeSameEntryBatch(byte[] data, int times) throws InterruptedException, SQLException{
+ start = System.currentTimeMillis();
+ int count = times;
+ String content = new String(data);
+ System.out.println("Data: " + content + ", " + data.length);
+ while(count-- > 0){
+ stmt.addBatch("insert into data(content) values(\"" + content + "\");");
+ }
+ LOG.info("Finished writing batch SQL command in ms: " + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+ stmt.executeBatch();
+ System.out.println("Finished " + times + " writes in ms: " + (System.currentTimeMillis() - start));
+ LOG.info("Ended computation");
+ }
+
+ void writeSameEntry(byte[] data, int times) throws InterruptedException, SQLException{
+ start = System.currentTimeMillis();
+ int count = times;
+ String content = new String(data);
+ System.out.println("Data: " + content + ", " + data.length);
+ while(count-- > 0){
+ stmt.executeUpdate("insert into data(content) values(\"" + content + "\");");
+ }
+ System.out.println("Finished " + times + " writes in ms: " + (System.currentTimeMillis() - start));
+ LOG.info("Ended computation");
+ }
+
+}
View
252 bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/TestClient.java
@@ -0,0 +1,252 @@
+package org.apache.bookkeeper.benchmark;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.QuorumEngine;
+import org.apache.bookkeeper.client.ReadCallback;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is a simple test program to compare the performance of writing to
+ * BookKeeper and to the local file system.
+ *
+ */
+
+public class TestClient
+ implements AddCallback, ReadCallback{
+ private static final Logger LOG = Logger.getLogger(TestClient.class);
+
+ BookKeeper x;
+ LedgerHandle lh;
+ Integer entryId;
+ HashMap<Integer, Integer> map;
+
+ FileOutputStream fStream;
+ FileOutputStream fStreamLocal;
+ long start, lastId;
+
+ public TestClient() {
+ entryId = 0;
+ map = new HashMap<Integer, Integer>();
+ }
+
+ public TestClient(String servers) throws KeeperException, IOException, InterruptedException{
+ this();
+ x = new BookKeeper(servers);
+ try{
+ lh = x.createLedger(new byte[] {'a', 'b'});
+ } catch (BKException e) {
+ LOG.error(e.toString());
+ }
+ }
+
+ public TestClient(String servers, int ensSize, int qSize)
+ throws KeeperException, IOException, InterruptedException{
+ this();
+ x = new BookKeeper(servers);
+ try{
+ lh = x.createLedger(ensSize, qSize, QMode.VERIFIABLE, new byte[] {'a', 'b'});
+ } catch (BKException e) {
+ LOG.error(e.toString());
+ }
+ }
+
+ public TestClient(FileOutputStream fStream)
+ throws FileNotFoundException {
+ this.fStream = fStream;
+ this.fStreamLocal = new FileOutputStream("./local.log");
+ }
+
+
+ public Integer getFreshEntryId(int val){
+ ++this.entryId;
+ synchronized (map) {
+ map.put(this.entryId, val);
+ }
+ return this.entryId;
+ }
+
+ public boolean removeEntryId(Integer id){
+ boolean retVal = false;
+ synchronized (map) {
+ map.remove(id);
+ retVal = true;
+
+ if(map.size() == 0) map.notifyAll();
+ else{
+ if(map.size() < 4)
+ LOG.error(map.toString());
+ }
+ }
+ return retVal;
+ }
+
+ public void closeHandle() throws KeeperException, InterruptedException{
+ x.closeLedger(lh);
+ }
+ /**
+ * First says if entries should be written to BookKeeper (0) or to the local
+ * disk (1). Second parameter is an integer defining the length of a ledger entry.
+ * Third parameter is the number of writes.
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ int lenght = Integer.parseInt(args[1]);
+ StringBuilder sb = new StringBuilder();
+ while(lenght-- > 0){
+ sb.append('a');
+ }
+
+ Integer selection = Integer.parseInt(args[0]);
+ switch(selection){
+ case 0:
+ StringBuilder servers_sb = new StringBuilder();
+ for (int i = 4; i < args.length; i++){
+ servers_sb.append(args[i] + " ");
+ }
+
+ String servers = servers_sb.toString().trim().replace(' ', ',');
+ try {
+ TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4]));
+ c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2]));
+ //c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0]));
+ c.closeHandle();
+ } catch (NumberFormatException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ break;
+ case 1:
+
+ try{
+ TestClient c = new TestClient(new FileOutputStream(args[2]));
+ c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3]));
+ } catch(FileNotFoundException e){
+ LOG.error(e);
+ }
+ break;
+ case 2:
+ break;
+ }
+ }
+
+ void writeSameEntryBatch(byte[] data, int times) throws InterruptedException{
+ start = System.currentTimeMillis();
+ int count = times;
+ LOG.debug("Data: " + new String(data) + ", " + data.length);
+ while(count-- > 0){
+ x.asyncAddEntry(lh, data, this, this.getFreshEntryId(2));
+ }
+ LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
+ synchronized (map) {
+ if(map.size() != 0)
+ map.wait();
+ }
+ LOG.debug("Finished processing in ms: " + (System.currentTimeMillis() - start));
+
+ LOG.debug("Ended computation");
+ }
+
+ void writeConsecutiveEntriesBatch(int times) throws InterruptedException{
+ start = System.currentTimeMillis();
+ int count = times;
+ while(count-- > 0){
+ byte[] write = new byte[2];
+ int j = count%100;
+ int k = (count+1)%100;
+ write[0] = (byte) j;
+ write[1] = (byte) k;
+ x.asyncAddEntry(lh, write, this, this.getFreshEntryId(2));
+ }
+ LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
+ synchronized (map) {
+ if(map.size() != 0)
+ map.wait();
+ }
+ LOG.debug("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
+
+ Integer mon = Integer.valueOf(0);
+ synchronized(mon){
+ try{
+ x.asyncReadEntries(lh, 1, times - 1, this, mon);
+ mon.wait();
+ } catch (BKException e){
+ LOG.error(e);
+ }
+ }
+ LOG.error("Ended computation");
+ }
+
+ void writeSameEntryBatchFS(byte[] data, int times) {
+ int count = times;
+ LOG.debug("Data: " + data.length + ", " + times);
+ try{
+ start = System.currentTimeMillis();
+ while(count-- > 0){
+ fStream.write(data);
+ fStreamLocal.write(data);
+ fStream.flush();
+ }
+ fStream.close();
+ System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
+ } catch(IOException e){
+ LOG.error(e);
+ }
+ }
+
+
+ public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+ this.removeEntryId((Integer) ctx);
+ }
+
+ public void readComplete(int rc, long ledgerId, Enumeration<LedgerEntry> seq, Object ctx){
+ System.out.println("Read callback: " + rc);
+ while(seq.hasMoreElements()){
+ LedgerEntry le = seq.nextElement();
+ LOG.debug(new String(le.getEntry()));
+ }
+ synchronized(ctx){
+ ctx.notify();
+ }
+ }
+}
View
62 bookkeeper/README.txt
@@ -0,0 +1,62 @@
+BookKeeper README
+
+1- Overview
+BookKeeper is a highly available logging service. As many critical services rely upon write-ahead logs to provide persistence along with high performance, an alternative to make such a service highly available despite the failures of individual servers it to offload write-ahead logs to an external service.
+
+This is exactly what BookKeeper provides. With BookKeeper, a service (or application) writes to a set of servers dedicated to storing such logs. An example of such an application is the Namenode of the Hadoop Distributed File System.
+
+The main components of BookKeeper are:
+* Client: Applications interact with BookKeeper through the interface of of a BookKeeper client;
+* Ledger: A ledger is our equivalent to a log file. Clients read entries from and write entries to ledgers;
+* Bookie: Bookies are BookKeeper servers and they store the content of ledgers. Typically there are multiple bookies implementing a ledger.
+
+2- How to compile
+Run "ant" from "trunk/contrib/bookkeeper". This will generate the bookkeeper jar in "trunk/build/contrib/bookkeeper".
+
+3- Setting up
+
+A typical BookKeeper configuration includes a set of bookies and a ZooKeeper ensemble, where the ZooKeeper instance stores metadata for BookKeeper. As an example of such metadata, BookKeeper clients learn about available bookies by consulting a ZooKeeper service.
+
+To set up BookKeeper, follow these steps:
+* Once bookies and ZooKeeper servers are running, create two znodes: "/ledgers" and "/ledgers/available".
+* To run a bookie, run the java class "org.apache.bookkeeper.proto.BookieServer". It takes 3 parameters: a port, one directory path for transaction logs, and one directory path for indexes and data. Here is an example: java -cp .:bookkeeper.jar:../ZooKeeper/zookeeper-<version>.jar:/usr/local/apache-log4j-1.2.15/log4j-1.2.15.jar -Dlog4j.configuration=log4j.properties org.apache.bookkeeper.proto.BookieServer 3181 /disk1/bk/ /disk2/bk/
+* For each bookie b, if <host> is the host name of b and <port> is the bookie port, then create a znode "/ledgers/available/<host>:<port>".
+* It is ready to run!
+
+For test purposes, there is a class named "org.apache.bookkeeper.util.LocalBookkeeper" which runs a custom number on BookKeeper servers, along with a ZooKeeper server, on a single node. A typical invocation would be:
+java -cp:<classpath> org.apache.bookkeeper.util.LocalBookKeeper <number-of-bookies>
+
+4- Developing applications
+
+BookKeeper is written in Java. When implementing an application that uses BookKeeper, follow these steps:
+
+a. Instantiate a BookKeeper object. The single parameter to the BookKeeper constructor is a list of ZooKeeper servers;
+b. Once we have a BookKeeper object, we can create a ledger with createLedger. The default call to createLedger takes a single parameter, which is supposed to be for password authentication, but currently it has no effect. A call to createLedger returns a ledger handle (type LedgerHandle);
+c. Once we have a ledger, we can write to the ledger by calling either addEntry or asyncAddEntry. The first call is synchronous, whereas the second call is asynchronous, and both write byte arrays as entries. To use the asynchronous version, the application has to implement the AddCallback interface;
+d. Ideally, once the application finishes writing to the ledger, it should close it by calling close on the ledger handle. If it doesn't then BookKeeper will try to recover the ledger when a client tries to open it. By closing the ledger properly, we avoid this recovery step, which is recommended but not mandatory;
+e. Before reading from a ledger, a client has to open it by calling openLedger on a BookKeeper object, and readEntries or asycnReadEntries to read entries. Both read calls take as input two entry numbers, n1 and n2, and return all entries from n1 through n2.
+
+Here is a simple example of a method that creates a BookKeeper object, creates a ledger, writes an entry to the ledger, and closes it:
+
+BookKeeper bk;
+LedgerHandle lh;
+
+public void allInOne(String servers) throws KeeperException, IOException, InterruptedException{
+ bk = new BookKeeper(servers);
+ try{
+ lh = bk.createLedger(new byte[] {'a', 'b'});
+ bk.addEntry(lh, new byte[]{'a', 'b'});
+ bk.close(lh);
+ } catch (BKException e) {
+ e.printStackTrace();
+ }
+ }
+
+5- Selecting quorum mode and number of bookies (advanced)
+
+There are two methods to store ledgers with BookKeeper:
+
+a. Self-verifying: Each entry includes a digest that is used to guarantee that upon a read, the value read is the same as the one written. This mode requires n > 2t bookies, and quorums of size t + 1. By default, a call to createLedger uses this method and 3 servers;
+b. Generic: Entries do not include a digest, and it requires more replicas: n > 3t and quorums of size 2t + 1.
+
+The quorum mode and number of bookies can be selected through the createLedger method.
View
545 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -0,0 +1,545 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+
+
+/**
+ * Implements a bookie.
+ *
+ */
+
+public class Bookie extends Thread {
+ HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
+ static Logger LOG = Logger.getLogger(Bookie.class);
+
+ final File journalDirectory;
+
+ final File ledgerDirectories[];
+
+ // ZK registration path for this bookie
+ static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
+
+ // ZooKeeper client instance for the Bookie
+ ZooKeeper zk;
+
+ // Running flag
+ private volatile boolean running = false;
+
+ public static class NoLedgerException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private long ledgerId;
+ public NoLedgerException(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+ public long getLedgerId() {
+ return ledgerId;
+ }
+ }
+ public static class NoEntryException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private long ledgerId;
+ private long entryId;
+ public NoEntryException(long ledgerId, long entryId) {
+ super("Entry " + entryId + " not found in " + ledgerId);
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+ public long getLedger() {
+ return ledgerId;
+ }
+ public long getEntry() {
+ return entryId;
+ }
+ }
+
+ EntryLogger entryLogger;
+ LedgerCache ledgerCache;
+ class SyncThread extends Thread {
+ volatile boolean running = true;
+ public SyncThread() {
+ super("SyncThread");
+ }
+ @Override
+ public void run() {
+ while(running) {
+ synchronized(this) {
+ try {
+ wait(100);
+ if (!entryLogger.testAndClearSomethingWritten()) {
+ continue;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ lastLogMark.markLog();
+ try {
+ ledgerCache.flushLedger(true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ entryLogger.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ lastLogMark.rollLog();
+ }
+ }
+ }
+ SyncThread syncThread = new SyncThread();
+ public Bookie(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
+ instantiateZookeeperClient(port, zkServers);
+ this.journalDirectory = journalDirectory;
+ this.ledgerDirectories = ledgerDirectories;
+ entryLogger = new EntryLogger(ledgerDirectories, this);
+ ledgerCache = new LedgerCache(ledgerDirectories);
+ lastLogMark.readLog();
+ final long markedLogId = lastLogMark.txnLogId;
+ if (markedLogId > 0) {
+ File logFiles[] = journalDirectory.listFiles();
+ ArrayList<Long> logs = new ArrayList<Long>();
+ for(File f: logFiles) {
+ String name = f.getName();
+ if (!name.endsWith(".txn")) {
+ continue;
+ }
+ String idString = name.split("\\.")[0];
+ long id = Long.parseLong(idString, 16);
+ if (id < markedLogId) {
+ continue;
+ }
+ logs.add(id);
+ }
+ Collections.sort(logs);
+ if (logs.size() == 0 || logs.get(0) != markedLogId) {
+ throw new IOException("Recovery log " + markedLogId + " is missing");
+ }
+ // TODO: When reading in the journal logs that need to be synced, we
+ // should use BufferedChannels instead to minimize the amount of
+ // system calls done.
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
+ for(Long id: logs) {
+ FileChannel recLog = openChannel(id);
+ while(true) {
+ lenBuff.clear();
+ fullRead(recLog, lenBuff);
+ if (lenBuff.remaining() != 0) {
+ break;
+ }
+ lenBuff.flip();
+ int len = lenBuff.getInt();
+ if (len == 0) {
+ break;
+ }
+ recBuff.clear();
+ if (recBuff.remaining() < len) {
+ recBuff = ByteBuffer.allocate(len);
+ }
+ recBuff.limit(len);
+ if (fullRead(recLog, recBuff) != len) {
+ // This seems scary, but it just means that this is where we
+ // left off writing
+ break;
+ }
+ recBuff.flip();
+ long ledgerId = recBuff.getLong();
+ // XXX we net to make sure we set the master keys appropriately!
+ LedgerDescriptor handle = getHandle(ledgerId, false);
+ try {
+ recBuff.rewind();
+ handle.addEntry(recBuff);
+ } finally {
+ putHandle(handle);
+ }
+ }
+ }
+ }
+ setDaemon(true);
+ LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
+ start();
+ syncThread.start();
+ }
+
+ /**
+ * Instantiate the ZooKeeper client for the Bookie.
+ */
+ private void instantiateZookeeperClient(int port, String zkServers) throws IOException {
+ if (zkServers == null) {
+ LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
+ zk = null;
+ return;
+ }
+ // Create the ZooKeeper client instance
+ zk = new ZooKeeper(zkServers, 10000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO: handle session disconnects and expires
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process: " + event.getType() + " " + event.getPath());
+ }
+ }
+ });
+ // Create the ZK ephemeral node for this Bookie.
+ try {
+ zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (Exception e) {
+ LOG.fatal("ZK exception registering ephemeral Znode for Bookie!", e);
+ // Throw an IOException back up. This will cause the Bookie
+ // constructor to error out. Alternatively, we could do a System
+ // exit here as this is a fatal error.
+ throw new IOException(e);
+ }
+ }
+
+ private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
+ int total = 0;
+ while(bb.remaining() > 0) {
+ int rc = fc.read(bb);
+ if (rc <= 0) {
+ return total;
+ }
+ total += rc;
+ }
+ return total;
+ }
+ private void putHandle(LedgerDescriptor handle) {
+ synchronized (ledgers) {
+ handle.decRef();
+ }
+ }
+
+ private LedgerDescriptor getHandle(long ledgerId, boolean readonly, byte[] masterKey) throws IOException {
+ LedgerDescriptor handle = null;
+ synchronized (ledgers) {
+ handle = ledgers.get(ledgerId);
+ if (handle == null) {
+ if (readonly) {
+ throw new NoLedgerException(ledgerId);
+ }
+ handle = createHandle(ledgerId, readonly);
+ ledgers.put(ledgerId, handle);
+ handle.setMasterKey(ByteBuffer.wrap(masterKey));
+ }
+ handle.incRef();
+ }
+ return handle;
+ }
+
+ private LedgerDescriptor getHandle(long ledgerId, boolean readonly) throws IOException {
+ LedgerDescriptor handle = null;
+ synchronized (ledgers) {
+ handle = ledgers.get(ledgerId);
+ if (handle == null) {
+ if (readonly) {
+ throw new NoLedgerException(ledgerId);
+ }
+ handle = createHandle(ledgerId, readonly);
+ ledgers.put(ledgerId, handle);
+ }
+ handle.incRef();
+ }
+ return handle;
+ }
+
+
+ private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
+ return new LedgerDescriptor(ledgerId, entryLogger, ledgerCache);
+ }
+
+ static class QueueEntry {
+ QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
+ WriteCallback cb, Object ctx) {
+ this.entry = entry.duplicate();
+ this.cb = cb;
+ this.ctx = ctx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ ByteBuffer entry;
+
+ long ledgerId;
+
+ long entryId;
+
+ WriteCallback cb;
+
+ Object ctx;
+ }
+
+ LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
+
+ public final static long preAllocSize = 4*1024*1024;
+
+ public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+
+ class LastLogMark {
+ long txnLogId;
+ long txnLogPosition;
+ LastLogMark lastMark;
+ LastLogMark(long logId, long logPosition) {
+ this.txnLogId = logId;
+ this.txnLogPosition = logPosition;
+ }
+ synchronized void setLastLogMark(long logId, long logPosition) {
+ txnLogId = logId;
+ txnLogPosition = logPosition;
+ }
+ synchronized void markLog() {
+ lastMark = new LastLogMark(txnLogId, txnLogPosition);
+ }
+ synchronized void rollLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ bb.putLong(txnLogId);
+ bb.putLong(txnLogPosition);
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileOutputStream fos = new FileOutputStream(file);
+ fos.write(buff);
+ fos.getChannel().force(true);
+ fos.close();
+ } catch (IOException e) {
+ LOG.error("Problems writing to " + file, e);
+ }
+ }
+ }
+ synchronized void readLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileInputStream fis = new FileInputStream(file);
+ fis.read(buff);
+ fis.close();
+ bb.clear();
+ long i = bb.getLong();
+ long p = bb.getLong();
+ if (i > txnLogId) {
+ txnLogId = i;
+ }
+ if (p > txnLogPosition) {
+ txnLogPosition = p;
+ }
+ } catch (IOException e) {
+ LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
+ }
+ }
+ }
+ }
+
+ private LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+ public boolean isRunning(){
+ return running;
+ }
+
+ @Override
+ public void run() {
+ LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ try {
+ long logId = System.currentTimeMillis();
+ FileChannel logFile = openChannel(logId);
+ BufferedChannel bc = new BufferedChannel(logFile, 65536);
+ zeros.clear();
+ long nextPrealloc = preAllocSize;
+ long lastFlushPosition = 0;
+ logFile.write(zeros, nextPrealloc);
+ running = true;
+ // TODO: Currently, when we roll over the journal logs, the older
+ // ones are never garbage collected. We should remove a journal log
+ // once all of its entries have been synced with the entry logs.
+ while (true) {
+ QueueEntry qe = null;
+ if (toFlush.isEmpty()) {
+ qe = queue.take();
+ } else {
+ qe = queue.poll();
+ if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
+ //logFile.force(false);
+ bc.flush(true);
+ lastFlushPosition = bc.position();
+ lastLogMark.setLastLogMark(logId, lastFlushPosition);
+ for (QueueEntry e : toFlush) {
+ e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
+ }
+ toFlush.clear();
+ }
+ }
+ if (qe == null) {
+ continue;
+ }
+ lenBuff.clear();
+ lenBuff.putInt(qe.entry.remaining());
+ lenBuff.flip();
+ //
+ // we should be doing the following, but then we run out of
+ // direct byte buffers
+ // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+ bc.write(lenBuff);
+ bc.write(qe.entry);
+ if (bc.position() > nextPrealloc) {
+ nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
+ zeros.clear();
+ logFile.write(zeros, nextPrealloc);
+ }
+ toFlush.add(qe);
+ }
+ } catch (Exception e) {
+ LOG.fatal("Bookie thread exiting", e);
+ }
+ running = false;
+ }
+
+ private FileChannel openChannel(long logId) throws FileNotFoundException {
+ FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
+ Long.toHexString(logId) + ".txn"),
+ "rw").getChannel();
+ return logFile;
+ }
+
+ public void shutdown() throws InterruptedException {
+ // Shutdown the ZK client
+ if(zk != null) zk.close();
+ this.interrupt();
+ this.join();
+ syncThread.running = false;
+ syncThread.join();
+ for(LedgerDescriptor d: ledgers.values()) {
+ d.close();
+ }
+ // Shutdown the EntryLogger which has the GarbageCollector Thread running
+ entryLogger.shutdown();
+ }
+
+ public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ throws IOException, BookieException {
+ long ledgerId = entry.getLong();
+ LedgerDescriptor handle = getHandle(ledgerId, false, masterKey);
+
+ if(!handle.cmpMasterKey(ByteBuffer.wrap(masterKey))){
+ throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+ }
+ try {
+ entry.rewind();
+ long entryId = handle.addEntry(entry);
+ entry.rewind();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding " + entryId + "@" + ledgerId);
+ }
+ queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx));
+ } finally {
+ putHandle(handle);
+ }
+ }
+
+ public ByteBuffer readEntry(long ledgerId, long entryId) throws IOException {
+ LedgerDescriptor handle = getHandle(ledgerId, true);
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading " + entryId + "@" + ledgerId);
+ }
+ return handle.readEntry(entryId);
+ } finally {
+ putHandle(handle);
+ }
+ }
+
+ // The rest of the code is test stuff
+ static class CounterCallback implements WriteCallback {
+ int count;
+
+ synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
+ count--;
+ if (count == 0) {
+ notifyAll();
+ }
+ }
+
+ synchronized public void incCount() {
+ count++;
+ }
+
+ synchronized public void waitZero() throws InterruptedException {
+ while (count > 0) {
+ wait();
+ }
+ }
+ }
+
+ /**
+ * @param args
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws IOException,
+ InterruptedException, BookieException {
+ Bookie b = new Bookie(5000, null, new File("/tmp"), new File[] { new File("/tmp") });
+ CounterCallback cb = new CounterCallback();
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 100000; i++) {
+ ByteBuffer buff = ByteBuffer.allocate(1024);
+ buff.putLong(1);
+ buff.putLong(i);
+ buff.limit(1024);
+ buff.position(0);
+ cb.incCount();
+ b.addEntry(buff, cb, null, new byte[0]);
+ }
+ cb.waitZero();
+ long end = System.currentTimeMillis();
+ System.out.println("Took " + (end-start) + "ms");
+ }
+}
View
81 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -0,0 +1,81 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+ import java.lang.Exception;
+
+ @SuppressWarnings("serial")
+public abstract class BookieException extends Exception {
+
+ private int code;
+ public BookieException(int code){
+ this.code = code;
+ }
+
+ public static BookieException create(int code){
+ switch(code){
+ case Code.UnauthorizedAccessException:
+ return new BookieUnauthorizedAccessException();
+ default:
+ return new BookieIllegalOpException();
+ }
+ }
+
+ public interface Code {
+ int OK = 0;
+ int UnauthorizedAccessException = -1;
+
+ int IllegalOpException = -100;
+ }
+
+ public void setCode(int code){
+ this.code = code;
+ }
+
+ public int getCode(){
+ return this.code;
+ }
+
+ public String getMessage(int code){
+ switch(code){
+ case Code.OK:
+ return "No problem";
+ case Code.UnauthorizedAccessException:
+ return "Error while reading ledger";
+ default:
+ return "Invalid operation";
+ }
+ }
+
+ public static class BookieUnauthorizedAccessException extends BookieException {
+ public BookieUnauthorizedAccessException(){
+ super(Code.UnauthorizedAccessException);
+ }
+ }
+
+ public static class BookieIllegalOpException extends BookieException {
+ public BookieIllegalOpException(){
+ super(Code.UnauthorizedAccessException);
+ }
+ }
+}
View
168 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Provides a buffering layer in front of a FileChannel.
+ */
+public class BufferedChannel
+{
+ ByteBuffer writeBuffer;
+ ByteBuffer readBuffer;
+ private FileChannel bc;
+ long position;
+ int capacity;
+ long readBufferStartPosition;
+ long writeBufferStartPosition;
+ BufferedChannel(FileChannel bc, int capacity) throws IOException {
+ this.bc = bc;
+ this.capacity = capacity;
+ position = bc.position();
+ writeBufferStartPosition = position;
+ }
+/* public void close() throws IOException {
+ bc.close();
+ }
+*/
+// public boolean isOpen() {
+// return bc.isOpen();
+// }
+
+ synchronized public int write(ByteBuffer src) throws IOException {
+ int copied = 0;
+ if (writeBuffer == null) {
+ writeBuffer = ByteBuffer.allocateDirect(capacity);
+ }
+ while(src.remaining() > 0) {
+ int truncated = 0;
+ if (writeBuffer.remaining() < src.remaining()) {
+ truncated = src.remaining() - writeBuffer.remaining();
+ src.limit(src.limit()-truncated);
+ }
+ copied += src.remaining();
+ writeBuffer.put(src);
+ src.limit(src.limit()+truncated);
+ if (writeBuffer.remaining() == 0) {
+ writeBuffer.flip();
+ bc.write(writeBuffer);
+ writeBuffer.clear();
+ writeBufferStartPosition = bc.position();
+ }
+ }
+ position += copied;
+ return copied;
+ }
+
+ public long position() {
+ return position;
+ }
+
+ /**
+ * Retrieve the current size of the underlying FileChannel
+ *
+ * @return FileChannel size measured in bytes
+ *
+ * @throws IOException if some I/O error occurs reading the FileChannel
+ */
+ public long size() throws IOException {
+ return bc.size();
+ }
+
+ public void flush(boolean sync) throws IOException {
+ synchronized(this) {
+ if (writeBuffer == null) {
+ return;
+ }
+ writeBuffer.flip();
+ bc.write(writeBuffer);
+ writeBuffer.clear();
+ writeBufferStartPosition = bc.position();
+ }
+ if (sync) {
+ bc.force(false);
+ }
+ }
+
+ /*public Channel getInternalChannel() {
+ return bc;
+ }*/
+ synchronized public int read(ByteBuffer buff, long pos) throws IOException {
+ if (readBuffer == null) {
+ readBuffer = ByteBuffer.allocateDirect(capacity);
+ readBufferStartPosition = Long.MIN_VALUE;
+ }
+ int rc = buff.remaining();
+ while(buff.remaining() > 0) {
+ // check if it is in the write buffer
+ if (writeBuffer != null && writeBufferStartPosition <= pos) {
+ long positionInBuffer = pos - writeBufferStartPosition;
+ long bytesToCopy = writeBuffer.position()-positionInBuffer;
+ if (bytesToCopy > buff.remaining()) {
+ bytesToCopy = buff.remaining();
+ }
+ if (bytesToCopy == 0) {
+ throw new IOException("Read past EOF");
+ }
+ ByteBuffer src = writeBuffer.duplicate();
+ src.position((int) positionInBuffer);
+ src.limit((int) (positionInBuffer+bytesToCopy));
+ buff.put(src);
+ pos+= bytesToCopy;
+ // first check if there is anything we can grab from the readBuffer
+ } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition+readBuffer.capacity()) {
+ long positionInBuffer = pos - readBufferStartPosition;
+ long bytesToCopy = readBuffer.capacity()-positionInBuffer;
+ if (bytesToCopy > buff.remaining()) {
+ bytesToCopy = buff.remaining();
+ }
+ ByteBuffer src = readBuffer.duplicate();
+ src.position((int) positionInBuffer);
+ src.limit((int) (positionInBuffer+bytesToCopy));
+ buff.put(src);
+ pos += bytesToCopy;
+ // let's read it
+ } else {
+ readBufferStartPosition = pos;
+ readBuffer.clear();
+ // make sure that we don't overlap with the write buffer
+ if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
+ readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
+ if (readBufferStartPosition < 0) {
+ readBuffer.put(LedgerEntryPage.zeroPage, 0, (int)-readBufferStartPosition);
+ }
+ }
+ while(readBuffer.remaining() > 0) {
+ if (bc.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
+ throw new IOException("Short read");
+ }
+ }
+ readBuffer.put(LedgerEntryPage.zeroPage, 0, readBuffer.remaining());
+ readBuffer.clear();
+ }
+ }
+ return rc;
+ }
+}
View
487 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -0,0 +1,487 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * This class manages the writing of the bookkeeper entries. All the new
+ * entries are written to a common log. The LedgerCache will have pointers
+ * into files created by this class with offsets into the files to find
+ * the actual ledger entry. The entry log files created by this class are
+ * identified by a long.
+ */
+public class EntryLogger {
+ private static final Logger LOG = Logger.getLogger(EntryLogger.class);
+ private File dirs[];
+ // This is a handle to the Bookie parent instance. We need this to get
+ // access to the LedgerCache as well as the ZooKeeper client handle.
+ private final Bookie bookie;
+
+ private long logId;
+ /**
+ * The maximum size of a entry logger file.
+ */
+ final static long LOG_SIZE_LIMIT = Long.getLong("logSizeLimit", 2 * 1024 * 1024 * 1024L);
+ private volatile BufferedChannel logChannel;
+ /**
+ * The 1K block at the head of the entry logger file
+ * that contains the fingerprint and (future) meta-data
+ */
+ final static int LOGFILE_HEADER_SIZE = 1024;
+ final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+
+ // this indicates that a write has happened since the last flush
+ private volatile boolean somethingWritten = false;
+
+ // ZK ledgers related String constants
+ static final String LEDGERS_PATH = "/ledgers";
+ static final String LEDGER_NODE_PREFIX = "L";
+ static final String AVAILABLE_NODE = "available";
+
+ // Maps entry log files to the set of ledgers that comprise the file.
+ private ConcurrentMap<Long, ConcurrentHashMap<Long, Boolean>> entryLogs2LedgersMap = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>>();
+ // This is the thread that garbage collects the entry logs that do not
+ // contain any active ledgers in them.
+ GarbageCollectorThread gcThread = new GarbageCollectorThread();
+ // This is how often we want to run the Garbage Collector Thread (in milliseconds).
+ // This should be passed as a System property. Default it to 1000 ms (1sec).
+ final static int gcWaitTime = Integer.getInteger("gcWaitTime", 1000);
+
+ /**
+ * Create an EntryLogger that stores it's log files in the given
+ * directories
+ */
+ public EntryLogger(File dirs[], Bookie bookie) throws IOException {
+ this.dirs = dirs;
+ this.bookie = bookie;
+ // Initialize the entry log header buffer. This cannot be a static object
+ // since in our unit tests, we run multiple Bookies and thus EntryLoggers
+ // within the same JVM. All of these Bookie instances access this header
+ // so there can be race conditions when entry logs are rolled over and
+ // this header buffer is cleared before writing it into the new logChannel.
+ LOGFILE_HEADER.put("BKLO".getBytes());
+ // Find the largest logId
+ for(File f: dirs) {
+ long lastLogId = getLastLogId(f);
+ if (lastLogId >= logId) {
+ logId = lastLogId+1;
+ }
+ }
+ createLogId(logId);
+ // Start the Garbage Collector thread to prune unneeded entry logs.
+ gcThread.start();
+ }
+
+ /**
+ * Maps entry log files to open channels.
+ */
+ private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+
+ /**
+ * This is the garbage collector thread that runs in the background to
+ * remove any entry log files that no longer contains any active ledger.
+ */
+ class GarbageCollectorThread extends Thread {
+ volatile boolean running = true;
+
+ public GarbageCollectorThread() {
+ super("GarbageCollectorThread");
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ synchronized (this) {
+ try {
+ wait(gcWaitTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ // Initialization check. No need to run any logic if we are still starting up.
+ if (entryLogs2LedgersMap.isEmpty() || bookie.ledgerCache == null
+ || bookie.ledgerCache.activeLedgers == null) {
+ continue;
+ }
+ // First sync ZK to make sure we're reading the latest active/available ledger nodes.
+ bookie.zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
+ .create(KeeperException.Code.get(rc), path));
+ return;
+ }
+ // Sync has completed successfully so now we can poll ZK
+ // and read in the latest set of active ledger nodes.
+ List<String> ledgerNodes;
+ try {
+ ledgerNodes = bookie.zk.getChildren(LEDGERS_PATH, null);
+ } catch (Exception e) {
+ LOG.error("Error polling ZK for the available ledger nodes: ", e);
+ // We should probably wait a certain amount of time before retrying in case of temporary issues.
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved current set of ledger nodes: " + ledgerNodes);
+ }
+ // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
+ HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
+ for (String ledgerNode : ledgerNodes) {
+ try {
+ // The available node is also stored in this path so ignore that.
+ // That node is the path for the set of available Bookie Servers.
+ if (ledgerNode.equals(AVAILABLE_NODE))
+ continue;
+ String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
+ allActiveLedgers.add(Long.parseLong(parts[parts.length - 1]));
+ } catch (NumberFormatException e) {
+ LOG.fatal("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
+ // This is a pretty bad error as it indicates a ledger node in ZK
+ // has an incorrect format. For now just continue and consider
+ // this as a non-existent ledger.
+ continue;
+ }
+ }
+ ConcurrentMap<Long, Boolean> curActiveLedgers = bookie.ledgerCache.activeLedgers;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK: " + allActiveLedgers);
+ LOG.debug("Current active ledgers from Bookie: " + curActiveLedgers.keySet());
+ }
+ // Remove any active ledgers that don't exist in ZK.
+ for (Long ledger : curActiveLedgers.keySet()) {
+ if (!allActiveLedgers.contains(ledger)) {
+ // Remove it from the current active ledgers set and also from all
+ // LedgerCache data references to the ledger, i.e. the physical ledger index file.
+ LOG.info("Removing a non-active/deleted ledger: " + ledger);
+ curActiveLedgers.remove(ledger);
+ try {
+ bookie.ledgerCache.deleteLedger(ledger);
+ } catch (IOException e) {
+ LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+ }
+ }
+ }
+ // Loop through all of the entry logs and remove the non-active ledgers.
+ for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
+ ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
+ for (Long entryLogLedger : entryLogLedgers.keySet()) {
+ // Remove the entry log ledger from the set if it isn't active.
+ if (!bookie.ledgerCache.activeLedgers.containsKey(entryLogLedger)) {
+ entryLogLedgers.remove(entryLogLedger);
+ }
+ }
+ if (entryLogLedgers.isEmpty()) {
+ // This means the entry log is not associated with any active ledgers anymore.
+ // We can remove this entry log file now.
+ LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
+ File entryLogFile;
+ try {
+ entryLogFile = findFile(entryLogId);
+ } catch (FileNotFoundException e) {
+ LOG.error("Trying to delete an entryLog file that could not be found: "
+ + entryLogId + ".log");
+ continue;
+ }
+ entryLogFile.delete();
+ channels.remove(entryLogId);
+ entryLogs2LedgersMap.remove(entryLogId);
+ }
+ }
+ };
+ }, null);
+ }
+ }
+ }
+
+ /**
+ * Creates a new log file with the given id.
+ */
+ private void createLogId(long logId) throws IOException {
+ List<File> list = Arrays.asList(dirs);
+ Collections.shuffle(list);
+ File firstDir = list.get(0);
+ if (logChannel != null) {
+ logChannel.flush(true);
+ }
+ logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
+ logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
+ channels.put(logId, logChannel);
+ for(File f: dirs) {
+ setLastLogId(f, logId);
+ }
+ // Extract all of the ledger ID's that comprise all of the entry logs
+ // (except for the current new one which is still being written to).
+ extractLedgersFromEntryLogs();
+ }
+
+ /**
+ * writes the given id to the "lastId" file in the given directory.
+ */
+ private void setLastLogId(File dir, long logId) throws IOException {
+ FileOutputStream fos;
+ fos = new FileOutputStream(new File(dir, "lastId"));
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos));
+ try {
+ bw.write(Long.toHexString(logId) + "\n");
+ bw.flush();
+ } finally {
+ try {
+ fos.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * reads id from the "lastId" file in the given directory.
+ */
+ private long getLastLogId(File f) {
+ FileInputStream fis;
+ try {
+ fis = new FileInputStream(new File(f, "lastId"));
+ } catch (FileNotFoundException e) {
+ return -1;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+ try {
+ String lastIdString = br.readLine();
+ return Long.parseLong(lastIdString);
+ } catch (IOException e) {
+ return -1;
+ } catch(NumberFormatException e) {
+ return -1;
+ } finally {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ private void openNewChannel() throws IOException {
+ createLogId(++logId);
+ }
+
+ synchronized void flush() throws IOException {
+ if (logChannel != null) {
+ logChannel.flush(true);
+ }
+ }
+ synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
+ openNewChannel();
+ }
+ ByteBuffer buff = ByteBuffer.allocate(4);
+ buff.putInt(entry.remaining());
+ buff.flip();
+ logChannel.write(buff);
+ long pos = logChannel.position();
+ logChannel.write(entry);
+ //logChannel.flush(false);
+ somethingWritten = true;
+ return (logId << 32L) | pos;
+ }
+
+ byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
+ long entryLogId = location >> 32L;
+ long pos = location & 0xffffffffL;
+ ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ pos -= 4; // we want to get the ledgerId and length to check
+ BufferedChannel fc;
+ try {
+ fc = getChannelForLogId(entryLogId);
+ } catch (FileNotFoundException e) {
+ FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
+ newe.setStackTrace(e.getStackTrace());
+ throw newe;
+ }
+ if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ throw new IOException("Short read from entrylog " + entryLogId);
+ }
+ pos += 4;
+ sizeBuff.flip();
+ int entrySize = sizeBuff.getInt();
+ // entrySize does not include the ledgerId
+ if (entrySize > 1024*1024) {
+ LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
+
+ }
+ byte data[] = new byte[entrySize];
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int rc = fc.read(buff, pos);
+ if ( rc != data.length) {
+ throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
+ }
+ buff.flip();
+ long thisLedgerId = buff.getLong();
+ if (thisLedgerId != ledgerId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
+ }
+ long thisEntryId = buff.getLong();
+ if (thisEntryId != entryId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
+ }
+
+ return data;
+ }
+
+ private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
+ BufferedChannel fc = channels.get(entryLogId);
+ if (fc != null) {
+ return fc;
+ }
+ File file = findFile(entryLogId);
+ FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
+ // If the file already exists before creating a BufferedChannel layer above it,
+ // set the FileChannel's position to the end so the write buffer knows where to start.
+ newFc.position(newFc.size());
+ synchronized (channels) {
+ fc = channels.get(entryLogId);
+ if (fc != null){
+ newFc.close();
+ return fc;
+ }
+ fc = new BufferedChannel(newFc, 8192);
+ channels.put(entryLogId, fc);
+ return fc;
+ }
+ }
+
+ private File findFile(long logId) throws FileNotFoundException {
+ for(File d: dirs) {
+ File f = new File(d, Long.toHexString(logId)+".log");
+ if (f.exists()) {
+ return f;
+ }
+ }
+ throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
+ }
+
+ synchronized public boolean testAndClearSomethingWritten() {
+ try {
+ return somethingWritten;
+ } finally {
+ somethingWritten = false;
+ }
+ }
+
+ /**
+ * Method to read in all of the entry logs (those that we haven't done so yet),
+ * and find the set of ledger ID's that make up each entry log file.
+ */
+ private void extractLedgersFromEntryLogs() throws IOException {
+ // Extract it for every entry log except for the current one.
+ // Entry Log ID's are just a long value that starts at 0 and increments
+ // by 1 when the log fills up and we roll to a new one.
+ ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ BufferedChannel bc;
+ for (long entryLogId = 0; entryLogId < logId; entryLogId++) {
+ // Comb the current entry log file if it has not already been extracted.
+ if (entryLogs2LedgersMap.containsKey(entryLogId)) {
+ continue;
+ }
+ LOG.info("Extracting the ledgers from entryLogId: " + entryLogId);
+ // Get the BufferedChannel for the current entry log file
+ try {
+ bc = getChannelForLogId(entryLogId);
+ } catch (FileNotFoundException e) {
+ // If we can't find the entry log file, just log a warning message and continue.
+ // This could be a deleted/garbage collected entry log.
+ LOG.warn("Entry Log file not found in log directories: " + entryLogId + ".log");
+ continue;
+ }
+ // Start the read position in the current entry log file to be after
+ // the header where all of the ledger entries are.
+ long pos = LOGFILE_HEADER_SIZE;
+ ConcurrentHashMap<Long, Boolean> entryLogLedgers = new ConcurrentHashMap<Long, Boolean>();
+ // Read through the entry log file and extract the ledger ID's.
+ while (true) {
+ // Check if we've finished reading the entry log file.
+ if (pos >= bc.size()) {
+ break;
+ }
+ if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ throw new IOException("Short read from entrylog " + entryLogId);
+ }
+ pos += 4;
+ sizeBuff.flip();
+ int entrySize = sizeBuff.getInt();
+ if (entrySize > 1024 * 1024) {
+ LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
+ + entryLogId);
+ }
+ byte data[] = new byte[entrySize];
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int rc = bc.read(buff, pos);
+ if (rc != data.length) {
+ throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!="
+ + data.length + ")");
+ }
+ buff.flip();
+ long ledgerId = buff.getLong();
+ entryLogLedgers.put(ledgerId, true);
+ // Advance position to the next entry and clear sizeBuff.
+ pos += entrySize;
+ sizeBuff.clear();
+ }
+ LOG.info("Retrieved all ledgers that comprise entryLogId: " + entryLogId + ", values: " + entryLogLedgers);
+ entryLogs2LedgersMap.put(entryLogId, entryLogLedgers);
+ }
+ }
+
+ /**
+ * Shutdown method to gracefully stop all threads spawned in this class and exit.
+ *
+ * @throws InterruptedException if there is an exception stopping threads.
+ */
+ public void shutdown() throws InterruptedException {
+ gcThread.running = false;
+ gcThread.interrupt();
+ gcThread.join();
+ }
+
+}
View
124 bookkeeper/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * This is the file handle for a ledger's index file that maps entry ids to location.
+ * It is used by LedgerCache.
+ */
+class FileInfo {
+ private FileChannel fc;
+ private final File lf;
+ /**
+ * The fingerprint of a ledger index file
+ */
+ private byte header[] = "BKLE\0\0\0\0".getBytes();
+ static final long START_OF_DATA = 1024;
+ private long size;
+ private int useCount;
+ private boolean isClosed;
+ public FileInfo(File lf) throws IOException {
+ this.lf = lf;
+ fc = new RandomAccessFile(lf, "rws").getChannel();
+ size = fc.size();
+ if (size == 0) {
+ fc.write(ByteBuffer.wrap(header));
+ }
+ }
+
+ synchronized public long size() {
+ long rc = size-START_OF_DATA;
+ if (rc < 0) {
+ rc = 0;
+ }
+ return rc;
+ }
+
+ synchronized public int read(ByteBuffer bb, long position) throws IOException {
+ int total = 0;
+ while(bb.remaining() > 0) {
+ int rc = fc.read(bb, position+START_OF_DATA);
+ if (rc <= 0) {
+ throw new IOException("Short read");
+ }
+ total += rc;
+ }
+ return total;
+ }
+
+ synchronized public void close() throws IOException {
+ isClosed = true;
+ if (useCount == 0) {
+ fc.close();
+ }
+ }
+
+ synchronized public long write(ByteBuffer[] buffs, long position) throws IOException {
+ long total = 0;
+ try {
+ fc.position(position+START_OF_DATA);
+ while(buffs[buffs.length-1].remaining() > 0) {
+ long rc = fc.write(buffs);
+ if (rc <= 0) {
+ throw new IOException("Short write");
+ }
+ total += rc;
+ }
+ } finally {
+ long newsize = position+START_OF_DATA+total;
+ if (newsize > size) {
+ size = newsize;
+ }
+ }
+ return total;
+ }
+
+ synchronized public void use() {
+ useCount++;
+ }
+
+ synchronized public void release() {
+ useCount--;
+ if (isClosed && useCount == 0) {
+ try {
+ fc.close();
+ } catch (IO