Skip to content

Commit

Permalink
Add plumbing for building item signals
Browse files Browse the repository at this point in the history
Currently, the computed output file is empty apart from CSV column headers;
it will get populated in later changes. The empty file is not served to the
public yet.

#23
  • Loading branch information
brawer committed May 13, 2024
1 parent 1650b72 commit 1f9b0fc
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 2 deletions.
138 changes: 138 additions & 0 deletions cmd/qrank-builder/itemsignals.go
@@ -0,0 +1,138 @@
// SPDX-FileCopyrightText: 2024 Sascha Brawer <sascha@brawer.ch>
// SPDX-License-Identifier: MIT

package main

import (
"bytes"
"context"
"fmt"
"os"
"regexp"
"time"

"github.com/klauspost/compress/zstd"
"github.com/minio/minio-go/v7"
)

// BuildItemSignals builds per-item signals and puts them in storage.
// If the signals file is already in storage, it does not get re-built.
func buildItemSignals(ctx context.Context, pageviews []string, sites *map[string]WikiSite, s3 S3) (time.Time, error) {
stored, err := StoredItemSignalsVersion(ctx, s3)
if err != nil {
return time.Time{}, err
}

newest := ItemSignalsVersion(pageviews, sites)
if !newest.After(stored) {
s := stored.Format(time.DateOnly)
n := newest.Format(time.DateOnly)
logger.Printf("signals in storage are still fresh: stored=%s, newest=%s", s, n)
return stored, nil
}

newestYMD := newest.Format("20060102")
destPath := fmt.Sprintf("public/signals-%s.csv.zst", newestYMD)
logger.Printf("building %s", destPath)
outFile, err := os.CreateTemp("", "*-item_signals.csv.zst")
if err != nil {
return time.Time{}, err
}
zstdLevel := zstd.WithEncoderLevel(zstd.SpeedBestCompression)
writer, err := zstd.NewWriter(outFile, zstdLevel)
if err != nil {
return time.Time{}, err
}

// Write column titles.
columns := []string{
"item",
"pageviews",
"wikitext_bytes",
"claims",
"identifiers",
"sitelinks",
}
var buf bytes.Buffer
for i, col := range columns {
if i != 0 {
if _, err := buf.WriteString(","); err != nil {
return time.Time{}, err
}
}
if _, err := buf.WriteString(col); err != nil {
return time.Time{}, err
}
}
if _, err := buf.WriteString("\n"); err != nil {
return time.Time{}, err
}
if _, err := writer.Write(buf.Bytes()); err != nil {
return time.Time{}, err
}

// TODO: Actually build and write the signals. Not yet implemented.

if err := writer.Close(); err != nil {
return time.Time{}, err
}
if err := outFile.Close(); err != nil {
return time.Time{}, err
}

if err := PutInStorage(ctx, outFile.Name(), s3, "qrank", destPath, "application/zstd"); err != nil {
return time.Time{}, err
}

if err := os.Remove(outFile.Name()); err != nil {
return time.Time{}, err
}

return newest, nil
}

func ItemSignalsVersion(pageviews []string, sites *map[string]WikiSite) time.Time {
var date time.Time
re := regexp.MustCompile(`^pageviews/pageviews-(\d{4}-W\d{2}).zst$`)
for _, pv := range pageviews {
if match := re.FindStringSubmatch(pv); match != nil {
if year, week, err := ParseISOWeek(match[1]); err == nil {
weekStart := ISOWeekStart(year, week)
weekEnd := weekStart.AddDate(0, 0, 6) // weekStart + 6 days
if weekEnd.After(date) {
date = weekEnd
}
}
}
}

for _, site := range *sites {
if site.LastDumped.After(date) {
date = site.LastDumped
}
}

return date
}

// StoredItemSignalsVersion returns the version of the signals file in storage.
// If there is no such file, the result is the zero time.Time without error.
func StoredItemSignalsVersion(ctx context.Context, s3 S3) (time.Time, error) {
re := regexp.MustCompile(`^public/signals-(\d{8})-page_signals.zst$`)
var result time.Time
opts := minio.ListObjectsOptions{Prefix: "public/"}
for obj := range s3.ListObjects(ctx, "qrank", opts) {
if obj.Err != nil {
return time.Time{}, obj.Err
}
if match := re.FindStringSubmatch(obj.Key); match != nil {
if t, err := time.Parse(match[1], "20060201"); err == nil {
if t.After(result) {
result = t
}
}
}
}

return result, nil
}
129 changes: 129 additions & 0 deletions cmd/qrank-builder/itemsignals_test.go
@@ -0,0 +1,129 @@
// SPDX-FileCopyrightText: 2024 Sascha Brawer <sascha@brawer.ch>
// SPDX-License-Identifier: MIT

package main

import (
"bytes"
"context"
"log"
"slices"
"testing"
"time"
)

func TestBuildItemSignals(t *testing.T) {
logger = log.New(&bytes.Buffer{}, "", log.Lshortfile)
ctx := context.Background()
s3 := NewFakeS3()
pageviewsW07 := []string{
"rm.wikipedia,3824,3",
"rm.wikipedia,799,1111",
}
pageviewsW08 := []string{
"rm.wikipedia,3824,2",
"rm.wikipedia,799,4444",
"rm.wikipedia,9999,9999",
}
pageviews := []string{
"pageviews/pageviews-2011-W07.zst",
"pageviews/pageviews-2011-W08.zst",
}
s3.WriteLines(pageviewsW07, pageviews[0])
s3.WriteLines(pageviewsW08, pageviews[1])

rmDumped, _ := time.Parse(time.DateOnly, "2011-12-09")
sites := &map[string]WikiSite{
"rm.wikipedia.org": WikiSite{"rmwiki", "rm.wikipedia.org", rmDumped},
}

date, err := buildItemSignals(ctx, pageviews, sites, s3)
if err != nil {
t.Error(err)
}
gotDate := date.Format(time.DateOnly)
wantDate := "2011-12-09"
if gotDate != wantDate {
t.Errorf("got %s, want %s", gotDate, wantDate)
}

got, err := s3.ReadLines("public/signals-20111209.csv.zst")
if err != nil {
t.Error(err)
}
want := []string{
"item,pageviews,wikitext_bytes,claims,identifiers,sitelinks",
}
if !slices.Equal(got, want) {
t.Errorf("got %v, want %v", got, want)
}
}

// If the most recent pageview file is newer than the last dump
// of any Wikimedia site, ItemSignalsVersion() should return
// the last day of the week of the most recent pageviews file.
func TestItemSignalsVersion_FreshPageviews(t *testing.T) {
pageviews := []string{
"pageviews/pageviews-2023-W17.zst",
"pageviews/pageviews-2023-W18.zst",
"pageviews/pageviews-2023-W19.zst",
}

enDumped, _ := time.Parse(time.DateOnly, "2011-12-31")
rmDumped, _ := time.Parse(time.DateOnly, "2011-11-11")
sites := map[string]WikiSite{
"en.wikipedia.org": WikiSite{"enwiki", "en.wikipedia.org", enDumped},
"rm.wikipedia.org": WikiSite{"rmwiki", "rm.wikipedia.org", rmDumped},
}

got := ItemSignalsVersion(pageviews, &sites).Format(time.DateOnly)
want := "2023-05-14"
if got != want {
t.Errorf("got %s, want %s", got, want)
}
}

// If the most recent pageview file is older than the last dump
// of any Wikimedia site, ItemSignalsVersion() should return
// the date of the most recent Wikimedia site dump.
func TestItemSignalsVersion_OldPageviews(t *testing.T) {
pageviews := []string{
"pageviews/pageviews-2003-W17.zst",
"pageviews/pageviews-2003-W18.zst",
"pageviews/pageviews-2003-W19.zst",
}

enDumped, _ := time.Parse(time.DateOnly, "2011-12-31")
rmDumped, _ := time.Parse(time.DateOnly, "2011-11-11")
sites := map[string]WikiSite{
"en.wikipedia.org": WikiSite{"enwiki", "en.wikipedia.org", enDumped},
"rm.wikipedia.org": WikiSite{"rmwiki", "rm.wikipedia.org", rmDumped},
}

got := ItemSignalsVersion(pageviews, &sites).Format(time.DateOnly)
want := "2011-12-31"
if got != want {
t.Errorf("got %s, want %s", got, want)
}
}

func TestStoredItemSignalsVersion(t *testing.T) {
s3 := NewFakeS3()
got, err := StoredItemSignalsVersion(context.Background(), s3)
if err != nil {
t.Error(err)
}
if !got.IsZero() {
t.Errorf("got %s, want zero", got.Format(time.DateOnly))
}
s3.data["public/20230815-signals.zst"] = []byte("foo")
s3.data["public/20240131-signals.zst"] = []byte("bar")
got, err = StoredItemSignalsVersion(context.Background(), s3)
if err != nil {
t.Error(err)
}
want, _ := time.Parse("2024-01-31", time.DateOnly)
if got != want {
t.Errorf("got %s, want 2024-01-31", got.Format(time.DateOnly))
}
}
10 changes: 8 additions & 2 deletions cmd/qrank-builder/main.go
Expand Up @@ -107,7 +107,8 @@ func computeQRank(dumpsPath string, testRun bool, storage *minio.Client) error {
// fully implemented yet, so we do not yet actually use the output.
// The older approach is done by the call to processPageviews below.
// https://github.com/brawer/wikidata-qrank/issues/23
_, err := buildPageviews(ctx, dumpsPath, 52, s3) // for past 52 weeks
numWeeks := 52
pageviews, err := buildPageviews(ctx, dumpsPath, numWeeks, s3)
if err != nil {
return err
}
Expand All @@ -121,6 +122,11 @@ func computeQRank(dumpsPath string, testRun bool, storage *minio.Client) error {
return err
}

_, err = buildItemSignals(ctx, pageviews, sites, s3)
if err != nil {
return err
}

// TOOD: The following is just a benchmark to see how long it takes
// to read all page_entities within the Wikimedia datacenter.
// Once we know it's reasonable to do this, we can remove this code.
Expand Down Expand Up @@ -156,7 +162,7 @@ func computeQRank(dumpsPath string, testRun bool, storage *minio.Client) error {
return err
}

pageviews, err := processPageviews(testRun, dumpsPath, edate, outDir, ctx)
pageviews, err = processPageviews(testRun, dumpsPath, edate, outDir, ctx)
if err != nil {
return err
}
Expand Down

0 comments on commit 1f9b0fc

Please sign in to comment.