Skip to content

Commit

Permalink
Merge 5fcb1c4 into 22ba8d1
Browse files Browse the repository at this point in the history
  • Loading branch information
remicorniere committed Mar 9, 2020
2 parents 22ba8d1 + 5fcb1c4 commit 66d96bf
Show file tree
Hide file tree
Showing 27 changed files with 1,822 additions and 153 deletions.
3 changes: 3 additions & 0 deletions _examples/go.sum
Expand Up @@ -99,7 +99,9 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/processone/mpg123 v1.0.0 h1:o2WOyGZRM255or1Zc/LtF/jARn51B+9aQl72Qace0GA=
github.com/processone/mpg123 v1.0.0/go.mod h1:X/FeL+h8vD1bYsG9tIWV3M2c4qNTZOficyvPVBP08go=
github.com/processone/soundcloud v1.0.0 h1:/+i6+Yveb7Y6IFGDSkesYI+HddblzcRTQClazzVHxoE=
github.com/processone/soundcloud v1.0.0/go.mod h1:kDLeWpkRtN3C8kIReQdxoiRi92P9xR6yW6qLOJnNWfY=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
Expand Down Expand Up @@ -155,6 +157,7 @@ golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190110200230-915654e7eabc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
2 changes: 1 addition & 1 deletion _examples/xmpp_echo/xmpp_echo.go
Expand Up @@ -28,7 +28,7 @@ func main() {
router := xmpp.NewRouter()
router.HandleFunc("message", handleMessage)

client, err := xmpp.NewClient(config, router, errorHandler)
client, err := xmpp.NewClient(&config, router, errorHandler)
if err != nil {
log.Fatalf("%+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/xmpp_jukebox/xmpp_jukebox.go
Expand Up @@ -54,7 +54,7 @@ func main() {
handleIQ(s, p, player)
})

client, err := xmpp.NewClient(config, router, errorHandler)
client, err := xmpp.NewClient(&config, router, errorHandler)
if err != nil {
log.Fatalf("%+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/xmpp_oauth2/xmpp_oauth2.go
Expand Up @@ -28,7 +28,7 @@ func main() {
router := xmpp.NewRouter()
router.HandleFunc("message", handleMessage)

client, err := xmpp.NewClient(config, router, errorHandler)
client, err := xmpp.NewClient(&config, router, errorHandler)
if err != nil {
log.Fatalf("%+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/xmpp_pubsub_client/xmpp_ps_client.go
Expand Up @@ -38,7 +38,7 @@ func main() {
log.Println("Received a message ! => \n" + string(data))
})

client, err := xmpp.NewClient(config, router, func(err error) { log.Println(err) })
client, err := xmpp.NewClient(&config, router, func(err error) { log.Println(err) })
if err != nil {
log.Fatalf("%+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion _examples/xmpp_websocket/xmpp_websocket.go
Expand Up @@ -26,7 +26,7 @@ func main() {
router := xmpp.NewRouter()
router.HandleFunc("message", handleMessage)

client, err := xmpp.NewClient(config, router, errorHandler)
client, err := xmpp.NewClient(&config, router, errorHandler)
if err != nil {
log.Fatalf("%+v", err)
}
Expand Down
13 changes: 12 additions & 1 deletion auth.go
Expand Up @@ -60,9 +60,20 @@ func authPlain(socket io.ReadWriter, decoder *xml.Decoder, mech string, user str
raw := "\x00" + user + "\x00" + secret
enc := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(enc, []byte(raw))
_, err := fmt.Fprintf(socket, "<auth xmlns='%s' mechanism='%s'>%s</auth>", stanza.NSSASL, mech, enc)

a := stanza.SASLAuth{
Mechanism: mech,
Value: string(enc),
}
data, err := xml.Marshal(a)
if err != nil {
return err
}
n, err := socket.Write(data)
if err != nil {
return err
} else if n == 0 {
return errors.New("failed to write authSASL nonza to socket : wrote 0 bytes")
}

// Next message should be either success or failure.
Expand Down
159 changes: 120 additions & 39 deletions client.go
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"io"
"net"
"sync"
"time"

"gosrc.io/xmpp/stanza"
Expand All @@ -14,15 +15,36 @@ import (
//=============================================================================
// EventManager

// ConnState represents the current connection state.
// SyncConnState represents the current connection state.
type SyncConnState struct {
sync.RWMutex
// Current state of the client. Please use the dedicated getter and setter for this field as they are thread safe.
state ConnState
}
type ConnState = uint8

// getState is a thread-safe getter for the current state
func (scs *SyncConnState) getState() ConnState {
var res ConnState
scs.RLock()
res = scs.state
scs.RUnlock()
return res
}

// setState is a thread-safe setter for the current
func (scs *SyncConnState) setState(cs ConnState) {
scs.Lock()
scs.state = cs
scs.Unlock()
}

// This is a the list of events happening on the connection that the
// client can be notified about.
const (
InitialPresence = "<presence/>"
StateDisconnected ConnState = iota
StateConnected
StateResuming
StateSessionEstablished
StateStreamError
StatePermanentError
Expand All @@ -31,7 +53,7 @@ const (
// Event is a structure use to convey event changes related to client state. This
// is for example used to notify the client when the client get disconnected.
type Event struct {
State ConnState
State SyncConnState
Description string
StreamError string
SMState SMState
Expand All @@ -44,7 +66,16 @@ type SMState struct {
Id string
// Inbound stanza count
Inbound uint
// TODO Store location for IP affinity

// IP affinity
preferredReconAddr string

// Error
StreamErrorGroup stanza.StanzaErrorGroup

// Track sent stanzas
*stanza.UnAckQueue

// TODO Store max and timestamp, to check if we should retry resumption or not
}

Expand All @@ -53,29 +84,35 @@ type SMState struct {
type EventHandler func(Event) error

type EventManager struct {
// Store current state
CurrentState ConnState
// Store current state. Please use "getState" and "setState" to access and/or modify this.
CurrentState SyncConnState

// Callback used to propagate connection state changes
Handler EventHandler
}

// updateState changes the CurrentState in the event manager. The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) updateState(state ConnState) {
em.CurrentState = state
em.CurrentState.setState(state)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState})
}
}

// disconnected changes the CurrentState in the event manager to "disconnected". The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) disconnected(state SMState) {
em.CurrentState = StateDisconnected
em.CurrentState.setState(StateDisconnected)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, SMState: state})
}
}

// streamError changes the CurrentState in the event manager to "streamError". The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) streamError(error, desc string) {
em.CurrentState = StateStreamError
em.CurrentState.setState(StateStreamError)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, StreamError: error, Description: desc})
}
Expand All @@ -90,7 +127,7 @@ var ErrCanOnlySendGetOrSetIq = errors.New("SendIQ can only send get and set IQ s
// server.
type Client struct {
// Store user defined options and states
config Config
config *Config
// Session gather data that can be accessed by users of this library
Session *Session
transport Transport
Expand All @@ -100,16 +137,22 @@ type Client struct {
EventManager
// Handle errors from client execution
ErrorHandler func(error)

// Post connection hook. This will be executed on first connection
PostFirstConnHook func() error

// Post resume hook. This will be executed after the client resumes a lost connection using StreamManagement (XEP-0198)
PostReconnectHook func() error
}

/*
Setting up the client / Checking the parameters
*/

// NewClient generates a new XMPP client, based on Config passed as parameters.
// If host is not specified, the DNS SRV should be used to find the host from the domainpart of the Jid.
// If host is not specified, the DNS SRV should be used to find the host from the domain part of the Jid.
// Default the port to 5222.
func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, err error) {
func NewClient(config *Config, r *Router, errorHandler func(error)) (c *Client, err error) {
if config.KeepaliveInterval == 0 {
config.KeepaliveInterval = time.Second * 30
}
Expand Down Expand Up @@ -169,26 +212,45 @@ func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, e
return
}

// Connect triggers actual TCP connection, based on previously defined parameters.
// Connect simply triggers resumption, with an empty session state.
// Connect establishes a first time connection to a XMPP server.
// It calls the PostFirstConnHook
func (c *Client) Connect() error {
var state SMState
return c.Resume(state)
err := c.connect()
if err != nil {
return err
}
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
err = c.sendWithWriter(c.transport, []byte(InitialPresence))
// Execute the post first connection hook. Typically this holds "ask for roster" and this type of actions.
if c.PostFirstConnHook != nil {
err = c.PostFirstConnHook()
if err != nil {
return err
}
}

// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
// Start the receiver go routine
go c.recv(keepaliveQuit)
return err
}

// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state.
func (c *Client) Resume(state SMState) error {
// connect establishes an actual TCP connection, based on previously defined parameters, as well as a XMPP session
func (c *Client) connect() error {
var state SMState
var err error

// This is the TCP connection
streamId, err := c.transport.Connect()
if err != nil {
return err
}
c.updateState(StateConnected)

// Client is ok, we now open XMPP session
if c.Session, err = NewSession(c.transport, c.config, state); err != nil {
// Client is ok, we now open XMPP session with TLS negotiation if possible and session resume or binding
// depending on state.
if c.Session, err = NewSession(c, state); err != nil {
// Try to get the stream close tag from the server.
go func() {
for {
Expand All @@ -212,22 +274,26 @@ func (c *Client) Resume(state SMState) error {
c.Session.StreamId = streamId
c.updateState(StateSessionEstablished)

// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
// Start the receiver go routine
state = c.Session.SMState
go c.recv(state, keepaliveQuit)

// We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
err = c.sendWithWriter(c.transport, []byte(InitialPresence))
return err
}

// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state. See XEP-0198
func (c *Client) Resume() error {
c.EventManager.updateState(StateResuming)
err := c.connect()
if err != nil {
return err
}
// Execute post reconnect hook. This can be different from the first connection hook, and not trigger roster retrival
// for example.
if c.PostReconnectHook != nil {
err = c.PostReconnectHook()
}
return err
}

// Disconnect disconnects the client from the server, sending a stream close nonza and closing the TCP connection.
func (c *Client) Disconnect() error {
if c.transport != nil {
return c.transport.Close()
Expand All @@ -252,6 +318,15 @@ func (c *Client) Send(packet stanza.Packet) error {
return errors.New("cannot marshal packet " + err.Error())
}

// Store stanza as non-acked as part of stream management
// See https://xmpp.org/extensions/xep-0198.html#scenarios
if c.config.StreamManagementEnable {
if _, ok := packet.(stanza.SMRequest); !ok {
toStore := stanza.UnAckedStz{Stz: string(data)}
c.Session.SMState.UnAckQueue.Push(&toStore)
}
}

return c.sendWithWriter(c.transport, data)
}

Expand Down Expand Up @@ -284,6 +359,12 @@ func (c *Client) SendRaw(packet string) error {
return errors.New("client is not connected")
}

// Store stanza as non-acked as part of stream management
// See https://xmpp.org/extensions/xep-0198.html#scenarios
if c.config.StreamManagementEnable {
toStore := stanza.UnAckedStz{Stz: packet}
c.Session.SMState.UnAckQueue.Push(&toStore)
}
return c.sendWithWriter(c.transport, []byte(packet))
}

Expand All @@ -297,13 +378,13 @@ func (c *Client) sendWithWriter(writer io.Writer, packet []byte) error {
// Go routines

// Loop: Receive data from server
func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
func (c *Client) recv(keepaliveQuit chan<- struct{}) {
for {
val, err := stanza.NextPacket(c.transport.GetDecoder())
if err != nil {
c.ErrorHandler(err)
close(keepaliveQuit)
c.disconnected(state)
c.disconnected(c.Session.SMState)
return
}

Expand All @@ -321,7 +402,7 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
answer := stanza.SMAnswer{XMLName: xml.Name{
Space: stanza.NSStreamManagement,
Local: "a",
}, H: state.Inbound}
}, H: c.Session.SMState.Inbound}
err = c.Send(answer)
if err != nil {
c.ErrorHandler(err)
Expand All @@ -332,7 +413,7 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
c.transport.ReceivedStreamClose()
return
default:
state.Inbound++
c.Session.SMState.Inbound++
}
// Do normal route processing in a go-routine so we can immediately
// start receiving other stanzas. This also allows route handlers to
Expand Down

0 comments on commit 66d96bf

Please sign in to comment.