Skip to content

Commit 1a1fe7b

Browse files
committed
api: support IPROTO_PUSH messages
This patch adds support for receiving messages sent using box.session.push() via an iterator in the manner of asynchronous case of a Lua implementation[1]. Now the calls Future.Get() and Future.GetTyped() ignore push messages, and do not report an error. 1. https://www.tarantool.io/ru/doc/latest/reference/reference_lua/box_session/push/ Closes #67
1 parent e844c03 commit 1a1fe7b

11 files changed

+750
-218
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1111
### Added
1212

1313
- SSL support (#155)
14+
- IPROTO_PUSH messages support (#67)
1415

1516
### Changed
1617

config.lua

+8
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ local function simple_incr(a)
110110
end
111111
rawset(_G, 'simple_incr', simple_incr)
112112

113+
local function push_func(cnt)
114+
for i = 1, cnt do
115+
box.session.push(i)
116+
end
117+
return cnt
118+
end
119+
rawset(_G, 'push_func', push_func)
120+
113121
box.space.test:truncate()
114122

115123
--box.schema.user.revoke('guest', 'read,write,execute', 'universe')

connection.go

+93-28
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,9 @@ type Greeting struct {
163163

164164
// Opts is a way to configure Connection
165165
type Opts struct {
166-
// Timeout for any particular request. If Timeout is zero request, any
167-
// request can be blocked infinitely.
166+
// Timeout for response to a particular request. The timeout is reset when
167+
// push messages are received. If Timeout is zero, any request can be
168+
// blocked infinitely.
168169
// Also used to setup net.TCPConn.Set(Read|Write)Deadline.
169170
Timeout time.Duration
170171
// Timeout between reconnect attempts. If Reconnect is zero, no
@@ -568,8 +569,8 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
568569
requests[pos].first = nil
569570
requests[pos].last = &requests[pos].first
570571
for fut != nil {
571-
fut.err = neterr
572-
fut.markReady(conn)
572+
fut.SetError(neterr)
573+
conn.markDone(fut)
573574
fut, fut.next = fut.next, nil
574575
}
575576
}
@@ -685,40 +686,61 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
685686
conn.reconnect(err, c)
686687
return
687688
}
688-
if fut := conn.fetchFuture(resp.RequestId); fut != nil {
689-
fut.resp = resp
690-
fut.markReady(conn)
689+
690+
var fut *Future = nil
691+
if resp.Code == PushCode {
692+
if fut = conn.peekFuture(resp.RequestId); fut != nil {
693+
fut.AppendPush(resp)
694+
}
691695
} else {
696+
if fut = conn.fetchFuture(resp.RequestId); fut != nil {
697+
fut.SetResponse(resp)
698+
conn.markDone(fut)
699+
}
700+
}
701+
if fut == nil {
692702
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
693703
}
694704
}
695705
}
696706

697707
func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
698-
fut = &Future{}
708+
fut = NewFuture()
699709
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
700710
select {
701711
case conn.rlimit <- struct{}{}:
702712
default:
703-
fut.err = ClientError{ErrRateLimited, "Request is rate limited on client"}
713+
fut.err = ClientError{
714+
ErrRateLimited,
715+
"Request is rate limited on client",
716+
}
717+
fut.ready = nil
718+
fut.done = nil
704719
return
705720
}
706721
}
707-
fut.ready = make(chan struct{})
708722
fut.requestId = conn.nextRequestId()
709723
fut.requestCode = requestCode
710724
shardn := fut.requestId & (conn.opts.Concurrency - 1)
711725
shard := &conn.shard[shardn]
712726
shard.rmut.Lock()
713727
switch conn.state {
714728
case connClosed:
715-
fut.err = ClientError{ErrConnectionClosed, "using closed connection"}
729+
fut.err = ClientError{
730+
ErrConnectionClosed,
731+
"using closed connection",
732+
}
716733
fut.ready = nil
734+
fut.done = nil
717735
shard.rmut.Unlock()
718736
return
719737
case connDisconnected:
720-
fut.err = ClientError{ErrConnectionNotReady, "client connection is not ready"}
738+
fut.err = ClientError{
739+
ErrConnectionNotReady,
740+
"client connection is not ready",
741+
}
721742
fut.ready = nil
743+
fut.done = nil
722744
shard.rmut.Unlock()
723745
return
724746
}
@@ -737,22 +759,38 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
737759
runtime.Gosched()
738760
select {
739761
case conn.rlimit <- struct{}{}:
740-
case <-fut.ready:
762+
case <-fut.done:
741763
if fut.err == nil {
742-
panic("fut.ready is closed, but err is nil")
764+
panic("fut.done is closed, but err is nil")
743765
}
744766
}
745767
}
746768
}
747769
return
748770
}
749771

772+
func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future {
773+
if fut.ready == nil {
774+
return fut
775+
}
776+
conn.putFuture(fut, body)
777+
return fut
778+
}
779+
780+
func (conn *Connection) failFuture(fut *Future, err error) *Future {
781+
if f := conn.fetchFuture(fut.requestId); f == fut {
782+
fut.SetError(err)
783+
conn.markDone(fut)
784+
}
785+
return fut
786+
}
787+
750788
func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
751789
shardn := fut.requestId & (conn.opts.Concurrency - 1)
752790
shard := &conn.shard[shardn]
753791
shard.bufmut.Lock()
754792
select {
755-
case <-fut.ready:
793+
case <-fut.done:
756794
shard.bufmut.Unlock()
757795
return
758796
default:
@@ -767,8 +805,8 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
767805
shard.buf.Trunc(blen)
768806
shard.bufmut.Unlock()
769807
if f := conn.fetchFuture(fut.requestId); f == fut {
770-
fut.markReady(conn)
771-
fut.err = err
808+
fut.SetError(err)
809+
conn.markDone(fut)
772810
} else if f != nil {
773811
/* in theory, it is possible. In practice, you have
774812
* to have race condition that lasts hours */
@@ -782,7 +820,7 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
782820
// packing error is more important than connection
783821
// error, because it is indication of programmer's
784822
// mistake.
785-
fut.err = err
823+
fut.SetError(err)
786824
}
787825
}
788826
return
@@ -793,15 +831,40 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
793831
}
794832
}
795833

834+
func (conn *Connection) markDone(fut *Future) {
835+
if conn.rlimit != nil {
836+
<-conn.rlimit
837+
}
838+
}
839+
840+
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
841+
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
842+
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
843+
shard.rmut.Lock()
844+
defer shard.rmut.Unlock()
845+
846+
if conn.opts.Timeout > 0 {
847+
fut = conn.getFutureImp(reqid, true)
848+
pair := &shard.requests[pos]
849+
*pair.last = fut
850+
pair.last = &fut.next
851+
fut.timeout = time.Since(epoch) + conn.opts.Timeout
852+
} else {
853+
fut = conn.getFutureImp(reqid, false)
854+
}
855+
856+
return fut
857+
}
858+
796859
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
797860
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
798861
shard.rmut.Lock()
799-
fut = conn.fetchFutureImp(reqid)
862+
fut = conn.getFutureImp(reqid, true)
800863
shard.rmut.Unlock()
801864
return fut
802865
}
803866

804-
func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
867+
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
805868
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
806869
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
807870
pair := &shard.requests[pos]
@@ -812,11 +875,13 @@ func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
812875
return nil
813876
}
814877
if fut.requestId == reqid {
815-
*root = fut.next
816-
if fut.next == nil {
817-
pair.last = root
818-
} else {
819-
fut.next = nil
878+
if fetch {
879+
*root = fut.next
880+
if fut.next == nil {
881+
pair.last = root
882+
} else {
883+
fut.next = nil
884+
}
820885
}
821886
return fut
822887
}
@@ -851,11 +916,11 @@ func (conn *Connection) timeouts() {
851916
} else {
852917
fut.next = nil
853918
}
854-
fut.err = ClientError{
919+
fut.SetError(ClientError{
855920
Code: ErrTimeouted,
856921
Msg: fmt.Sprintf("client timeout for request %d", fut.requestId),
857-
}
858-
fut.markReady(conn)
922+
})
923+
conn.markDone(fut)
859924
shard.bufmut.Unlock()
860925
}
861926
if pair.first != nil && pair.first.timeout < minNext {

const.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const (
6161
RLimitWait = 2
6262

6363
OkCode = uint32(0)
64+
PushCode = uint32(0x80)
6465
ErrorCodeBit = 0x8000
6566
PacketLengthBytes = 5
6667
ErSpaceExistsCode = 0xa

example_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,39 @@ func ExampleConnection_SelectAsync() {
126126
// Future 2 Data [[18 val 18 bla]]
127127
}
128128

129+
func ExampleFuture_GetIterator() {
130+
conn := example_connect()
131+
defer conn.Close()
132+
133+
const timeout = 3 * time.Second
134+
// Or any other Connection.*Async() call.
135+
fut := conn.Call17Async("push_func", []interface{}{4})
136+
137+
var it tarantool.ResponseIterator
138+
for it = fut.GetIterator().WithTimeout(timeout); it.Next(); {
139+
resp := it.Value()
140+
if resp.Code == tarantool.PushCode {
141+
// It is a push message.
142+
fmt.Printf("push message: %d\n", resp.Data[0].(uint64))
143+
} else if resp.Code == tarantool.OkCode {
144+
// It is a regular response.
145+
fmt.Printf("response: %d", resp.Data[0].(uint64))
146+
} else {
147+
fmt.Printf("an unexpected response code %d", resp.Code)
148+
}
149+
}
150+
if err := it.Err(); err != nil {
151+
fmt.Printf("error in call of push_func is %v", err)
152+
return
153+
}
154+
// Output:
155+
// push message: 1
156+
// push message: 2
157+
// push message: 3
158+
// push message: 4
159+
// response: 4
160+
}
161+
129162
func ExampleConnection_Ping() {
130163
conn := example_connect()
131164
defer conn.Close()

0 commit comments

Comments
 (0)