Skip to content
This repository has been archived by the owner on Oct 3, 2022. It is now read-only.

Commit

Permalink
server: cached thread index endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bakape committed Feb 15, 2020
1 parent 6afe4e5 commit 3dc62fe
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 203 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ debug.test
/config.json
db_*.db
/target
vendor
210 changes: 136 additions & 74 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,30 @@
package cache

import (
"encoding/gob"
"net/http"
"time"

"github.com/bakape/meguca/config"
"github.com/bakape/recache"
"github.com/bakape/meguca/db"
"github.com/bakape/recache/v5"
)

const evictionTimer = time.Second * 10

var (
cache *recache.Cache

// Cache frontend for retreiving thread page JSON
threadFrontend *recache.Frontend

// Cache frontend for retrieving the thread index.
// Contrains only one record.
indexFrontend *recache.Frontend

// Stores the threads IDs pf all threads.
// Contrains only one record.
threadIDFrontend *recache.Frontend
)

// Key for identifying thread pages
Expand All @@ -24,85 +37,134 @@ type threadKey struct {

// Init cache with specified max memory usage
func Init() (err error) {
cache = recache.NewCache(recache.Options{
cache = recache.NewCache(recache.CacheOptions{
MemoryLimit: uint(config.Server.CacheSize * (1 << 20)),
LRULimit: time.Hour,
})

// TODO: Global post index frontend

// threadFrontend = cache.NewFrontend(
// func(k recache.Key, rw *recache.RecordWriter) (err error) {
// key := k.(threadKey)
// buf, err := db.GetThread(key.id, key.page)
// if err != nil {
// return
// }
// rw.Write(buf)
// return
// },
// )

// listen := func(ch string, handler func(string) error) error {
// return db.Listen(pg_util.ListenOpts{
// DebounceInterval: time.Second,
// Channel: "thread.updated",
// OnMsg: handler,
// OnConnectionLoss: cache.EvictAll,
// })
// }

// err = listen("thread.updated", func(msg string) (err error) {
// ints, err := db.SplitUint64s(msg, 2)
// if err != nil {
// return
// }
// thread := ints[0]
// page := int(ints[1])

// threadFrontend.EvictByFunc(func(k recache.Key) (bool, error) {
// key := k.(threadKey)
// if key.id == thread {
// switch page {
// case -2, key.page:
// return true, nil
// }
// }
// return false, nil
// })

// return
// })
// if err != nil {
// return
// }
// return listen("thread.deleted", func(msg string) (err error) {
// thread, err := strconv.ParseUint(msg, 10, 64)
// if err != nil {
// return
// }

// threadFrontend.EvictByFunc(func(k recache.Key) (bool, error) {
// return k.(threadKey).id == thread, nil
// })

// return
// })
threadFrontend = cache.NewFrontend(func(
k recache.Key,
rw *recache.RecordWriter,
) (err error) {
key := k.(threadKey)
buf, err := db.GetThread(key.id, key.page)
if err != nil {
return
}
rw.Write(buf)
return
})

threadIDFrontend = cache.NewFrontend(func(
_ recache.Key,
rw *recache.RecordWriter,
) (err error) {
ids, err := db.GetThreadIDs()
if err != nil {
return
}
return gob.NewEncoder(rw).Encode(ids)
})

indexFrontend = cache.NewFrontend(func(
_ recache.Key,
rw *recache.RecordWriter,
) (err error) {
var ids []uint64
s, err := rw.Bind(threadIDFrontend, struct{}{})
if err != nil {
return
}
err = gob.NewDecoder(s.Decompress()).Decode(&ids)
if err != nil {
return
}

_, err = rw.Write([]byte{'['})
if err != nil {
return
}
for i, id := range ids {
if i != 0 {
_, err = rw.Write([]byte{','})
if err != nil {
return
}
}
err = rw.Include(threadFrontend, threadKey{
id: id,
page: -5,
})
if err != nil {
return
}
}
_, err = rw.Write([]byte{']'})
return
})

return
}

// Clear entire cache
func Clear() {
cache.EvictAll()
// Evict entire cache
func EvictAll() {
cache.EvictAll(evictionTimer)
}

// // Write thread page JSON to w
// // page: page of the thread to fetch. -1 to fetch the last page.
// func Thread(
// w http.ResponseWriter, r *http.Request,
// id uint64,
// page int,
// ) (err error) {
// _, err = threadFrontend.WriteHTTP(threadKey{id, page}, w, r)
// return
// }
// Evict all stored data for a thread
func EvictThread(id uint64) {
threadFrontend.EvictByFunc(
evictionTimer,
func(k recache.Key) (bool, error) {
return k.(threadKey).id == id, nil
},
)
}

// Evict a single page of a thread
func EvictThreadPage(id uint64, page uint) {
threadFrontend.Evict(evictionTimer, threadKey{
id: id,
page: int(page),
})

// Always evict last 5 posts as the change is most likely to happen in those
// anyway. We can omit cheking this page is actually includes them.
threadFrontend.Evict(evictionTimer, threadKey{
id: id,
page: -5,
})
}

// Call this to evict caches on new thread creation or old thread deletion
func EvictThreadList() {
threadIDFrontend.EvictAll(0)
}

// Write thread page JSON to w
//
// page: page of the thread to write;
// -1 to get the last page;
// -5 to get last 5 post variant for the thread index;
func WriteThread(
w http.ResponseWriter, r *http.Request,
id uint64,
page int,
) (err error) {
// Normalize -1 to not produce duplicate cache entries
if page == -1 {
page, err = db.GetLastPage(id)
if err != nil {
return
}
}

_, err = threadFrontend.WriteHTTP(threadKey{id, page}, w, r)
return
}

// Write thread index JSON to w
func WriteIndex(w http.ResponseWriter, r *http.Request) (err error) {
_, err = indexFrontend.WriteHTTP(struct{}{}, w, r)
return
}
34 changes: 34 additions & 0 deletions db/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,37 @@ func GetThread(id uint64, page int) (thread []byte, err error) {
castNoRows(&thread, &err)
return
}

// Get all existing thread IDs
func GetThreadIDs() (ids []uint64, err error) {
r, err := db.Query(context.Background(), "select id from threads")
if err != nil {
return
}
defer r.Close()

for r.Next() {
var id uint64
err = r.Scan(&id)
if err != nil {
return
}
ids = append(ids, id)
}
err = r.Err()
return
}

// Get the number of the last page of a thread
func GetLastPage(id uint64) (n int, err error) {
err = db.
QueryRow(
context.Background(),
`select coalesce(max(page), 0)
from posts
where thread = $1`,
id,
).
Scan(&n)
return
}
56 changes: 56 additions & 0 deletions db/threads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -153,6 +155,8 @@ func TestGetFeedData(t *testing.T) {
}

func TestGetThread(t *testing.T) {
clearTables(t, "threads")

img, _, closeFiles := prepareSampleImage(t)
closeFiles()
thread, user := insertSampleThread(t)
Expand Down Expand Up @@ -376,4 +380,56 @@ func TestGetThread(t *testing.T) {
}
})
}

t.Run("get thread IDs", func(t *testing.T) {
t.Parallel()

ids, err := GetThreadIDs()
if err != nil {
t.Fatal(err)
}
sort.Sort(idSorter(ids))
fmt.Println(ids, []uint64{thread, thread2})
test.AssertEquals(t, ids, []uint64{thread, thread2})
})

t.Run("get page counts", func(t *testing.T) {
t.Parallel()

cases := [...]struct {
name string
thread uint64
last int
}{
{
name: "small",
thread: thread2,
last: 0,
},
{
name: "bigger",
thread: thread,
last: 1,
},
{
name: "no thread",
thread: thread2 + 20,
last: 0,
},
}

for i := range cases {
c := cases[i]
t.Run(c.name, func(t *testing.T) {
t.Parallel()

last, err := GetLastPage(c.thread)
if err != nil {
t.Fatal(err)
}
test.AssertEquals(t, c.last, last)
})
}

})
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ require (
github.com/Masterminds/squirrel v1.1.0 // indirect
github.com/bakape/captchouli v1.2.0
github.com/bakape/pg_util v0.11.0
github.com/bakape/recache v0.0.0-20190524102257-67e46453405b
github.com/bakape/recache v1.0.1 // indirect
github.com/bakape/recache/v4 v4.0.2
github.com/bakape/recache/v5 v5.0.2
github.com/bakape/thumbnailer/v2 v2.5.6
github.com/chai2010/webp v1.1.0
github.com/dimfeld/httptreemux v5.0.1+incompatible
Expand All @@ -26,10 +28,8 @@ require (
github.com/go-playground/log v6.3.0+incompatible
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/google/go-cmp v0.3.1
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgconn v1.2.0
github.com/jackc/pgtype v1.1.0
github.com/jackc/pgx v3.6.0+incompatible
github.com/jackc/pgx/v4 v4.2.0
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
Expand Down
Loading

0 comments on commit 3dc62fe

Please sign in to comment.