Skip to content

Commit

Permalink
Fix performance issues on write and read
Browse files Browse the repository at this point in the history
- don't use the write pool to write synchronously
- add a WaitGroup on the read goroutines
- don't use the write pool (asynchronous write) by default
  • Loading branch information
JCapul committed Apr 24, 2019
1 parent ba89fe7 commit 8a54135
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewRedisConf() *Redis {
Cluster: false,
ClusterAddrs: []string{":7001", ":7002", ":7003", ":7004", ":7005", ":7006"},
UseUnlink: true,
UseWritePool: true,
UseWritePool: false,
WritePoolBufferSize: defaultWritePoolBufferSize,
WritePoolWorkers: defaultWritePoolWorkers,
}
Expand Down
29 changes: 22 additions & 7 deletions src/go/redisfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func byte2StringNoCopy(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// Asynchronous writer pool

// Job ...
type Job struct {
async bool
Expand Down Expand Up @@ -210,9 +212,9 @@ func NewWritePool(conf *config.Redis) *WritePool {
}

// SetRange ...
func (p *WritePool) SetRange(async bool, key string, off int64, value []byte) {
func (p *WritePool) SetRange(key string, off int64, value []byte) {
ack := make(chan bool) // could come from a pool instead
p.jobRequest <- &Job{async, key, off, value, ack}
p.jobRequest <- &Job{true, key, off, value, ack}
<-ack
}

Expand Down Expand Up @@ -241,11 +243,21 @@ func NewFileContentClient(conf *config.Redis, stripeSize int64) *FileContentClie
}

func (c FileContentClient) writeStripe(key string, value []byte) {
c.writePool.SetRange(c.conf.UseWritePool, key, -1, value)
if c.conf.UseWritePool {
c.writePool.SetRange(key, -1, value)
} else {
Try(c.redisClient.Set(key, byte2StringNoCopy(value), 0).Err())
}

}

func (c FileContentClient) writeStripeAt(key string, off int64, value []byte) {
c.writePool.SetRange(c.conf.UseWritePool, key, off, value)
if c.conf.UseWritePool {
c.writePool.SetRange(key, off, value)
} else {
Try(c.redisClient.SetRange(key, off, byte2StringNoCopy(value)).Err())
}

}

type stripeInfo struct {
Expand Down Expand Up @@ -385,10 +397,12 @@ func (c FileContentClient) ReadAt(name string, off, size int64) (string, bool) {
ok := false

stripes := stripeLayout(c.stripeSize, off, size)

wg := sync.WaitGroup{}
wg.Add(len(stripes))
retChan := make(chan *readAtReturn, len(stripes))
for i, stripe := range stripes {
go func(key string, off, size int64, ret chan *readAtReturn, i int) {
go func(key string, off, size int64, ret chan *readAtReturn, i int, wg *sync.WaitGroup) {
defer wg.Done()
var s string
var ok bool
if off == 0 && size == c.stripeSize {
Expand All @@ -397,8 +411,9 @@ func (c FileContentClient) ReadAt(name string, off, size int64) (string, bool) {
s, ok = c.readStripeRange(key, off, off+size-1)
}
ret <- &readAtReturn{i, s, ok}
}(fmt.Sprintf("%s:%d", name, stripe.id), stripe.off, stripe.len, retChan, i)
}(fmt.Sprintf("%s:%d", name, stripe.id), stripe.off, stripe.len, retChan, i, &wg)
}
wg.Wait()
var i int
for {
// retrieve result and put them in order
Expand Down

0 comments on commit 8a54135

Please sign in to comment.