Skip to content

Commit

Permalink
WIP: Implement MMAP backed file cache for chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbuenemann committed Feb 18, 2020
1 parent 90f7cd9 commit 54f74cd
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 15 deletions.
18 changes: 17 additions & 1 deletion chunk/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"fmt"
"os"

. "github.com/claudetech/loggo/default"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Response struct {

// NewManager creates a new chunk manager
func NewManager(
chunkFile string,
chunkSize int64,
loadAhead,
checkThreads int,
Expand All @@ -56,6 +58,15 @@ func NewManager(
if chunkSize%1024 != 0 {
return nil, fmt.Errorf("Chunk size must be divideable by 1024")
}
if chunkFile != "" {
pageSize := int64(os.Getpagesize())
if chunkSize < pageSize {
return nil, fmt.Errorf("Chunk size must not be < %v", pageSize)
}
if chunkSize%pageSize != 0 {
return nil, fmt.Errorf("Chunk size must be divideable by %v", pageSize)
}
}
if maxChunks < 2 || maxChunks < loadAhead {
return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value")
}
Expand All @@ -65,11 +76,16 @@ func NewManager(
return nil, err
}

storage, err := NewStorage(chunkSize, maxChunks, chunkFile)
if nil != err {
return nil, err
}

manager := Manager{
ChunkSize: chunkSize,
LoadAhead: loadAhead,
downloader: downloader,
storage: NewStorage(chunkSize, maxChunks),
storage: storage,
queue: make(chan *QueueEntry, 100),
}

Expand Down
7 changes: 7 additions & 0 deletions chunk/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ func NewStack(maxChunks int) *Stack {
}
}

// Len gets the number of items on the stack
func (s *Stack) Len() int {
s.lock.Lock()
defer s.lock.Unlock()
return s.len
}

// Pop pops the first item from the stack
func (s *Stack) Pop() string {
s.lock.Lock()
Expand Down
22 changes: 22 additions & 0 deletions chunk/stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,25 @@ func TestAddToStack(t *testing.T) {
t.Fatalf("Expected nil got %v", v)
}
}

func TestLen(t *testing.T) {
stack := NewStack(1)

v := stack.Len()
if 0 != v {
t.Fatalf("Expected 0 got %v", v)
}

stack.Push("1")
v = stack.Len()
if 1 != v {
t.Fatalf("Expected 1 got %v", v)
}

_ = stack.Pop()
v = stack.Len()
if 0 != v {
t.Fatalf("Expected 0 got %v", v)
}

}
74 changes: 61 additions & 13 deletions chunk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package chunk

import (
"errors"
"fmt"
"os"
"sync"
"syscall"

. "github.com/claudetech/loggo/default"
)
Expand All @@ -12,29 +15,56 @@ var ErrTimeout = errors.New("timeout")

// Storage is a chunk storage
type Storage struct {
ChunkFile *os.File
ChunkSize int64
MaxChunks int
chunks map[string][]byte
stack *Stack
lock sync.Mutex
}

// Item represents a chunk in RAM
type Item struct {
id string
bytes []byte
}

// NewStorage creates a new storage
func NewStorage(chunkSize int64, maxChunks int) *Storage {
func NewStorage(chunkSize int64, maxChunks int, chunkFilePath string) (*Storage, error) {
storage := Storage{
ChunkSize: chunkSize,
MaxChunks: maxChunks,
chunks: make(map[string][]byte),
stack: NewStack(maxChunks),
}

return &storage
// Non-empty string in chunkFilePath enables MMAP disk storage for chunks
if chunkFilePath != "" {
chunkFile, err := os.OpenFile(chunkFilePath, os.O_RDWR|os.O_CREATE, 0600)
if nil != err {
Log.Debugf("%v", err)
return nil, fmt.Errorf("Could not open chunk cache file")
}
err = chunkFile.Truncate(chunkSize * int64(maxChunks))
if nil != err {
Log.Debugf("%v", err)
return nil, fmt.Errorf("Could not resize chunk cache file")
}
Log.Infof("Created chunk cache file %v", chunkFile.Name())
storage.ChunkFile = chunkFile
}

return &storage, nil
}

// newChunk creates a new mmap-backed chunk
func (s *Storage) newChunk() ([]byte, error) {
if s.ChunkFile != nil {
index := int64(s.stack.Len())
Log.Debugf("Mmap chunk %v / %v", index+1, s.MaxChunks)
chunk, err := syscall.Mmap(int(s.ChunkFile.Fd()), index*s.ChunkSize, int(s.ChunkSize), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return nil, err
}

return chunk, nil
} else {
return make([]byte, s.ChunkSize), nil
}
}

// Clear removes all old chunks on disk (will be called on each program start)
Expand All @@ -57,17 +87,35 @@ func (s *Storage) Load(id string) []byte {
// Store stores a chunk in the RAM and adds it to the disk storage queue
func (s *Storage) Store(id string, bytes []byte) error {
s.lock.Lock()
defer s.lock.Unlock()

// Avoid storing same chunk multiple times
if _, exists := s.chunks[id]; exists {
Log.Debugf("Create chunk %v (exists)", id)
s.stack.Touch(id)
return nil
}

deleteID := s.stack.Pop()
if "" != deleteID {
chunk := s.chunks[deleteID]
delete(s.chunks, deleteID)

Log.Debugf("Deleted chunk %v", deleteID)
}

s.chunks[id] = bytes
s.stack.Push(id)
s.lock.Unlock()
Log.Debugf("Create chunk %v (reused)", id)
copy(chunk, bytes)
s.chunks[id] = chunk
s.stack.Push(id)
} else {
Log.Debugf("Create chunk %v (stored)", id)
chunk, err := s.newChunk()
if err != nil {
return err
}
copy(chunk, bytes)
s.chunks[id] = chunk
s.stack.Push(id)
}

return nil
}
16 changes: 15 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func main() {
argRootNodeID := flag.String("root-node-id", "root", "The ID of the root node to mount (use this for only mount a sub directory)")
argDriveID := flag.String("drive-id", "", "The ID of the shared drive to mount (including team drives)")
argConfigPath := flag.StringP("config", "c", filepath.Join(home, ".plexdrive"), "The path to the configuration directory")
argCacheFile := flag.String("cache-file", filepath.Join(home, ".plexdrive", "cache.bolt"), "Path the the cache file")
argCacheFile := flag.String("cache-file", filepath.Join(home, ".plexdrive", "cache.bolt"), "Path of the cache file")
argChunkFile := flag.String("chunk-file", filepath.Join(home, ".plexdrive", "chunks.dat"), "Path of the chunk cache file")
argChunkMmap := flag.Bool("chunk-mmap", false, "Enable disk based chunk cache")
argChunkSize := flag.String("chunk-size", "10M", "The size of each chunk that is downloaded (units: B, K, M, G)")
argChunkLoadThreads := flag.Int("chunk-load-threads", max(runtime.NumCPU()/2, 1), "The number of threads to use for downloading chunks")
argChunkCheckThreads := flag.Int("chunk-check-threads", max(runtime.NumCPU()/2, 1), "The number of threads to use for checking chunk existence")
Expand Down Expand Up @@ -123,6 +125,8 @@ func main() {
Log.Debugf("drive-id : %v", *argDriveID)
Log.Debugf("config : %v", *argConfigPath)
Log.Debugf("cache-file : %v", *argCacheFile)
Log.Debugf("chunk-file : %v", *argChunkFile)
Log.Debugf("chunk-mmap : %v", *argChunkMmap)
Log.Debugf("chunk-size : %v", *argChunkSize)
Log.Debugf("chunk-load-threads : %v", *argChunkLoadThreads)
Log.Debugf("chunk-check-threads : %v", *argChunkCheckThreads)
Expand All @@ -147,6 +151,15 @@ func main() {
Log.Debugf("%v", err)
os.Exit(1)
}
if *argChunkMmap {
if err := os.MkdirAll(filepath.Dir(*argChunkFile), 0766); nil != err {
Log.Errorf("Could not create chunk cache file directory")
Log.Debugf("%v", err)
os.Exit(1)
}
} else {
*argChunkFile = ""
}

// set the global buffer configuration
chunkSize, err := parseSizeArg(*argChunkSize)
Expand Down Expand Up @@ -181,6 +194,7 @@ func main() {
}

chunkManager, err := chunk.NewManager(
*argChunkFile,
chunkSize,
*argChunkLoadAhead,
*argChunkCheckThreads,
Expand Down

0 comments on commit 54f74cd

Please sign in to comment.