Skip to content

Commit

Permalink
Merge pull request ipfs-cluster#597 from ipfs/race-fixes-picks
Browse files Browse the repository at this point in the history
Race fixes picks
  • Loading branch information
hsanjuan committed Oct 30, 2018
2 parents e787773 + 1bc7f5a commit a0dc644
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 23 deletions.
9 changes: 9 additions & 0 deletions api/types.go
Expand Up @@ -829,6 +829,15 @@ func (pins PinSerial) ToPin() Pin {
}
}

// Clone returns a deep copy of the PinSerial.
func (pins PinSerial) Clone() PinSerial {
new := pins // this copy all the simple fields.
// slices are pointers. We need to explicitally copy them.
new.Allocations = make([]string, len(pins.Allocations))
copy(new.Allocations, pins.Allocations)
return new
}

// NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node
type NodeWithMeta struct {
Expand Down
36 changes: 22 additions & 14 deletions cluster.go
Expand Up @@ -56,15 +56,18 @@ type Cluster struct {
allocator PinAllocator
informer Informer

doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup

// peerAdd
paMux sync.Mutex

// shutdown function and related variables
shutdownLock sync.Mutex
shutdownB bool
removed bool
doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup

paMux sync.Mutex
}

// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
Expand Down Expand Up @@ -313,6 +316,8 @@ func (c *Cluster) watchPeers() {
}

if !hasMe {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty())
c.removed = true
go c.Shutdown()
Expand Down Expand Up @@ -1093,11 +1098,12 @@ func (c *Cluster) Peers() []api.ID {
logger.Error("an empty list of peers will be returned")
return []api.ID{}
}
lenMembers := len(members)

peersSerial := make([]api.IDSerial, len(members), len(members))
peers := make([]api.ID, len(members), len(members))
peersSerial := make([]api.IDSerial, lenMembers, lenMembers)
peers := make([]api.ID, lenMembers, lenMembers)

ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)

errs := c.rpcClient.MultiCall(
Expand Down Expand Up @@ -1133,13 +1139,14 @@ func (c *Cluster) globalPinInfoCid(method string, h cid.Cid) (api.GlobalPinInfo,
logger.Error(err)
return api.GlobalPinInfo{}, err
}
lenMembers := len(members)

replies := make([]api.PinInfoSerial, len(members), len(members))
replies := make([]api.PinInfoSerial, lenMembers, lenMembers)
arg := api.Pin{
Cid: h,
}

ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)

errs := c.rpcClient.MultiCall(
Expand Down Expand Up @@ -1190,18 +1197,19 @@ func (c *Cluster) globalPinInfoCid(method string, h cid.Cid) (api.GlobalPinInfo,
}

func (c *Cluster) globalPinInfoSlice(method string) ([]api.GlobalPinInfo, error) {
var infos []api.GlobalPinInfo
infos := make([]api.GlobalPinInfo, 0)
fullMap := make(map[string]api.GlobalPinInfo)

members, err := c.consensus.Peers()
if err != nil {
logger.Error(err)
return []api.GlobalPinInfo{}, err
}
lenMembers := len(members)

replies := make([][]api.PinInfoSerial, len(members), len(members))
replies := make([][]api.PinInfoSerial, lenMembers, lenMembers)

ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)

errs := c.rpcClient.MultiCall(
Expand Down
17 changes: 13 additions & 4 deletions consensus/raft/log_op.go
Expand Up @@ -36,29 +36,38 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
panic("received unexpected state type")
}

// Copy the Cid. We are about to pass it to go-routines
// that will make things with it (read its fields). However,
// as soon as ApplyTo is done, the next operation will be deserealized
// on top of "op". This can cause data races with the slices in
// api.PinSerial, which don't get copied when passed.
pinS := op.Cid.Clone()

pin := pinS.ToPin()

switch op.Type {
case LogOpPin:
err = state.Add(op.Cid.ToPin())
err = state.Add(pin)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Track",
op.Cid,
pinS,
&struct{}{},
nil)
case LogOpUnpin:
err = state.Rm(op.Cid.ToPin().Cid)
err = state.Rm(pin.Cid)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Untrack",
op.Cid,
pinS,
&struct{}{},
nil)
default:
Expand Down
6 changes: 3 additions & 3 deletions ipfsconn/ipfshttp/ipfshttp.go
Expand Up @@ -328,7 +328,7 @@ func (ipfs *Connector) pinLsHandler(w http.ResponseWriter, r *http.Request) {
Type: "recursive",
}
} else {
var pins []api.PinSerial
pins := make([]api.PinSerial, 0)
err := ipfs.rpcClient.Call(
"",
"Cluster",
Expand Down Expand Up @@ -429,7 +429,7 @@ func (ipfs *Connector) addHandler(w http.ResponseWriter, r *http.Request) {
}

func (ipfs *Connector) repoStatHandler(w http.ResponseWriter, r *http.Request) {
var peers []peer.ID
peers := make([]peer.ID, 0)
err := ipfs.rpcClient.Call(
"",
"Cluster",
Expand Down Expand Up @@ -770,7 +770,7 @@ func (ipfs *Connector) apiURL() string {
func (ipfs *Connector) ConnectSwarms() error {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
var idsSerial []api.IDSerial
idsSerial := make([]api.IDSerial, 0)
err := ipfs.rpcClient.Call(
"",
"Cluster",
Expand Down
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -45,9 +45,9 @@
},
{
"author": "hsanjuan",
"hash": "Qmas5ivzqmatd1tJWppjcupYY8YS2DJHJZxSMsMoLbCQ2R",
"hash": "QmPYiV9nwnXPxdn9zDgY4d9yaHwTS414sUb1K6nvQVHqqo",
"name": "go-libp2p-gorpc",
"version": "1.0.23"
"version": "1.0.24"
},
{
"author": "libp2p",
Expand Down

0 comments on commit a0dc644

Please sign in to comment.