Skip to content

Commit

Permalink
Fix rados volume and tests to work with a real ceph cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jrandall committed Jul 6, 2018
1 parent 879220b commit a42ccc4
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 231 deletions.
154 changes: 92 additions & 62 deletions services/keepstore/rados_volume.go
Expand Up @@ -66,19 +66,22 @@ var (
)

const (
RFC3339NanoMaxLen = 36
RadosLockNotFound = -2
RadosLockBusy = -16
RadosLockExist = -17
RadosLockLocked = 0
RadosLockUnlocked = 0
RadosLockData = "keep_lock_data"
RadosLockTouch = "keep_lock_touch"
RadosXattrMtime = "keep_mtime"
RadosXattrTrash = "keep_trash"
RadosKeepNamespace = "keep"
DefaultRadosIndexWorkers = 64
DefaultRadosEmptyTrashWorkers = 1
RFC3339NanoMaxLen = 36
RadosLockNotFound = -2
RadosLockBusy = -16
RadosLockExist = -17
RadosLockLocked = 0
RadosLockUnlocked = 0
RadosLockData = "keep_lock_data"
RadosLockTouch = "keep_lock_touch"
RadosXattrMtime = "keep_mtime"
RadosXattrTrash = "keep_trash"
RadosKeepNamespace = "keep"
DefaultRadosIndexWorkers = 64
DefaultRadosEmptyTrashWorkers = 1
DefaultRadosReadTimeoutSeconds = 3600
DefaultRadosWriteTimeoutSeconds = 3600
DefaultRadosMetadataTimeoutSeconds = 60
)

type radosVolumeAdder struct {
Expand Down Expand Up @@ -159,17 +162,17 @@ func init() {
flag.DurationVar(
&radosReadTimeout,
"rados-read-timeout",
60*time.Minute,
DefaultRadosReadTimeoutSeconds*time.Second,
"Timeout for read operations.")
flag.DurationVar(
&radosWriteTimeout,
"rados-write-timeout",
60*time.Minute,
DefaultRadosWriteTimeoutSeconds*time.Second,
"Timeout for write operations.")
flag.DurationVar(
&radosMetadataTimeout,
"rados-metadata-timeout",
1*time.Minute,
DefaultRadosMetadataTimeoutSeconds*time.Second,
"Timeout for metadata operations.")

RadosMinReadTimeout = arvados.Duration(1 * time.Second)
Expand All @@ -190,6 +193,7 @@ type radosConn interface {
GetClusterStats() (stat rados.ClusterStat, err error)
ListPools() (names []string, err error)
OpenIOContext(pool string) (radosIOContext, error)
Shutdown()
}

type radosIOContext interface {
Expand Down Expand Up @@ -340,6 +344,9 @@ func (v *RadosVolume) Start() error {
if err != nil {
return fmt.Errorf("rados: error getting rados cluster stats: %v", err)
}
if cs.Kb_avail == 0 {
log.Warnf("rados: cluster has no space available")
}
log.Infof("rados: cluster %s has %.1f GiB with %.1f GiB used in %d objects and %.1f GiB available", v.Cluster, float64(cs.Kb)/1024/1024, float64(cs.Kb_used)/1024/1024, cs.Num_objects, float64(cs.Kb_avail)/1024/1024)

pools, err := v.conn.ListPools()
Expand All @@ -360,18 +367,20 @@ func (v *RadosVolume) Start() error {
ioctx.SetNamespace(RadosKeepNamespace)

v.ioctx = ioctx

ps, err := v.ioctx.GetPoolStats()
if err != nil {
return fmt.Errorf("rados: error getting pool stats: %v", err)
}
log.Infof("rados: pool %s has %.1f GiB used in %d objects", v.Pool, float64(ps.Num_kb)/1024/1024, ps.Num_objects)

if v.RadosReplication == 0 {
// RadosReplication was not set or was explicitly set to 0 - determine it from the PoolStats if we can
v.RadosReplication = 1
ps, err := v.ioctx.GetPoolStats()
if err != nil {
log.Warnf("rados: failed to get pool stats, set RadosReplication to 1: %v", err)
} else {
if ps.Num_objects > 0 {
actualReplication := float64(ps.Num_object_clones) / float64(ps.Num_objects)
v.RadosReplication = int(math.Ceil(actualReplication))
log.Infof("rados: pool has %d objects and %d object clones for an actual replication of %.2f, set RadosReplication to %d", ps.Num_objects, ps.Num_object_clones, actualReplication, v.RadosReplication)
}
if ps.Num_objects > 0 {
actualReplication := float64(ps.Num_object_clones) / float64(ps.Num_objects)
v.RadosReplication = int(math.Ceil(actualReplication))
log.Infof("rados: pool has %d objects and %d object clones for an actual replication of %.2f, set RadosReplication to %d", ps.Num_objects, ps.Num_object_clones, actualReplication, v.RadosReplication)
}
}

Expand Down Expand Up @@ -460,14 +469,18 @@ func (v *RadosVolume) Get(ctx context.Context, loc string, buf []byte) (n int, e
GetReadUntil:
for {
read_bytes := 0
readComplete := make(chan struct{})
go func() {
read_bytes, err = v.ioctx.Read(loc, buf[n:], offset)
err = v.translateError(err)
close(readComplete)
}()
select {
case <-ctx.Done():
err = ctx.Err()
radosTracef("rados: Get loc=%s len(buf)=%d size=%d failed to read before context expired, returning n=%d err=%v", loc, len(buf), size, n, err)
return
default:
read_bytes, err = v.ioctx.Read(loc, buf[n:], offset)
err = v.translateError(err)
case <-readComplete:
v.stats.Tick(&v.stats.Ops, &v.stats.GetOps)
v.stats.TickErr(err)
if err != nil {
Expand Down Expand Up @@ -500,27 +513,14 @@ GetReadUntil:
func (v *RadosVolume) Compare(ctx context.Context, loc string, expect []byte) (err error) {
radosTracef("rados: Compare loc=%s len(expect)=%d", loc, len(expect))

// get size of stored block
size, err := v.size(loc)
if err != nil {
radosTracef("rados: Compare loc=%s failed to get size of object, returning err=%v", loc, err)
return
}

// compare size of stored block to expected data length
if size != uint64(len(expect)) {
err = DiskHashError
radosTracef("rados: Compare loc=%s size %d is not equal to length of expected data %d, returning err=%v", loc, size, len(expect), err)
return
}

// get stored block
buf := make([]byte, size)
buf := make([]byte, BlockSize)
n, err := v.Get(ctx, loc, buf)
if err != nil {
radosTracef("rados: Compare loc=%s failed to get object, returning err=%v", loc, err)
return
}
buf = buf[:n]

// compare size of returned data to expected data length
if n != len(expect) {
Expand Down Expand Up @@ -580,12 +580,15 @@ func (v *RadosVolume) Put(ctx context.Context, loc string, block []byte) (err er
return MethodDisabledError
}

// get a lock with create = true so that we get the lock even if the
// Obtain a lock with create = true so that we get the lock even if the
// object does not yet exist (N.B. in this case an empty object will be created
// to facilitate the lock, but that is ok as we are about to write to it)
lockCookie, err := v.lockExclusive(ctx, loc, RadosLockData, "Put", v.WriteTimeout, true)
if err != nil {
radosTracef("rados: Put loc=%s len(block)=%d failed to obtain lock, returning err=%v", loc, len(block), err)
if err == rados.RadosErrorPermissionDenied {
log.Errorf("rados: got permission denied attempting to obtain a lock. please ensure ceph client '%s' has 'rwx' permission on ceph pool '%s' ('rw' is not sufficient)", v.User, v.Pool)
}
return
}
defer func() {
Expand All @@ -609,15 +612,26 @@ func (v *RadosVolume) Put(ctx context.Context, loc string, block []byte) (err er
// Only write non-empty blocks (as some versions of go-ceph have a problem writing empty buffers)
// Note that the lock will have already created an empty object so the write is already done.
if len(block) > 0 {
err = v.ioctx.WriteFull(loc, block)
err = v.translateError(err)
v.stats.Tick(&v.stats.Ops, &v.stats.PutOps)
v.stats.TickErr(err)
if err != nil {
radosTracef("rados: Put loc=%s len(block)=%d failed to write block, returning err=%v", loc, len(block), err)
writeComplete := make(chan struct{})
go func() {
err = v.ioctx.WriteFull(loc, block)
err = v.translateError(err)
close(writeComplete)
}()
select {
case <-ctx.Done():
err = ctx.Err()
radosTracef("rados: Put loc=%s len(block)=%d failed to write before context expired, returning err=%v", loc, len(block), err)
return
case <-writeComplete:
v.stats.Tick(&v.stats.Ops, &v.stats.PutOps)
v.stats.TickErr(err)
if err != nil {
radosTracef("rados: Put loc=%s len(block)=%d failed to write block, returning err=%v", loc, len(block), err)
return
}
v.stats.TickOutBytes(uint64(len(block)))
}
v.stats.TickOutBytes(uint64(len(block)))
}

// Since we have just put this object, it is no longer trash.
Expand Down Expand Up @@ -987,10 +1001,18 @@ func (v *RadosVolume) Status() (vs *VolumeStatus) {
ps, err := v.ioctx.GetPoolStats()
if err != nil {
log.Printf("rados: %s: failed to get pool stats, Status will not report BytesUsed correctly: %v", v, err)
} else {
radosTracef("rados: Status() has pool stats %+v", ps)
return
}
radosTracef("rados: Status() has pool stats %+v", ps)
if ps.Num_bytes > 0 {
// the generic tests do not ever want BytesUsed to be 0
// so we must not set it to 0 even if we get that from
// the backend. Note that the pool stats can lag behind
// reality somewhat, so Num_bytes may still be zero if
// an object has just been added to an empty pool.
vs.BytesUsed = ps.Num_bytes
}

radosTracef("rados: Status() complete, returning vs=%+v", vs)
return
}
Expand Down Expand Up @@ -1019,9 +1041,11 @@ func (v *RadosVolume) Writable() bool {
// Replication returns the storage redundancy of the
// underlying device. It will be passed on to clients in
// responses to PUT requests.
func (v *RadosVolume) Replication() int {
radosTracef("rados: Replication")
return v.RadosReplication
func (v *RadosVolume) Replication() (replication int) {
radosTracef("rados: Replication()")
replication = v.RadosReplication
radosTracef("rados: Replication() complete, returning replication=%d", replication)
return
}

// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
Expand Down Expand Up @@ -1070,7 +1094,7 @@ func (v *RadosVolume) EmptyTrash() {
// actually delete the object
err = v.delete(loc)
if err != nil {
log.Warn("rados: %s: EmptyTrash failed to delete %s: %v", v, loc, err)
log.Warnf("rados: %s: EmptyTrash failed to delete %s: %v", v, loc, err)
return
}
atomic.AddInt64(&bytesDeleted, int64(size))
Expand All @@ -1095,7 +1119,7 @@ func (v *RadosVolume) EmptyTrash() {
workers := theConfig.EmptyTrashWorkers
if workers <= 0 {
workers = DefaultRadosEmptyTrashWorkers
log.Warn(fmt.Sprintf("rados: cannot EmptyTrash with %d EmptyTrashWorkers, using %d instead", theConfig.EmptyTrashWorkers, workers))
log.Warnf("rados: cannot EmptyTrash with %d EmptyTrashWorkers, using %d instead", theConfig.EmptyTrashWorkers, workers)
}
err := v.listObjects(filterFunc, mapFunc, reduceFunc, workers)
if err != nil {
Expand Down Expand Up @@ -1204,7 +1228,7 @@ func (v *RadosVolume) lockShared(ctx context.Context, loc string, name string, d
// will fail and return an error unless the create argument
// is set to true.
func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc string, timeout arvados.Duration, create bool, exclusive bool) (lockCookie string, err error) {
radosTracef("rados: lock")
radosTracef("rados: lock loc=%s name=%s desc=%s timeout=%v create=%v exclusive=%v", loc, name, desc, timeout, create, exclusive)
locking_finished := make(chan bool)
lockCookie = uuid.Must(uuid.NewV4()).String()

Expand All @@ -1215,7 +1239,6 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st
for !locked {
select {
case <-ctx.Done():
close(locking_finished)
return
default:
res := 0
Expand All @@ -1226,9 +1249,11 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st
res, err = v.ioctx.LockShared(loc, name, lockCookie, "", desc, time.Duration(timeout), nil)
radosTracef("rados: lock call to rados LockShared for %s lock on loc=%s with lockCookie=%s returned res=%v err=%v", name, loc, lockCookie, res, err)
}
err = v.translateError(err)
v.stats.Tick(&v.stats.Ops, &v.stats.LockOps)
v.stats.TickErr(err)
if err != nil {
close(locking_finished)
return
}
switch res {
Expand Down Expand Up @@ -1266,39 +1291,44 @@ func (v *RadosVolume) lock(ctx context.Context, loc string, name string, desc st
locked = true
default:
err = fmt.Errorf("rados: attempting to get %s lock for %s on object %s: unexpected non-error return value %d from underlying lock function", name, desc, loc, res)
close(locking_finished)
return
}
}
}
close(locking_finished)
return
}()

// block on either locking_finished or ctx.Done()
select {
case <-locking_finished:
case <-ctx.Done():
close(locking_finished)
log.Warnf("rados: abandoning attempt to obtain exclusive %s lock for %s on object %s: %s", name, desc, loc, ctx.Err())
err = ctx.Err()
}

radosTracef("rados: lock %s on loc=%s returning with lockCookie=%s err=%v", name, loc, lockCookie, err)
radosTracef("rados: lock loc=%s name=%s desc=%s timeout=%v create=%v exclusive=%v complete, returning lockCookie=%s err=%v", loc, name, desc, timeout, create, exclusive, lockCookie, err)
return
}

// unlock previously obtained data lock
func (v *RadosVolume) unlock(loc string, name string, lockCookie string) (err error) {
radosTracef("rados: unlock")
radosTracef("rados: unlock loc=%s name=%s lockCookie=%s", loc, name, lockCookie)
res := 0
res, err = v.ioctx.Unlock(loc, name, lockCookie)
err = v.translateError(err)
v.stats.Tick(&v.stats.Ops, &v.stats.UnlockOps)
v.stats.TickErr(err)
if err != nil {
radosTracef("rados: unlock loc=%s name=%s lockCookie=%s got error unlocking, returning err=%v", loc, name, lockCookie, err)
return
}
if res == RadosLockNotFound {
err = fmt.Errorf("rados: attempting to unlock %s lock on object %s, lock was not held for cookie '%s'", name, loc, lockCookie)
}
radosTracef("rados: unlock loc=%s name=%s lockCookie=%s complete, returning err=%v", loc, name, lockCookie, err)
return
}

Expand Down
14 changes: 14 additions & 0 deletions services/keepstore/rados_volume_mock_test.go
Expand Up @@ -151,6 +151,10 @@ func (conn *radosMockConn) OpenIOContext(pool string) (ioctx radosIOContext, err
return
}

func (conn *radosMockConn) Shutdown() {
radosTracef("radosmock: conn.Shutdown()")
}

type radosMockIoctx struct {
*radosMockConn
pool string
Expand Down Expand Up @@ -314,6 +318,11 @@ func (ioctx *radosMockIoctx) Read(oid string, data []byte, offset uint64) (n int
return
}
n = copy(data, obj.data[offset:])

// pause here to facilitate race tests
radosTracef("radosmock: ioctx.Read oid=%s len(data)=%d offset=%d calling unlockAndRace()", oid, len(data), offset)
ioctx.b.unlockAndRace()

radosTracef("radosmock: ioctx.Read oid=%s len(data)=%d offset=%d complete, returning n=%d err=%v", oid, len(data), offset, n, err)
return
}
Expand Down Expand Up @@ -457,6 +466,11 @@ func (ioctx *radosMockIoctx) WriteFull(oid string, data []byte) (err error) {
if !ok {
ioctx.objects[oid] = newRadosStubObj([]byte{})
}

// pause here to facilitate race tests
radosTracef("radosmock: ioctx.WriteFull oid=%s len(data)=%d calling unlockAndRace()", oid, len(data))
ioctx.b.unlockAndRace()

obj.data = make([]byte, len(data))
n := copy(obj.data, data)
if n != len(data) {
Expand Down

0 comments on commit a42ccc4

Please sign in to comment.