Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file.go: Added SaveToFileConcurrentWithTimeout #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions file.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package fastcache

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"runtime"
"time"

"github.com/golang/snappy"
)
Expand Down Expand Up @@ -60,7 +63,7 @@ func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error {
if concurrency <= 0 || concurrency > gomaxprocs {
concurrency = gomaxprocs
}
if err := c.save(tmpDir, concurrency); err != nil {
if err := c.save(tmpDir, concurrency, context.Background()); err != nil {
return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err)
}

Expand All @@ -76,6 +79,67 @@ func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error {
return nil
}

// SaveToFileConcurrentWithTimeout basically does the same things which SaveToFileConcurrent does.
//
// The difference is that SaveToFileConcurrentWithTimeout accepts timeout parameter to
// limit the time for saving the data when cached data is too huge.
//
// The saved data may be loaded with LoadFromFile*.
//
// See also SaveToFileConcurrent.
func (c *Cache) SaveToFileConcurrentWithTimeout(filePath string, concurrency int, timeout time.Duration) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that passing io.Writer instead of filepath (and opening file inside) will be much more flexible and easier to test. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cristaloleg Thanks for the comment but I'm sorry that I don't quite understand your point "passing io.Writer and opening the file inside" 😭 Could you please elaborate? In this change, I wanted to make the function signature look like existing SaveToFileConcurrent.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about slightly another API:

func (c *Cache) SaveConcurrentWithTimeout(w io.Writer, concurrency int, timeout time.Duration) error {

Anyway, I'm the author of fastcache, someone else might have another view on this.

ctx, cancelFn := context.WithTimeout(context.Background(), timeout)
defer cancelFn()

// Create dir if it doesn't exist.
dir := filepath.Dir(filePath)
if _, err := os.Stat(dir); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("cannot stat %q: %s", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("cannot create dir %q: %s", dir, err)
}
}

// Save cache data into a temporary directory.
tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.")
if err != nil {
return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err)
}
defer func() {
if tmpDir != "" {
_ = os.RemoveAll(tmpDir)
}
}()
gomaxprocs := runtime.GOMAXPROCS(-1)
if concurrency <= 0 || concurrency > gomaxprocs {
concurrency = gomaxprocs
}
timedOut := false
if err := c.save(tmpDir, concurrency, ctx); err != nil && err != ErrTimeout {
return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err)
} else if err == ErrTimeout {
timedOut = true
}

// Remove old filePath contents, since os.Rename may return
// error if filePath dir exists.
if err := os.RemoveAll(filePath); err != nil {
return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err)
}
if err := os.Rename(tmpDir, filePath); err != nil {
return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err)
}
tmpDir = ""

// To notice the caller that saving has been stopped due to the time limit.
if timedOut {
return ErrTimeout
}
return nil
}

// LoadFromFile loads cache data from the given filePath.
//
// See SaveToFile* for saving cache data to file.
Expand All @@ -95,7 +159,7 @@ func LoadFromFileOrNew(filePath string, maxBytes int) *Cache {
return New(maxBytes)
}

func (c *Cache) save(dir string, workersCount int) error {
func (c *Cache) save(dir string, workersCount int, ctx context.Context) error {
if err := saveMetadata(c, dir); err != nil {
return err
}
Expand All @@ -105,7 +169,7 @@ func (c *Cache) save(dir string, workersCount int) error {
results := make(chan error)
for i := 0; i < workersCount; i++ {
go func(workerNum int) {
results <- saveBuckets(c.buckets[:], workCh, dir, workerNum)
results <- saveBuckets(c.buckets[:], workCh, dir, workerNum, ctx)
}(i)
}
// Feed workers with work
Expand Down Expand Up @@ -223,7 +287,9 @@ func loadMetadata(dir string) (uint64, error) {

var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`)

func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error {
var ErrTimeout = errors.New("spent allowed time")

func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int, ctx context.Context) error {
dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum)
dataFile, err := os.Create(dataPath)
if err != nil {
Expand All @@ -234,6 +300,11 @@ func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int)
}()
zw := snappy.NewBufferedWriter(dataFile)
for bucketNum := range workCh {
select {
case <-ctx.Done():
return ErrTimeout
default:
}
if err := writeUint64(zw, uint64(bucketNum)); err != nil {
return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err)
}
Expand Down