diff --git a/services/keepstore/rados_volume.go b/services/keepstore/rados_volume.go index 2ce9e1a0d6..d705b4157a 100644 --- a/services/keepstore/rados_volume.go +++ b/services/keepstore/rados_volume.go @@ -66,19 +66,22 @@ var ( ) const ( - RFC3339NanoMaxLen = 36 - RadosLockNotFound = -2 - RadosLockBusy = -16 - RadosLockExist = -17 - RadosLockLocked = 0 - RadosLockUnlocked = 0 - RadosLockData = "keep_lock_data" - RadosLockTouch = "keep_lock_touch" - RadosXattrMtime = "keep_mtime" - RadosXattrTrash = "keep_trash" - RadosKeepNamespace = "keep" - DefaultRadosIndexWorkers = 64 - DefaultRadosEmptyTrashWorkers = 1 + RFC3339NanoMaxLen = 36 + RadosLockNotFound = -2 + RadosLockBusy = -16 + RadosLockExist = -17 + RadosLockLocked = 0 + RadosLockUnlocked = 0 + RadosLockData = "keep_lock_data" + RadosLockTouch = "keep_lock_touch" + RadosXattrMtime = "keep_mtime" + RadosXattrTrash = "keep_trash" + RadosKeepNamespace = "keep" + DefaultRadosIndexWorkers = 64 + DefaultRadosEmptyTrashWorkers = 1 + DefaultRadosReadTimeoutSeconds = 3600 + DefaultRadosWriteTimeoutSeconds = 3600 + DefaultRadosMetadataTimeoutSeconds = 60 ) type radosVolumeAdder struct { @@ -159,17 +162,17 @@ func init() { flag.DurationVar( &radosReadTimeout, "rados-read-timeout", - 60*time.Minute, + DefaultRadosReadTimeoutSeconds*time.Second, "Timeout for read operations.") flag.DurationVar( &radosWriteTimeout, "rados-write-timeout", - 60*time.Minute, + DefaultRadosWriteTimeoutSeconds*time.Second, "Timeout for write operations.") flag.DurationVar( &radosMetadataTimeout, "rados-metadata-timeout", - 1*time.Minute, + DefaultRadosMetadataTimeoutSeconds*time.Second, "Timeout for metadata operations.") RadosMinReadTimeout = arvados.Duration(1 * time.Second) @@ -190,6 +193,7 @@ type radosConn interface { GetClusterStats() (stat rados.ClusterStat, err error) ListPools() (names []string, err error) OpenIOContext(pool string) (radosIOContext, error) + Shutdown() } type radosIOContext interface { @@ -340,6 +344,9 @@ func (v *RadosVolume) Start() error { if err != nil { return fmt.Errorf("rados: error getting rados cluster stats: %v", err) } + if cs.Kb_avail == 0 { + log.Warnf("rados: cluster has no space available") + } log.Infof("rados: cluster %s has %.1f GiB with %.1f GiB used in %d objects and %.1f GiB available", v.Cluster, float64(cs.Kb)/1024/1024, float64(cs.Kb_used)/1024/1024, cs.Num_objects, float64(cs.Kb_avail)/1024/1024) pools, err := v.conn.ListPools() @@ -360,18 +367,20 @@ func (v *RadosVolume) Start() error { ioctx.SetNamespace(RadosKeepNamespace) v.ioctx = ioctx + + ps, err := v.ioctx.GetPoolStats() + if err != nil { + return fmt.Errorf("rados: error getting pool stats: %v", err) + } + log.Infof("rados: pool %s has %.1f GiB used in %d objects", v.Pool, float64(ps.Num_kb)/1024/1024, ps.Num_objects) + if v.RadosReplication == 0 { // RadosReplication was not set or was explicitly set to 0 - determine it from the PoolStats if we can v.RadosReplication = 1 - ps, err := v.ioctx.GetPoolStats() - if err != nil { - log.Warnf("rados: failed to get pool stats, set RadosReplication to 1: %v", err) - } else { - if ps.Num_objects > 0 { - actualReplication := float64(ps.Num_object_clones) / float64(ps.Num_objects) - v.RadosReplication = int(math.Ceil(actualReplication)) - log.Infof("rados: pool has %d objects and %d object clones for an actual replication of %.2f, set RadosReplication to %d", ps.Num_objects, ps.Num_object_clones, actualReplication, v.RadosReplication) - } + if ps.Num_objects > 0 { + actualReplication := float64(ps.Num_object_clones) / float64(ps.Num_objects) + v.RadosReplication = int(math.Ceil(actualReplication)) + log.Infof("rados: pool has %d objects and %d object clones for an actual replication of %.2f, set RadosReplication to %d", ps.Num_objects, ps.Num_object_clones, actualReplication, v.RadosReplication) } } @@ -460,14 +469,18 @@ func (v *RadosVolume) Get(ctx context.Context, loc string, buf []byte) (n int, e GetReadUntil: for { read_bytes := 0 + readComplete := make(chan struct{}) + go func() { + read_bytes, err = v.ioctx.Read(loc, buf[n:], offset) + err = v.translateError(err) + close(readComplete) + }() select { case <-ctx.Done(): err = ctx.Err() radosTracef("rados: Get loc=%s len(buf)=%d size=%d failed to read before context expired, returning n=%d err=%v", loc, len(buf), size, n, err) return - default: - read_bytes, err = v.ioctx.Read(loc, buf[n:], offset) - err = v.translateError(err) + case <-readComplete: v.stats.Tick(&v.stats.Ops, &v.stats.GetOps) v.stats.TickErr(err) if err != nil { @@ -500,27 +513,14 @@ GetReadUntil: func (v *RadosVolume) Compare(ctx context.Context, loc string, expect []byte) (err error) { radosTracef("rados: Compare loc=%s len(expect)=%d", loc, len(expect)) - // get size of stored block - size, err := v.size(loc) - if err != nil { - radosTracef("rados: Compare loc=%s failed to get size of object, returning err=%v", loc, err) - return - } - - // compare size of stored block to expected data length - if size != uint64(len(expect)) { - err = DiskHashError - radosTracef("rados: Compare loc=%s size %d is not equal to length of expected data %d, returning err=%v", loc, size, len(expect), err) - return - } - // get stored block - buf := make([]byte, size) + buf := make([]byte, BlockSize) n, err := v.Get(ctx, loc, buf) if err != nil { radosTracef("rados: Compare loc=%s failed to get object, returning err=%v", loc, err) return } + buf = buf[:n] // compare size of returned data to expected data length if n != len(expect) { @@ -580,12 +580,15 @@ func (v *RadosVolume) Put(ctx context.Context, loc string, block []byte) (err er return MethodDisabledError } - // get a lock with create = true so that we get the lock even if the + // Obtain a lock with create = true so that we get the lock even if the // object does not yet exist (N.B. in this case an empty object will be created // to facilitate the lock, but that is ok as we are about to write to it) lockCookie, err := v.lockExclusive(ctx, loc, RadosLockData, "Put", v.WriteTimeout, true) if err != nil { radosTracef("rados: Put loc=%s len(block)=%d failed to obtain lock, returning err=%v", loc, len(block), err) + if err == rados.RadosErrorPermissionDenied { + log.Errorf("rados: got permission denied attempting to obtain a lock. please ensure ceph client '%s' has 'rwx' permission on ceph pool '%s' ('rw' is not sufficient)", v.User, v.Pool) + } return } defer func() { @@ -609,15 +612,26 @@ func (v *RadosVolume) Put(ctx context.Context, loc string, block []byte) (err er // Only write non-empty blocks (as some versions of go-ceph have a problem writing empty buffers) // Note that the lock will have already created an empty object so the write is already done. if len(block) > 0 { - err = v.ioctx.WriteFull(loc, block) - err = v.translateError(err) - v.stats.Tick(&v.stats.Ops, &v.stats.PutOps) - v.stats.TickErr(err) - if err != nil { - radosTracef("rados: Put loc=%s len(block)=%d failed to write block, returning err=%v", loc, len(block), err) + writeComplete := make(chan struct{}) + go func() { + err = v.ioctx.WriteFull(loc, block) + err = v.translateError(err) + close(writeComplete) + }() + select { + case <-ctx.Done(): + err = ctx.Err() + radosTracef("rados: Put loc=%s len(block)=%d failed to write before context expired, returning err=%v", loc, len(block), err) return + case <-writeComplete: + v.stats.Tick(&v.stats.Ops, &v.stats.PutOps) + v.stats.TickErr(err) + if err != nil { + radosTracef("rados: Put loc=%s len(block)=%d failed to write block, returning err=%v", loc, len(block), err) + return + } + v.stats.TickOutBytes(uint64(len(block))) } - v.stats.TickOutBytes(uint64(len(block))) } // Since we have just put this object, it is no longer trash. @@ -987,10 +1001,18 @@ func (v *RadosVolume) Status() (vs *VolumeStatus) { ps, err := v.ioctx.GetPoolStats() if err != nil { log.Printf("rados: %s: failed to get pool stats, Status will not report BytesUsed correctly: %v", v, err) - } else { - radosTracef("rados: Status() has pool stats %+v", ps) + return + } + radosTracef("rados: Status() has pool stats %+v", ps) + if ps.Num_bytes > 0 { + // the generic tests do not ever want BytesUsed to be 0 + // so we must not set it to 0 even if we get that from + // the backend. Note that the pool stats can lag behind + // reality somewhat, so Num_bytes may still be zero if + // an object has just been added to an empty pool. vs.BytesUsed = ps.Num_bytes } + radosTracef("rados: Status() complete, returning vs=%+v", vs) return } @@ -1019,9 +1041,11 @@ func (v *RadosVolume) Writable() bool { // Replication returns the storage redundancy of the // underlying device. It will be passed on to clients in // responses to PUT requests. -func (v *RadosVolume) Replication() int { - radosTracef("rados: Replication") - return v.RadosReplication +func (v *RadosVolume) Replication() (replication int) { + radosTracef("rados: Replication()") + replication = v.RadosReplication + radosTracef("rados: Replication() complete, returning replication=%d", replication) + return } // EmptyTrash looks for trashed blocks that exceeded TrashLifetime @@ -1070,7 +1094,7 @@ func (v *RadosVolume) EmptyTrash() { // actually delete the object err = v.delete(loc) if err != nil { - log.Warn("rados: %s: EmptyTrash failed to delete %s: %v", v, loc, err) + log.Warnf("rados: %s: EmptyTrash failed to delete %s: %v", v, loc, err) return } atomic.AddInt64(&bytesDeleted, int64(size)) @@ -1095,7 +1119,7 @@ func (v *RadosVolume) EmptyTrash() { workers := theConfig.EmptyTrashWorkers if workers <= 0 { workers = DefaultRadosEmptyTrashWorkers - log.Warn(fmt.Sprintf("rados: cannot EmptyTrash with %d EmptyTrashWorkers, using %d instead", theConfig.EmptyTrashWorkers, workers)) + log.Warnf("rados: cannot EmptyTrash with %d EmptyTrashWorkers, using %d instead", theConfig.EmptyTrashWorkers, workers) } err := v.listObjects(filterFunc, mapFunc, reduceFunc, workers) if err != nil { @@ -1204,7 +1228,7 @@ func (v *RadosVolume) lockShared(ctx context.Context, loc string, name string, d // will fail and return an error unless the create argument // is set to true. func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc string, timeout arvados.Duration, create bool, exclusive bool) (lockCookie string, err error) { - radosTracef("rados: lock") + radosTracef("rados: lock loc=%s name=%s desc=%s timeout=%v create=%v exclusive=%v", loc, name, desc, timeout, create, exclusive) locking_finished := make(chan bool) lockCookie = uuid.Must(uuid.NewV4()).String() @@ -1215,7 +1239,6 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st for !locked { select { case <-ctx.Done(): - close(locking_finished) return default: res := 0 @@ -1226,9 +1249,11 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st res, err = v.ioctx.LockShared(loc, name, lockCookie, "", desc, time.Duration(timeout), nil) radosTracef("rados: lock call to rados LockShared for %s lock on loc=%s with lockCookie=%s returned res=%v err=%v", name, loc, lockCookie, res, err) } + err = v.translateError(err) v.stats.Tick(&v.stats.Ops, &v.stats.LockOps) v.stats.TickErr(err) if err != nil { + close(locking_finished) return } switch res { @@ -1266,39 +1291,44 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st locked = true default: err = fmt.Errorf("rados: attempting to get %s lock for %s on object %s: unexpected non-error return value %d from underlying lock function", name, desc, loc, res) + close(locking_finished) return } } } close(locking_finished) + return }() // block on either locking_finished or ctx.Done() select { case <-locking_finished: case <-ctx.Done(): + close(locking_finished) log.Warnf("rados: abandoning attempt to obtain exclusive %s lock for %s on object %s: %s", name, desc, loc, ctx.Err()) err = ctx.Err() } - radosTracef("rados: lock %s on loc=%s returning with lockCookie=%s err=%v", name, loc, lockCookie, err) + radosTracef("rados: lock loc=%s name=%s desc=%s timeout=%v create=%v exclusive=%v complete, returning lockCookie=%s err=%v", loc, name, desc, timeout, create, exclusive, lockCookie, err) return } // unlock previously obtained data lock func (v *RadosVolume) unlock(loc string, name string, lockCookie string) (err error) { - radosTracef("rados: unlock") + radosTracef("rados: unlock loc=%s name=%s lockCookie=%s", loc, name, lockCookie) res := 0 res, err = v.ioctx.Unlock(loc, name, lockCookie) err = v.translateError(err) v.stats.Tick(&v.stats.Ops, &v.stats.UnlockOps) v.stats.TickErr(err) if err != nil { + radosTracef("rados: unlock loc=%s name=%s lockCookie=%s got error unlocking, returning err=%v", loc, name, lockCookie, err) return } if res == RadosLockNotFound { err = fmt.Errorf("rados: attempting to unlock %s lock on object %s, lock was not held for cookie '%s'", name, loc, lockCookie) } + radosTracef("rados: unlock loc=%s name=%s lockCookie=%s complete, returning err=%v", loc, name, lockCookie, err) return } diff --git a/services/keepstore/rados_volume_mock_test.go b/services/keepstore/rados_volume_mock_test.go index 26b572b3df..2b92208771 100644 --- a/services/keepstore/rados_volume_mock_test.go +++ b/services/keepstore/rados_volume_mock_test.go @@ -151,6 +151,10 @@ func (conn *radosMockConn) OpenIOContext(pool string) (ioctx radosIOContext, err return } +func (conn *radosMockConn) Shutdown() { + radosTracef("radosmock: conn.Shutdown()") +} + type radosMockIoctx struct { *radosMockConn pool string @@ -314,6 +318,11 @@ func (ioctx *radosMockIoctx) Read(oid string, data []byte, offset uint64) (n int return } n = copy(data, obj.data[offset:]) + + // pause here to facilitate race tests + radosTracef("radosmock: ioctx.Read oid=%s len(data)=%d offset=%d calling unlockAndRace()", oid, len(data), offset) + ioctx.b.unlockAndRace() + radosTracef("radosmock: ioctx.Read oid=%s len(data)=%d offset=%d complete, returning n=%d err=%v", oid, len(data), offset, n, err) return } @@ -457,6 +466,11 @@ func (ioctx *radosMockIoctx) WriteFull(oid string, data []byte) (err error) { if !ok { ioctx.objects[oid] = newRadosStubObj([]byte{}) } + + // pause here to facilitate race tests + radosTracef("radosmock: ioctx.WriteFull oid=%s len(data)=%d calling unlockAndRace()", oid, len(data)) + ioctx.b.unlockAndRace() + obj.data = make([]byte, len(data)) n := copy(obj.data, data) if n != len(data) { diff --git a/services/keepstore/rados_volume_test.go b/services/keepstore/rados_volume_test.go index a87704abff..2b538c7799 100644 --- a/services/keepstore/rados_volume_test.go +++ b/services/keepstore/rados_volume_test.go @@ -28,10 +28,12 @@ import ( "context" "encoding/json" "flag" + "fmt" "sync" "testing" "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/ghodss/yaml" check "gopkg.in/check.v1" ) @@ -113,68 +115,99 @@ func newRadosStubBackend(numReplicas uint64) *radosStubBackend { } func (h *radosStubBackend) unlockAndRace() { + radosTracef("rados stub: unlockAndRace()") if h.race == nil { + radosTracef("rados stub: unlockAndRace() race is nil, returning") return } + + radosTracef("rados stub: unlockAndRace() unlocking backend") h.Unlock() + // Signal caller that race is starting by reading from // h.race. If we get a channel, block until that channel is // ready to receive. If we get nil (or h.race is closed) just // proceed. - if c := <-h.race; c != nil { + radosTracef("rados stub: unlockAndRace() reading from h.race") + c := <-h.race + radosTracef("rados stub: unlockAndRace() read from h.race") + if c != nil { + radosTracef("rados stub: unlockAndRace() blocking while waiting to write to channel received on h.race") c <- struct{}{} } + + radosTracef("rados stub: unlockAndRace() locking backend") h.Lock() + + radosTracef("rados stub: unlockAndRace() completed, returning") } type TestableRadosVolume struct { *RadosVolume radosStubBackend *radosStubBackend t TB + useMock bool } func NewTestableRadosVolume(t TB, readonly bool, replication int) *TestableRadosVolume { - var v *RadosVolume - radosTracef("rados test: NewTestableRadosVolume readonly=%v replication=%d", readonly, replication) + var tv *TestableRadosVolume + radosTracef("radostest: NewTestableRadosVolume readonly=%v replication=%d", readonly, replication) radosStubBackend := newRadosStubBackend(uint64(replication)) pool := radosTestPool - if pool == "" { + useMock := pool == "" + + if useMock { // Connect using mock radosImplementation instead of real Ceph - log.Infof("rados test: using mock radosImplementation") + log.Infof("radostest: using mock radosImplementation") radosMock := &radosMockImpl{ b: radosStubBackend, } - v = &RadosVolume{ - Pool: RadosMockPool, - MonHost: RadosMockMonHost, - ReadOnly: readonly, - RadosReplication: replication, - rados: radosMock, + v := &RadosVolume{ + Pool: RadosMockPool, + MonHost: RadosMockMonHost, + ReadOnly: readonly, + RadosReplication: replication, + RadosIndexWorkers: 4, + ReadTimeout: arvados.Duration(10 * time.Second), + WriteTimeout: arvados.Duration(10 * time.Second), + MetadataTimeout: arvados.Duration(10 * time.Second), + rados: radosMock, + } + tv = &TestableRadosVolume{ + RadosVolume: v, + radosStubBackend: radosStubBackend, + t: t, + useMock: useMock, } } else { // Connect to real Ceph using the real radosImplementation - log.Infof("rados test: using real radosImplementation") - v = &RadosVolume{ - Pool: pool, - KeyringFile: radosKeyringFile, - MonHost: radosMonHost, - Cluster: radosCluster, - User: radosUser, - ReadOnly: readonly, - RadosReplication: replication, + log.Infof("radostest: using real radosImplementation") + v := &RadosVolume{ + Pool: pool, + KeyringFile: radosKeyringFile, + MonHost: radosMonHost, + Cluster: radosCluster, + User: radosUser, + ReadOnly: readonly, + RadosReplication: replication, + RadosIndexWorkers: 4, + ReadTimeout: arvados.Duration(DefaultRadosReadTimeoutSeconds * time.Second), + WriteTimeout: arvados.Duration(DefaultRadosWriteTimeoutSeconds * time.Second), + MetadataTimeout: arvados.Duration(DefaultRadosMetadataTimeoutSeconds * time.Second), + } + tv = &TestableRadosVolume{ + RadosVolume: v, + t: t, + useMock: useMock, } } - tv := &TestableRadosVolume{ - RadosVolume: v, - radosStubBackend: radosStubBackend, - t: t, - } - + // Start err := tv.Start() if err != nil { t.Error(err) } + return tv } @@ -215,139 +248,76 @@ func TestRadosVolumeReplication(t *testing.T) { } } -// func TestRadosVolumeCreateBlobRace(t *testing.T) { -// v := NewTestableRadosVolume(t, false, 3) -// defer v.Teardown() - -// var wg sync.WaitGroup - -// v.radosStubBackend.race = make(chan chan struct{}) - -// wg.Add(1) -// go func() { -// defer wg.Done() -// err := v.Put(context.Background(), TestHash, TestBlock) -// if err != nil { -// t.Error(err) -// } -// }() -// continuePut := make(chan struct{}) -// // Wait for the stub's Put to create the empty blob -// v.radosStubBackend.race <- continuePut -// wg.Add(1) -// go func() { -// defer wg.Done() -// buf := make([]byte, len(TestBlock)) -// _, err := v.Get(context.Background(), TestHash, buf) -// if err != nil { -// t.Error(err) -// } -// }() -// // Wait for the stub's Get to get the empty blob -// close(v.radosStubBackend.race) -// // Allow stub's Put to continue, so the real data is ready -// // when the volume's Get retries -// <-continuePut -// // Wait for Get() and Put() to finish -// wg.Wait() -// } - -// func TestRadosVolumeCreateBlobRaceDeadline(t *testing.T) { -// v := NewTestableRadosVolume(t, false, 3) -// defer v.Teardown() - -// v.PutRaw(TestHash, nil) - -// buf := new(bytes.Buffer) -// v.IndexTo("", buf) -// if buf.Len() != 0 { -// t.Errorf("Index %+q should be empty", buf.Bytes()) -// } - -// v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond)) - -// allDone := make(chan struct{}) -// go func() { -// defer close(allDone) -// buf := make([]byte, BlockSize) -// n, err := v.Get(context.Background(), TestHash, buf) -// if err != nil { -// t.Error(err) -// return -// } -// if n != 0 { -// t.Errorf("Got %+q, expected empty buf", buf[:n]) -// } -// }() -// select { -// case <-allDone: -// case <-time.After(time.Second): -// t.Error("Get should have stopped waiting for race when block was 2s old") -// } - -// buf.Reset() -// v.IndexTo("", buf) -// if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) { -// t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0") -// } -// } - -// func TestRadosVolumeContextCancelGet(t *testing.T) { -// testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { -// v.PutRaw(TestHash, TestBlock) -// _, err := v.Get(ctx, TestHash, make([]byte, BlockSize)) -// return err -// }) -// } - -// func TestRadosVolumeContextCancelPut(t *testing.T) { -// testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { -// return v.Put(ctx, TestHash, make([]byte, BlockSize)) -// }) -// } - -// func TestRadosVolumeContextCancelCompare(t *testing.T) { -// testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { -// v.PutRaw(TestHash, TestBlock) -// return v.Compare(ctx, TestHash, TestBlock2) -// }) -// } - -// func testRadosVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableRadosVolume) error) { -// v := NewTestableRadosVolume(t, false, 3) -// defer v.Teardown() -// v.radosStubBackend.race = make(chan chan struct{}) - -// ctx, cancel := context.WithCancel(context.Background()) -// allDone := make(chan struct{}) -// go func() { -// defer close(allDone) -// err := testFunc(ctx, v) -// if err != context.Canceled { -// t.Errorf("got %T %q, expected %q", err, err, context.Canceled) -// } -// }() -// releaseHandler := make(chan struct{}) -// select { -// case <-allDone: -// t.Error("testFunc finished without waiting for v.radosStubBackend.race") -// case <-time.After(10 * time.Second): -// t.Error("timed out waiting to enter handler") -// case v.radosStubBackend.race <- releaseHandler: -// } - -// cancel() - -// select { -// case <-time.After(10 * time.Second): -// t.Error("timed out waiting to cancel") -// case <-allDone: -// } - -// go func() { -// <-releaseHandler -// }() -// } +func TestRadosVolumeContextCancelGet(t *testing.T) { + testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { + v.PutRaw(TestHash, TestBlock) + _, err := v.Get(ctx, TestHash, make([]byte, BlockSize)) + return err + }) +} + +func TestRadosVolumeContextCancelPut(t *testing.T) { + testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { + return v.Put(ctx, TestHash, make([]byte, BlockSize)) + }) +} + +func TestRadosVolumeContextCancelCompare(t *testing.T) { + testRadosVolumeContextCancel(t, func(ctx context.Context, v *TestableRadosVolume) error { + v.PutRaw(TestHash, TestBlock) + return v.Compare(ctx, TestHash, TestBlock2) + }) +} + +func testRadosVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableRadosVolume) error) { + v := NewTestableRadosVolume(t, false, 3) + defer v.Teardown() + + if v.radosStubBackend == nil { + t.Skip("radostest: testRadosVolumeContextCancel can only be run with radosStubBackend") + } + v.radosStubBackend.race = make(chan chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + allDone := make(chan struct{}) + testFuncErr := make(chan error, 1) + go func() { + defer close(allDone) + defer close(testFuncErr) + err := testFunc(ctx, v) + if err != context.Canceled { + err = fmt.Errorf("radostest: testRadosVolumeContextCancel testFunc returned %T %q, expected %q", err, err, context.Canceled) + testFuncErr <- err + } + }() + releaseHandler := make(chan struct{}) + select { + case <-allDone: + t.Error("radostest: testRadosVolumeContextCancel testFunc finished without waiting for v.radosStubBackend.race") + case <-time.After(10 * time.Second): + t.Error("radostest: testRadosVolumeContextCancel timed out waiting to enter handler") + case v.radosStubBackend.race <- releaseHandler: + } + + radosTracef("radostest: testRadosVolumeContextCancel cancelling context") + cancel() + + select { + case <-time.After(10 * time.Second): + t.Error("radostest: testRadosVolumeContextCancel timed out waiting to cancel") + case <-allDone: + } + + err := <-testFuncErr + if err != nil { + t.Errorf("radostest: testRadosVolumeContextCancel error from testFunc: %v", err) + } + + go func() { + radosTracef("radostest: testRadosVolumeContextCancel receiving from releaseHandler to release the backend from the race") + <-releaseHandler + }() +} func (s *StubbedRadosSuite) TestStats(c *check.C) { stats := func() string { @@ -391,19 +361,32 @@ Volumes: c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) } -func (v *TestableRadosVolume) PutRaw(locator string, data []byte) { - radosTracef("radostest: PutRaw putting locator=%s len(data)=%d data='%s'", locator, len(data), data) +func (v *TestableRadosVolume) PutRaw(loc string, data []byte) { + radosTracef("radostest: PutRaw loc=%s len(data)=%d data='%s'", loc, len(data), data) - // need to temporarily disable ReadOnly status and restore it after the call to Put - defer func(ro bool) { - v.ReadOnly = ro - }(v.ReadOnly) + if v.ReadOnly { + // need to temporarily disable ReadOnly status and restore it after the call to Put + defer func(ro bool) { + v.ReadOnly = ro + }(v.ReadOnly) + v.ReadOnly = false + } + + if v.radosStubBackend != nil && v.radosStubBackend.race != nil { + // also need to temporarily disable backend race + defer func(race chan chan struct{}) { + v.radosStubBackend.race = race + }(v.radosStubBackend.race) + v.radosStubBackend.race = nil + } - v.ReadOnly = false - err := v.Put(context.Background(), locator, data) + err := v.Put(context.Background(), loc, data) if err != nil { - v.t.Fatalf("PutRaw failed to put locator %s: %s", locator, err) + v.t.Fatalf("radostest: PutRaw failed to put loc %s: %s", loc, err) } + + radosTracef("radostest: PutRaw loc=%s len(data)=%d data='%s' complete, returning", loc, len(data), data) + return } func (v *TestableRadosVolume) TouchWithDate(loc string, mtime time.Time) { @@ -417,4 +400,70 @@ func (v *TestableRadosVolume) TouchWithDate(loc string, mtime time.Time) { return } -func (v *TestableRadosVolume) Teardown() {} +func (v *TestableRadosVolume) Teardown() { + if !v.useMock { + // When using a real Ceph pool we need to clean out all data + // after each test. + err := v.deleteAllObjects() + if err != nil { + v.t.Error(err) + } + } + // we also must call conn.Shutdown or else librados will leak threads like crazy every time we abandon a RadosVolume and create a new one + v.conn.Shutdown() +} + +type errListEntry struct { + err error +} + +func (ile *errListEntry) String() string { + return fmt.Sprintf("%s", ile.err) +} + +func (ile *errListEntry) Err() error { + return ile.err +} + +func (v *TestableRadosVolume) deleteAllObjects() (err error) { + radosTracef("radostest: deleteAllObjects()") + + // filter to include all objects + filterFunc := func(loc string) (bool, error) { + return true, nil + } + + // delete each loc and return empty listEntry + mapFunc := func(loc string) listEntry { + delErr := v.delete(loc) + if delErr != nil { + log.Warnf("radostest: deleteAllObjects() failed to delete %s: %v", loc, delErr) + return &errListEntry{ + err: delErr, + } + } + return &errListEntry{} + } + + // count number of objects deleted and errors + deleted := 0 + errors := 0 + reduceFunc := func(le listEntry) { + if le.Err() != nil { + errors++ + } else { + deleted++ + } + } + + workers := 1 + err = v.listObjects(filterFunc, mapFunc, reduceFunc, workers) + if err != nil { + log.Printf("radostest: deleteAllObjects() failed to listObjects: %s", err) + return + } + log.Infof("radostest: deleteAllObjects() deleted %d objects and had %d errors", deleted, errors) + + radosTracef("radostest: deleteAllObjects() finished, returning") + return +}