Skip to content

Commit

Permalink
Adding SetKeyMapper and Encode/DecodeKey
Browse files Browse the repository at this point in the history
  • Loading branch information
djherbis committed Oct 24, 2020
1 parent 486cc2c commit a0daa9e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 31 deletions.
103 changes: 76 additions & 27 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package fscache
import (
"bytes"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
Expand All @@ -17,6 +16,7 @@ import (
"gopkg.in/djherbis/stream.v1"
)

// FileSystemStater implementers can provide FileInfo data about a named resource.
type FileSystemStater interface {
// Stat takes a File.Name() and returns FileInfo interface
Stat(name string) (FileInfo, error)
Expand All @@ -37,21 +37,48 @@ type FileSystem interface {
RemoveAll() error
}

type stdFs struct {
// StandardFS is an implemenation of FileSystem which writes to the os Filesystem.
type StandardFS struct {
root string
init func() error

// EncodeKey takes a 'name' given to Create and converts it into a
// the Filename that should be used. It should return 'true' if
// DecodeKey can convert the returned string back to the original 'name'
// and false otherwise.
// This must be set before the first call to Create.
EncodeKey func(string) (string, bool)

// DecodeKey should convert a given Filename into the original 'name' given to
// EncodeKey, and return true if this conversion was possible. Returning false
// will cause it to try and lookup a stored 'encodedName.key' file which holds
// the original name.
DecodeKey func(string) (string, bool)
}

// IdentityCodeKey works as both an EncodeKey and a DecodeKey func, which just returns
// it's given argument and true. This is expected to be used when your FSCache
// uses SetKeyMapper to ensure its internal km(key) value is already a valid filename path.
func IdentityCodeKey(key string) (string, bool) { return key, true }

// NewFs returns a FileSystem rooted at directory dir.
// Dir is created with perms if it doesn't exist.
func NewFs(dir string, mode os.FileMode) (FileSystem, error) {
fs := &stdFs{root: dir, init: func() error {
return os.MkdirAll(dir, mode)
}}
// This also uses the default EncodeKey/DecodeKey functions B64ORMD5HashEncodeKey/B64DecodeKey.
func NewFs(dir string, mode os.FileMode) (*StandardFS, error) {
fs := &StandardFS{
root: dir,
init: func() error {
return os.MkdirAll(dir, mode)
},
EncodeKey: B64OrMD5HashEncodeKey,
DecodeKey: B64DecodeKey,
}
return fs, fs.init()
}

func (fs *stdFs) Reload(add func(key, name string)) error {
// Reload looks through the dir given to NewFs and returns every key, name pair (Create(key) => name = File.Name())
// that is managed by this FileSystem.
func (fs *StandardFS) Reload(add func(key, name string)) error {
files, err := ioutil.ReadDir(fs.root)
if err != nil {
return err
Expand Down Expand Up @@ -103,43 +130,52 @@ func (fs *stdFs) Reload(add func(key, name string)) error {
return nil
}

func (fs *stdFs) Create(name string) (stream.File, error) {
// Create creates a File for the given 'name', it may not use the given name on the
// os filesystem, that depends on the implementation of EncodeKey used.
func (fs *StandardFS) Create(name string) (stream.File, error) {
name, err := fs.makeName(name)
if err != nil {
return nil, err
}
return fs.create(name)
}

func (fs *stdFs) create(name string) (stream.File, error) {
func (fs *StandardFS) create(name string) (stream.File, error) {
return os.OpenFile(filepath.Join(fs.root, name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
}

func (fs *stdFs) Open(name string) (stream.File, error) {
// Open opens a stream.File for the given File.Name() returned by Create().
func (fs *StandardFS) Open(name string) (stream.File, error) {
return os.Open(name)
}

func (fs *stdFs) Remove(name string) error {
// Remove removes a stream.File for the given File.Name() returned by Create().
func (fs *StandardFS) Remove(name string) error {
os.Remove(fmt.Sprintf("%s.key", name))
return os.Remove(name)
}

func (fs *stdFs) RemoveAll() error {
// RemoveAll deletes all files in the directory managed by this StandardFS.
// Warning that if you put files in this directory that were not created by
// StandardFS they will also be deleted.
func (fs *StandardFS) RemoveAll() error {
if err := os.RemoveAll(fs.root); err != nil {
return err
}
return fs.init()
}

func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) {
// AccessTimes returns atime and mtime for the given File.Name() returned by Create().
func (fs *StandardFS) AccessTimes(name string) (rt, wt time.Time, err error) {
fi, err := os.Stat(name)
if err != nil {
return rt, wt, err
}
return atime.Get(fi), fi.ModTime(), nil
}

func (fs *stdFs) Stat(name string) (FileInfo, error) {
// Stat returns FileInfo for the given File.Name() returned by Create().
func (fs *StandardFS) Stat(name string) (FileInfo, error) {
stat, err := os.Stat(name)
if err != nil {
return FileInfo{}, err
Expand All @@ -150,18 +186,12 @@ func (fs *stdFs) Stat(name string) (FileInfo, error) {

const (
saltSize = 8
salt = "xxxxxxxx" // this is only important for sizing now.
maxShort = 20
shortPrefix = "s"
longPrefix = "l"
)

func salt() string {
buf := bytes.NewBufferString("")
enc := base64.NewEncoder(base64.URLEncoding, buf)
io.CopyN(enc, rand.Reader, saltSize)
return buf.String()
}

func tob64(s string) string {
buf := bytes.NewBufferString("")
enc := base64.NewEncoder(base64.URLEncoding, buf)
Expand All @@ -178,16 +208,27 @@ func fromb64(s string) string {
return out.String()
}

func (fs *stdFs) makeName(key string) (string, error) {
// B64OrMD5HashEncodeKey converts a given key into a filesystem name-safe string
// and returns true iff it can be reversed with B64DecodeKey.
func B64OrMD5HashEncodeKey(key string) (string, bool) {
b64key := tob64(key)
// short name
if len(b64key) < maxShort {
return fmt.Sprintf("%s%s%s", shortPrefix, salt(), b64key), nil
return fmt.Sprintf("%s%s%s", shortPrefix, salt, b64key), true
}

// long name
hash := md5.Sum([]byte(key))
name := fmt.Sprintf("%s%s%x", longPrefix, salt(), hash[:])
return fmt.Sprintf("%s%s%x", longPrefix, salt, hash[:]), false
}

func (fs *StandardFS) makeName(key string) (string, error) {
name, decodable := fs.EncodeKey(key)
if decodable {
return name, nil
}

// Name is not decodeable, store it.
f, err := fs.create(fmt.Sprintf("%s.key", name))
if err != nil {
return "", err
Expand All @@ -197,10 +238,18 @@ func (fs *stdFs) makeName(key string) (string, error) {
return name, err
}

func (fs *stdFs) getKey(name string) (string, error) {
// short name
// B64DecodeKey converts a string y into x st. y, ok = B64OrMD5HashEncodeKey(x), and ok = true.
// Basically it should reverse B64OrMD5HashEncodeKey if B64OrMD5HashEncodeKey returned true.
func B64DecodeKey(name string) (string, bool) {
if strings.HasPrefix(name, shortPrefix) {
return fromb64(strings.TrimPrefix(name, shortPrefix)[saltSize:]), nil
return fromb64(strings.TrimPrefix(name, shortPrefix)[saltSize:]), true
}
return "", false
}

func (fs *StandardFS) getKey(name string) (string, error) {
if key, ok := fs.DecodeKey(name); ok {
return key, nil
}

// long name
Expand Down
40 changes: 36 additions & 4 deletions fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,35 @@ type Cache interface {
Clean() error
}

// FSCache is a Cache which uses a Filesystem to read/write cached data.
type FSCache struct {
mu sync.RWMutex
files map[string]fileStream
km func(string) string
fs FileSystem
haunter Haunter
}

// SetKeyMapper will use the given function to transform any given Cache key into the result of km(key).
// This means that internally, the cache will only track km(key), and forget the original key. The consequences
// of this are that Enumerate will return km(key) instead of key, and Filesystem will give km(key) to Create
// and expect Reload() to return km(key).
// The purpose of this function is so that the internally managed key can be converted to a string that is
// allowed as a filesystem path.
func (c *FSCache) SetKeyMapper(km func(string) string) *FSCache {
c.mu.Lock()
defer c.mu.Unlock()
c.km = km
return c
}

func (c *FSCache) mapKey(key string) string {
if c.km == nil {
return key
}
return c.km(key)
}

// ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range
// Requests are possible.
type ReadAtCloser interface {
Expand Down Expand Up @@ -125,15 +147,19 @@ func (c *FSCache) load() error {
})
}

// Exists returns true iff this key is in the Cache (may not be finished streaming).
func (c *FSCache) Exists(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, ok := c.files[key]
_, ok := c.files[c.mapKey(key)]
return ok
}

// Get obtains a ReadAtCloser for the given key, and may return a WriteCloser to write the original cache data
// if this is a cache-miss.
func (c *FSCache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error) {
c.mu.RLock()
key = c.mapKey(key)
f, ok := c.files[key]
if ok {
r, err = f.next()
Expand Down Expand Up @@ -168,8 +194,10 @@ func (c *FSCache) Get(key string) (r ReadAtCloser, w io.WriteCloser, err error)
return r, f, err
}

// Remove removes the specified key from the cache.
func (c *FSCache) Remove(key string) error {
c.mu.Lock()
key = c.mapKey(key)
f, ok := c.files[key]
delete(c.files, key)
c.mu.Unlock()
Expand All @@ -180,6 +208,7 @@ func (c *FSCache) Remove(key string) error {
return nil
}

// Clean resets the cache removing all keys and data.
func (c *FSCache) Clean() error {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -204,6 +233,7 @@ func (a *accessor) EnumerateEntries(enumerator func(key string, e Entry) bool) {
}

func (a *accessor) RemoveFile(key string) {
key = a.c.mapKey(key)
f, ok := a.c.files[key]
delete(a.c.files, key)
if ok {
Expand Down Expand Up @@ -237,8 +267,8 @@ func (c *FSCache) oldFile(name string) fileStream {

type reloadedFile struct {
handleCounter
fs FileSystem
name string
fs FileSystem
name string
io.WriteCloser // nop Write & Close methods. will never be called.
}

Expand Down Expand Up @@ -285,11 +315,13 @@ func (f *cachedFile) Close() error {
return f.stream.Close()
}

// CacheReader is a ReadAtCloser for a Cache key that also tracks open readers.
type CacheReader struct {
ReadAtCloser
cnt *handleCounter
}

// Close frees the underlying ReadAtCloser and updates the open reader counter.
func (r *CacheReader) Close() error {
defer r.cnt.dec()
return r.ReadAtCloser.Close()
Expand All @@ -313,7 +345,7 @@ func (r *CacheReader) Size() (int64, bool, error) {
return fi.Size(), true, nil

default:
return 0, false, fmt.Errorf("reader does not support stat.")
return 0, false, fmt.Errorf("reader does not support stat")
}
}

Expand Down
10 changes: 10 additions & 0 deletions fscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func testCaches(t *testing.T, run func(c Cache)) {

rc := NewRemote("localhost:10000")
run(rc)

fs, _ := NewFs("./cachex", 0700)
fs.EncodeKey = IdentityCodeKey
fs.DecodeKey = IdentityCodeKey
ck, _ := NewCache(fs, NewReaper(time.Hour, time.Hour))
ck.SetKeyMapper(func(key string) string {
name, _ := B64OrMD5HashEncodeKey(key)
return name
})
run(ck)
}

func TestHandler(t *testing.T) {
Expand Down

0 comments on commit a0daa9e

Please sign in to comment.