Skip to content

Commit

Permalink
add recover away field
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 10, 2018
1 parent 3fcd3d0 commit 3f19a79
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 159 deletions.
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,25 +1181,25 @@ func (c *Client) subscribeCmd(cmd *proto.SubscribeRequest) (*proto.SubscribeResp
res.Recovered = false
} else {
res.Publications = publications
res.Recovered = false
res.Recovered = time.Duration(cmd.Away)*time.Second+time.Second < time.Duration(chOpts.HistoryLifetime)*time.Second && len(publications) < chOpts.HistorySize
}
} else {
publications, recovered, err := c.node.recoverHistory(channel, cmd.Last)
publications, found, err := c.node.recoverHistory(channel, cmd.Last)
if err != nil {
c.node.logger.log(newLogEntry(LogLevelError, "error recovering", map[string]interface{}{"channel": channel, "user": c.user, "client": c.uid, "error": err.Error()}))
res.Publications = nil
res.Recovered = false
} else {
res.Publications = publications
res.Recovered = recovered
res.Recovered = found || (time.Duration(cmd.Away)*time.Second+time.Second < time.Duration(chOpts.HistoryLifetime)*time.Second && len(publications) < chOpts.HistorySize)
}
}
} else {
// Client don't want to recover messages yet (fresh connect), we just return last
// publication uid here so it could recover later.
lastPubUID, err := c.node.lastPublicationUID(channel)
if err != nil {
c.node.logger.log(newLogEntry(LogLevelError, "error getting last message ID for channel", map[string]interface{}{"channel": channel, "user": c.user, "client": c.uid, "error": err.Error()}))
c.node.logger.log(newLogEntry(LogLevelError, "error getting last publication ID for channel", map[string]interface{}{"channel": channel, "user": c.user, "client": c.uid, "error": err.Error()}))
} else {
res.Last = lastPubUID
}
Expand Down
73 changes: 47 additions & 26 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,35 +279,56 @@ func TestClientSubscribeLast(t *testing.T) {
assert.Equal(t, "9", result.Last)
}

func TestClientSubscribeRecover(t *testing.T) {
node := nodeWithMemoryEngine()
node.config.HistorySize = 10
node.config.HistoryLifetime = 60
node.config.HistoryRecover = true

transport := newTestTransport()
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, _ := newClient(newCtx, node, transport)
var recoverTests = []struct {
Name string
HistorySize int
HistoryLifetime int
NumPublications int
Last string
Away uint32
NumRecovered int
Recovered bool
}{
{"from_last_uid", 10, 60, 10, "7", 0, 2, true},
{"empty_last_uid_full_history", 10, 60, 10, "", 0, 10, false},
{"empty_last_uid_short_disconnect", 10, 60, 9, "", 19, 9, true},
{"empty_last_uid_long_disconnect", 10, 60, 9, "", 119, 9, false},
}

for i := 0; i < 10; i++ {
node.Publish("test", &Publication{
UID: strconv.Itoa(i),
Data: []byte(`{}`),
func TestClientSubscribeRecover(t *testing.T) {
for _, tt := range recoverTests {
t.Run(tt.Name, func(t *testing.T) {
node := nodeWithMemoryEngine()
node.config.HistorySize = tt.HistorySize
node.config.HistoryLifetime = tt.HistoryLifetime
node.config.HistoryRecover = true

transport := newTestTransport()
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, _ := newClient(newCtx, node, transport)

for i := 0; i < tt.NumPublications; i++ {
node.Publish("test", &Publication{
UID: strconv.Itoa(i),
Data: []byte(`{}`),
})
}

connectClient(t, client)

subscribeResp, disconnect := client.subscribeCmd(&proto.SubscribeRequest{
Channel: "test",
Recover: true,
Last: tt.Last,
Away: tt.Away,
})
assert.Nil(t, disconnect)
assert.Nil(t, subscribeResp.Error)
assert.Equal(t, tt.NumRecovered, len(subscribeResp.Result.Publications))
assert.Equal(t, tt.Recovered, subscribeResp.Result.Recovered)
})
}

connectClient(t, client)

subscribeResp, disconnect := client.subscribeCmd(&proto.SubscribeRequest{
Channel: "test",
Recover: true,
Last: "7",
})
assert.Nil(t, disconnect)
assert.Nil(t, subscribeResp.Error)
assert.Equal(t, 2, len(subscribeResp.Result.Publications))
assert.True(t, subscribeResp.Result.Recovered)
}

func TestClientUnsubscribe(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type Engine interface {
// recoverHistory allows to recover missed messages starting
// from last seen Publication UID provided by client. This method
// should return as many Publications as possible and boolean value
// indicating whether all Publications were successfully restored
// or not. The case when publications can not be fully restored
// indicating whether lastUID was found in publications or not
// The case when publications can not be fully restored
// can happen if old Publications already removed from history
// due to size or lifetime limits.
recoverHistory(ch string, lastUID string) ([]*Publication, bool, error)
Expand Down
14 changes: 7 additions & 7 deletions engine_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,14 @@ func (h *historyHub) recover(ch string, last string) ([]*Publication, bool, erro
}
}
if position > -1 {
// Last uid provided found in history. Set recovered flag which means that
// server thinks missed messages fully recovered.
// Provided last UID found in history. In this case we can be
// sure that all missed messages will be recovered.
return publications[0:position], true, nil
}
// Provided last UID not found in history messages. This means that client
// most probably missed too many messages (or maybe wrong last uid provided but
// it's not a normal case). So we try to compensate as many as we can. But
// recovered flag stays false so we do not give a guarantee all missed messages
// recovered successfully.
// Provided last UID not found in history messages. This means that
// client most probably missed too many messages or publication with
// last UID already expired (or maybe wrong last uid provided but
// it's not a normal case). So we try to compensate as many as we
// can but get caller know about missing UID.
return publications, false, nil
}
13 changes: 6 additions & 7 deletions engine_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,15 +1270,14 @@ func (e *shard) RecoverHistory(ch string, last string) ([]*Publication, bool, er
}
}
if position > -1 {
// Last uid found in history. Set recovered flag which means that
// server thinks missed messages fully recovered.
// Last uid found in history.
return publications[0:position], true, nil
}
// Provided last UID not found in history messages. This means that client
// most probably missed too many messages (or maybe wrong last uid provided but
// it's not a normal case). So we try to compensate as many as we can. But
// recovered flag stays false so we do not give a guarantee all missed messages
// recovered successfully.
// Provided last UID not found in history messages. This means that
// client most probably missed too many messages or publication with
// last UID already expired (or maybe wrong last uid provided but
// it's not a normal case). So we try to compensate as many as we
// can but get caller know about missing UID.
return publications, false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion examples/websocket_grpc_json/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
input[type="text"] { width: 300px; }
.muted {color: #CCCCCC; font-size: 10px;}
</style>
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<script type="text/javascript" src="jquery.min.js"></script>
<script type="text/javascript" src="http://localhost:2000/centrifuge.js"></script>
<script type="text/javascript">
// helper functions to work with escaping html.
Expand Down
4 changes: 4 additions & 0 deletions examples/websocket_grpc_json/jquery.min.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/websocket_grpc_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func main() {
JoinLeave: true,
HistoryLifetime: 60,
HistorySize: 10,
HistoryRecover: true,
},
},
}
Expand Down
Loading

0 comments on commit 3f19a79

Please sign in to comment.