Permalink
Browse files

Initial Commit

  • Loading branch information...
0 parents commit 4dc27085808a8fc367acb2d84dc81911670b2a8d @cburroughs cburroughs committed Dec 27, 2013
Showing with 19,876 additions and 0 deletions.
  1. +17 −0 .gitignore
  2. +177 −0 LICENSE
  3. +64 −0 NOTICE
  4. +128 −0 README.mdown
  5. +42 −0 hydra-avro/pom.xml
  6. +145 −0 hydra-avro/src/main/java/com/addthis/hydra/task/output/OutputStreamAvro.java
  7. +194 −0 hydra-avro/src/main/java/com/addthis/hydra/task/source/DataSourceAvro.java
  8. +1 −0 hydra-avro/src/main/resources/plugins/avro-input-sources.classmap
  9. +1 −0 hydra-avro/src/main/resources/plugins/avro-output-stream-formatters.classmap
  10. +100 −0 hydra-avro/src/test/java/com/addthis/hydra/task/source/DataSourceAvroTest.java
  11. +95 −0 hydra-data/pom.xml
  12. +76 −0 hydra-data/src/main/java/com/addthis/hydra/data/MakeBloom.java
  13. +53 −0 hydra-data/src/main/java/com/addthis/hydra/data/channel/BlockingBufferedConsumer.java
  14. +73 −0 hydra-data/src/main/java/com/addthis/hydra/data/channel/BlockingNullConsumer.java
  15. +657 −0 hydra-data/src/main/java/com/addthis/hydra/data/io/DiskBackedList.java
  16. +716 −0 hydra-data/src/main/java/com/addthis/hydra/data/io/DiskBackedList2.java
  17. +67 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/AbstractBufferOp.java
  18. +173 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/AbstractQueryOp.java
  19. +39 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/AbstractRowOp.java
  20. +73 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/AbstractTableOp.java
  21. +101 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/BoundedValue.java
  22. +178 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/CLIQuery.java
  23. +210 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/DiskBackedMap.java
  24. +31 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/FieldValue.java
  25. +92 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/FieldValueList.java
  26. +354 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/Query.java
  27. +38 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryChannelException.java
  28. +332 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryElement.java
  29. +151 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryElementField.java
  30. +357 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryElementNode.java
  31. +108 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryElementProperty.java
  32. +398 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryEngine.java
  33. +83 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryEngineDirectory.java
  34. +37 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryException.java
  35. +129 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryMemManager.java
  36. +35 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryMemTracker.java
  37. +70 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryOp.java
  38. +515 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryOpProcessor.java
  39. +36 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryStatusObserver.java
  40. +34 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/QueryTableOp.java
  41. +29 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/ResultAccumulator.java
  42. +47 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/ResultChannelOutput.java
  43. +49 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/ResultTable.java
  44. +120 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/ResultTableDisk.java
  45. +196 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/ResultTableTuned.java
  46. +166 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannel.java
  47. +378 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannelClient.java
  48. +213 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannelClientX.java
  49. +73 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannelRequest.java
  50. +116 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannelResponse.java
  51. +328 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryChannelServer.java
  52. +70 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/QueryHost.java
  53. +36 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/channel/RowByteWrapper.java
  54. +100 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpChangePoints.java
  55. +103 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpCompare.java
  56. +82 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpContains.java
  57. +74 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpDateFormat.java
  58. +131 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpDePivot.java
  59. +232 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpDiff.java
  60. +580 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpDiskSort.java
  61. +175 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpDisorder.java
  62. +92 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpFill.java
  63. +153 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpFold.java
  64. +210 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpFrequencyTable.java
  65. +467 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpGather.java
  66. +87 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpHistogram.java
  67. +105 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpLimit.java
  68. +80 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpMap.java
  69. +46 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpMedian.java
  70. +387 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpMerge.java
  71. +72 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpNoDup.java
  72. +306 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpNumber.java
  73. +77 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpOrder.java
  74. +103 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpOrderMap.java
  75. +90 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpPercentileDistribution.java
  76. +98 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpPercentileRank.java
  77. +549 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpPivot.java
  78. +85 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpRMap.java
  79. +48 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpRandomFail.java
  80. +67 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpRange.java
  81. +103 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpRemoveSingletons.java
  82. +49 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpReverse.java
  83. +337 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpRoll.java
  84. +78 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpSeen.java
  85. +56 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpSkip.java
  86. +61 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpSleep.java
  87. +186 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpSort.java
  88. +265 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpString.java
  89. +80 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpTitle.java
  90. +67 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/op/OpTranspose.java
  91. +32 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/AbstractQueryConsumer.java
  92. +43 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/EmptyQuerySource.java
  93. +21 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/ErrorHandlingQuerySource.java
  94. +48 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/QueryConsumer.java
  95. +19 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/QueryHandle.java
  96. +40 −0 hydra-data/src/main/java/com/addthis/hydra/data/query/source/QuerySource.java
  97. +878 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/ConcurrentTree.java
  98. +639 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/ConcurrentTreeNode.java
  99. +36 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTree.java
  100. +133 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTreeNode.java
  101. +54 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTreeNodeActor.java
  102. +19 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTreeNodeInitializer.java
  103. +30 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTreeNodeUpdater.java
  104. +29 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/DataTreeUtil.java
  105. +491 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/ReadTree.java
  106. +357 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/ReadTreeNode.java
  107. +992 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/Tree.java
  108. +76 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeCommonParameters.java
  109. +77 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeDataParameters.java
  110. +26 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeDataParent.java
  111. +546 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeNode.java
  112. +133 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeNodeData.java
  113. +18 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeNodeDataDeferredOperation.java
  114. +79 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeNodeList.java
  115. +792 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/TreeStatistics.java
  116. +132 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/prop/DataBits.java
  117. +254 −0 hydra-data/src/main/java/com/addthis/hydra/data/tree/prop/DataBloom.java
Sorry, we could not display the entire diff because too many files (1,468) changed.
17 .gitignore
@@ -0,0 +1,17 @@
+target/
+test.http.get/
+.classpath
+.project
+.settings/
+.idea/
+hydra-local/
+hydra-data/indexes/
+hydra-data/query.tmp
+hydra*.iml
+pom.xml.versionsBackup
+.#*
+docs/build/
+hydra.ipr
+hydra.iws
+hydra-uber/src/main/resources/hydra-git.properties
+*.pyc
177 LICENSE
@@ -0,0 +1,177 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
64 NOTICE
@@ -0,0 +1,64 @@
+hydra
+Copyright 2013 AddThis
+
+This product includes software developed by AddThis.
+
+
+This product also includes the following third-party components
+
+jQuery (http://jquery.org/)
+Copyright 2012 jQuery Foundation and other contributors
+
+date.js (http://www.datejs.com/)
+Copyright 2006-2007, Coolite Inc.
+
+google-code-prettify (https://code.google.com/p/google-code-prettify/)
+Copyright 2006 Google Inc.
+
+prototypejs (http://www.prototypejs.org/)
+Copyright 2005-2009 Sam Stephenson
+
+codemirror (http://codemirror.net/)
+Copyright (C) 2013 by Marijn Haverbeke <marijnh@gmail.com> and others
+
+Bootstrap (http://getbootstrap.com/)
+Copyright 2013 Twitter, Inc
+
+DataTables (http://datatables.net/)
+Copyright 2008-2012 Allan Jardine
+
+Ace (http://ace.c9.io/)
+Copyright (c) 2010, Ajax.org B.V.
+
+alertify.js (http://fabien-d.github.com/alertify.js/)
+copyright Fabien Doiron 2013
+
+Backbone.js (http://backbonejs.org)
+2010-2013 Jeremy Ashkenas, DocumentCloud Inc
+
+d3 (http://d3js.org/)
+Copyright (c) 2013, Michael Bostock
+
+RequireJS domReady (http://github.com/requirejs/domReady)
+Copyright 2010-2012 The Dojo Foundation
+
+jasmine (http://pivotal.github.io/jasmine/)
+Copyright (c) 2008-2011 Pivotal Labs
+
+jQuery resize event (http://benalman.com/projects/jquery-resize-plugin/)
+Copyright (c) 2010 "Cowboy" Ben Alman
+
+jQuery Cookie Plugin (https://github.com/carhartl/jquery-cookie)
+Copyright 2013 Klaus Hartl
+
+RequireJS (http://github.com/jrburke/requirejs)
+Copyright (c) 2010-2012, The Dojo Foundation
+
+jsSHA (http://caligatio.github.com/jsSHA/)
+Copyright Brian Turek 2008-2013
+
+Underscore.js (http://underscorejs.org)
+(c) 2009-2013 Jeremy Ashkenas, DocumentCloud and Investigative Reporters & Editors
+
+
+
128 README.mdown
@@ -0,0 +1,128 @@
+## hydra
+
+Hydra is a distributed data processing and storage system originally
+developed at [AddThis](http://www.addthis.com). It ingests streams of
+data (think log files) and builds trees that are aggregates,
+summaries, or transformations of the data. These trees can be used by
+humans to explore (tiny queries), as part of a machine learning
+pipeline (big queries), or to support live consoles on websites (lots
+of queries).
+
+You can run hydra from the command line to slice and dice that Apache
+access log you have sitting around (or that gargantuan csv file). Or
+if terabytes per day is your cup of tea run a Hydra Cluster that
+supports your job with resource sharing, job management, distributed
+backups, data partitioning, and efficient bulk file transfer.
+
+## Building
+
+Assuming you have [Apache Maven](http://maven.apache.org/) installed
+and configured:
+
+ mvn package
+
+Should compile and build jars. All hydra dependencies should be
+available on maven central but hydra itself is not yet published.
+
+[Berkeley DB Java Edition](http://www.oracle.com/technetwork/database/berkeleydb/overview/index-093405.html)
+is used for several core features. The sleepycat license has strong
+copyleft properties that do not match the rest of the project. It is
+set as a non-transitive dependency to avoid inadvertently pulling it
+into downstream projects. In the future hydra should have pluggable
+storage with multiple implementations.
+
+The `hydra-uber` module builds an `exec` jar containing hydra and all
+of it's dependencies. To include BDB JE when building with `mvn
+package` use `-P bdbje`. The main class of the `exec` jar launches
+the various components of a hydra cluster by name.
+
+## System dependencies
+
+JDK 7 is required. Hydra has been developed on Linux (Centos 6) and
+should work on any modern Linux distro. Other unix-like systems
+should work with minor changes but have not been tested. Mac OSX
+should work for building and running local-stack (see below).
+
+Hydra uses [rabbitmq](http://www.rabbitmq.com/) for low volume
+command and control message exchange. On a modern Linux systems
+`apt-get install rabbitmq-server` and running with the default
+settings is adequate in most cases.
+
+To run efficiently Hydra needs a mechanism to take copy on write
+backups of the output of jobs. The is currently accomplished by
+adding the [fl-cow](http://xmailserver.org/flcow.html) library to
+`LD_PRELOAD`. Experimenting with other approaches such as ZFS or `cp
+--reflink` are under consideration.
+
+Many components assume that there is a local user called `hydra` and
+that all minion nodes can ssh as that user to each other. This is
+used most prominently for `rsync` based replicas.
+
+
+## Components
+
+While hydra can be used for ad-hoc analysis of csv and other local
+files, it's most commonly used in a distributed cluster. In that case
+the following components are involved:
+
+ * [ZooKeeper](http://zookeeper.apache.org/)
+ * Spawn: Job control and execution
+ * Minion: Task runner
+ * QueryMaster: Handler for queries
+ * QueryWorker: Handle scatter-gather requests from QueryMaster
+ * Meshy: File server
+
+A typical configuration is to have a cluster head with Spawn &
+QueryMaster backed by a homogeneous clusters of nodes running Minion,
+QueryWorker, and Meshy.
+
+## Local Stack
+
+For local development all of the above components can run together in
+a single stack run out of `hydra-local`. There is a `local-stack.sh`
+script to assist with this. To run the local stack:
+
+ * You must be able to build hydra
+ * Have rabbitmq installed
+ * Allow your current user to ssh to itself
+
+The first time the script is run a `hydra-local` directory will be created.
+
+ * `./hydra-uber/bin/local-stack.sh start` - start ZooKeeper
+ * `./hydra-uber/bin/local-stack.sh start` - start spawn, querymaster etc.
+ * `./hydra-uber/bin/local-stack.sh seed` - add some sample data
+
+You can then navigate to http://localhost:5052/ and you should see
+the spawn web interface.
+
+When done `./hydra-uber/bin/local-stack.sh stop` will stop everything
+except ZooKeeper, and running `stop` a second time will bring that
+process down as well.
+
+There are sample job configurations located in `hydra-uber/local/sample/`
+
+## Administrative
+
+### Discussion
+
+Mailing list: http://groups.google.com/group/hydra-oss
+
+[Freenode](http://freenode.net/) channel: `#hydra`
+
+### Versioning
+
+It's x.y.z where:
+
+ * x: Something Big Happened
+ * y: next release
+ * z: strive for bug fix only
+
+### License
+
+hydra is released under the Apache License Version 2.0. See
+[Apache](http://www.apache.org/licenses/LICENSE-2.0) or the LICENSE
+file in this distribution for details.
+
+
+
+
42 hydra-avro/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.addthis.hydra</groupId>
+ <artifactId>hydra-parent</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hydra-avro</artifactId>
+ <name>Hydra Avro Module</name>
+ <description>Avro input and output</description>
+
+ <dependencies>
+ <!-- module deps -->
+ <dependency>
+ <groupId>com.addthis.hydra</groupId>
+ <artifactId>hydra-task</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- 3rd party -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <!-- end 3rd part -->
+ </dependencies>
+</project>
145 hydra-avro/src/main/java/com/addthis/hydra/task/output/OutputStreamAvro.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.task.output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.addthis.basis.util.Bytes;
+
+import com.addthis.bundle.core.Bundle;
+import com.addthis.bundle.core.BundleField;
+import com.addthis.bundle.io.DataChannelCodec;
+import com.addthis.bundle.io.DataChannelCodec.ClassIndexMap;
+import com.addthis.bundle.io.DataChannelCodec.FieldIndexMap;
+import com.addthis.bundle.value.ValueArray;
+import com.addthis.bundle.value.ValueMap;
+import com.addthis.bundle.value.ValueMapEntry;
+import com.addthis.bundle.value.ValueObject;
+import com.addthis.codec.Codec;
+import com.addthis.codec.Codec.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+/**
+ * @user-reference
+ * @hydra-name avro
+ */
+public class OutputStreamAvro extends OutputStreamFormatter implements Codec.SuperCodable {
+
+ @Set(codable = true)
+ private HashSet<String> include;
+ @Set(codable = true)
+ private HashSet<String> exclude;
+ @Set(codable = true, required = true)
+ private String schema;
+ // TODO: add support for specifying schema URL
+
+ private Schema outputSchema;
+ private DatumWriter<GenericRecord> datumWriter;
+
+
+ @Override
+ public OutputStreamEmitter createEmitter() {
+ return new OutputStreamEmitter() {
+ private final ClassIndexMap classMap = DataChannelCodec.createClassIndexMap();
+ private final FieldIndexMap fieldMap = DataChannelCodec.createFieldIndexMap();
+ private BinaryEncoder encoder;
+
+ @Override
+ public void write(OutputStream out, Bundle row) throws IOException {
+ if (include != null || exclude != null) {
+ row = new FilteredBundle(row, include, exclude);
+ }
+ synchronized (this) {
+ GenericRecord outputRecord = createAvroRecordFromBundle(row);
+ encoder = EncoderFactory.get().blockingBinaryEncoder(out, encoder);
+ datumWriter.write(outputRecord, encoder);
+ Bytes.writeBytes(DataChannelCodec.encodeBundle(row, fieldMap, classMap), out);
+ }
+ }
+
+ private GenericRecord createAvroRecordFromBundle(Bundle bundle) {
+ GenericRecord genericRecord = new GenericData.Record(outputSchema);
+ for (BundleField bundleField : bundle) {
+ ValueObject value = bundle.getValue(bundleField);
+ if (value == null) {
+ continue;
+ }
+ Object val = null;
+ switch (value.getObjectType()) {
+ case ARRAY:
+ ValueArray valueArray = value.asArray();
+ List<String> list = new ArrayList<>(valueArray.size());
+ for (ValueObject valueObject : valueArray) {
+ list.add(valueObject.toString());
+ }
+ val = list;
+ break;
+ case MAP:
+ ValueMap map = value.asMap();
+ Map<String, String> avroMap = new HashMap<>();
+ for (ValueMapEntry valueMapEntry : map) {
+ avroMap.put(valueMapEntry.getKey(), valueMapEntry.getValue().toString());
+ }
+ val = avroMap;
+ break;
+ case STRING:
+ val = value.asString().getString();
+ break;
+ case INT:
+ val = value.asNumber().asLong().getLong();
+ break;
+ case FLOAT:
+ val = value.asDouble().getDouble();
+ break;
+ case BYTES:
+ val = value.asBytes().getBytes();
+ break;
+ }
+
+ genericRecord.put(bundleField.getName(), val);
+ }
+ return genericRecord;
+ }
+
+ @Override
+ public void flush(OutputStream out) throws IOException {
+ out.flush();
+ }
+ };
+ }
+
+ @Override
+ public void postDecode() {
+ outputSchema = new Schema.Parser().parse(schema);
+ datumWriter = new GenericDatumWriter<>(outputSchema);
+ }
+
+ @Override
+ public void preEncode() {
+ }
+}
194 hydra-avro/src/main/java/com/addthis/hydra/task/source/DataSourceAvro.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.task.source;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import com.addthis.bundle.channel.DataChannelError;
+import com.addthis.bundle.core.Bundle;
+import com.addthis.bundle.core.BundleFactory;
+import com.addthis.bundle.core.list.ListBundle;
+import com.addthis.bundle.core.list.ListBundleFormat;
+import com.addthis.bundle.value.ValueFactory;
+import com.addthis.bundle.value.ValueObject;
+import com.addthis.codec.Codec;
+import com.addthis.hydra.task.run.TaskRunConfig;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+
+/**
+ * This data source <span class="hydra-summary">accepts avro streams</span>.
+ *
+ * @user-reference
+ * @hydra-name avro
+ */
+public class DataSourceAvro extends TaskDataSource implements BundleFactory {
+
+ /**
+ * This field is required.
+ */
+ @Codec.Set(codable = true, required = true)
+ protected FactoryInputStream input;
+ @Codec.Set(codable = true, required = true)
+ private String schema;
+ @Codec.Set(codable = true, required = true)
+ private HashSet<String> fields;
+
+ private GenericDatumReader<GenericRecord> datumReader;
+
+ private final ListBundleFormat format = new ListBundleFormat();
+ private GenericRecord peekRecord;
+ private Bundle peek;
+ private Decoder decoder;
+ private InputStream inputStream;
+
+ @Override
+ protected void open(TaskRunConfig config) {
+
+ setDatumReader(new GenericDatumReader<GenericRecord>(new Schema.Parser().parse(schema)));
+ try {
+ setInputStream(input.createInputStream(config));
+ setDecoder(DecoderFactory.get().binaryDecoder(inputStream, null));
+ } catch (IOException e) {
+ throw DataChannelError.promote(e);
+ }
+ }
+
+ @Override
+ public Bundle next() throws DataChannelError {
+ Bundle next = peek();
+ peek = null;
+ return next;
+ }
+
+ @Override
+ public Bundle peek() throws DataChannelError {
+ if (peek == null) {
+ try {
+ peekRecord = datumReader.read(peekRecord, decoder);
+ GenericData genericData = datumReader.getData();
+ Bundle bundle = createBundle();
+ for (String field : fields) {
+ ValueObject value = getValueObject(peekRecord, field, genericData);
+ if (value != null) {
+ bundle.setValue(bundle.getFormat().getField(field), value);
+ }
+ }
+ peek = bundle;
+ } catch (EOFException e) {
+ return null;
+ } catch (IOException e) {
+ throw DataChannelError.promote(e);
+ }
+ }
+ return peek;
+ }
+
+ protected ValueObject getValueObject(GenericRecord genericRecord, String fieldName, GenericData genericData) throws IOException {
+ Schema.Field field = genericRecord.getSchema().getField(fieldName);
+ if (field == null) {
+ return null;
+ }
+ Object recordValue = genericRecord.get(field.name());
+ if (recordValue == null) {
+ return null;
+ }
+ Schema schema = field.schema();
+ Schema.Type type = schema.getType();
+ return getValueObject(recordValue, schema, type, genericData);
+ }
+
+ private ValueObject getValueObject(Object recordValue, Schema schema, Schema.Type type, GenericData genericData) throws IOException {
+ ValueObject value = null;
+ switch (type) {
+ case ARRAY:
+ List<String> replacement = new ArrayList<>();
+ for (Utf8 av : (List<Utf8>) recordValue) {
+ replacement.add(av.toString());
+ }
+ value = ValueFactory.createValueArray(replacement);
+ break;
+ case BYTES:
+ value = ValueFactory.create((byte[]) recordValue);
+ break;
+ case ENUM:
+ value = ValueFactory.create((double) recordValue);
+ break;
+ case FIXED:
+ throw new RuntimeException("FIXED type is not supported");
+ case FLOAT:
+ case DOUBLE:
+ value = ValueFactory.create((double) recordValue);
+ break;
+ case INT:
+ value = ValueFactory.create((int) recordValue);
+ break;
+ case LONG:
+ value = ValueFactory.create((long) recordValue);
+ break;
+ case MAP:
+ throw new IOException("MAP types are not currently supported");
+ case NULL:
+ break;
+ case STRING:
+ case BOOLEAN:
+ value = ValueFactory.create(recordValue.toString());
+ break;
+ case UNION:
+ Schema unionSchema = schema.getTypes().get(genericData.resolveUnion(schema, recordValue));
+ value = getValueObject(recordValue, unionSchema, unionSchema.getType(), genericData);
+ break;
+ default:
+ throw new IOException("Unknown schema type: " + type);
+ }
+ return value;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Bundle createBundle() {
+ return new ListBundle(format);
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ public void setDatumReader(GenericDatumReader<GenericRecord> datumReader) {
+ this.datumReader = datumReader;
+ }
+
+ public void setDecoder(Decoder decoder) {
+ this.decoder = decoder;
+ }
+
+ public void setFields(HashSet<String> fields) {
+ this.fields = fields;
+ }
+}
1 hydra-avro/src/main/resources/plugins/avro-input-sources.classmap
@@ -0,0 +1 @@
+"avro", com.addthis.hydra.task.source.DataSourceAvro
1 hydra-avro/src/main/resources/plugins/avro-output-stream-formatters.classmap
@@ -0,0 +1 @@
+"avro", com.addthis.hydra.task.output.OutputStreamAvro
100 hydra-avro/src/test/java/com/addthis/hydra/task/source/DataSourceAvroTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.task.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import com.addthis.bundle.core.Bundle;
+import com.addthis.bundle.core.BundleField;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertNotNull;
+
+public class DataSourceAvroTest {
+
+ private DataSourceAvro dataSourceAvro;
+ private final String schema = "{\n \"namespace\": \"com.addthis.camus.records\",\n \"type\": \"record\",\n \"name\": \"testEvent\",\n \"doc\": \"test\",\n \"fields\": [\n\t " +
+ "{\"name\": \"string\",\"type\": \"string\",\"default\": \"\"}," +
+ "{\"name\": \"long\",\"type\": \"long\",\"default\": 0}," +
+ "{\"name\": \"double\",\"type\": \"double\",\"default\": 0.0}," +
+ "{\"name\": \"boolean\",\"type\": \"boolean\"}," +
+ "{\"name\": \"array\", \"type\": {\"type\":\"array\", \"items\":\"string\"}}," +
+ "{\"name\": \"union\", \"type\": [{\"type\":\"string\"}, {\"type\":\"null\"}]}" +
+ "]}";
+
+ @Before
+ public void setup() {
+
+ Schema outputSchema = new Schema.Parser().parse(schema);
+ GenericRecord genericRecord = new GenericData.Record(outputSchema);
+ genericRecord.put("string", "hello");
+ genericRecord.put("long", 1l);
+ genericRecord.put("double", 2.0d);
+ genericRecord.put("boolean", true);
+ genericRecord.put("array", Arrays.asList("foo", "bar", "car"));
+ genericRecord.put("union", "union");
+
+ dataSourceAvro = new DataSourceAvro();
+ HashSet<String> fields = new HashSet<>();
+ fields.add("string");
+ fields.add("long");
+ fields.add("double");
+ fields.add("boolean");
+ fields.add("array");
+ fields.add("union");
+ dataSourceAvro.setFields(fields);
+ dataSourceAvro.setDatumReader(new GenericDatumReader<GenericRecord>(new Schema.Parser().parse(schema)));
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(bos, null);
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(outputSchema);
+ try {
+ datumWriter.write(genericRecord, encoder);
+ encoder.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ InputStream inputStream = new ByteArrayInputStream(bos.toByteArray());
+ dataSourceAvro.setInputStream(inputStream);
+ dataSourceAvro.setDecoder(DecoderFactory.get().binaryDecoder(inputStream, null));
+
+ }
+
+ @Test
+ public void testGetValueObject() throws Exception {
+
+ Bundle bundle = dataSourceAvro.peek();
+ assertNotNull(bundle);
+ for (BundleField bundleField : bundle) {
+ System.out.println(bundleField.getName() + ":" + bundle.getValue(bundleField));
+ }
+ }
+}
95 hydra-data/pom.xml
@@ -0,0 +1,95 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.addthis.hydra</groupId>
+ <artifactId>hydra-parent</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hydra-data</artifactId>
+ <name>Hydra Data Module</name>
+ <description>log file processing library</description>
+
+ <dependencies>
+ <!-- module deps -->
+ <dependency>
+ <groupId>com.addthis.hydra</groupId>
+ <artifactId>hydra-store</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.addthis.hydra</groupId>
+ <artifactId>hydra-filters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- addthis deps -->
+ <dependency>
+ <groupId>com.addthis</groupId>
+ <artifactId>bundle</artifactId>
+ <version>${hydra.dep.bundle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.addthis</groupId>
+ <artifactId>codec</artifactId>
+ <version>${hydra.dep.codec.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.addthis</groupId>
+ <artifactId>meshy</artifactId>
+ <version>${hydra.dep.meshy.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.addthis</groupId>
+ <artifactId>muxy</artifactId>
+ <version>${hydra.dep.muxy.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.clearspring.analytics</groupId>
+ <artifactId>stream</artifactId>
+ <version>${hydra.dep.streamlib.version}</version>
+ </dependency>
+ <!-- end addthis deps -->
+ <!-- 3rd party -->
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${hydra.dep.commons-codec.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${hydra.dep.compress.snappy-java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>compress-lzf</artifactId>
+ <version>${hydra.dep.compress.compress-lzf.version}</version>
+ </dependency>
+ <!-- end 3rd party -->
+
+ <dependency>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <version>${hydra.dep.sleepycat.je.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
76 hydra-data/src/main/java/com/addthis/hydra/data/MakeBloom.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.data;
+
+import java.io.File;
+
+import com.addthis.basis.util.Files;
+
+import com.addthis.maljson.JSONArray;
+
+import com.clearspring.analytics.stream.membership.BloomFilter;
+
+import org.slf4j.Logger;
+
+import org.slf4j.LoggerFactory;
+/**
+ * <h1>MakeBloom</h1>
+ * <p/>
+ * <p>Create a Bloom Filter out of a string list and export it.</p>
+ *
+ */
+public class MakeBloom {
+
+ /** */
+ private static final Logger log = LoggerFactory.getLogger(MakeBloom.class);
+ private static final String fpRate = System.getProperty("fpRate", "0.001");
+ private static final double fp_rate = Double.parseDouble(fpRate);
+
+ // get all the quoted words from a "frag" file
+ public static String[] getWords(File in) throws java.io.IOException,
+ com.addthis.maljson.JSONException {
+ log.debug("Reading " + in.length() + " bytes from [" + in + "]");
+ JSONArray words = new JSONArray("[" + new String(Files.read(in), "utf8") + "]");
+ log.debug("Read " + words.length() + " words from [" + in + "]");
+ String[] ret = new String[words.length()];
+ for (int i = 0; i < words.length(); i++) {
+ ret[i] = words.getString(i);
+ }
+ return ret;
+ }
+
+ public static void main(String[] args) throws java.io.IOException,
+ com.addthis.maljson.JSONException {
+ if (args.length != 1 && args.length != 2) {
+ throw new IllegalArgumentException("usage: MakeBloom word-list-file [bloom-file]");
+ }
+
+ File in = new File(args[0]);
+ String[] words = getWords(in);
+
+ BloomFilter bf = new BloomFilter(words.length, fp_rate);
+ log.debug("Created: BloomFilter(" + bf.buckets() + " buckets, " +
+ bf.getHashCount() + " hashes); FP rate = " + fp_rate);
+ for (int i = 0; i < words.length; i++) {
+ bf.add(words[i]);
+ }
+ log.debug("Added words");
+
+ File out = args.length == 2 ? new File(args[1]) :
+ Files.replaceSuffix(in, "-" + words.length + "-" + fpRate + ".bloom");
+ log.debug("Writing [" + out + "]");
+ Files.write(out, org.apache.commons.codec.binary.Base64.encodeBase64(BloomFilter.serialize(bf)), false);
+ log.debug("Wrote " + out.length() + " bytes to [" + out + "]");
+ }
+}
53 hydra-data/src/main/java/com/addthis/hydra/data/channel/BlockingBufferedConsumer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.data.channel;
+
+import com.addthis.bundle.core.Bundle;
+import com.addthis.bundle.table.DataTable;
+import com.addthis.hydra.data.query.Query;
+import com.addthis.hydra.data.query.QueryOpProcessor;
+
+
+/**
+ * table accumulator for op chain that can block until complete send or error
+ */
+public class BlockingBufferedConsumer extends BlockingNullConsumer {
+
+ private final DataTable table;
+
+ public BlockingBufferedConsumer() throws InterruptedException {
+ table = Query.createProcessor(this).createTable(0);
+ }
+
+ public BlockingBufferedConsumer(String ops) throws InterruptedException {
+ table = Query.createProcessor(this, ops).createTable(0);
+ }
+
+ public BlockingBufferedConsumer(QueryOpProcessor proc) throws InterruptedException {
+ table = proc.createTable(0);
+ }
+
+ @Override
+ public void send(Bundle row) {
+ table.append(row);
+ }
+
+ /**
+ * only callable once
+ */
+ public DataTable getTable() throws Exception {
+ waitComplete();
+ return table;
+ }
+}
73 hydra-data/src/main/java/com/addthis/hydra/data/channel/BlockingNullConsumer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.data.channel;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import com.addthis.bundle.channel.DataChannelError;
+import com.addthis.bundle.channel.DataChannelOutput;
+import com.addthis.bundle.core.Bundle;
+import com.addthis.bundle.core.list.ListBundle;
+import com.addthis.bundle.core.list.ListBundleFormat;
+import com.addthis.hydra.data.query.QueryException;
+
+/**
+ * effective /dev/null for op chain that can block until complete send or error
+ */
+public class BlockingNullConsumer implements DataChannelOutput {
+
+ private final ListBundleFormat format = new ListBundleFormat();
+ private final Semaphore gate = new Semaphore(1);
+ private QueryException exception;
+
+ public BlockingNullConsumer() {
+ try {
+ gate.acquire();
+ } catch (InterruptedException e) {
+ throw new DataChannelError(e);
+ }
+ }
+
+ public void waitComplete() throws Exception {
+ gate.acquire();
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ @Override
+ public void sendComplete() {
+ gate.release();
+ }
+
+ @Override
+ public void send(Bundle row) {
+ }
+
+ @Override
+ public void send(List<Bundle> bundles) {
+ }
+
+ @Override
+ public void sourceError(DataChannelError ex) {
+ exception = new QueryException(ex);
+ sendComplete();
+ }
+
+ @Override
+ public Bundle createBundle() {
+ return new ListBundle(format);
+ }
+}
657 hydra-data/src/main/java/com/addthis/hydra/data/io/DiskBackedList.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.data.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * a disk-backed list in which only the index is stored in memory with
+ * objects/bytes kept on disk. sparse and delete are punted. when a
+ * value is updated, a new object is appended and the old data is left
+ * in place. compaction creates an new backing store and copies over
+ * data, deleting the old.
+ *
+ * @param <K> a codable object type
+ */
+@Deprecated
+public class DiskBackedList<K> implements List<K> {
+
+ /** */
+ public static interface ItemCodec<K> {
+
+ public K decode(byte row[]) throws IOException;
+
+ public byte[] encode(K row) throws IOException;
+ }
+
+ private static final int headerSize = 64;
+
+ private final LinkedList<DiskBackedListEntry> master = new LinkedList<DiskBackedListEntry>();
+
+ private ItemCodec<K> codec;
+ private RandomAccessFile access;
+ private AccessFileHandler accessFileHandler;
+ private File data;
+ private long nextOffset;
+ private long firstElement;
+ private int cruft;
+ private int numSeeks = 0;
+
+ public DiskBackedList(File data, ItemCodec<K> codec) throws IOException {
+ this(data, codec, 1000);
+ }
+
+ public DiskBackedList(File data, ItemCodec<K> codec, int maxReadBufferSize) throws IOException {
+ this.data = data;
+ this.codec = codec;
+ boolean create = !data.exists() || data.length() == 0;
+ access = new RandomAccessFile(data, "rw");
+ this.accessFileHandler = new AccessFileHandler(access, maxReadBufferSize);
+ if (create) {
+ clear();
+ } else {
+ readHeader();
+ System.out.println("importing " + data + " first=" + firstElement + " next=" + nextOffset);
+ if (firstElement > 0) {
+ DiskBackedListEntry e1 = getEntry(firstElement);
+ e1.read();
+ master.add(e1);
+ while ((e1 = e1.getNext()) != null) {
+ master.add(e1);
+ }
+ }
+ }
+ }
+
+ public void setCodec(ItemCodec<K> codec) {
+ this.codec = codec;
+ }
+
+ @Override
+ protected void finalize() {
+ if (access != null) {
+ System.err.println("finalizing open DiskBackedList rows=" + size() + " size=" + data.length() + " @ " + data);
+ try {
+ close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public boolean add(K element) {
+ add(master.size(), element);
+ return true;
+ }
+
+ @Override
+ public void add(int index, K element) {
+ try {
+ DiskBackedListEntry added = allocate(element, index < master.size() ? master.get(index) : null);
+ master.add(index, added);
+ if (index == 0) {
+ setFirst(added);
+ } else {
+ DiskBackedListEntry prev = master.get(index - 1);
+ prev.setNext(added);
+ prev.update();
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends K> c) {
+ for (K k : c) {
+ add(k);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends K> c) {
+ for (K k : c) {
+ add(index++, k);
+ }
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ master.clear();
+ firstElement = 0;
+ nextOffset = headerSize;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return indexOf(o) >= 0;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Iterator<?> iter = c.iterator(); iter.hasNext();) {
+ if (!contains(iter.next())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public K get(int index) {
+ return master.get(index).getObjectSafe();
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ int index = 0;
+ for (DiskBackedListEntry next : master) {
+ if (next.getObjectSafe().equals(o)) {
+ return index;
+ }
+ index++;
+ }
+ return -1;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return master.size() == 0;
+ }
+
+ @Override
+ public Iterator<K> iterator() {
+ return listIterator();
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ int index = 0;
+ int found = -1;
+ for (DiskBackedListEntry next : master) {
+ if (next.getObjectSafe().equals(o)) {
+ found = index;
+ }
+ index++;
+ }
+ return found;
+ }
+
+ @Override
+ public ListIterator<K> listIterator() {
+ return listIterator(0);
+ }
+
+ @Override
+ public ListIterator<K> listIterator(final int index) {
+ return new ListIterator<K>() {
+
+ ListIterator<DiskBackedListEntry> listIter = master.listIterator(index);
+
+ @Override
+ public boolean hasNext() {
+ return listIter.hasNext();
+ }
+
+ @Override
+ public K next() {
+ return listIter.next().getObjectSafe();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(K e) {
+ DiskBackedList.this.add(e);
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ return listIter.hasPrevious();
+ }
+
+ @Override
+ public int nextIndex() {
+ return listIter.nextIndex();
+ }
+
+ @Override
+ public K previous() {
+ return listIter.previous().getObjectSafe();
+ }
+
+ @Override
+ public int previousIndex() {
+ return listIter.previousIndex();
+ }
+
+ @Override
+ public void set(K e) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ if (remove(indexOf(o)) != null) {
+ cruft++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public K remove(int index) {
+ if (index >= 0) {
+ DiskBackedListEntry e = master.remove(index);
+ try {
+ DiskBackedListEntry prev = index > 0 ? master.get(index - 1) : null;
+ DiskBackedListEntry next = index < master.size() ? master.get(index) : null;
+ if (prev != null) {
+ prev.setNext(next);
+ prev.update();
+ }
+ if (index == 0) {
+ setFirst(next);
+ }
+ cruft++;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return e.getObjectSafe();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ boolean success = true;
+ for (Iterator<?> iter = c.iterator(); iter.hasNext();) {
+ if (!remove(iter.next())) {
+ success = false;
+ }
+ }
+ cruft++;
+ return success;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public K set(int index, K element) {
+ DiskBackedListEntry e = master.get(index);
+ try {
+ DiskBackedListEntry prev = index > 0 ? master.get(index - 1) : null;
+ DiskBackedListEntry next = index + 1 < master.size() ? master.get(index + 1) : null;
+ DiskBackedListEntry swap = allocate(element, next);
+ master.set(index, swap);
+ if (prev != null) {
+ prev.setNext(swap);
+ prev.update();
+ }
+ if (index == 0) {
+ setFirst(swap);
+ }
+ cruft++;
+ return e.getObjectSafe();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public int size() {
+ return master.size();
+ }
+
+ @Override
+ public List<K> subList(int fromIndex, int toIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * write DiskBackedList magic preamble and pointers
+ */
+ private void writeHeader() throws IOException {
+ access.seek(0);
+ access.writeLong(0x12345678L);
+ access.writeLong(firstElement);
+ access.writeLong(nextOffset);
+ numSeeks += 1;
+ }
+
+ /**
+ * read DiskBackedList magic preamble and pointers
+ */
+ private void readHeader() throws IOException {
+ access.seek(0);
+ access.readLong();
+ firstElement = access.readLong();
+ nextOffset = access.readLong();
+ numSeeks += 1;
+ }
+
+ /**
+ * fix pointers in Entries to align with master list. run after a sort()
+ */
+ private void updatePointers() throws IOException {
+ DiskBackedListEntry prev = null;
+ for (DiskBackedListEntry next : master) {
+ if (prev != null) {
+ prev.setNext(next);
+ prev.update();
+ }
+ prev = next;
+ }
+ if (prev != null) {
+ prev.setNext(null);
+ prev.update();
+ }
+ }
+
+ private void setFirst(DiskBackedListEntry e) {
+ if (e != null) {
+ firstElement = e.off;
+ } else {
+ firstElement = 0;
+ }
+ }
+
+ /**
+ * read Entry from disk based on an offset
+ */
+ private DiskBackedListEntry getEntry(long off) throws IOException {
+ if (off > 0) {
+ DiskBackedListEntry e = new DiskBackedListEntry(off);
+ e.read();
+ return e;
+ }
+ return null;
+ }
+
+ /**
+ * allocate a new Entry based on object data and prev/next pointers
+ */
+ private DiskBackedListEntry allocate(K val, DiskBackedListEntry next) throws Exception {
+ return allocate(codec.encode(val), next);
+ }
+
+ /**
+ * allocate a new Entry based on raw data and prev/next pointers
+ */
+ private DiskBackedListEntry allocate(byte data[], DiskBackedListEntry next) throws Exception {
+ return accessFileHandler.writeToAccess(data, next);
+ }
+
+ /**
+ * create new DiskBackedList and copy over data compacting out holes
+ */
+ public void compact() throws Exception {
+ if (cruft > 0) {
+ File newdata = new File(data.getParentFile(), data.getName().concat(".new"));
+ DiskBackedList<K> alt = new DiskBackedList<K>(newdata, codec);
+ alt.addEncodedData(getEncodedData());
+ clear();
+ close();
+ data = alt.data;
+ nextOffset = alt.nextOffset;
+ firstElement = alt.firstElement;
+ access = alt.access;
+ cruft = 0;
+ }
+ }
+
+ /**
+ * clean up and close
+ */
+ public void close() throws IOException {
+ if (access != null) {
+ writeHeader();
+ access.close();
+ access = null;
+ }
+ }
+
+ /**
+ * perform sort based on object comparisons
+ */
+ public void sort(final Comparator<K> comp) {
+ long starttime = System.currentTimeMillis();
+ dumbSort(comp);
+ long stoptime = System.currentTimeMillis() - starttime;
+ System.out.println("dumbSort took " + stoptime + " ms");
+ starttime = System.currentTimeMillis();
+ Collections.sort(master, new Comparator<DiskBackedListEntry>() {
+ @Override
+ public int compare(DiskBackedListEntry o1, DiskBackedListEntry o2) {
+ return comp.compare(o1.getObjectSafe(), o2.getObjectSafe());
+ }
+ });
+ try {
+ updatePointers();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ stoptime = System.currentTimeMillis() - starttime;
+ System.out.println("existing sort took " + stoptime + " ms");
+ }
+
+ public <K> void dumbSort(final Comparator<K> comp) {
+ List<K> memoryList = new ArrayList<K>();
+ Iterator it = this.iterator();
+ while (it.hasNext()) {
+ memoryList.add((K) it.next());
+ }
+ Collections.sort(memoryList, comp);
+ StringBuilder buf = new StringBuilder();
+ for (K thing : memoryList.subList(0, 20)) {
+ buf.append(thing);
+ }
+ System.out.println("Here's dumb sort: " + buf.toString());
+ }
+
+ /**
+ * consume a list of encoded list elements
+ */
+ public void addEncodedData(Iterator<byte[]> stream) throws Exception {
+ DiskBackedListEntry prev = master.size() > 0 ? master.getLast() : null;
+ while (stream.hasNext()) {
+ DiskBackedListEntry next = allocate(stream.next(), null);
+ if (prev != null) {
+ prev.setNext(next);
+ prev.update();
+ }
+ prev = next;
+ }
+ if (prev != null) {
+ prev.update();
+ }
+ }
+
+ /**
+ * produce a list of encoded list elements
+ */
+ public Iterator<byte[]> getEncodedData() {
+ return new Iterator<byte[]>() {
+ Iterator<DiskBackedListEntry> iter = master.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public byte[] next() {
+ try {
+ return iter.next().getData();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ public int getSeeks() {
+ return numSeeks;
+ }
+
+ private class AccessFileHandler {
+
+ private RandomAccessFile access;
+ private int maxReadBufferSize;
+ private int maxWriteBufferSize;
+ private LinkedHashMap<Long, byte[]> readBuffer = new LinkedHashMap<Long, byte[]>();
+ public HashMap<Long, byte[]> writeBuffer = new HashMap<Long, byte[]>();
+
+ public AccessFileHandler(RandomAccessFile access, int maxReadBufferSize) {
+ this.access = access;
+ this.maxReadBufferSize = maxReadBufferSize;
+ this.maxWriteBufferSize = 1000;
+ }
+
+ public byte[] getFromAccess(Long offset) throws IOException {
+ byte[] result = readBuffer.get(offset);
+ if (result == null) {
+ access.seek(offset + 8);
+ result = new byte[access.readInt()];
+ access.readFully(result);
+ numSeeks += 1;
+ }
+ putInReadBuffer(offset, result);
+ return result;
+ }
+
+ public void putInReadBuffer(Long offset, byte[] data) {
+ if (readBuffer.size() >= maxReadBufferSize) {
+ readBuffer.clear();
+ }
+ readBuffer.put(offset, data);
+ }
+
+ public DiskBackedListEntry writeToAccess(byte[] bytes, DiskBackedListEntry next) throws IOException {
+ putInReadBuffer(nextOffset, bytes);
+ DiskBackedListEntry e = new DiskBackedListEntry(nextOffset);
+ e.setNext(next);
+ e.write(bytes);
+ nextOffset += 8 + 4 + bytes.length;
+ return e;
+ }
+ }
+
+ /**
+ * pointer bag with reader/write utilities
+ */
+ private class DiskBackedListEntry {
+
+ private long off;
+ private long next;
+ private boolean updated;
+
+ private DiskBackedListEntry(long off) {
+ this.off = off;
+ }
+
+ public void read() throws IOException {
+ access.seek(off);
+ next = access.readLong();
+ numSeeks += 1;
+ }
+
+ public void update() throws IOException {
+ if (updated) {
+ write(null);
+ updated = false;
+ }
+ }
+
+ public void write(byte data[]) throws IOException {
+ access.seek(off);
+ access.writeLong(next);
+ numSeeks += 1;
+ if (data != null) {
+ access.writeInt(data.length);
+ access.write(data);
+ }
+ }
+
+ public byte[] getData() throws IOException {
+ return accessFileHandler.getFromAccess(off);
+ }
+
+ @SuppressWarnings("unchecked")
+ public K getObject() throws Exception {
+ return codec.decode(getData());
+ }
+
+ public K getObjectSafe() {
+ try {
+ return getObject();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public DiskBackedListEntry getNext() throws IOException {
+ return getEntry(next);
+ }
+
+ public void setNext(DiskBackedListEntry next) {
+ if (next == null || next.off != this.next) {
+ this.next = next != null ? next.off : 0;
+ updated = true;
+ }
+
+ }
+ }
+
+}
716 hydra-data/src/main/java/com/addthis/hydra/data/io/DiskBackedList2.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.addthis.hydra.data.io;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+
+import com.addthis.basis.util.Bytes;
+import com.addthis.basis.util.Files;
+import com.addthis.basis.util.MemoryCounter;
+import com.addthis.basis.util.Parameter;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of disk-backed list tuned for good sorting performance.
+ * Incoming values are split into a number of chunks. Each chunk is small enough
+ * to fit entirely within memory.
+ *
+ * @param <K> a codable object type
+ */
+public class DiskBackedList2<K> implements List<K> {
+
+ public static interface ItemCodec<K> {
+
+ public K decode(byte row[]) throws IOException;
+
+ public byte[] encode(K row) throws IOException;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(DiskBackedList2.class);
+
+ private ItemCodec<K> codec;
+ private List<DBLChunk> chunks;
+ private DBLChunk currentChunk;
+ private int totalItems;
+ private final long maxChunkSizeBytes;
+ private final long maxTotalSizeBytes = Parameter.longValue("max.total.query.size.bytes", 10 * 1000 * 1000 * 1000);
+ private static final int defaultChunkSizeBytes = Parameter.intValue("default.chunk.size.bytes", 16 * 1000 * 1000);
+ private final String filePrefix = "dbl2file-";
+ private final String fileSuffix = ".dat";
+ private final File directory;
+
+ public DiskBackedList2(ItemCodec<K> codec) throws IOException {
+ this(codec, defaultChunkSizeBytes, Files.createTempDir());
+ }
+
+ public DiskBackedList2(ItemCodec<K> codec, long maxChunkSizeBytes, File directory) throws IOException {
+ this.codec = codec;
+ this.maxChunkSizeBytes = maxChunkSizeBytes;
+ this.directory = directory;
+ this.chunks = new ArrayList<DBLChunk>();
+ this.currentChunk = addChunk();
+ this.totalItems = 0;
+ }
+
+ public void setCodec(ItemCodec<K> codec) {
+ this.codec = codec;
+ }
+
+ @Override
+ protected void finalize() {
+ chunks.clear();
+ currentChunk = null;
+ }
+
+ public String toString() {
+ return "DBL: rows=" + totalItems + ", numChunks=" + chunks.size() + ", maxChunkSize=" + maxChunkSizeBytes;
+ }
+
+ // Which chunk is this element on and where is the element in that list?
+ private Pair<Integer, Integer> indicesForElement(int elementIndex) {
+ int chunkIndex = 0;
+ int remaining = elementIndex;
+ for (DBLChunk chunk : chunks) {
+ int numEntries = chunk.getNumEntries();
+ if (remaining >= numEntries) {
+ remaining -= numEntries;
+ chunkIndex += 1;
+ } else {
+ break;
+ }
+ }
+ return Pair.of(chunkIndex, remaining);
+ }
+
+ private void loadChunk(int chunkIndex) throws IOException {
+ if (currentChunk == null) {
+ loadChunkFully(chunkIndex);
+ } else if (currentChunk.getIndex() != chunkIndex) {
+ currentChunk.saveToFile();
+ currentChunk.clear();
+ loadChunkFully(chunkIndex);
+ }
+ }
+
+ private void installChunk(int index, DBLChunk chunk) {
+ try {
+ while (index >= chunks.size()) {
+ chunks.add(new DBLChunk(chunks.size(), directory));
+ }
+ chunks.set(index, chunk);
+ totalItems += chunk.getNumEntries();
+ currentChunk = null;
+ } catch (IOException io) {
+ log.warn("exception during install chunk: ", io);
+ }
+ }
+
+ @Override
+ public boolean add(K element) {
+ try {
+ loadChunk(chunks.size() - 1);
+ if (!currentChunk.hasRoom()) {
+ currentChunk = addChunk();
+ }
+ currentChunk.store(element);
+ totalItems++;
+ return true;
+ } catch (IOException ex) {
+ log.warn("[disk.backed.list] exception while adding " + element, ex);
+ }
+ return false;
+ }
+
+ @Override
+ /**
+ * This function adds an element to a particular location.
+ * TODO does not presently respect maxChunkSize
+ */
+ public void add(int elementIndex, K element) {
+ Pair<Integer, Integer> indices = indicesForElement(elementIndex);
+ try {
+ loadChunk(indices.getLeft());
+ currentChunk.store(indices.getRight(), element);
+ totalItems++;
+ } catch (IOException io) {
+ log.warn("[disk.backed.list] exception while adding " + element, io);
+ }
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends K> c) {
+ for (K k : c) {
+ add(k);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends K> c) {
+ for (K k : c) {
+ add(index++, k);
+ }
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ for (DBLChunk chunk : chunks) {
+ chunk.deleteFile();
+ }
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return indexOf(o) >= 0;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Iterator<?> iter = c.iterator(); iter.hasNext();) {
+ if (!contains(iter.next())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public K get(int elementIndex) {
+ try {
+ Pair<Integer, Integer> indices = indicesForElement(elementIndex);
+ loadChunk(indices.getLeft());
+ return currentChunk.get(indices.getRight());
+ } catch (IOException ex) {
+ log.warn("[disk.backed.list] exception while getting element " + elementIndex, ex);
+ return null;
+ }
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ int index = 0;
+ //noinspection unchecked
+ for (K elt : (Iterable<K>) iterator()) {
+ if (elt.equals(o)) {
+ return index;
+ }
+ index++;
+ }
+ return -1;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return chunks.isEmpty();
+ }
+
+ @Override
+ public Iterator<K> iterator() {
+ return listIterator();
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ int index = 0;
+ int found = -1;
+ //noinspection unchecked
+ for (K elt : (Iterable<K>) iterator()) {
+ if (elt.equals(o)) {
+ found = index;
+ }
+ index++;
+ }
+ return found;
+ }
+
+ @Override
+ public ListIterator<K> listIterator() {
+ return listIterator(0);
+ }
+
+ @Override
+ public ListIterator<K> listIterator(final int ind) {
+ try {
+ if (currentChunk != null) {
+ currentChunk.saveToFile();
+ }
+ } catch (IOException ex) {
+ log.warn("[disk.backed.list] exception during saving", ex);
+ return null;
+ }
+ return new ListIterator<K>() {
+
+ private int index = ind;
+
+ @Override
+ public boolean hasNext() {
+ return index < totalItems;
+ }
+
+ @Override
+ public K next() {
+ return get(index++);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(K e) {
+ DiskBackedList2.this.add(e);
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ return index > 0;
+ }
+
+ @Override
+ public int nextIndex() {
+ return index;
+ }
+
+ @Override
+ public K previous() {
+ return get(index--);
+ }
+
+ @Override
+ public int previousIndex() {
+ return index - 1;
+ }
+
+ @Override
+ public void set(K e) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return remove(indexOf(o)) != null;
+ }
+
+ @Override
+ public K remove(int elementIndex) {
+ Pair<Integer, Integer> indices = indicesForElement(elementIndex);
+ try {
+ loadChunk(indices.getLeft());
+ } catch (IOException io) {
+ log.warn("[disk.backed.list] io exception during remove operation", io);
+ return null;
+ }
+ K elt = currentChunk.remove(indices.getRight().intValue());
+ totalItems -= 1;
+ return elt;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ boolean success = true;
+ for (Iterator<?> iter = c.iterator(); iter.hasNext();) {
+ if (!remove(iter.next())) {
+ success = false;
+ }
+ }
+ return success;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public K set