Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add rabbitmq license

  • Loading branch information...
commit 5c1ab49ab6db9a68ff9adbbd0344afdab92aab2c 1 parent 469ea2f
@erylee authored
Showing with 10,319 additions and 45 deletions.
  1. +1 −1  .gitignore
  2. +455 −0 LICENSE-MPL-RabbitMQ
  3. +10 −1 README.md
  4. +186 −0 rel/files/emqtt
  5. +56 −0 rel/files/emqtt.cmd
  6. +34 −0 rel/files/erl
  7. +138 −0 rel/files/nodetool
  8. +39 −0 rel/files/start_erl.cmd
  9. +38 −0 rel/files/sys.config
  10. +33 −0 rel/files/vm.args
  11. +6 −2 rel/reltool.config
  12. +35 −0 src/apps/emqtt/ebin/emqtt.app
  13. +32 −1 src/apps/emqtt/include/emqtt.hrl
  14. +28 −0 src/apps/emqtt/include/gm_specs.hrl
  15. +35 −12 src/apps/emqtt/src/emqtt.app.src
  16. +357 −0 src/apps/emqtt/src/emqtt.erl
  17. +112 −0 src/apps/emqtt/src/emqtt_client.erl
  18. +38 −8 src/apps/emqtt/src/emqtt_client_sup.erl
  19. +57 −0 src/apps/emqtt/src/emqtt_connection_sup.erl
  20. +282 −0 src/apps/emqtt/src/emqtt_file.erl
  21. +134 −0 src/apps/emqtt/src/emqtt_log.erl
  22. +731 −0 src/apps/emqtt/src/emqtt_misc.erl
  23. +314 −0 src/apps/emqtt/src/emqtt_mnesia.erl
  24. +143 −0 src/apps/emqtt/src/emqtt_net.erl
  25. +382 −0 src/apps/emqtt/src/emqtt_networking.erl
  26. +253 −6 src/apps/emqtt/src/emqtt_packet.erl
  27. +169 −0 src/apps/emqtt/src/emqtt_reader.erl
  28. +42 −0 src/apps/emqtt/src/emqtt_restartable_sup.erl
  29. +92 −0 src/apps/emqtt/src/emqtt_router.erl
  30. +65 −14 src/apps/emqtt/src/emqtt_sup.erl
  31. +67 −0 src/apps/emqtt/src/emqtt_types.erl
  32. +1,201 −0 src/apps/emqtt/src/file_handle_cache.erl
  33. +1,195 −0 src/apps/emqtt/src/gen_server2.erl
  34. +1,388 −0 src/apps/emqtt/src/gm.erl
  35. +90 −0 src/apps/emqtt/src/lqueue.erl
  36. +213 −0 src/apps/emqtt/src/pg_local.erl
  37. +194 −0 src/apps/emqtt/src/priority_queue.erl
  38. +1,071 −0 src/apps/emqtt/src/supervisor2.erl
  39. +106 −0 src/apps/emqtt/src/tcp_acceptor.erl
  40. +39 −0 src/apps/emqtt/src/tcp_acceptor_sup.erl
  41. +92 −0 src/apps/emqtt/src/tcp_listener.erl
  42. +66 −0 src/apps/emqtt/src/tcp_listener_sup.erl
  43. +141 −0 src/apps/emqtt/src/worker_pool.erl
  44. +53 −0 src/apps/emqtt/src/worker_pool_sup.erl
  45. +106 −0 src/apps/emqtt/src/worker_pool_worker.erl
View
2  .gitignore
@@ -1,2 +1,2 @@
-rel/files
+rel/emqtt/*
src/apps/emqtt/ebin/*.beam
View
455 LICENSE-MPL-RabbitMQ
@@ -0,0 +1,455 @@
+ MOZILLA PUBLIC LICENSE
+ Version 1.1
+
+ ---------------
+
+1. Definitions.
+
+ 1.0.1. "Commercial Use" means distribution or otherwise making the
+ Covered Code available to a third party.
+
+ 1.1. "Contributor" means each entity that creates or contributes to
+ the creation of Modifications.
+
+ 1.2. "Contributor Version" means the combination of the Original
+ Code, prior Modifications used by a Contributor, and the Modifications
+ made by that particular Contributor.
+
+ 1.3. "Covered Code" means the Original Code or Modifications or the
+ combination of the Original Code and Modifications, in each case
+ including portions thereof.
+
+ 1.4. "Electronic Distribution Mechanism" means a mechanism generally
+ accepted in the software development community for the electronic
+ transfer of data.
+
+ 1.5. "Executable" means Covered Code in any form other than Source
+ Code.
+
+ 1.6. "Initial Developer" means the individual or entity identified
+ as the Initial Developer in the Source Code notice required by Exhibit
+ A.
+
+ 1.7. "Larger Work" means a work which combines Covered Code or
+ portions thereof with code not governed by the terms of this License.
+
+ 1.8. "License" means this document.
+
+ 1.8.1. "Licensable" means having the right to grant, to the maximum
+ extent possible, whether at the time of the initial grant or
+ subsequently acquired, any and all of the rights conveyed herein.
+
+ 1.9. "Modifications" means any addition to or deletion from the
+ substance or structure of either the Original Code or any previous
+ Modifications. When Covered Code is released as a series of files, a
+ Modification is:
+ A. Any addition to or deletion from the contents of a file
+ containing Original Code or previous Modifications.
+
+ B. Any new file that contains any part of the Original Code or
+ previous Modifications.
+
+ 1.10. "Original Code" means Source Code of computer software code
+ which is described in the Source Code notice required by Exhibit A as
+ Original Code, and which, at the time of its release under this
+ License is not already Covered Code governed by this License.
+
+ 1.10.1. "Patent Claims" means any patent claim(s), now owned or
+ hereafter acquired, including without limitation, method, process,
+ and apparatus claims, in any patent Licensable by grantor.
+
+ 1.11. "Source Code" means the preferred form of the Covered Code for
+ making modifications to it, including all modules it contains, plus
+ any associated interface definition files, scripts used to control
+ compilation and installation of an Executable, or source code
+ differential comparisons against either the Original Code or another
+ well known, available Covered Code of the Contributor's choice. The
+ Source Code can be in a compressed or archival form, provided the
+ appropriate decompression or de-archiving software is widely available
+ for no charge.
+
+ 1.12. "You" (or "Your") means an individual or a legal entity
+ exercising rights under, and complying with all of the terms of, this
+ License or a future version of this License issued under Section 6.1.
+ For legal entities, "You" includes any entity which controls, is
+ controlled by, or is under common control with You. For purposes of
+ this definition, "control" means (a) the power, direct or indirect,
+ to cause the direction or management of such entity, whether by
+ contract or otherwise, or (b) ownership of more than fifty percent
+ (50%) of the outstanding shares or beneficial ownership of such
+ entity.
+
+2. Source Code License.
+
+ 2.1. The Initial Developer Grant.
+ The Initial Developer hereby grants You a world-wide, royalty-free,
+ non-exclusive license, subject to third party intellectual property
+ claims:
+ (a) under intellectual property rights (other than patent or
+ trademark) Licensable by Initial Developer to use, reproduce,
+ modify, display, perform, sublicense and distribute the Original
+ Code (or portions thereof) with or without Modifications, and/or
+ as part of a Larger Work; and
+
+ (b) under Patents Claims infringed by the making, using or
+ selling of Original Code, to make, have made, use, practice,
+ sell, and offer for sale, and/or otherwise dispose of the
+ Original Code (or portions thereof).
+
+ (c) the licenses granted in this Section 2.1(a) and (b) are
+ effective on the date Initial Developer first distributes
+ Original Code under the terms of this License.
+
+ (d) Notwithstanding Section 2.1(b) above, no patent license is
+ granted: 1) for code that You delete from the Original Code; 2)
+ separate from the Original Code; or 3) for infringements caused
+ by: i) the modification of the Original Code or ii) the
+ combination of the Original Code with other software or devices.
+
+ 2.2. Contributor Grant.
+ Subject to third party intellectual property claims, each Contributor
+ hereby grants You a world-wide, royalty-free, non-exclusive license
+
+ (a) under intellectual property rights (other than patent or
+ trademark) Licensable by Contributor, to use, reproduce, modify,
+ display, perform, sublicense and distribute the Modifications
+ created by such Contributor (or portions thereof) either on an
+ unmodified basis, with other Modifications, as Covered Code
+ and/or as part of a Larger Work; and
+
+ (b) under Patent Claims infringed by the making, using, or
+ selling of Modifications made by that Contributor either alone
+ and/or in combination with its Contributor Version (or portions
+ of such combination), to make, use, sell, offer for sale, have
+ made, and/or otherwise dispose of: 1) Modifications made by that
+ Contributor (or portions thereof); and 2) the combination of
+ Modifications made by that Contributor with its Contributor
+ Version (or portions of such combination).
+
+ (c) the licenses granted in Sections 2.2(a) and 2.2(b) are
+ effective on the date Contributor first makes Commercial Use of
+ the Covered Code.
+
+ (d) Notwithstanding Section 2.2(b) above, no patent license is
+ granted: 1) for any code that Contributor has deleted from the
+ Contributor Version; 2) separate from the Contributor Version;
+ 3) for infringements caused by: i) third party modifications of
+ Contributor Version or ii) the combination of Modifications made
+ by that Contributor with other software (except as part of the
+ Contributor Version) or other devices; or 4) under Patent Claims
+ infringed by Covered Code in the absence of Modifications made by
+ that Contributor.
+
+3. Distribution Obligations.
+
+ 3.1. Application of License.
+ The Modifications which You create or to which You contribute are
+ governed by the terms of this License, including without limitation
+ Section 2.2. The Source Code version of Covered Code may be
+ distributed only under the terms of this License or a future version
+ of this License released under Section 6.1, and You must include a
+ copy of this License with every copy of the Source Code You
+ distribute. You may not offer or impose any terms on any Source Code
+ version that alters or restricts the applicable version of this
+ License or the recipients' rights hereunder. However, You may include
+ an additional document offering the additional rights described in
+ Section 3.5.
+
+ 3.2. Availability of Source Code.
+ Any Modification which You create or to which You contribute must be
+ made available in Source Code form under the terms of this License
+ either on the same media as an Executable version or via an accepted
+ Electronic Distribution Mechanism to anyone to whom you made an
+ Executable version available; and if made available via Electronic
+ Distribution Mechanism, must remain available for at least twelve (12)
+ months after the date it initially became available, or at least six
+ (6) months after a subsequent version of that particular Modification
+ has been made available to such recipients. You are responsible for
+ ensuring that the Source Code version remains available even if the
+ Electronic Distribution Mechanism is maintained by a third party.
+
+ 3.3. Description of Modifications.
+ You must cause all Covered Code to which You contribute to contain a
+ file documenting the changes You made to create that Covered Code and
+ the date of any change. You must include a prominent statement that
+ the Modification is derived, directly or indirectly, from Original
+ Code provided by the Initial Developer and including the name of the
+ Initial Developer in (a) the Source Code, and (b) in any notice in an
+ Executable version or related documentation in which You describe the
+ origin or ownership of the Covered Code.
+
+ 3.4. Intellectual Property Matters
+ (a) Third Party Claims.
+ If Contributor has knowledge that a license under a third party's
+ intellectual property rights is required to exercise the rights
+ granted by such Contributor under Sections 2.1 or 2.2,
+ Contributor must include a text file with the Source Code
+ distribution titled "LEGAL" which describes the claim and the
+ party making the claim in sufficient detail that a recipient will
+ know whom to contact. If Contributor obtains such knowledge after
+ the Modification is made available as described in Section 3.2,
+ Contributor shall promptly modify the LEGAL file in all copies
+ Contributor makes available thereafter and shall take other steps
+ (such as notifying appropriate mailing lists or newsgroups)
+ reasonably calculated to inform those who received the Covered
+ Code that new knowledge has been obtained.
+
+ (b) Contributor APIs.
+ If Contributor's Modifications include an application programming
+ interface and Contributor has knowledge of patent licenses which
+ are reasonably necessary to implement that API, Contributor must
+ also include this information in the LEGAL file.
+
+ (c) Representations.
+ Contributor represents that, except as disclosed pursuant to
+ Section 3.4(a) above, Contributor believes that Contributor's
+ Modifications are Contributor's original creation(s) and/or
+ Contributor has sufficient rights to grant the rights conveyed by
+ this License.
+
+ 3.5. Required Notices.
+ You must duplicate the notice in Exhibit A in each file of the Source
+ Code. If it is not possible to put such notice in a particular Source
+ Code file due to its structure, then You must include such notice in a
+ location (such as a relevant directory) where a user would be likely
+ to look for such a notice. If You created one or more Modification(s)
+ You may add your name as a Contributor to the notice described in
+ Exhibit A. You must also duplicate this License in any documentation
+ for the Source Code where You describe recipients' rights or ownership
+ rights relating to Covered Code. You may choose to offer, and to
+ charge a fee for, warranty, support, indemnity or liability
+ obligations to one or more recipients of Covered Code. However, You
+ may do so only on Your own behalf, and not on behalf of the Initial
+ Developer or any Contributor. You must make it absolutely clear than
+ any such warranty, support, indemnity or liability obligation is
+ offered by You alone, and You hereby agree to indemnify the Initial
+ Developer and every Contributor for any liability incurred by the
+ Initial Developer or such Contributor as a result of warranty,
+ support, indemnity or liability terms You offer.
+
+ 3.6. Distribution of Executable Versions.
+ You may distribute Covered Code in Executable form only if the
+ requirements of Section 3.1-3.5 have been met for that Covered Code,
+ and if You include a notice stating that the Source Code version of
+ the Covered Code is available under the terms of this License,
+ including a description of how and where You have fulfilled the
+ obligations of Section 3.2. The notice must be conspicuously included
+ in any notice in an Executable version, related documentation or
+ collateral in which You describe recipients' rights relating to the
+ Covered Code. You may distribute the Executable version of Covered
+ Code or ownership rights under a license of Your choice, which may
+ contain terms different from this License, provided that You are in
+ compliance with the terms of this License and that the license for the
+ Executable version does not attempt to limit or alter the recipient's
+ rights in the Source Code version from the rights set forth in this
+ License. If You distribute the Executable version under a different
+ license You must make it absolutely clear that any terms which differ
+ from this License are offered by You alone, not by the Initial
+ Developer or any Contributor. You hereby agree to indemnify the
+ Initial Developer and every Contributor for any liability incurred by
+ the Initial Developer or such Contributor as a result of any such
+ terms You offer.
+
+ 3.7. Larger Works.
+ You may create a Larger Work by combining Covered Code with other code
+ not governed by the terms of this License and distribute the Larger
+ Work as a single product. In such a case, You must make sure the
+ requirements of this License are fulfilled for the Covered Code.
+
+4. Inability to Comply Due to Statute or Regulation.
+
+ If it is impossible for You to comply with any of the terms of this
+ License with respect to some or all of the Covered Code due to
+ statute, judicial order, or regulation then You must: (a) comply with
+ the terms of this License to the maximum extent possible; and (b)
+ describe the limitations and the code they affect. Such description
+ must be included in the LEGAL file described in Section 3.4 and must
+ be included with all distributions of the Source Code. Except to the
+ extent prohibited by statute or regulation, such description must be
+ sufficiently detailed for a recipient of ordinary skill to be able to
+ understand it.
+
+5. Application of this License.
+
+ This License applies to code to which the Initial Developer has
+ attached the notice in Exhibit A and to related Covered Code.
+
+6. Versions of the License.
+
+ 6.1. New Versions.
+ Netscape Communications Corporation ("Netscape") may publish revised
+ and/or new versions of the License from time to time. Each version
+ will be given a distinguishing version number.
+
+ 6.2. Effect of New Versions.
+ Once Covered Code has been published under a particular version of the
+ License, You may always continue to use it under the terms of that
+ version. You may also choose to use such Covered Code under the terms
+ of any subsequent version of the License published by Netscape. No one
+ other than Netscape has the right to modify the terms applicable to
+ Covered Code created under this License.
+
+ 6.3. Derivative Works.
+ If You create or use a modified version of this License (which you may
+ only do in order to apply it to code which is not already Covered Code
+ governed by this License), You must (a) rename Your license so that
+ the phrases "Mozilla", "MOZILLAPL", "MOZPL", "Netscape",
+ "MPL", "NPL" or any confusingly similar phrase do not appear in your
+ license (except to note that your license differs from this License)
+ and (b) otherwise make it clear that Your version of the license
+ contains terms which differ from the Mozilla Public License and
+ Netscape Public License. (Filling in the name of the Initial
+ Developer, Original Code or Contributor in the notice described in
+ Exhibit A shall not of themselves be deemed to be modifications of
+ this License.)
+
+7. DISCLAIMER OF WARRANTY.
+
+ COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS,
+ WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+ WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF
+ DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING.
+ THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE COVERED CODE
+ IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE IN ANY RESPECT,
+ YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUME THE
+ COST OF ANY NECESSARY SERVICING, REPAIR OR CORRECTION. THIS DISCLAIMER
+ OF WARRANTY CONSTITUTES AN ESSENTIAL PART OF THIS LICENSE. NO USE OF
+ ANY COVERED CODE IS AUTHORIZED HEREUNDER EXCEPT UNDER THIS DISCLAIMER.
+
+8. TERMINATION.
+
+ 8.1. This License and the rights granted hereunder will terminate
+ automatically if You fail to comply with terms herein and fail to cure
+ such breach within 30 days of becoming aware of the breach. All
+ sublicenses to the Covered Code which are properly granted shall
+ survive any termination of this License. Provisions which, by their
+ nature, must remain in effect beyond the termination of this License
+ shall survive.
+
+ 8.2. If You initiate litigation by asserting a patent infringement
+ claim (excluding declatory judgment actions) against Initial Developer
+ or a Contributor (the Initial Developer or Contributor against whom
+ You file such action is referred to as "Participant") alleging that:
+
+ (a) such Participant's Contributor Version directly or indirectly
+ infringes any patent, then any and all rights granted by such
+ Participant to You under Sections 2.1 and/or 2.2 of this License
+ shall, upon 60 days notice from Participant terminate prospectively,
+ unless if within 60 days after receipt of notice You either: (i)
+ agree in writing to pay Participant a mutually agreeable reasonable
+ royalty for Your past and future use of Modifications made by such
+ Participant, or (ii) withdraw Your litigation claim with respect to
+ the Contributor Version against such Participant. If within 60 days
+ of notice, a reasonable royalty and payment arrangement are not
+ mutually agreed upon in writing by the parties or the litigation claim
+ is not withdrawn, the rights granted by Participant to You under
+ Sections 2.1 and/or 2.2 automatically terminate at the expiration of
+ the 60 day notice period specified above.
+
+ (b) any software, hardware, or device, other than such Participant's
+ Contributor Version, directly or indirectly infringes any patent, then
+ any rights granted to You by such Participant under Sections 2.1(b)
+ and 2.2(b) are revoked effective as of the date You first made, used,
+ sold, distributed, or had made, Modifications made by that
+ Participant.
+
+ 8.3. If You assert a patent infringement claim against Participant
+ alleging that such Participant's Contributor Version directly or
+ indirectly infringes any patent where such claim is resolved (such as
+ by license or settlement) prior to the initiation of patent
+ infringement litigation, then the reasonable value of the licenses
+ granted by such Participant under Sections 2.1 or 2.2 shall be taken
+ into account in determining the amount or value of any payment or
+ license.
+
+ 8.4. In the event of termination under Sections 8.1 or 8.2 above,
+ all end user license agreements (excluding distributors and resellers)
+ which have been validly granted by You or any distributor hereunder
+ prior to termination shall survive termination.
+
+9. LIMITATION OF LIABILITY.
+
+ UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT
+ (INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL
+ DEVELOPER, ANY OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED CODE,
+ OR ANY SUPPLIER OF ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR
+ ANY INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY
+ CHARACTER INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF GOODWILL,
+ WORK STOPPAGE, COMPUTER FAILURE OR MALFUNCTION, OR ANY AND ALL OTHER
+ COMMERCIAL DAMAGES OR LOSSES, EVEN IF SUCH PARTY SHALL HAVE BEEN
+ INFORMED OF THE POSSIBILITY OF SUCH DAMAGES. THIS LIMITATION OF
+ LIABILITY SHALL NOT APPLY TO LIABILITY FOR DEATH OR PERSONAL INJURY
+ RESULTING FROM SUCH PARTY'S NEGLIGENCE TO THE EXTENT APPLICABLE LAW
+ PROHIBITS SUCH LIMITATION. SOME JURISDICTIONS DO NOT ALLOW THE
+ EXCLUSION OR LIMITATION OF INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO
+ THIS EXCLUSION AND LIMITATION MAY NOT APPLY TO YOU.
+
+10. U.S. GOVERNMENT END USERS.
+
+ The Covered Code is a "commercial item," as that term is defined in
+ 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer
+ software" and "commercial computer software documentation," as such
+ terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48
+ C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 (June 1995),
+ all U.S. Government End Users acquire Covered Code with only those
+ rights set forth herein.
+
+11. MISCELLANEOUS.
+
+ This License represents the complete agreement concerning subject
+ matter hereof. If any provision of this License is held to be
+ unenforceable, such provision shall be reformed only to the extent
+ necessary to make it enforceable. This License shall be governed by
+ California law provisions (except to the extent applicable law, if
+ any, provides otherwise), excluding its conflict-of-law provisions.
+ With respect to disputes in which at least one party is a citizen of,
+ or an entity chartered or registered to do business in the United
+ States of America, any litigation relating to this License shall be
+ subject to the jurisdiction of the Federal Courts of the Northern
+ District of California, with venue lying in Santa Clara County,
+ California, with the losing party responsible for costs, including
+ without limitation, court costs and reasonable attorneys' fees and
+ expenses. The application of the United Nations Convention on
+ Contracts for the International Sale of Goods is expressly excluded.
+ Any law or regulation which provides that the language of a contract
+ shall be construed against the drafter shall not apply to this
+ License.
+
+12. RESPONSIBILITY FOR CLAIMS.
+
+ As between Initial Developer and the Contributors, each party is
+ responsible for claims and damages arising, directly or indirectly,
+ out of its utilization of rights under this License and You agree to
+ work with Initial Developer and Contributors to distribute such
+ responsibility on an equitable basis. Nothing herein is intended or
+ shall be deemed to constitute any admission of liability.
+
+13. MULTIPLE-LICENSED CODE.
+
+ Initial Developer may designate portions of the Covered Code as
+ "Multiple-Licensed". "Multiple-Licensed" means that the Initial
+ Developer permits you to utilize portions of the Covered Code under
+ Your choice of the NPL or the alternative licenses, if any, specified
+ by the Initial Developer in the file described in Exhibit A.
+
+EXHIBIT A -Mozilla Public License.
+
+ ``The contents of this file are subject to the Mozilla Public License
+ Version 1.1 (the "License"); you may not use this file except in
+ compliance with the License. You may obtain a copy of the License at
+ http://www.mozilla.org/MPL/
+
+ Software distributed under the License is distributed on an "AS IS"
+ basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ License for the specific language governing rights and limitations
+ under the License.
+
+ The Original Code is RabbitMQ Visualiser.
+
+ The Initial Developer of the Original Code is VMware, Inc.
+ Copyright (c) 2011-2011 VMware, Inc. All rights reserved.''
+
+ [NOTE: The text of this Exhibit A may differ slightly from the text of
+ the notices in the Source Code files of the Original Code. You should
+ use the text of this Exhibit A rather than the text found in the
+ Original Code Source Code for Your Modifications.]
View
11 README.md
@@ -1,9 +1,16 @@
# eMQTT
-Erlang MQTT Server
+Erlang MQTT Server is based on RabbitMQ Network Framework.
# Build
+NOTICE: should install rebar first
+
+git clone https://github.com/basho/rebar
+cd rebar
+./bootstrap
+cp rebar .
+
make
# Run
@@ -20,3 +27,5 @@ cd emqtt
The first release is based on mqtt4erl.
+www.rabbitmq.com
+
View
186 rel/files/emqtt
@@ -0,0 +1,186 @@
+#!/bin/sh
+# -*- tab-width:4;indent-tabs-mode:nil -*-
+# ex: ts=4 sw=4 et
+
+RUNNER_SCRIPT_DIR=$(cd ${0%/*} && pwd)
+
+RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*}
+RUNNER_ETC_DIR=$RUNNER_BASE_DIR/etc
+RUNNER_LOG_DIR=$RUNNER_BASE_DIR/log
+# Note the trailing slash on $PIPE_DIR/
+PIPE_DIR=/tmp/$RUNNER_BASE_DIR/
+RUNNER_USER=
+
+# Make sure this script is running as the appropriate user
+if [ ! -z "$RUNNER_USER" ] && [ `whoami` != "$RUNNER_USER" ]; then
+ exec sudo -u $RUNNER_USER -i $0 $@
+fi
+
+# Make sure CWD is set to runner base dir
+cd $RUNNER_BASE_DIR
+
+# Make sure log directory exists
+mkdir -p $RUNNER_LOG_DIR
+# Identify the script name
+SCRIPT=`basename $0`
+
+# Parse out release and erts info
+START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data`
+ERTS_VSN=${START_ERL% *}
+APP_VSN=${START_ERL#* }
+
+# Use releases/VSN/vm.args if it exists otherwise use etc/vm.args
+if [ -e "$RUNNER_BASE_DIR/releases/$APP_VSN/vm.args" ]; then
+ VMARGS_PATH="$RUNNER_BASE_DIR/releases/$APP_VSN/vm.args"
+else
+ VMARGS_PATH="$RUNNER_ETC_DIR/vm.args"
+fi
+
+# Use releases/VSN/sys.config if it exists otherwise use etc/app.config
+if [ -e "$RUNNER_BASE_DIR/releases/$APP_VSN/sys.config" ]; then
+ CONFIG_PATH="$RUNNER_BASE_DIR/releases/$APP_VSN/sys.config"
+else
+ CONFIG_PATH="$RUNNER_ETC_DIR/app.config"
+fi
+
+# Extract the target node name from node.args
+NAME_ARG=`egrep '^-s?name' $VMARGS_PATH`
+if [ -z "$NAME_ARG" ]; then
+ echo "vm.args needs to have either -name or -sname parameter."
+ exit 1
+fi
+
+# Extract the target cookie
+COOKIE_ARG=`grep '^-setcookie' $VMARGS_PATH`
+if [ -z "$COOKIE_ARG" ]; then
+ echo "vm.args needs to have a -setcookie parameter."
+ exit 1
+fi
+
+# Add ERTS bin dir to our path
+ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin
+
+# Setup command to control the node
+NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG"
+
+# Check the first argument for instructions
+case "$1" in
+ start)
+ # Make sure there is not already a node running
+ RES=`$NODETOOL ping`
+ if [ "$RES" = "pong" ]; then
+ echo "Node is already running!"
+ exit 1
+ fi
+ HEART_COMMAND="$RUNNER_BASE_DIR/bin/$SCRIPT start"
+ export HEART_COMMAND
+ mkdir -p $PIPE_DIR
+ shift # remove $1
+ $ERTS_PATH/run_erl -daemon $PIPE_DIR $RUNNER_LOG_DIR "exec $RUNNER_BASE_DIR/bin/$SCRIPT console $@" 2>&1
+ ;;
+
+ stop)
+ # Wait for the node to completely stop...
+ case `uname -s` in
+ Linux|Darwin|FreeBSD|DragonFly|NetBSD|OpenBSD)
+ # PID COMMAND
+ PID=`ps ax -o pid= -o command=|\
+ grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
+ ;;
+ SunOS)
+ # PID COMMAND
+ PID=`ps -ef -o pid= -o args=|\
+ grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
+ ;;
+ CYGWIN*)
+ # UID PID PPID TTY STIME COMMAND
+ PID=`ps -efW|grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $2}'`
+ ;;
+ esac
+ $NODETOOL stop
+ ES=$?
+ if [ "$ES" -ne 0 ]; then
+ exit $ES
+ fi
+ while `kill -0 $PID 2>/dev/null`;
+ do
+ sleep 1
+ done
+ ;;
+
+ restart)
+ ## Restart the VM without exiting the process
+ $NODETOOL restart
+ ES=$?
+ if [ "$ES" -ne 0 ]; then
+ exit $ES
+ fi
+ ;;
+
+ reboot)
+ ## Restart the VM completely (uses heart to restart it)
+ $NODETOOL reboot
+ ES=$?
+ if [ "$ES" -ne 0 ]; then
+ exit $ES
+ fi
+ ;;
+
+ ping)
+ ## See if the VM is alive
+ $NODETOOL ping
+ ES=$?
+ if [ "$ES" -ne 0 ]; then
+ exit $ES
+ fi
+ ;;
+
+ attach)
+ # Make sure a node IS running
+ RES=`$NODETOOL ping`
+ ES=$?
+ if [ "$ES" -ne 0 ]; then
+ echo "Node is not running!"
+ exit $ES
+ fi
+
+ shift
+ exec $ERTS_PATH/to_erl $PIPE_DIR
+ ;;
+
+ console|console_clean)
+ # .boot file typically just $SCRIPT (ie, the app name)
+ # however, for debugging, sometimes start_clean.boot is useful:
+ case "$1" in
+ console) BOOTFILE=$SCRIPT ;;
+ console_clean) BOOTFILE=start_clean ;;
+ esac
+ # Setup beam-required vars
+ ROOTDIR=$RUNNER_BASE_DIR
+ BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin
+ EMU=beam
+ PROGNAME=`echo $0 | sed 's/.*\\///'`
+ CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$BOOTFILE -mode embedded -config $CONFIG_PATH -args_file $VMARGS_PATH -- ${1+"$@"}"
+ export EMU
+ export ROOTDIR
+ export BINDIR
+ export PROGNAME
+
+ # Dump environment info for logging purposes
+ echo "Exec: $CMD"
+ echo "Root: $ROOTDIR"
+
+ # Log the startup
+ logger -t "$SCRIPT[$$]" "Starting up"
+
+ # Start the VM
+ exec $CMD
+ ;;
+
+ *)
+ echo "Usage: $SCRIPT {start|stop|restart|reboot|ping|console|console_clean|attach}"
+ exit 1
+ ;;
+esac
+
+exit 0
View
56 rel/files/emqtt.cmd
@@ -0,0 +1,56 @@
+@setlocal
+
+@set node_name=emqtt
+
+@rem Get the abolute path to the parent directory,
+@rem which is assumed to be the node root.
+@for /F "delims=" %%I in ("%~dp0..") do @set node_root=%%~fI
+
+@set releases_dir=%node_root%\releases
+
+@rem Parse ERTS version and release version from start_erl.data
+@for /F "tokens=1,2" %%I in (%releases_dir%\start_erl.data) do @(
+ @call :set_trim erts_version %%I
+ @call :set_trim release_version %%J
+)
+
+@set erts_bin=%node_root%\erts-%erts_version%\bin
+
+@set service_name=%node_name%_%release_version%
+
+@if "%1"=="install" @goto install
+@if "%1"=="uninstall" @goto uninstall
+@if "%1"=="start" @goto start
+@if "%1"=="stop" @goto stop
+@if "%1"=="restart" @call :stop && @goto start
+@if "%1"=="console" @goto console
+@rem TODO: attach, ping, restart and reboot
+
+:usage
+@echo Usage: %0 {install|uninstall|start|stop|restart|console}
+@goto :EOF
+
+:install
+@%erts_bin%\erlsrv.exe add %service_name% -c "Erlang node %node_name% in %node_root%" -w %node_root% -m %node_root%\bin\start_erl.cmd -args " ++ %node_name% ++ %node_root%" -stopaction "init:stop()."
+@goto :EOF
+
+:uninstall
+@%erts_bin%\erlsrv.exe remove %service_name%
+@%erts_bin%\epmd.exe -kill
+@goto :EOF
+
+:start
+@%erts_bin%\erlsrv.exe start %service_name%
+@goto :EOF
+
+:stop
+@%erts_bin%\erlsrv.exe stop %service_name%
+@goto :EOF
+
+:console
+@start %erts_bin%\werl.exe -boot %releases_dir%\%release_version%\%node_name%
+@goto :EOF
+
+:set_trim
+@set %1=%2
+@goto :EOF
View
34 rel/files/erl
@@ -0,0 +1,34 @@
+#!/bin/sh
+
+## This script replaces the default "erl" in erts-VSN/bin. This is necessary
+## as escript depends on erl and in turn, erl depends on having access to a
+## bootscript (start.boot). Note that this script is ONLY invoked as a side-effect
+## of running escript -- the embedded node bypasses erl and uses erlexec directly
+## (as it should).
+##
+## Note that this script makes the assumption that there is a start_clean.boot
+## file available in $ROOTDIR/release/VSN.
+
+# Determine the abspath of where this script is executing from.
+ERTS_BIN_DIR=$(cd ${0%/*} && pwd)
+
+# Now determine the root directory -- this script runs from erts-VSN/bin,
+# so we simply need to strip off two dirs from the end of the ERTS_BIN_DIR
+# path.
+ROOTDIR=${ERTS_BIN_DIR%/*/*}
+
+# Parse out release and erts info
+START_ERL=`cat $ROOTDIR/releases/start_erl.data`
+ERTS_VSN=${START_ERL% *}
+APP_VSN=${START_ERL#* }
+
+BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin
+EMU=beam
+PROGNAME=`echo $0 | sed 's/.*\\///'`
+CMD="$BINDIR/erlexec"
+export EMU
+export ROOTDIR
+export BINDIR
+export PROGNAME
+
+exec $CMD -boot $ROOTDIR/releases/$APP_VSN/start_clean ${1+"$@"}
View
138 rel/files/nodetool
@@ -0,0 +1,138 @@
+%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+%% -------------------------------------------------------------------
+%%
+%% nodetool: Helper Script for interacting with live nodes
+%%
+%% -------------------------------------------------------------------
+
+main(Args) ->
+ ok = start_epmd(),
+ %% Extract the args
+ {RestArgs, TargetNode} = process_args(Args, [], undefined),
+
+ %% See if the node is currently running -- if it's not, we'll bail
+ case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
+ {true, pong} ->
+ ok;
+ {_, pang} ->
+ io:format("Node ~p not responding to pings.\n", [TargetNode]),
+ halt(1)
+ end,
+
+ case RestArgs of
+ ["ping"] ->
+ %% If we got this far, the node already responsed to a ping, so just dump
+ %% a "pong"
+ io:format("pong\n");
+ ["stop"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]);
+ ["restart"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, restart, [], 60000)]);
+ ["reboot"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, reboot, [], 60000)]);
+ ["rpc", Module, Function | RpcArgs] ->
+ case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
+ [RpcArgs], 60000) of
+ ok ->
+ ok;
+ {badrpc, Reason} ->
+ io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
+ halt(1);
+ _ ->
+ halt(1)
+ end;
+ ["rpcterms", Module, Function, ArgsAsString] ->
+ case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
+ consult(ArgsAsString), 60000) of
+ {badrpc, Reason} ->
+ io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
+ halt(1);
+ Other ->
+ io:format("~p\n", [Other])
+ end;
+ Other ->
+ io:format("Other: ~p\n", [Other]),
+ io:format("Usage: nodetool {ping|stop|restart|reboot}\n")
+ end,
+ net_kernel:stop().
+
+process_args([], Acc, TargetNode) ->
+ {lists:reverse(Acc), TargetNode};
+process_args(["-setcookie", Cookie | Rest], Acc, TargetNode) ->
+ erlang:set_cookie(node(), list_to_atom(Cookie)),
+ process_args(Rest, Acc, TargetNode);
+process_args(["-name", TargetName | Rest], Acc, _) ->
+ ThisNode = append_node_suffix(TargetName, "_maint_"),
+ {ok, _} = net_kernel:start([ThisNode, longnames]),
+ process_args(Rest, Acc, nodename(TargetName));
+process_args(["-sname", TargetName | Rest], Acc, _) ->
+ ThisNode = append_node_suffix(TargetName, "_maint_"),
+ {ok, _} = net_kernel:start([ThisNode, shortnames]),
+ process_args(Rest, Acc, nodename(TargetName));
+process_args([Arg | Rest], Acc, Opts) ->
+ process_args(Rest, [Arg | Acc], Opts).
+
+
+start_epmd() ->
+ [] = os:cmd(epmd_path() ++ " -daemon"),
+ ok.
+
+epmd_path() ->
+ ErtsBinDir = filename:dirname(escript:script_name()),
+ Name = "epmd",
+ case os:find_executable(Name, ErtsBinDir) of
+ false ->
+ case os:find_executable(Name) of
+ false ->
+ io:format("Could not find epmd.~n"),
+ halt(1);
+ GlobalEpmd ->
+ GlobalEpmd
+ end;
+ Epmd ->
+ Epmd
+ end.
+
+
+nodename(Name) ->
+ case string:tokens(Name, "@") of
+ [_Node, _Host] ->
+ list_to_atom(Name);
+ [Node] ->
+ [_, Host] = string:tokens(atom_to_list(node()), "@"),
+ list_to_atom(lists:concat([Node, "@", Host]))
+ end.
+
+append_node_suffix(Name, Suffix) ->
+ case string:tokens(Name, "@") of
+ [Node, Host] ->
+ list_to_atom(lists:concat([Node, Suffix, os:getpid(), "@", Host]));
+ [Node] ->
+ list_to_atom(lists:concat([Node, Suffix, os:getpid()]))
+ end.
+
+
+%%
+%% Given a string or binary, parse it into a list of terms, ala file:consult/0
+%%
+consult(Str) when is_list(Str) ->
+ consult([], Str, []);
+consult(Bin) when is_binary(Bin)->
+ consult([], binary_to_list(Bin), []).
+
+consult(Cont, Str, Acc) ->
+ case erl_scan:tokens(Cont, Str, 0) of
+ {done, Result, Remaining} ->
+ case Result of
+ {ok, Tokens, _} ->
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ consult([], Remaining, [Term | Acc]);
+ {eof, _Other} ->
+ lists:reverse(Acc);
+ {error, Info, _} ->
+ {error, Info}
+ end;
+ {more, Cont1} ->
+ consult(Cont1, eof, Acc)
+ end.
View
39 rel/files/start_erl.cmd
@@ -0,0 +1,39 @@
+@setlocal
+
+@rem Parse arguments. erlsrv.exe prepends erl arguments prior to first ++.
+@rem Other args are position dependent.
+@set args="%*"
+@for /F "delims=++ tokens=1,2,3" %%I in (%args%) do @(
+ @set erl_args=%%I
+ @call :set_trim node_name %%J
+ @call :set_trim node_root %%K
+)
+
+@set releases_dir=%node_root%\releases
+
+@rem parse ERTS version and release version from start_erl.dat
+@for /F "tokens=1,2" %%I in (%releases_dir%\start_erl.data) do @(
+ @call :set_trim erts_version %%I
+ @call :set_trim release_version %%J
+)
+
+@set erl_exe=%node_root%\erts-%erts_version%\bin\erl.exe
+@set boot_file=%releases_dir%\%release_version%\%node_name%
+
+@if exist %releases_dir%\%release_version%\sys.config (
+ @set app_config=%releases_dir%\%release_version%\sys.config
+) else (
+ @set app_config=%node_root%\etc\app.config
+)
+
+@if exist %releases_dir%\%release_version%\vm.args (
+ @set vm_args=%releases_dir%\%release_version%\vm.args
+) else (
+ @set vm_args=%node_root%\etc\vm.args
+)
+
+@%erl_exe% %erl_args% -boot %boot_file% -config %app_config% -args_file %vm_args%
+
+:set_trim
+@set %1=%2
+@goto :EOF
View
38 rel/files/sys.config
@@ -0,0 +1,38 @@
+[
+ %% SASL config
+ {sasl, [
+ {sasl_error_logger, {file, "log/sasl-error.log"}},
+ {errlog_type, error},
+ {error_logger_mf_dir, "log/sasl"}, % Log directory
+ {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size
+ {error_logger_mf_maxfiles, 5} % 5 files max
+ ]},
+ {mnesia, [
+ {dir, "var/data"}
+ ]},
+
+ {emqtt, [{tcp_listeners, [1883]},
+ {vm_memory_high_watermark, 0.4},
+ {frame_max, 131072},
+ {msg_store_file_size_limit, 16777216},
+ {queue_index_max_journal_entries, 262144},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_user_tags, [administrator]},
+ {default_vhost, <<"/">>},
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {cluster_nodes, []},
+ {server_properties, []},
+ {collect_statistics, none},
+ {collect_statistics_interval, 5000},
+ {delegate_count, 16},
+ {trace_vhosts, []},
+ {tcp_listen_options, [binary,
+ {packet, raw},
+ {reuseaddr, true},
+ {backlog, 128},
+ {nodelay, true},
+ {exit_on_close, false}]}
+ ]}
+].
+
View
33 rel/files/vm.args
@@ -0,0 +1,33 @@
+## Name of the node
+-sname emqtt
+
+## Cookie for distributed erlang
+-setcookie emqtt-secret-cookie
+
+##erlang args
++K true
++A 30
++P 1048576
+
+-kernel inet_default_connect_options [{nodelay,true}]
+
+## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
+## (Disabled by default..use with caution!)
+##-heart
+
+## Enable kernel poll and a few async threads
+##+K true
+##+A 5
+
+## Increase number of concurrent ports/sockets
+##-env ERL_MAX_PORTS 4096
+
+## Tweak GC to run more often
+##-env ERL_FULLSWEEP_AFTER 10
+
+-os_mon start_cpu_sup true
+
+-os_mon start_disksup false
+
+-os_mon start_memsup false
+
View
8 rel/reltool.config
@@ -2,11 +2,13 @@
{lib_dirs, ["../src/apps"]},
{erts, [{mod_cond, derived}, {app_file, strip}]},
{app_file, strip},
- {rel, "emqtt", "0.1",
+ {rel, "emqtt", "0.1.0",
[
kernel,
stdlib,
sasl,
+ mnesia,
+ os_mon,
emqtt
]},
{rel, "start_clean", "",
@@ -15,7 +17,7 @@
stdlib
]},
{boot_rel, "emqtt"},
- {profile, embedded},
+ %{profile, embedded},
{incl_cond, exclude},
{excl_archive_filters, [".*"]}, %% Do not archive built libs
{excl_sys_filters, ["^bin/.*", "^erts.*/doc", "^erts.*/src",
@@ -26,6 +28,8 @@
{app, sasl, [{incl_cond, include}]},
{app, stdlib, [{incl_cond, include}]},
{app, kernel, [{incl_cond, include}]},
+ {app, mnesia, [{incl_cond, include}]},
+ {app, os_mon, [{incl_cond, include}]},
{app, emqtt, [{incl_cond, include}]}
]}.
View
35 src/apps/emqtt/ebin/emqtt.app
@@ -0,0 +1,35 @@
+{application,emqtt,
+ [{description,"Erlang MQTT Server"},
+ {id,"eMQTT"},
+ {vsn,"0.1.0"},
+ {modules,
+ [emqtt,emqtt_app,emqtt_client,emqtt_client_sup,emqtt_connection_sup,
+ emqtt_file,emqtt_log,emqtt_misc,emqtt_mnesia,emqtt_net,
+ emqtt_networking,emqtt_packet,emqtt_reader,emqtt_restartable_sup,
+ emqtt_router,emqtt_sup,emqtt_types,file_handle_cache,gen_server2,gm,
+ lqueue,pg_local,priority_queue,supervisor2,tcp_acceptor,
+ tcp_acceptor_sup,tcp_listener,tcp_listener_sup,worker_pool,
+ worker_pool_sup,worker_pool_worker]},
+ {registered,[emqtt_log,emqtt_router,emqtt_sup,emqtt_tcp_client_sup]},
+ {applications,[kernel,stdlib,sasl,mnesia,os_mon]},
+ {mod,{emqtt,[]}},
+ {env,
+ [{hipe_compile,false},
+ {tcp_listeners,[1883]},
+ {vm_memory_high_watermark,0.4},
+ {frame_max,131072},
+ {msg_store_file_size_limit,16777216},
+ {queue_index_max_journal_entries,262144},
+ {cluster_nodes,[]},
+ {server_properties,[]},
+ {collect_statistics,none},
+ {collect_statistics_interval,5000},
+ {delegate_count,16},
+ {trace_vhosts,[]},
+ {tcp_listen_options,
+ [binary,
+ {packet,raw},
+ {reuseaddr,true},
+ {backlog,128},
+ {nodelay,true},
+ {exit_on_close,false}]}]}]}.
View
33 src/apps/emqtt/include/emqtt.hrl
@@ -26,6 +26,10 @@
-define(PINGRESP, 13).
-define(DISCONNECT, 14).
+-record(ssl_socket, {tcp, ssl}).
+
+-record(listener, {node, protocol, host, ip_address, port}).
+
-record(mqtt_client, {
socket,
id_pid,
@@ -53,7 +57,7 @@
dup = 0,
qos = 0,
retain = 0,
- args
+ arg
}).
-record(sub, {
@@ -72,3 +76,30 @@
publish_options = #publish_options{}
}).
+
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
+-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
+-define(ERTS_MINIMUM, "5.6.3").
+
+-define(MAX_WAIT, 16#ffffffff).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
+-define(DELETED_HEADER, <<"BCC">>).
+
+-ifdef(debug).
+-define(LOGDEBUG0(F), rabbit_log:debug(F)).
+-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
+-define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
+-else.
+-define(LOGDEBUG0(F), ok).
+-define(LOGDEBUG(F,A), ok).
+-define(LOGMESSAGE(D,C,M,Co), ok).
+-endif.
+
+-define(LOG(Msg), io:format("{~p:~p ~p}: ~p~n", [?MODULE, ?LINE, self(), Msg])).
+
+
View
28 src/apps/emqtt/include/gm_specs.hrl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-ifdef(use_specs).
+
+-type(callback_result() :: 'ok' | {'stop', any()} | {'become', atom(), args()}).
+-type(args() :: any()).
+-type(members() :: [pid()]).
+
+-spec(joined/2 :: (args(), members()) -> callback_result()).
+-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
+-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
+-spec(terminate/2 :: (args(), term()) -> any()).
+
+-endif.
View
47 src/apps/emqtt/src/emqtt.app.src
@@ -1,12 +1,35 @@
-{application, emqtt,
- [
- {description, "Erlang MQTT Server"},
- {vsn, "0.1"},
- {registered, []},
- {applications, [
- kernel,
- stdlib
- ]},
- {mod, { emqtt_app, []}},
- {env, []}
- ]}.
+{application,emqtt,
+ [{description,"Erlang MQTT Server"},
+ {id,"eMQTT"},
+ {vsn,"0.1.0"},
+ {modules,
+ [emqtt,emqtt_app,emqtt_client,emqtt_client_sup,emqtt_connection_sup,
+ emqtt_file,emqtt_log,emqtt_misc,emqtt_mnesia,emqtt_net,
+ emqtt_networking,emqtt_packet,emqtt_reader,emqtt_restartable_sup,
+ emqtt_router,emqtt_sup,emqtt_types,file_handle_cache,gen_server2,gm,
+ lqueue,pg_local,priority_queue,supervisor2,tcp_acceptor,
+ tcp_acceptor_sup,tcp_listener,tcp_listener_sup,worker_pool,
+ worker_pool_sup,worker_pool_worker]},
+ {registered,[emqtt_log,emqtt_router,emqtt_sup,emqtt_tcp_client_sup]},
+ {applications,[kernel,stdlib,sasl,mnesia,os_mon]},
+ {mod,{emqtt,[]}},
+ {env,
+ [{hipe_compile,false},
+ {tcp_listeners,[1883]},
+ {vm_memory_high_watermark,0.4},
+ {frame_max,131072},
+ {msg_store_file_size_limit,16777216},
+ {queue_index_max_journal_entries,262144},
+ {cluster_nodes,[]},
+ {server_properties,[]},
+ {collect_statistics,none},
+ {collect_statistics_interval,5000},
+ {delegate_count,16},
+ {trace_vhosts,[]},
+ {tcp_listen_options,
+ [binary,
+ {packet,raw},
+ {reuseaddr,true},
+ {backlog,128},
+ {nodelay,true},
+ {exit_on_close,false}]}]}]}.
View
357 src/apps/emqtt/src/emqtt.erl
@@ -1 +1,358 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
-module(emqtt).
+
+-behaviour(application).
+
+-export([start/0, stop/0, stop_and_halt/0,
+ status/0, is_running/0, is_running/1, environment/0]).
+
+-export([start/2, stop/1]).
+
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-emqtt_boot_step({pre_boot, [{description, "emqtt boot start"}]}).
+
+-emqtt_boot_step({database,
+ [{mfa, {emqtt_mnesia, init, []}},
+ {requires, file_handle_cache},
+ {enables, external_infrastructure}]}).
+
+-emqtt_boot_step({file_handle_cache,
+ [{description, "file handle cache server"},
+ {mfa, {emqtt_sup, start_restartable_child,
+ [file_handle_cache]}},
+ {requires, pre_boot},
+ {enables, worker_pool}]}).
+
+-emqtt_boot_step({worker_pool,
+ [{description, "worker pool"},
+ {mfa, {emqtt_sup, start_child, [worker_pool_sup]}},
+ {requires, pre_boot},
+ {enables, external_infrastructure}]}).
+
+-emqtt_boot_step({external_infrastructure,
+ [{description, "external infrastructure ready"}]}).
+
+-emqtt_boot_step({emqtt_log,
+ [{description, "logging server"},
+ {mfa, {emqtt_sup, start_restartable_child,
+ [emqtt_log]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
+-emqtt_boot_step({kernel_ready,
+ [{description, "kernel ready"},
+ {requires, external_infrastructure}]}).
+
+-emqtt_boot_step({core_initialized,
+ [{description, "core initialized"},
+ {requires, kernel_ready}]}).
+
+-emqtt_boot_step({networking,
+ [{mfa, {emqtt_networking, boot, []}},
+ {requires, core_initialized}]}).
+
+
+%%---------------------------------------------------------------------------
+
+-include("emqtt.hrl").
+
+-define(APPS, [sasl, os_mon, mnesia, emqtt]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+%% this really should be an abstract type
+
+-spec(start/0 :: () -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_halt/0 :: () -> no_return()).
+-spec(status/0 ::
+ () -> [{pid, integer()} |
+ {running_applications, [{atom(), string(), string()}]} |
+ {os, {atom(), atom()}} |
+ {erlang_version, string()} |
+ {memory, any()}]).
+-spec(is_running/0 :: () -> boolean()).
+-spec(is_running/1 :: (node()) -> boolean()).
+-spec(environment/0 :: () -> [{atom() | term()}]).
+
+-spec(start/2 :: ('normal',[]) ->
+ {'error',
+ {'erlang_version_too_old',
+ {'found',[any()]},
+ {'required',[any(),...]}}} |
+ {'ok',pid()}).
+-spec(stop/1 :: (_) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ emqtt_misc:start_applications(application_load_order()).
+
+stop() ->
+ emqtt_log:info("Stopping Rabbit~n"),
+ ok = emqtt_misc:stop_applications(application_load_order()).
+
+stop_and_halt() ->
+ try
+ stop()
+ after
+ emqtt_misc:local_info_msg("Halting Erlang VM~n", []),
+ init:stop()
+ end,
+ ok.
+
+status() ->
+ [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications(infinity)},
+ {os, os:type()},
+ {erlang_version, erlang:system_info(system_version)},
+ {memory, erlang:memory()}].
+
+is_running() -> is_running(node()).
+
+is_running(Node) ->
+ case rpc:call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(emqtt, Apps)
+ end.
+
+environment() ->
+ lists:keysort(
+ 1, [P || P = {K, _} <- application:get_all_env(emqtt),
+ K =/= default_pass]).
+
+%%--------------------------------------------------------------------
+
+start(normal, []) ->
+ case erts_version_check() of
+ ok ->
+ {ok, SupPid} = emqtt_sup:start_link(),
+ true = register(emqtt, self()),
+
+ print_banner(),
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
+ io:format("~nbroker running~n"),
+ {ok, SupPid};
+ Error ->
+ Error
+ end.
+
+stop(_State) ->
+ ok.
+
+%%---------------------------------------------------------------------------
+%% application life cycle
+
+application_load_order() ->
+ ok = load_applications(),
+ {ok, G} = emqtt_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ [{App, app_dependencies(App)} ||
+ {App, _Desc, _Vsn} <- application:loaded_applications()]),
+ true = digraph:del_vertices(
+ G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)),
+ Result = digraph_utils:topsort(G),
+ true = digraph:delete(G),
+ Result.
+
+load_applications() ->
+ load_applications(queue:from_list(?APPS), sets:new()).
+
+load_applications(Worklist, Loaded) ->
+ case queue:out(Worklist) of
+ {empty, _WorkList} ->
+ ok;
+ {{value, App}, Worklist1} ->
+ case sets:is_element(App, Loaded) of
+ true -> load_applications(Worklist1, Loaded);
+ false -> case application:load(App) of
+ ok -> ok;
+ {error, {already_loaded, App}} -> ok;
+ Error -> throw(Error)
+ end,
+ load_applications(
+ queue:join(Worklist1,
+ queue:from_list(app_dependencies(App))),
+ sets:add_element(App, Loaded))
+ end
+ end.
+
+app_dependencies(App) ->
+ case application:get_key(App, applications) of
+ undefined -> [];
+ {ok, Lst} -> Lst
+ end.
+
+%%---------------------------------------------------------------------------
+%% boot step logic
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("-- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-60s ...", [Description]),
+ [try
+ apply(M,F,A)
+ catch
+ _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
+ [Reason, erlang:get_stacktrace()])
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ sort_boot_steps(emqtt_misc:all_module_attributes(emqtt_boot_step)).
+
+vertices(_Module, Steps) ->
+ [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+
+edges(_Module, Steps) ->
+ [case Key of
+ requires -> {StepName, OtherStep};
+ enables -> {OtherStep, StepName}
+ end || {StepName, Atts} <- Steps,
+ {Key, OtherStep} <- Atts,
+ Key =:= requires orelse Key =:= enables].
+
+sort_boot_steps(UnsortedSteps) ->
+ case emqtt_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error(
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end])
+ end.
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+
+%%---------------------------------------------------------------------------
+%% misc
+
+erts_version_check() ->
+ FoundVer = erlang:system_info(version),
+ case emqtt_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
+ true -> ok;
+ false -> {error, {erlang_version_too_old,
+ {found, FoundVer}, {required, ?ERTS_MINIMUM}}}
+ end.
+
+print_banner() ->
+ {ok, Product} = application:get_key(id),
+ {ok, Version} = application:get_key(vsn),
+ ProductLen = string:len(Product),
+ io:format("~n"
+ "+---+ +---+~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| +---+ +-------+~n"
+ "| |~n"
+ "| ~s +---+ |~n"
+ "| | | |~n"
+ "| ~s +---+ |~n"
+ "| |~n"
+ "+-------------------+~n"
+ "~s~n~s~n~s~n~n",
+ [Product, string:right([$v|Version], ProductLen),
+ ?PROTOCOL_VERSION,
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ Settings = [{"node", node()},
+ {"app descriptor", app_location()},
+ {"home dir", home_dir()},
+ {"config file(s)", config_files()},
+ {"cookie hash", emqtt_misc:cookie_hash()},
+ {"database dir", emqtt_mnesia:dir()},
+ {"erlang version", erlang:system_info(version)}],
+ DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
+ Format = fun (K, V) ->
+ io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ [K, V])
+ end,
+ lists:foreach(fun ({"config file(s)" = K, []}) ->
+ Format(K, "(none)");
+ ({"config file(s)" = K, [V0 | Vs]}) ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ ({K, V}) ->
+ Format(K, V)
+ end, Settings),
+ io:nl().
+
+app_location() ->
+ {ok, Application} = application:get_application(),
+ filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+
+home_dir() ->
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> Home;
+ Other -> Other
+ end.
+
+config_files() ->
+ case init:get_argument(config) of
+ {ok, Files} -> [filename:absname(
+ filename:rootname(File, ".config") ++ ".config") ||
+ File <- Files];
+ error -> []
+ end.
View
112 src/apps/emqtt/src/emqtt_client.erl
@@ -1,3 +1,115 @@
-module(emqtt_client).
+-include("emqtt.hrl").
+
+-export([start_link/1]).
+
+-behaviour(gen_server2).
+
+-record(state, {sock}).
+
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ %handle_pre_hibernate/1,
+ %prioritise_call/3,
+ %prioritise_cast/2,
+ %prioritise_info/2
+ ]).
+
+
+-spec start_link(term())->{ok,pid()} | ignore | {error,term()}.
+
+start_link(Sock) ->
+ gen_server2:start_link(?MODULE, [Sock], []).
+
+init([Sock]) ->
+ {ok, #state{sock = Sock}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(_Req, _From, State) ->
+ {reply, {error, badreq}, State}.
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(#mqtt_packet{type = ?CONNECT}, #state{sock = Sock} = State) ->
+ Reply = #mqtt_packet{type = ?CONNACK, arg = 0},
+ send(Reply, Sock),
+ {noreply, State};
+
+handle_cast(Msg, State) ->
+ io:format("badmsg: ~p~n", [Msg]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+send(#mqtt_packet{} = Message, Socket) ->
+%%?LOG({mqtt_core, send, pretty(Message)}),
+ {VariableHeader, Payload} = emqtt_packet:encode_message(Message),
+ ok = send(emqtt_packet:encode_fixed_header(Message), Socket),
+ ok = send_length(size(VariableHeader) + size(Payload), Socket),
+ ok = send(VariableHeader, Socket),
+ ok = send(Payload, Socket);
+send(<<>>, _Socket) ->
+%%?LOG({send, no_bytes}),
+ ok;
+send(Bytes, Socket) when is_binary(Bytes) ->
+%%?LOG({send,bytes,binary_to_list(Bytes)}),
+ case gen_tcp:send(Socket, Bytes) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ ?LOG({send, socket, error, Reason}),
+ exit(Reason)
+ end.
+
+
+
+send_length(Length, Socket) when Length div 128 > 0 ->
+ Digit = Length rem 128,
+ send(<<1:1, Digit:7/big>>, Socket),
+ send_length(Length div 128, Socket);
+send_length(Length, Socket) ->
+ Digit = Length rem 128,
+ send(<<0:1, Digit:7/big>>, Socket).
+
+
View
46 src/apps/emqtt/src/emqtt_client_sup.erl
@@ -1,17 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
-module(emqtt_client_sup).
--behavior(supervisor).
+-behaviour(supervisor2).
--export([start_link/0]).
+-export([start_link/1, start_link/2]).
-export([init/1]).
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+-include("emqtt.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (mfa()) -> emqtt_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+ emqtt_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
-init([]) ->
- {ok, {{simple_one_for_one, 3, 1},
- [{undefined, {emqtt_client, start_link, []},
- transient, 10, worker, [emqtt_client]}]}}.
+start_link(Callback) ->
+ supervisor2:start_link(?MODULE, Callback).
+start_link(SupName, Callback) ->
+ supervisor2:start_link(SupName, ?MODULE, Callback).
+init({M,F,A}) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
View
57 src/apps/emqtt/src/emqtt_connection_sup.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(emqtt_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/1, reader/1]).
+
+-export([init/1]).
+
+-include("emqtt.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (port()) -> {'ok', pid(), pid()}).
+
+-spec(reader/1 :: (pid()) -> pid()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+start_link(Sock) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ClientPid} =
+ supervisor2:start_child(
+ SupPid,
+ {client, {emqtt_client, start_link, [Sock]},
+ intrinsic, ?MAX_WAIT, worker, [emqtt_client]}),
+ {ok, ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {emqtt_reader, start_link, [Sock, ClientPid]},
+ intrinsic, ?MAX_WAIT, worker, [emqtt_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
View
282 src/apps/emqtt/src/emqtt_file.erl
@@ -0,0 +1,282 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(emqtt_file).
+
+-include_lib("kernel/include/file.hrl").
+
+-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]).
+-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
+-export([append_file/2, ensure_parent_dirs_exist/1]).
+-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
+-export([lock_file/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ok_or_error() :: emqtt_types:ok_or_error(any())).
+
+-spec(is_file/1 :: ((file:filename())) -> boolean()).
+-spec(is_dir/1 :: ((file:filename())) -> boolean()).
+-spec(file_size/1 :: ((file:filename())) -> non_neg_integer()).
+-spec(ensure_dir/1 :: ((file:filename())) -> ok_or_error()).
+-spec(wildcard/2 :: (string(), file:filename()) -> [file:filename()]).
+-spec(list_dir/1 :: (file:filename()) -> emqtt_types:ok_or_error2(
+ [file:filename()], any())).
+-spec(read_term_file/1 ::
+ (file:filename()) -> {'ok', [any()]} | emqtt_types:error(any())).
+-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
+-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
+-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
+-spec(rename/2 ::
+ (file:filename(), file:filename()) -> ok_or_error()).
+-spec(delete/1 :: ([file:filename()]) -> ok_or_error()).
+-spec(recursive_delete/1 ::
+ ([file:filename()])
+ -> emqtt_types:ok_or_error({file:filename(), any()})).
+-spec(recursive_copy/2 ::
+ (file:filename(), file:filename())
+ -> emqtt_types:ok_or_error({file:filename(), file:filename(), any()})).
+-spec(lock_file/1 :: (file:filename()) -> emqtt_types:ok_or_error('eexist')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+is_file(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{type=regular}} -> true;
+ {ok, #file_info{type=directory}} -> true;
+ _ -> false
+ end.
+
+is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
+
+is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
+
+is_dir_internal({ok, #file_info{type=directory}}) -> true;
+is_dir_internal(_) -> false.
+
+file_size(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{size=Size}} -> Size;
+ _ -> 0
+ end.
+
+ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end).
+
+ensure_dir_internal("/") ->
+ ok;
+ensure_dir_internal(File) ->
+ Dir = filename:dirname(File),
+ case is_dir_no_handle(Dir) of
+ true -> ok;
+ false -> ensure_dir_internal(Dir),
+ prim_file:make_dir(Dir)
+ end.
+
+wildcard(Pattern, Dir) ->
+ {ok, Files} = list_dir(Dir),
+ {ok, RE} = re:compile(Pattern, [anchored]),
+ [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])].
+
+list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
+
+read_file_info(File) ->
+ with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
+
+with_fhc_handle(Fun) ->
+ ok = file_handle_cache:obtain(),
+ try Fun()
+ after ok = file_handle_cache:release()
+ end.
+
+read_term_file(File) ->
+ try
+ {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ {ok, [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups]}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
+group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)].
+
+group_tokens([], []) -> [];
+group_tokens(Cur, []) -> [Cur];
+group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
+group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
+
+write_term_file(File, Terms) ->
+ write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
+write_file(Path, Data) -> write_file(Path, Data, []).
+
+%% write_file/3 and make_binary/1 are both based on corresponding
+%% functions in the kernel/file.erl module of the Erlang R14B02
+%% release, which is licensed under the EPL. That implementation of
+%% write_file/3 does not do an fsync prior to closing the file, hence
+%% the existence of this version. APIs are otherwise identical.
+write_file(Path, Data, Modes) ->
+ Modes1 = [binary, write | (Modes -- [binary, write])],
+ case make_binary(Data) of
+ Bin when is_binary(Bin) ->
+ with_fhc_handle(
+ fun () -> case prim_file:open(Path, Modes1) of
+ {ok, Hdl} -> try prim_file:write(Hdl, Bin) of
+ ok -> prim_file:sync(Hdl);
+ {error, _} = E -> E
+ after
+ prim_file:close(Hdl)
+ end;
+ {error, _} = E -> E
+ end
+ end);
+ {error, _} = E -> E
+ end.
+
+make_binary(Bin) when is_binary(Bin) ->
+ Bin;
+make_binary(List) ->
+ try
+ iolist_to_binary(List)
+ catch error:Reason ->
+ {error, Reason}
+ end.
+
+
+append_file(File, Suffix) ->
+ case read_file_info(File) of
+ {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
+ {error, enoent} -> append_file(File, 0, Suffix);
+ Error -> Error
+ end.
+
+append_file(_, _, "") ->
+ ok;
+append_file(File, 0, Suffix) ->
+ with_fhc_handle(fun () ->
+ case prim_file:open([File, Suffix], [append]) of
+ {ok, Fd} -> prim_file:close(Fd);
+ Error -> Error
+ end
+ end);
+append_file(File, _, Suffix) ->
+ case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
+ {ok, Data} -> write_file([File, Suffix], Data, [append]);
+ Error -> Error
+ end.
+
+ensure_parent_dirs_exist(Filename) ->
+ case ensure_dir(Filename) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_parent_dirs, Filename, Reason}})
+ end.
+
+rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end).
+
+delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end).
+
+recursive_delete(Files) ->
+ with_fhc_handle(
+ fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files)
+ end).
+
+recursive_delete1(Path) ->
+ case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of
+ false -> case prim_file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case prim_file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case prim_file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+is_symlink_no_handle(File) ->
+ case prim_file:read_link(File) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+recursive_copy(Src, Dest) ->
+ %% Note that this uses the 'file' module and, hence, shouldn't be
+ %% run on many processes at once.
+ case is_dir(Src) of
+ false -> case file:copy(Src, Dest) of
+ {ok, _Bytes} -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Src, Dest, Err}}
+ end;
+ true -> case file:list_dir(Src) of
+ {ok, FileNames} ->
+ case file:make_dir(Dest) of
+ ok ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_copy(
+ filename:join(Src, FileName),
+ filename:join(Dest, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end;
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end
+ end.
+
+%% TODO: When we stop supporting Erlang prior to R14, this should be
+%% replaced with file:open [write, exclusive]
+lock_file(Path) ->
+ case is_file(Path) of
+ true -> {error, eexist};
+ false -> with_fhc_handle(
+ fun () -> {ok, Lock} = prim_file:open(Path, [write]),
+ ok = prim_file:close(Lock)
+ end)
+ end.
View
134 src/apps/emqtt/src/emqtt_log.erl
@@ -0,0 +1,134 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License