Skip to content

Commit

Permalink
Keep only the last three versions of page_entities
Browse files Browse the repository at this point in the history
  • Loading branch information
brawer committed May 8, 2024
1 parent afe986f commit a31b203
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 4 deletions.
19 changes: 18 additions & 1 deletion cmd/qrank-builder/pageentities.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func buildPageEntities(ctx context.Context, dumps string, sites *map[string]Wiki
if err != nil {
return err
}
tasks := make(chan WikiSite, 1000)
tasks := make(chan WikiSite, len(*sites))
group, groupCtx := errgroup.WithContext(ctx)
for i := 0; i < runtime.NumCPU(); i++ {
group.Go(func() error {
Expand All @@ -54,17 +54,34 @@ func buildPageEntities(ctx context.Context, dumps string, sites *map[string]Wiki
})
}

built := make(map[string]string, len(*sites))
for _, site := range *sites {
ymd := site.LastDumped.Format("20060102")
if arr, ok := stored[site.Key]; !ok || !slices.Contains(arr, ymd) {
tasks <- site
built[site.Key] = ymd
}
}
close(tasks)

if err := group.Wait(); err != nil {
return err
}

// Clean up old files. We only touch those wikis for which we built a new file.
for site, ymd := range built {
versions := append(stored[site], ymd)
sort.Strings(versions)
pos := slices.Index(versions, ymd)
for i := 0; i < pos-2; i += 1 {
path := fmt.Sprintf("page_entities/%s-%s-page_entities.zst", site, versions[i])
opts := minio.RemoveObjectOptions{}
if err := s3.RemoveObject(ctx, "qrank", path, opts); err != nil {
return err
}
}
}

return nil
}

Expand Down
21 changes: 19 additions & 2 deletions cmd/qrank-builder/pageentities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"reflect"
"slices"
"strings"
"testing"
"time"

Expand All @@ -24,7 +25,11 @@ func TestBuildPageEntities(t *testing.T) {
dumps := filepath.Join("testdata", "dumps")
s3 := NewFakeS3()
s3.data["page_entities/loginwiki-20240501-page_entities.zst"] = []byte("old-loginwiki")
s3.data["page_entities/rmwiki-20010203-page_entities.zst"] = []byte("old-rmwiki")
s3.data["page_entities/rmwiki-20010203-page_entities.zst"] = []byte("old-2001")
s3.data["page_entities/rmwiki-20020203-page_entities.zst"] = []byte("old-2002")
s3.data["page_entities/rmwiki-20030203-page_entities.zst"] = []byte("old-2003")
s3.data["page_entities/rmwiki-20040203-page_entities.zst"] = []byte("old-2004")
s3.data["page_entities/rmwiki-20050203-page_entities.zst"] = []byte("old-2005")
sites, err := ReadWikiSites(dumps)
if err != nil {
t.Fatal(err)
Expand All @@ -41,13 +46,14 @@ func TestBuildPageEntities(t *testing.T) {
t.Errorf("should not re-compute previously stored page_entities")
}

// For rmwiki-2024, new data should have been computed and put in storage.
// Make sure that data looks as expected.
path := "page_entities/rmwiki-20240301-page_entities.zst"
reader, err := zstd.NewReader(bytes.NewReader(s3.data[path]))
if err != nil {
t.Error(err)
}
defer reader.Close()

var buf bytes.Buffer
if _, err = io.Copy(&buf, reader); err != nil {
t.Error(err)
Expand All @@ -57,6 +63,17 @@ func TestBuildPageEntities(t *testing.T) {
if got != want {
t.Errorf("got %v, want %v", got, want)
}

// Verify that obsolete files have been cleaned up.
stored, err := storedPageEntities(context.Background(), s3)
if err != nil {
t.Error(err)
}
got = strings.Join(stored["rmwiki"], " ")
want = "20040203 20050203 20240301"
if got != want {
t.Errorf(`should clean up old page_entities; got "%s", want "%s"`, got, want)
}
}

func TestStoredPageEntitites(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions cmd/qrank-builder/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// A fake implementation for tests is in FakeS3, implemented in s3_test.go.
type S3 interface {
ListObjects(ctx context.Context, bucketName string, opts minio.ListObjectsOptions) <-chan minio.ObjectInfo
RemoveObject(ctx context.Context, bucketName string, objectName string, opts minio.RemoveObjectOptions) error
FGetObject(ctx context.Context, bucketName, objectName, filePath string, opts minio.GetObjectOptions) error
FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts minio.PutObjectOptions) (minio.UploadInfo, error)
}
Expand Down
27 changes: 26 additions & 1 deletion cmd/qrank-builder/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"fmt"
"os"
"strings"
"sync"

"github.com/minio/minio-go/v7"
)

type FakeS3 struct {
data map[string][]byte
data map[string][]byte
mutex sync.RWMutex
}

func NewFakeS3() *FakeS3 {
Expand All @@ -24,6 +26,9 @@ func NewFakeS3() *FakeS3 {
}

func (s3 *FakeS3) ListObjects(ctx context.Context, bucketName string, opts minio.ListObjectsOptions) <-chan minio.ObjectInfo {
s3.mutex.RLock()
defer s3.mutex.RUnlock()

ch := make(chan minio.ObjectInfo, 2)
go func() {
defer close(ch)
Expand All @@ -39,7 +44,24 @@ func (s3 *FakeS3) ListObjects(ctx context.Context, bucketName string, opts minio
return ch
}

func (s3 *FakeS3) RemoveObject(ctx context.Context, bucketName string, objectName string, opts minio.RemoveObjectOptions) error {
s3.mutex.Lock()
defer s3.mutex.Unlock()

if bucketName != "qrank" {
return fmt.Errorf(`unexpected bucket "%s"`, bucketName)
}
if _, ok := s3.data[objectName]; !ok {
return fmt.Errorf(`file not found: %s`, objectName)
}
delete(s3.data, objectName)
return nil
}

func (s3 *FakeS3) FGetObject(ctx context.Context, bucketName, objectName, filePath string, opts minio.GetObjectOptions) error {
s3.mutex.RLock()
defer s3.mutex.RUnlock()

if bucketName != "qrank" {
return fmt.Errorf(`unexpected bucket "%s"`, bucketName)
}
Expand Down Expand Up @@ -67,6 +89,9 @@ func (s3 *FakeS3) FGetObject(ctx context.Context, bucketName, objectName, filePa
}

func (s3 *FakeS3) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts minio.PutObjectOptions) (minio.UploadInfo, error) {
s3.mutex.Lock()
defer s3.mutex.Unlock()

info := minio.UploadInfo{}
if bucketName != "qrank" {
return info, fmt.Errorf("unexpected bucket %v", bucketName)
Expand Down

0 comments on commit a31b203

Please sign in to comment.