Skip to content

Commit

Permalink
Read S3 files (including page entities) directly, without a temp file
Browse files Browse the repository at this point in the history
  • Loading branch information
brawer committed May 8, 2024
1 parent eca8d47 commit 5df84de
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 52 deletions.
17 changes: 17 additions & 0 deletions cmd/qrank-builder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ func computeQRank(dumpsPath string, testRun bool, storage *minio.Client) error {
return err
}

// TOOD: The following is just a benchmark to see how long it takes
// to read all page_entities within the Wikimedia datacenter.
// Once we know it's reasonable to do this, we can remove this code.
logger.Printf("start reading page_entities")
numEntities := 0
scanner := NewPageEntitiesScanner(sites, s3)
for scanner.Scan() {
numEntities += 1
if numEntities < 5 || numEntities%10000 == 0 {
logger.Printf(" %d: %s\n", numEntities, scanner.Text())
}
}
if err := scanner.Err(); err != nil {
return err
}
logger.Printf("finished reading %d page_entities", numEntities)

// TODO: Old code starts here, remove after new implementation is done.
return nil

Expand Down
91 changes: 39 additions & 52 deletions cmd/qrank-builder/pageentities.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -207,14 +208,15 @@ func storedPageEntities(ctx context.Context, s3 S3) (map[string][]string, error)
}

type pageEntitiesScanner struct {
err error
paths []string
domains []string
curDomain int
scanner *bufio.Scanner
storage S3
tempFile *os.File
reader *zstd.Decoder
err error
paths []string
domains []string
curDomain int
storage S3
reader io.ReadCloser
decompressor *zstd.Decoder
scanner *bufio.Scanner
curLine bytes.Buffer
}

// NewPageEntitiesScanner returns an object similar to bufio.Scanner
Expand All @@ -240,32 +242,28 @@ func NewPageEntitiesScanner(sites *map[string]WikiSite, s3 S3) *pageEntitiesScan
}

return &pageEntitiesScanner{
err: nil,
paths: paths,
domains: domains,
curDomain: -1,
scanner: nil,
storage: s3,
tempFile: nil,
reader: nil,
err: nil,
paths: paths,
domains: domains,
curDomain: -1,
storage: s3,
reader: nil,
decompressor: nil,
scanner: nil,
}
}

func (s *pageEntitiesScanner) Scan() bool {
s.curLine.Truncate(0)
if s.err != nil {
return false
}
if s.tempFile == nil {
tempFile, err := os.CreateTemp("", "page_entities-*")
if err != nil {
s.err = err
return false
}
s.tempFile = tempFile
}
for s.curDomain < len(s.domains) {
if s.scanner != nil {
if s.scanner.Scan() {
s.curLine.WriteString(s.domains[s.curDomain])
s.curLine.WriteByte(',')
s.curLine.Write(s.scanner.Bytes())
return true
}
s.err = s.scanner.Err()
Expand All @@ -277,56 +275,45 @@ func (s *pageEntitiesScanner) Scan() bool {
if s.curDomain == len(s.domains) {
break
}
s.err = s.tempFile.Close()
if s.err != nil {
break
}
opts := minio.GetObjectOptions{}
s.err = s.storage.FGetObject(context.Background(), "qrank", s.paths[s.curDomain], s.tempFile.Name(), opts)
if s.err != nil {
break
}
s.tempFile, s.err = os.Open(s.tempFile.Name())

s.reader, s.err = NewS3Reader(context.Background(), "qrank", s.paths[s.curDomain], s.storage)
if s.err != nil {
break
}
if s.reader == nil {
s.reader, s.err = zstd.NewReader(nil)

if s.decompressor == nil {
s.decompressor, s.err = zstd.NewReader(nil)
if s.err != nil {
break
}
}
s.err = s.reader.Reset(s.tempFile)
s.err = s.decompressor.Reset(s.reader)
if s.err != nil {
break
}
s.scanner = bufio.NewScanner(s.reader)
s.scanner = bufio.NewScanner(s.decompressor)
}

if s.decompressor != nil {
s.decompressor.Close()
s.decompressor = nil
}

if s.reader != nil {
s.reader.Close()
s.reader = nil
}

if s.tempFile != nil {
s.tempFile.Close()
os.Remove(s.tempFile.Name())
s.tempFile = nil
}

s.scanner = nil
return false
}

func (s *pageEntitiesScanner) Bytes() []byte {
return s.curLine.Bytes()
}

func (s *pageEntitiesScanner) Text() string {
if s.scanner == nil || s.err != nil {
return ""
}
var buf strings.Builder
buf.WriteString(s.domains[s.curDomain])
buf.WriteByte(',')
buf.WriteString(s.scanner.Text())
return buf.String()
return s.curLine.String()
}

func (s *pageEntitiesScanner) Err() error {
Expand Down
69 changes: 69 additions & 0 deletions cmd/qrank-builder/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package main

import (
"context"
"fmt"
"io"
"os"

"github.com/minio/minio-go/v7"
//"github.com/minio/minio-go/v7/pkg/credentials"
Expand All @@ -21,6 +24,72 @@ type S3 interface {
FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts minio.PutObjectOptions) (minio.UploadInfo, error)
}

type tempFileReader struct {
file *os.File
}

func (r *tempFileReader) Read(buf []byte) (int, error) {
if r.file != nil {
return r.file.Read(buf)
} else {
return 0, fmt.Errorf("already closed")
}
}

func (r *tempFileReader) Close() error {
if r.file == nil {
return nil
}
err1 := r.file.Close()
err2 := os.Remove(r.file.Name())
r.file = nil
if err1 != nil {
return err1
} else {
return err2
}
}

// NewS3Reader is a heper to work around minio.Object not being constructable
// outside package minio, which makes it difficult to mock out. Unfortunately,
// minio.Client.GetObject() returns *minio.Object instead of io.ReadCloser.
func NewS3Reader(ctx context.Context, bucket string, path string, s3 S3) (io.ReadCloser, error) {
opts := minio.GetObjectOptions{}
if client, ok := s3.(*minio.Client); ok {
logger.Println("Storage is an instance of minio.Client")
obj, err := client.GetObject(ctx, bucket, path, opts)
if err != nil {
return nil, err
}
return obj, nil
}

logger.Println("Storage is _not_ an instance of minio.Client")

// The non-minio implementation is very ugly and quite inefficient,
// but only used in our unit tests. We fetch the content to a temp file,
// and then we return a custom io.ReadCloser that deletes that file file
// when Close() is called.
temp, err := os.CreateTemp("", "s3*")
if err != nil {
return nil, err
}
if err := temp.Close(); err != nil {
return nil, err
}
if err := s3.FGetObject(ctx, bucket, path, temp.Name(), opts); err != nil {
return nil, err
}
tempPath := temp.Name()
temp, err = os.Open(tempPath)
if err != nil {
os.Remove(tempPath)
return nil, err
}

return &tempFileReader{temp}, nil
}

// PutInStorage stores a file in S3 storage.
func PutInStorage(ctx context.Context, file string, s3 S3, bucket string, dest string, contentType string) error {
options := minio.PutObjectOptions{ContentType: contentType}
Expand Down

0 comments on commit 5df84de

Please sign in to comment.