Skip to content

Commit

Permalink
Add support for tracking of resent messages using opaque data. This w…
Browse files Browse the repository at this point in the history
…ill make it possible for us to actually deal with resends in a better way.
  • Loading branch information
olabini committed Sep 1, 2016
1 parent ec3a4e0 commit 0f02445
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 35 deletions.
4 changes: 2 additions & 2 deletions data_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func Test_genDataMsg_setsLastMessageWhenNewMessageIsPlaintext(t *testing.T) {
c.genDataMsg(msg)

assertDeepEquals(t, c.resend.pending(),
[]MessagePlaintext{
MessagePlaintext(msg),
[]messageToResend{
messageToResend{MessagePlaintext(msg), nil},
})
}

Expand Down
4 changes: 2 additions & 2 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newConversation(v otrVersion, rand io.Reader) *Conversation {
func (c *Conversation) expectMessageEvent(t *testing.T, f func(), expectedEvent MessageEvent, expectedMessage []byte, expectedError error) {
called := false

c.messageEventHandler = dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error) {
c.messageEventHandler = dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error, trace ...interface{}) {
assertDeepEquals(t, event, expectedEvent)
assertDeepEquals(t, message, expectedMessage)
assertDeepEquals(t, err, expectedError)
Expand All @@ -140,7 +140,7 @@ func (c *Conversation) expectMessageEvent(t *testing.T, f func(), expectedEvent
}

func (c *Conversation) doesntExpectMessageEvent(t *testing.T, f func()) {
c.messageEventHandler = dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error) {
c.messageEventHandler = dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error, trace ...interface{}) {
t.Errorf("Didn't expect a message event, but got: %v with msg %v and error %#v", event, message, err)
}}

Expand Down
20 changes: 10 additions & 10 deletions message_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ const (
// MessageEventHandler handles MessageEvents
type MessageEventHandler interface {
// HandleMessageEvent should handle and send the appropriate message(s) to the sender/recipient depending on the message events
HandleMessageEvent(event MessageEvent, message []byte, err error)
HandleMessageEvent(event MessageEvent, message []byte, err error, trace ...interface{})
}

type dynamicMessageEventHandler struct {
eh func(event MessageEvent, message []byte, err error)
eh func(event MessageEvent, message []byte, err error, trace ...interface{})
}

func (d dynamicMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error) {
d.eh(event, message, err)
func (d dynamicMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error, trace ...interface{}) {
d.eh(event, message, err, trace...)
}

func (c *Conversation) messageEvent(e MessageEvent) {
func (c *Conversation) messageEvent(e MessageEvent, trace ...interface{}) {
if c.messageEventHandler != nil {
c.messageEventHandler.HandleMessageEvent(e, nil, nil)
c.messageEventHandler.HandleMessageEvent(e, nil, nil, trace...)
}
}

Expand Down Expand Up @@ -129,10 +129,10 @@ type combinedMessageEventHandler struct {
handlers []MessageEventHandler
}

func (c combinedMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error) {
func (c combinedMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error, trace ...interface{}) {
for _, h := range c.handlers {
if h != nil {
h.HandleMessageEvent(event, message, err)
h.HandleMessageEvent(event, message, err, trace...)
}
}
}
Expand All @@ -147,6 +147,6 @@ func CombineMessageEventHandlers(handlers ...MessageEventHandler) MessageEventHa
type DebugMessageEventHandler struct{}

// HandleMessageEvent dumps all message events
func (DebugMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error) {
fmt.Fprintf(standardErrorOutput, "%sHandleMessageEvent(%s, message: %#v, error: %v)\n", debugPrefix, event, string(message), err)
func (DebugMessageEventHandler) HandleMessageEvent(event MessageEvent, message []byte, err error, trace ...interface{}) {
fmt.Fprintf(standardErrorOutput, "%sHandleMessageEvent(%s, message: %#v, error: %v, trace: %v)\n", debugPrefix, event, string(message), err, trace)
}
8 changes: 4 additions & 4 deletions message_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func Test_MessageEvent_hasValidStringImplementation(t *testing.T) {

func Test_combinedMessageEventHandler_callsAllErrorMessageHandlersGiven(t *testing.T) {
var called1, called2, called3 bool
f1 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error) {
f1 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error, trace ...interface{}) {
called1 = true
}}
f2 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error) {
f2 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error, trace ...interface{}) {
called2 = true
}}
f3 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error) {
f3 := dynamicMessageEventHandler{func(event MessageEvent, message []byte, err error, trace ...interface{}) {
called3 = true
}}
d := CombineMessageEventHandlers(f1, f2, nil, f3)
Expand All @@ -44,5 +44,5 @@ func Test_debugMessageEventHandler_writesTheEventToStderr(t *testing.T) {
ss := captureStderr(func() {
DebugMessageEventHandler{}.HandleMessageEvent(MessageEventLogHeartbeatSent, []byte("A message"), newOtrError("hello world"))
})
assertEquals(t, ss, "[DEBUG] HandleMessageEvent(MessageEventLogHeartbeatSent, message: \"A message\", error: otr: hello world)\n")
assertEquals(t, ss, "[DEBUG] HandleMessageEvent(MessageEventLogHeartbeatSent, message: \"A message\", error: otr: hello world, trace: [])\n")
}
30 changes: 20 additions & 10 deletions resend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,37 @@ const (
retransmitExact
)

type messageToResend struct {
m MessagePlaintext
opaque []interface{}
}

type resendContext struct {
mayRetransmit retransmitFlag
messageTransform func([]byte) []byte

messages struct {
m []MessagePlaintext
m []messageToResend
sync.RWMutex
}
}

func (r *resendContext) later(msg MessagePlaintext) {
func (r *resendContext) later(msg MessagePlaintext, opaque ...interface{}) {
r.messages.Lock()
defer r.messages.Unlock()

if r.messages.m == nil {
r.messages.m = make([]MessagePlaintext, 0, 5)
r.messages.m = make([]messageToResend, 0, 5)
}

r.messages.m = append(r.messages.m, msg)
r.messages.m = append(r.messages.m, messageToResend{msg, opaque})
}

func (r *resendContext) pending() []MessagePlaintext {
func (r *resendContext) pending() []messageToResend {
r.messages.RLock()
defer r.messages.RUnlock()

ret := make([]MessagePlaintext, len(r.messages.m))
ret := make([]messageToResend, len(r.messages.m))
copy(ret, r.messages.m)

return ret
Expand Down Expand Up @@ -70,8 +75,8 @@ func (c *Conversation) resendMessageTransformer() func([]byte) []byte {
return c.resend.messageTransform
}

func (c *Conversation) lastMessage(msg MessagePlaintext) {
c.resend.later(msg)
func (c *Conversation) lastMessage(msg MessagePlaintext, opaque ...interface{}) {
c.resend.later(msg, opaque...)
}

func (c *Conversation) updateMayRetransmitTo(f retransmitFlag) {
Expand All @@ -98,7 +103,8 @@ func (c *Conversation) retransmit() ([]messageWithHeader, error) {

resending := c.resend.mayRetransmit == retransmitWithPrefix

for _, msg := range msgs {
for _, msgx := range msgs {
msg := msgx.m
if resending {
msg = c.resendMessageTransformer()(msg)
}
Expand All @@ -115,7 +121,11 @@ func (c *Conversation) retransmit() ([]messageWithHeader, error) {
}

if resending {
c.messageEvent(MessageEventMessageResent)
for _, msgx := range msgs {
v := []interface{}{msgx.m}
v = append(v, msgx.opaque...)
c.messageEvent(MessageEventMessageResent, v...)
}
}

c.updateLastSent()
Expand Down
10 changes: 5 additions & 5 deletions send.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// Send takes a human readable message from the local user, possibly encrypts
// it and returns zero or more messages to send to the peer.
func (c *Conversation) Send(m ValidMessage) ([]ValidMessage, error) {
func (c *Conversation) Send(m ValidMessage, trace ...interface{}) ([]ValidMessage, error) {
message := makeCopy(m)
defer wipeBytes(message)

Expand All @@ -22,7 +22,7 @@ func (c *Conversation) Send(m ValidMessage) ([]ValidMessage, error) {

switch c.msgState {
case plainText:
return c.withInjections(c.sendMessageOnPlaintext(message))
return c.withInjections(c.sendMessageOnPlaintext(message, trace...))
case encrypted:
return c.withInjections(c.sendMessageOnEncrypted(message))
case finished:
Expand All @@ -33,12 +33,12 @@ func (c *Conversation) Send(m ValidMessage) ([]ValidMessage, error) {
return c.withInjections(nil, newOtrError("cannot send message in current state"))
}

func (c *Conversation) sendMessageOnPlaintext(message ValidMessage) ([]ValidMessage, error) {
func (c *Conversation) sendMessageOnPlaintext(message ValidMessage, trace ...interface{}) ([]ValidMessage, error) {
if c.Policies.has(requireEncryption) {
c.messageEvent(MessageEventEncryptionRequired)
c.messageEvent(MessageEventEncryptionRequired, trace...)
c.updateLastSent()
c.updateMayRetransmitTo(retransmitExact)
c.lastMessage(MessagePlaintext(makeCopy(message)))
c.lastMessage(MessagePlaintext(makeCopy(message)), trace...)
return []ValidMessage{c.QueryMessage()}, nil
}

Expand Down
21 changes: 19 additions & 2 deletions send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,25 @@ func Test_Send_saveLastMessageWhenMsgIsPlainTextAndEncryptedIsExpected(t *testin
c.Send(m)

assertDeepEquals(t, c.resend.pending(),
[]MessagePlaintext{
MessagePlaintext(m),
[]messageToResend{
messageToResend{MessagePlaintext(m), nil},
})
}

func Test_Send_saveLastMessageWhenMsgIsPlainTextAndEncryptedIsExpected_AndAddsAnOpaqueValueForEachMessage(t *testing.T) {
m := []byte("hello")
m2 := []byte("hello again?")
c := bobContextAfterAKE()
c.msgState = plainText
c.Policies = policies(allowV3 | requireEncryption)

c.Send(m, 42, "hello")
c.Send(m2, 15, "something")

assertDeepEquals(t, c.resend.pending(),
[]messageToResend{
messageToResend{MessagePlaintext(m), []interface{}{42, "hello"}},
messageToResend{MessagePlaintext(m2), []interface{}{15, "something"}},
})
}

Expand Down

0 comments on commit 0f02445

Please sign in to comment.