Skip to content

danclive/mqtt

 
 

Repository files navigation

中文文档

mqtt Mentioned in Awesome Go Build Status codecov Go Report Card

mqtt provides:

  • MQTT broker that fully implements the MQTT protocol V3.1.1.
  • Golang MQTT broker package for secondary development.
  • MQTT protocol pack/unpack package for implementing MQTT clients or testing.

Installation

$ go get -u github.com/DrmagicE/mqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See hooks.go for more details
  • Support tls/ssl and websocket
  • Enable user to write plugins. See plugin.go and /plugin for more details.
  • Provide abilities for extensions to interact with the server. See Server interface in server.go and example_test.go for more details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide restful API to interact with server. (plugin:management)

Limitations

  • The retained messages are not persisted when the server exit.
  • Cluster is not supported.

Get Started

Build-in MQTT broker

$ cd cmd/broker
$ go run main.go

The broker will listen on port 1883 for TCP and 8080 for websocket. The broker loads the following plugins:

  • management: Listens on port 8081, provides restful api service
  • prometheus: Listens on port 8082, serve as a prometheus exporter with /metrics path.

Docker

$ docker build -t mqtt .
$ docker run -p 1883:1883 -p  8081:8081 -p 8082:8082 mqtt

Build with external source code

The features of build-in MQTT broker are not rich enough.It is not implementing some features such as Authentication, ACL etc.. But It is easy to write your own plugins to extend the broker.

func main() {
	// listener
	ln, err := net.Listen("tcp", ":1883")
	if err != nil {
		log.Fatalln(err.Error())
		return
	}
	// websocket server
	ws := &mqtt.WsServer{
		Server: &http.Server{Addr: ":8080"},
		Path:   "/ws",
	}
	if err != nil {
		panic(err)
	}

	l, _ := zap.NewProduction()
	// l, _ := zap.NewDevelopment()
	s := mqtt.NewServer(
		mqtt.WithTCPListener(ln),
		mqtt.WithWebsocketServer(ws),
		// Add your plugins
		mqtt.WithPlugin(management.New(":8081", nil)),
		mqtt.WithPlugin(prometheus.New(&http.Server{
			Addr: ":8082",
		}, "/metrics")),
		mqtt.WithLogger(l),
	)

	s.Run()
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
	<-signalCh
	s.Stop(context.Background())
}

See /examples for more details.

Documentation

godoc

Hooks

mqtt implements the following hooks:

  • OnAccept (Only for tcp/ssl, not for ws/wss)
  • OnConnect
  • OnConnected
  • OnSessionCreated
  • OnSessionResumed
  • OnSessionTerminated
  • OnSubscribe
  • OnSubscribed
  • OnUnsubscribed
  • OnMsgArrived
  • OnAcked
  • OnMsgDropped
  • OnDeliver
  • OnClose
  • OnStop

See /examples/hook for more detail.

Stop the Server

Call server.Stop() to stop the broker gracefully:

  1. Close all open TCP listeners and shutting down all open websocket servers
  2. Close all idle connections
  3. Wait for all connections have been closed
  4. Trigger OnStop().

Test

Unit Test

$ go test -race . && go test -race packets
$ cd packets
$ go test -race .

Integration Test

Pass paho.mqtt.testing.

TODO

  • Support MQTT V3 and V5.
  • Support bridge mode and maybe cluster.

Breaking changes may occur when adding this new features.

About

Gmqtt is a flexible, high-performance MQTT broker library that fully implements the MQTT protocol V3.1.1 in golang

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 99.9%
  • Dockerfile 0.1%