Skip to content

Commit

Permalink
Add simple cache warmer, disabled by default
Browse files Browse the repository at this point in the history
  • Loading branch information
deluan committed Oct 26, 2020
1 parent f3bb51f commit 1e56f4d
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 28 deletions.
5 changes: 4 additions & 1 deletion cmd/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion conf/configuration.go
Expand Up @@ -48,6 +48,7 @@ type configOptions struct {
// DevFlags. These are used to enable/disable debugging and incomplete features
DevLogSourceLine bool
DevAutoCreateAdminPassword string
DevPreCacheAlbumArtwork bool
}

type scannerOptions struct {
Expand Down Expand Up @@ -132,7 +133,7 @@ func init() {
// DevFlags. These are used to enable/disable debugging and incomplete features
viper.SetDefault("devlogsourceline", false)
viper.SetDefault("devautocreateadminpassword", "")
viper.SetDefault("devoldscanner", false)
viper.SetDefault("devprecachealbumartwork", false)
}

func InitConfig(cfgFile string) {
Expand Down
72 changes: 72 additions & 0 deletions core/cache_warmer.go
@@ -0,0 +1,72 @@
package core

import (
"context"
"io/ioutil"

"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/core/pool"
"github.com/deluan/navidrome/log"
)

type CacheWarmer interface {
AddAlbum(ctx context.Context, albumID string)
Flush(ctx context.Context)
}

func NewCacheWarmer(cache ArtworkCache, artwork Artwork) CacheWarmer {
w := &warmer{
artwork: artwork,
cache: cache,
albums: map[string]struct{}{},
}
p, err := pool.NewPool("artwork", 3, &artworkItem{}, w.execute)
if err != nil {
log.Error(context.Background(), "Error creating pool for Album Artwork Cache Warmer", err)
} else {
w.pool = p
}

return w
}

type warmer struct {
pool *pool.Pool
artwork Artwork
cache ArtworkCache
albums map[string]struct{}
}

func (w *warmer) AddAlbum(ctx context.Context, albumID string) {
if albumID == "" {
return
}
w.albums[albumID] = struct{}{}
}

func (w *warmer) Flush(ctx context.Context) {
if conf.Server.DevPreCacheAlbumArtwork {
if w.pool == nil || len(w.albums) == 0 {
return
}
log.Info(ctx, "Pre-caching album artworks", "numAlbums", len(w.albums))
for id := range w.albums {
w.pool.Submit(artworkItem{albumID: id})
}
}
w.albums = map[string]struct{}{}
}

func (w *warmer) execute(workload interface{}) {
ctx := context.Background()
item := workload.(artworkItem)
log.Trace(ctx, "Pre-caching album artwork", "albumID", item.albumID)
err := w.artwork.Get(ctx, item.albumID, 0, ioutil.Discard)
if err != nil {
log.Warn("Error pre-caching artwork from album", "id", item.albumID, err)
}
}

type artworkItem struct {
albumID string
}
99 changes: 99 additions & 0 deletions core/pool/pool.go
@@ -0,0 +1,99 @@
package pool

type Executor func(workload interface{})

type Pool struct {
name string
item interface{}
workers []worker
exec Executor
//queue *dque.DQue
queue chan work // receives jobs to send to workers
end chan bool // when receives bool stops workers
}

func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) {
p := &Pool{
name: name,
item: item,
exec: exec,
queue: make(chan work),
end: make(chan bool),
}

//q, err := dque.NewOrOpen(name, filepath.Join(conf.Server.DataFolder, "queues", name), 50, p.itemBuilder)
//if err != nil {
// return nil, err
//}
//p.queue = q
for i := 0; i < workerCount; i++ {
worker := worker{
p: p,
id: i,
channel: make(chan work),
workerChannel: workerChannel,
end: make(chan bool)}
worker.Start()
p.workers = append(p.workers, worker)
}

// start pool
go func() {
for {
select {
case <-p.end:
for _, w := range p.workers {
w.Stop() // stop worker
}
return
case work := <-p.queue:
worker := <-workerChannel // wait for available channel
worker <- work // dispatch work to worker
}
}
}()
return p, nil
}

func (p *Pool) Submit(workload interface{}) {
p.queue <- work{workload}
}

//func (p *Pool) itemBuilder() interface{} {
// t := reflect.TypeOf(p.item)
// return reflect.New(t).Interface()
//}
//
var workerChannel = make(chan chan work)

type work struct {
workload interface{}
}

type worker struct {
id int
p *Pool
workerChannel chan chan work // used to communicate between dispatcher and workers
channel chan work
end chan bool
}

// start worker
func (w *worker) Start() {
go func() {
for {
w.workerChannel <- w.channel // when the worker is available place channel in queue
select {
case job := <-w.channel: // worker has received job
w.p.exec(job.workload) // do work
case <-w.end:
return
}
}
}()
}

// end worker
func (w *worker) Stop() {
w.end <- true
}
21 changes: 21 additions & 0 deletions core/pool/pool_test.go
@@ -0,0 +1,21 @@
package pool

import (
"testing"

"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/tests"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestCore(t *testing.T) {
tests.Init(t, false)
log.SetLevel(log.LevelCritical)
RegisterFailHandler(Fail)
RunSpecs(t, "Core Suite")
}

var _ = Describe("Pool", func() {

})
1 change: 1 addition & 0 deletions core/wire_providers.go
Expand Up @@ -17,6 +17,7 @@ var Set = wire.NewSet(
NewImageCache,
NewArchiver,
NewExternalInfo,
NewCacheWarmer,
LastFMNewClient,
SpotifyNewClient,
transcoder.New,
Expand Down
25 changes: 25 additions & 0 deletions db/migration/20201025222059_purge_cache.go
@@ -0,0 +1,25 @@
package migration

import (
"database/sql"
"os"
"path/filepath"

"github.com/deluan/navidrome/conf"
"github.com/pressly/goose"
)

func init() {
goose.AddMigration(Up20201025222059, Down20201025222059)
}

func Up20201025222059(tx *sql.Tx) error {
cacheFolder := filepath.Join(conf.Server.DataFolder, "cache")
notice(tx, "Purging all cache entries, as the format of the cache changed.")
return os.RemoveAll(cacheFolder)
}

func Down20201025222059(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
8 changes: 6 additions & 2 deletions scanner/refresh_buffer.go
Expand Up @@ -25,8 +25,12 @@ func newRefreshBuffer(ctx context.Context, ds model.DataStore) *refreshBuffer {
}

func (f *refreshBuffer) accumulate(mf model.MediaFile) {
f.album[mf.AlbumID] = struct{}{}
f.artist[mf.AlbumArtistID] = struct{}{}
if mf.AlbumID != "" {
f.album[mf.AlbumID] = struct{}{}
}
if mf.AlbumArtistID != "" {
f.artist[mf.AlbumArtistID] = struct{}{}
}
}

type refreshCallbackFunc = func(ids ...string) error
Expand Down
31 changes: 17 additions & 14 deletions scanner/scanner.go
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
)
Expand All @@ -32,12 +33,13 @@ type FolderScanner interface {
}

type scanner struct {
folders map[string]FolderScanner
status map[string]*scanStatus
lock *sync.RWMutex
ds model.DataStore
done chan bool
scan chan bool
folders map[string]FolderScanner
status map[string]*scanStatus
lock *sync.RWMutex
ds model.DataStore
cacheWarmer core.CacheWarmer
done chan bool
scan chan bool
}

type scanStatus struct {
Expand All @@ -46,14 +48,15 @@ type scanStatus struct {
lastUpdate time.Time
}

func New(ds model.DataStore) Scanner {
func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner {
s := &scanner{
ds: ds,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
done: make(chan bool),
scan: make(chan bool),
ds: ds,
cacheWarmer: cacheWarmer,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
done: make(chan bool),
scan: make(chan bool),
}
s.loadFolders()
return s
Expand Down Expand Up @@ -213,5 +216,5 @@ func (s *scanner) loadFolders() {
}

func (s *scanner) newScanner(f model.MediaFolder) FolderScanner {
return NewTagScanner(f.Path, s.ds)
return NewTagScanner(f.Path, s.ds, s.cacheWarmer)
}
30 changes: 20 additions & 10 deletions scanner/tag_scanner.go
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/model/request"
Expand All @@ -17,19 +18,21 @@ import (
)

type TagScanner struct {
rootFolder string
ds model.DataStore
mapper *mediaFileMapper
plsSync *playlistSync
cnt *counters
rootFolder string
ds model.DataStore
mapper *mediaFileMapper
plsSync *playlistSync
cnt *counters
cacheWarmer core.CacheWarmer
}

func NewTagScanner(rootFolder string, ds model.DataStore) *TagScanner {
func NewTagScanner(rootFolder string, ds model.DataStore, cacheWarmer core.CacheWarmer) *TagScanner {
return &TagScanner{
rootFolder: rootFolder,
mapper: newMediaFileMapper(rootFolder),
plsSync: newPlaylistSync(ds),
ds: ds,
rootFolder: rootFolder,
mapper: newMediaFileMapper(rootFolder),
plsSync: newPlaylistSync(ds),
ds: ds,
cacheWarmer: cacheWarmer,
}
}

Expand Down Expand Up @@ -62,6 +65,7 @@ const (
// Delete all empty albums, delete all empty artists, clean-up playlists
func (s *TagScanner) Scan(ctx context.Context, lastModifiedSince time.Time) error {
ctx = s.withAdminUser(ctx)
defer s.cacheWarmer.Flush(ctx)

start := time.Now()
allFSDirs, err := s.getDirTree(ctx)
Expand Down Expand Up @@ -209,6 +213,7 @@ func (s *TagScanner) processDeletedDir(ctx context.Context, dir string) error {

for _, t := range mfs {
buffer.accumulate(t)
s.cacheWarmer.AddAlbum(ctx, t.AlbumID)
}

err = buffer.flush()
Expand Down Expand Up @@ -285,6 +290,11 @@ func (s *TagScanner) processChangedDir(ctx context.Context, dir string) error {
}
}

// Pre cache all changed album artwork
for albumID := range buffer.album {
s.cacheWarmer.AddAlbum(ctx, albumID)
}

err = buffer.flush()
log.Info(ctx, "Finished processing changed folder", "dir", dir, "updated", numUpdatedTracks,
"purged", numPurgedTracks, "elapsed", time.Since(start))
Expand Down

0 comments on commit 1e56f4d

Please sign in to comment.