Skip to content

Commit

Permalink
expose FSCache, CacheReader and add CacheReader.Size
Browse files Browse the repository at this point in the history
  • Loading branch information
djherbis committed Feb 22, 2020
1 parent 287ea9b commit d81755d
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 58 deletions.
108 changes: 65 additions & 43 deletions fscache.go
@@ -1,6 +1,7 @@
package fscache

import (
"fmt"
"io"
"os"
"sync"
Expand Down Expand Up @@ -33,7 +34,7 @@ type Cache interface {
Clean() error
}

type cache struct {
type FSCache struct {
mu sync.RWMutex
files map[string]fileStream
fs FileSystem
Expand All @@ -48,7 +49,7 @@ type ReadAtCloser interface {
}

type fileStream interface {
next() (ReadAtCloser, error)
next() (*CacheReader, error)
InUse() bool
io.WriteCloser
remove() error
Expand All @@ -58,7 +59,7 @@ type fileStream interface {
// New creates a new Cache using NewFs(dir, perms).
// expiry is the duration after which an un-accessed key will be removed from
// the cache, a zero value expiro means never expire.
func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) {
func New(dir string, perms os.FileMode, expiry time.Duration) (*FSCache, error) {
fs, err := NewFs(dir, perms)
if err != nil {
return nil, err
Expand All @@ -76,7 +77,7 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) {
// NewCache creates a new Cache based on FileSystem fs.
// fs.Files() are loaded using the name they were created with as a key.
// Reaper is used to determine when files expire, nil means never expire.
func NewCache(fs FileSystem, grim Reaper) (Cache, error) {
func NewCache(fs FileSystem, grim Reaper) (*FSCache, error) {
if grim != nil {
return NewCacheWithHaunter(fs, NewReaperHaunterStrategy(grim))
}
Expand All @@ -87,8 +88,8 @@ func NewCache(fs FileSystem, grim Reaper) (Cache, error) {
// NewCacheWithHaunter create a new Cache based on FileSystem fs.
// fs.Files() are loaded using the name they were created with as a key.
// Haunter is used to determine when files expire, nil means never expire.
func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) {
c := &cache{
func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (*FSCache, error) {
c := &FSCache{
files: make(map[string]fileStream),
haunter: haunter,
fs: fs,
Expand All @@ -104,34 +105,34 @@ func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) {
return c, nil
}

func (c *cache) scheduleHaunt() {
func (c *FSCache) scheduleHaunt() {
c.haunt()
time.AfterFunc(c.haunter.Next(), c.scheduleHaunt)
}

func (c *cache) haunt() {
func (c *FSCache) haunt() {
c.mu.Lock()
defer c.mu.Unlock()

c.haunter.Haunt(c)
c.haunter.Haunt(&accessor{c: c})
}

func (c *cache) load() error {
func (c *FSCache) load() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.fs.Reload(func(key, name string) {
c.files[key] = c.oldFile(name)
})
}

func (c *cache) Exists(key string) bool {
func (c *FSCache) Exists(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, ok := c.files[key]
return ok
}

func (c *cache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
func (c *FSCache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
c.mu.RLock()
f, ok := c.files[key]
if ok {
Expand Down Expand Up @@ -167,7 +168,7 @@ func (c *cache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
return r, f, err
}

func (c *cache) Remove(key string) error {
func (c *FSCache) Remove(key string) error {
c.mu.Lock()
f, ok := c.files[key]
delete(c.files, key)
Expand All @@ -179,30 +180,34 @@ func (c *cache) Remove(key string) error {
return nil
}

func (c *cache) Clean() error {
func (c *FSCache) Clean() error {
c.mu.Lock()
defer c.mu.Unlock()
c.files = make(map[string]fileStream)
return c.fs.RemoveAll()
}

func (c *cache) Stat(name string) (FileInfo, error) {
return c.fs.Stat(name)
type accessor struct {
c *FSCache
}

func (c *cache) EnumerateEntries(enumerator func(key string, e Entry) bool) {
for k, f := range c.files {
func (a *accessor) Stat(name string) (FileInfo, error) {
return a.c.fs.Stat(name)
}

func (a *accessor) EnumerateEntries(enumerator func(key string, e Entry) bool) {
for k, f := range a.c.files {
if !enumerator(k, Entry{name: f.Name(), inUse: f.InUse()}) {
break
}
}
}

func (c *cache) RemoveFile(key string) {
f, ok := c.files[key]
delete(c.files, key)
func (a *accessor) RemoveFile(key string) {
f, ok := a.c.files[key]
delete(a.c.files, key)
if ok {
c.fs.Remove(f.Name())
a.c.fs.Remove(f.Name())
}
}

Expand All @@ -211,7 +216,7 @@ type cachedFile struct {
handleCounter
}

func (c *cache) newFile(name string) (fileStream, error) {
func (c *FSCache) newFile(name string) (fileStream, error) {
s, err := stream.NewStream(name, c.fs)
if err != nil {
return nil, err
Expand All @@ -223,7 +228,7 @@ func (c *cache) newFile(name string) (fileStream, error) {
return cf, nil
}

func (c *cache) oldFile(name string) fileStream {
func (c *FSCache) oldFile(name string) fileStream {
return &reloadedFile{
fs: c.fs,
name: name,
Expand All @@ -244,27 +249,30 @@ func (f *reloadedFile) remove() error {
return f.fs.Remove(f.name)
}

func (f *reloadedFile) next() (r ReadAtCloser, err error) {
r, err = f.fs.Open(f.name)
func (f *reloadedFile) next() (*CacheReader, error) {
r, err := f.fs.Open(f.name)
if err == nil {
f.inc()
}
return &cacheReader{r: r, cnt: &f.handleCounter}, err
return &CacheReader{
ReadAtCloser: r,
cnt: &f.handleCounter,
}, err
}

func (f *cachedFile) Name() string { return f.stream.Name() }

func (f *cachedFile) remove() error { return f.stream.Remove() }

func (f *cachedFile) next() (r ReadAtCloser, err error) {
func (f *cachedFile) next() (*CacheReader, error) {
reader, err := f.stream.NextReader()
if err != nil {
return nil, err
}
f.inc()
return &cacheReader{
r: reader,
cnt: &f.handleCounter,
return &CacheReader{
ReadAtCloser: reader,
cnt: &f.handleCounter,
}, nil
}

Expand All @@ -277,22 +285,36 @@ func (f *cachedFile) Close() error {
return f.stream.Close()
}

type cacheReader struct {
r ReadAtCloser
type CacheReader struct {
ReadAtCloser
cnt *handleCounter
}

func (r *cacheReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.r.ReadAt(p, off)
}

func (r *cacheReader) Read(p []byte) (n int, err error) {
return r.r.Read(p)
}

func (r *cacheReader) Close() error {
func (r *CacheReader) Close() error {
defer r.cnt.dec()
return r.r.Close()
return r.ReadAtCloser.Close()
}

// Size returns the current size of the stream being read, the boolean it
// returns is true iff the stream is done being written (otherwise Size may change).
// An error is returned if the Size fails to be computed or is not supported
// by the underlying filesystem.
func (r *CacheReader) Size() (int64, bool, error) {
switch v := r.ReadAtCloser.(type) {
case *stream.Reader:
size, done := v.Size()
return size, done, nil

case interface{ Stat() (os.FileInfo, error) }:
fi, err := v.Stat()
if err != nil {
return 0, false, err
}
return fi.Size(), true, nil

default:
return 0, false, fmt.Errorf("reader does not support stat.")
}
}

type handleCounter struct {
Expand Down

0 comments on commit d81755d

Please sign in to comment.