Skip to content

Commit

Permalink
Merge pull request #2 from pingww/main
Browse files Browse the repository at this point in the history
Repository Init
  • Loading branch information
RongtongJin committed Mar 8, 2022
2 parents a1a67f8 + 4dedf82 commit 9ca7914
Show file tree
Hide file tree
Showing 128 changed files with 12,695 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
*.iml
target/
75 changes: 74 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,74 @@
# rocketmq-mqtt
## Apache RocketMQ MQTT
A new MQTT protocol architecture model, based on which RocketMQ can better support messages from terminals such as IoT devices and Mobile APP. Based on the RocketMQ message unified storage engine, it supports both MQTT terminal and server message sending and receiving.

## Architecture
The relevant architecture design is introduced in [RIP-33](https://docs.google.com/document/d/1AD1GkV9mqE_YFA97uVem4SmB8ZJSXiJZvzt7-K6Jons/edit#)


## Get Started

### Prerequisites
The queue model of MQTT needs to be based on the lightweight queue feature ([RIP-28](https://github.com/apache/rocketmq/pull/3694)) of RocketMQ. RocketMQ has only supported this feature since version 4.9.3. Please ensure that the installed version of RocketMQ already supports this feature.

1. Clone
```shell
git clone https://github.com/apache/rocketmq-mqtt
```
2. Build the package
```shell
cd rocketmq-mqtt
mvn clean package -DskipTests=true assembly:assembly
```
3. Config
```shell
cp -r target/rocketmq-mqtt ~
cd ~/rocketmq-mqtt
cd conf
```
Some important configuration items in the **service.conf** configuration file

**Config Key** | **Instruction**
----- | ----
username | used for auth
secretKey | used for auth
NAMESRV_ADDR | specify namesrv address
eventNotifyRetryTopic | notify event retry topic
clientRetryTopic | client retry topic

4. CreateTopic

create all first-level topics, including **eventNotifyRetryTopic** and **clientRetryTopic** in the configuration file above.
```shell
sh mqadmin updatetopic -c {cluster} -t {topic} -n {namesrv}
```
5. Initialize Meta
- Configure Gateway Node List
```shell
sh mqadmin updateKvConfig -s LMQ -k LMQ_CONNECT_NODES -v {ip1,ip2} -n {namesrv}
```
- Configure the first-level topic list
```shell
sh mqadmin updateKvConfig -s LMQ -k ALL_FIRST_TOPICS -v {topic1,topic2} -n {namesrv}
```
- Configure a list of wildcard characters under each first-level topic
```shell
sh mqadmin updateKvConfig -s LMQ -k {topic} -v {topic/+} -n {namesrv}
```
6. Start Process
```shell
cd bin
sh mqtt.sh start
```
### Example
The mqtt-example module has written basic usage example code, which can be used for reference

## Protocol Version
The currently supported protocol version is [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf), but the will and retain features are not supported yet

## Authentication
At present, an implementation based on the HmacSHA1 signature algorithm is provided by default, Refer to **AuthManagerSample**. Users can customize other implementations to meet the needs of businesses to flexibly verify resources and identities.
## Meta Persistence
At present, meta data storage and management is simply implemented through the kvconfig mechanism of namesrv by default, Refer to **MetaPersistManagerSample**. Users can customize other implementations.

## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation.
37 changes: 37 additions & 0 deletions assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<fileSets>
<fileSet>
<includes>
<include>bin/*</include>
<include>conf/**</include>
</includes>
<excludes>
<exclude>**/src/**</exclude>
<exclude>**/target/**</exclude>
<exclude>**/.*/**</exclude>
</excludes>
</fileSet>
</fileSets>
<moduleSets>
<moduleSet>
<includes>
<include>org.apache.rocketmq:mqtt-cs</include>
<include>org.apache.rocketmq:mqtt-ds</include>
</includes>
<binaries>
<outputDirectory>lib</outputDirectory>
<unpack>false</unpack>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
</dependencySet>
</dependencySets>
</binaries>
</moduleSet>
</moduleSets>
</assembly>
93 changes: 93 additions & 0 deletions bin/mqtt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/bin/sh

#
# /*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
#

if [ -z "$ROCKETMQ_MQTT_HOME" ]; then
## resolve links - $0 may be a link to maven's home
PRG="$0"

# need this for relative symlinks
while [ -h "$PRG" ]; do
ls=$(ls -ld "$PRG")
link=$(expr "$ls" : '.*-> \(.*\)$')
if expr "$link" : '/.*' >/dev/null; then
PRG="$link"
else
PRG="$(dirname "$PRG")/$link"
fi
done

saveddir=$(pwd)

ROCKETMQ_MQTT_HOME=$(dirname "$PRG")/..

# make it fully qualified
ROCKETMQ_MQTT_HOME=$(cd "$ROCKETMQ_MQTT_HOME" && pwd)

cd "$saveddir"
fi

export ROCKETMQ_MQTT_HOME

BASEDIR=$HOME
mkdir -p $BASEDIR/logs

mainClass="org.apache.rocketmq.mqtt.cs.starter.Startup"

function startup() {
pid=`ps aux|grep $mainClass|grep -v grep |awk '{print $2}'`
if [ ! -z "$pid" ]; then
echo "java is runing..."
exit 1
fi
nohup sh ${ROCKETMQ_MQTT_HOME}/bin/runserver.sh $mainClass $@ >$BASEDIR/logs/start_out.log 2>&1 &
}

function stop() {
pid=`ps aux|grep $mainClass|grep -v grep |awk '{print $2}'`
if [ -z "$pid" ]; then
echo "no java to kill"
fi
printf 'stop...'
kill $pid
sleep 3
pid=`ps aux|grep $mainClass|grep -v grep |awk '{print $2}'`

if [ ! -z $pid ]; then
kill -9 $pid
fi
}

case "$1" in
start)
startup $@
;;
stop)
stop
;;
restart)
stop
startup
;;
*)
printf "Usage: sh $0 %s {start|stop|restart}\n"
exit 1
;;
esac
93 changes: 93 additions & 0 deletions bin/runserver.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600

choose_gc_log_directory()
{
case "`uname`" in
Darwin)
if [ ! -d "/Volumes/RAMDisk" ]; then
# create ram disk on Darwin systems as gc-log directory
DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null
diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null
echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."
fi
GC_LOG_DIR="/Volumes/RAMDisk"
;;
*)
# check if /dev/shm exists on other systems
if [ -d "/dev/shm" ]; then
GC_LOG_DIR="/dev/shm"
else
GC_LOG_DIR=${BASE_DIR}
fi
;;
esac
}

choose_gc_options()
{
# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
# '1' means releases befor Java 9
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')
if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
else
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
fi
}

choose_gc_log_directory
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

JAVA_OPT="${JAVA_OPT} -Dlogback.configurationFile=${BASE_DIR}/conf/logback.xml"

$JAVA ${JAVA_OPT} $@
18 changes: 18 additions & 0 deletions conf/connect.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


mqttPort=1883

Loading

0 comments on commit 9ca7914

Please sign in to comment.