Skip to content

Commit

Permalink
Merge pull request #6 from neverchanje/master
Browse files Browse the repository at this point in the history
bugfix: revert pr#3 and fix connections being closed during TableConn…
  • Loading branch information
Wu Tao committed Apr 8, 2018
2 parents 4981c03 + 328633c commit ef019bb
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 78 deletions.
25 changes: 3 additions & 22 deletions pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (p *pegasusTableConnector) handleQueryConfigResp(resp *replication.QueryCfg
}

p.mu.Lock()
defer p.mu.Unlock()

p.appId = resp.AppID

if len(resp.Partitions) > len(p.parts) {
Expand All @@ -123,21 +125,9 @@ func (p *pegasusTableConnector) handleQueryConfigResp(resp *replication.QueryCfg
}
p.parts[pconf.Pid.PartitionIndex] = r
}
p.mu.Unlock()

p.removeUnusedReplicaConnections(resp.Partitions)
return nil
}

// Remove unused connection to replica server according to routing table.
func (p *pegasusTableConnector) removeUnusedReplicaConnections(pconfs []*replication.PartitionConfiguration) {
primaries := make(map[string]bool)
for _, pconf := range pconfs {
primaries[pconf.Primary.GetAddress()] = true
}
p.replica.RemoveUnused(primaries)
}

func validateHashKey(hashKey []byte) error {
if len(hashKey) == 0 || hashKey == nil {
return fmt.Errorf("InvalidParameter: hash key must not be empty")
Expand Down Expand Up @@ -332,16 +322,7 @@ func (p *pegasusTableConnector) getPartition(hashKey []byte) (*base.Gpid, *sessi

func (p *pegasusTableConnector) Close() error {
p.tom.Kill(errors.New("table closed"))
err := func() error {
err := p.meta.Close()
if err == nil {
for _, r := range p.parts {
<-r.session.Close()
}
}
return err
}()
return wrapError(err, OpClose)
return nil
}

func (p *pegasusTableConnector) handleError(err error, gpid *base.Gpid, replica *session.ReplicaSession) error {
Expand Down
27 changes: 26 additions & 1 deletion pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"errors"
"math"
"testing"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/base"
"github.com/XiaoMi/pegasus-go-client/idl/replication"
"github.com/XiaoMi/pegasus-go-client/rpc"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/XiaoMi/pegasus-go-client/idl/replication"
)

// This is the integration test of the client. Please start the pegasus onebox
Expand Down Expand Up @@ -168,3 +170,26 @@ func TestPegasusTableConnector_HandleInvalidQueryConfigResp(t *testing.T) {
assert.Equal(t, len(p.parts), 4)
}
}

func TestPegasusTableConnector_Close(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()

// Ensure: Closing table doesn't close the connections.

cfg := Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
}

client := NewClient(cfg)
defer client.Close()

tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
ptb, _ := tb.(*pegasusTableConnector)
ptb.replica.GetReplica("0.0.0.0:34801")
time.Sleep(time.Second) // wait 1sec for connection ready.

ptb.Close()
assert.Equal(t, ptb.replica.GetReplica("0.0.0.0:34801").ConnState(), rpc.ConnStateReady)
}
2 changes: 1 addition & 1 deletion session/meta_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// MetaManager manages the list of metas, but only the leader will it requests to.
// If the one is not the actual leader, it will retry with another one.
// If the one is not the actual leader, it will retry with another.
type MetaManager struct {
logger pegalog.Logger

Expand Down
19 changes: 0 additions & 19 deletions session/replica_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,3 @@ func (rm *ReplicaManager) ReplicaCount() int {

return len(rm.replicas)
}

func (rm *ReplicaManager) RemoveUnused(inused map[string]bool) {
rm.Lock()
defer rm.Unlock()

unused := make([]string, 0)

for addr, r := range rm.replicas {
if _, ok := inused[addr]; !ok {
// close if it's not inused.
r.Close()
unused = append(unused, addr)
}
}

for _, addr := range unused {
delete(rm.replicas, addr)
}
}
32 changes: 1 addition & 31 deletions session/replica_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package session

import (
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
Expand All @@ -25,39 +24,10 @@ func TestReplicaManager_Close(t *testing.T) {
defer leaktest.Check(t)()

rm := NewReplicaManager()
defer rm.Close()

rm.GetReplica("127.0.0.1:34801")
rm.GetReplica("127.0.0.1:34802")
rm.GetReplica("127.0.0.1:34803")

time.Sleep(time.Second)
}

func TestReplicaManager_RemoveUnused(t *testing.T) {
defer leaktest.Check(t)()

rm := NewReplicaManager()
defer rm.Close()

{
r1 := rm.GetReplica("127.0.0.1:34802")
assert.Equal(t, len(rm.replicas), 1)

rm.RemoveUnused(map[string]bool{})
assert.Equal(t, len(rm.replicas), 0)

r2 := rm.GetReplica("127.0.0.1:34802")
assert.NotEqual(t, r1, r2)
}

{
rm.GetReplica("127.0.0.1:34801")
rm.GetReplica("127.0.0.1:34802")
rm.GetReplica("127.0.0.1:34803")
assert.Equal(t, len(rm.replicas), 3)

rm.RemoveUnused(map[string]bool{"127.0.0.1:34802": true})
assert.Equal(t, len(rm.replicas), 1)
}
rm.Close()
}
4 changes: 0 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ func (n *nodeSession) loopForResponse() error {
// Invoke a rpc call.
// The call will be cancelled if any io error encountered.
func (n *nodeSession) callWithGpid(ctx context.Context, gpid *base.Gpid, args rpcRequestArgs, name string) (result rpcResponseResult, err error) {
if cstate := n.ConnState(); cstate != rpc.ConnStateReady {
return nil, fmt.Errorf("failed to send request to this node [%s, %s, state: %s]", n.ntype, n.addr, cstate)
}

rcall := &rpcCall{}
rcall.args = args
rcall.name = name
Expand Down

0 comments on commit ef019bb

Please sign in to comment.