Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Bleve and Elastic code indexers to use a common cat-file --batch #14781

Merged
merged 9 commits into from
Mar 4, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,48 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

// +build !gogit

package git

import (
"bufio"
"bytes"
"io"
"math"
"strconv"
"strings"
)

// CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function
func CatFileBatch(repoPath string) (*io.PipeWriter, *bufio.Reader, func()) {
// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
// so let's create a batch stdin and stdout
batchStdinReader, batchStdinWriter := io.Pipe()
batchStdoutReader, batchStdoutWriter := io.Pipe()
cancel := func() {
_ = batchStdinReader.Close()
_ = batchStdinWriter.Close()
_ = batchStdoutReader.Close()
_ = batchStdoutWriter.Close()
}

go func() {
stderr := strings.Builder{}
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repoPath, batchStdoutWriter, &stderr, batchStdinReader)
if err != nil {
_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String()))
} else {
_ = batchStdoutWriter.Close()
_ = batchStdinReader.Close()
}
}()

// For simplicities sake we'll us a buffered reader to read from the cat-file --batch
batchReader := bufio.NewReader(batchStdoutReader)

return batchStdinWriter, batchReader, cancel
}

// ReadBatchLine reads the header line from cat-file --batch
// We expect:
// <sha> SP <type> SP <size> LF
Expand Down
25 changes: 2 additions & 23 deletions modules/git/commit_info_nogogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,8 @@ func GetLastCommitForPaths(commit *Commit, treePath string, paths []string) ([]*
}
}()

// We feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
// so let's create a batch stdin and stdout
batchStdinReader, batchStdinWriter := io.Pipe()
batchStdoutReader, batchStdoutWriter := io.Pipe()
defer func() {
_ = batchStdinReader.Close()
_ = batchStdinWriter.Close()
_ = batchStdoutReader.Close()
_ = batchStdoutWriter.Close()
}()

go func() {
stderr := strings.Builder{}
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(commit.repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
if err != nil {
_ = revListWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
} else {
_ = revListWriter.Close()
}
}()

// For simplicities sake we'll us a buffered reader
batchReader := bufio.NewReader(batchStdoutReader)
batchStdinWriter, batchReader, cancel := CatFileBatch(commit.repo.Path)
defer cancel()

mapsize := 4096
if len(paths) > mapsize {
Expand Down
23 changes: 2 additions & 21 deletions modules/git/pipeline/lfs_nogogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,8 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) {

// Next feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary.
// so let's create a batch stdin and stdout
batchStdinReader, batchStdinWriter := io.Pipe()
batchStdoutReader, batchStdoutWriter := io.Pipe()
defer func() {
_ = batchStdinReader.Close()
_ = batchStdinWriter.Close()
_ = batchStdoutReader.Close()
_ = batchStdoutWriter.Close()
}()

go func() {
stderr := strings.Builder{}
err := git.NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
if err != nil {
_ = revListWriter.CloseWithError(git.ConcatenateError(err, (&stderr).String()))
} else {
_ = revListWriter.Close()
}
}()

// For simplicities sake we'll us a buffered reader to read from the cat-file --batch
batchReader := bufio.NewReader(batchStdoutReader)
batchStdinWriter, batchReader, cancel := git.CatFileBatch(repo.Path)
defer cancel()

// We'll use a scanner for the revList because it's simpler than a bufio.Reader
scan := bufio.NewScanner(revListReader)
Expand Down
27 changes: 2 additions & 25 deletions modules/git/repo_language_stats_nogogit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"bytes"
"io"
"math"
"strings"

"code.gitea.io/gitea/modules/analyze"

Expand All @@ -22,30 +21,8 @@ import (
func (repo *Repository) GetLanguageStats(commitID string) (map[string]int64, error) {
// We will feed the commit IDs in order into cat-file --batch, followed by blobs as necessary.
// so let's create a batch stdin and stdout

batchStdinReader, batchStdinWriter := io.Pipe()
batchStdoutReader, batchStdoutWriter := io.Pipe()
defer func() {
_ = batchStdinReader.Close()
_ = batchStdinWriter.Close()
_ = batchStdoutReader.Close()
_ = batchStdoutWriter.Close()
}()

go func() {
stderr := strings.Builder{}
err := NewCommand("cat-file", "--batch").RunInDirFullPipeline(repo.Path, batchStdoutWriter, &stderr, batchStdinReader)
if err != nil {
_ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String()))
_ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String()))
} else {
_ = batchStdoutWriter.Close()
_ = batchStdinReader.Close()
}
}()

// For simplicities sake we'll us a buffered reader
batchReader := bufio.NewReader(batchStdoutReader)
batchStdinWriter, batchReader, cancel := CatFileBatch(repo.Path)
defer cancel()

writeID := func(id string) error {
_, err := batchStdinWriter.Write([]byte(id))
Expand Down
30 changes: 24 additions & 6 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
package code

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -173,7 +176,7 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
return indexer, created, err
}

func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
func (b *BleveIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
// Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
return nil
Expand All @@ -196,8 +199,16 @@ func (b *BleveIndexer) addUpdate(commitSha string, update fileUpdate, repo *mode
return b.addDelete(update.Filename, repo, batch)
}

fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
RunInDirBytes(repo.RepoPath())
if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
return err
}

_, _, size, err := git.ReadBatchLine(batchReader)
if err != nil {
return err
}

fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size))
if err != nil {
return err
} else if !base.IsTextFile(fileContents) {
Expand Down Expand Up @@ -254,10 +265,17 @@ func (b *BleveIndexer) Close() {
// Index indexes the data
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, update := range changes.Updates {
if err := b.addUpdate(sha, update, repo, batch); err != nil {
return err
if len(changes.Updates) > 0 {

batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
defer cancel()

for _, update := range changes.Updates {
if err := b.addUpdate(batchWriter, batchReader, sha, update, repo, batch); err != nil {
return err
}
}
cancel()
}
for _, filename := range changes.RemovedFilenames {
if err := b.addDelete(filename, repo, batch); err != nil {
Expand Down
38 changes: 28 additions & 10 deletions modules/indexer/code/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
package code

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -172,7 +175,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
return exists, nil
}

func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
func (b *ElasticSearchIndexer) addUpdate(batchWriter *io.PipeWriter, batchReader *bufio.Reader, sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) {
// Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) {
return nil, nil
Expand All @@ -195,8 +198,16 @@ func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *mo
return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil
}

fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
RunInDirBytes(repo.RepoPath())
if _, err := batchWriter.Write([]byte(update.BlobSha + "\n")); err != nil {
return nil, err
}

_, _, size, err := git.ReadBatchLine(batchReader)
if err != nil {
return nil, err
}

fileContents, err := ioutil.ReadAll(io.LimitReader(batchReader, size))
if err != nil {
return nil, err
} else if !base.IsTextFile(fileContents) {
Expand Down Expand Up @@ -230,14 +241,21 @@ func (b *ElasticSearchIndexer) addDelete(filename string, repo *models.Repositor
// Index will save the index data
func (b *ElasticSearchIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
reqs := make([]elastic.BulkableRequest, 0)
for _, update := range changes.Updates {
updateReqs, err := b.addUpdate(sha, update, repo)
if err != nil {
return err
}
if len(updateReqs) > 0 {
reqs = append(reqs, updateReqs...)
if len(changes.Updates) > 0 {

batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
defer cancel()

for _, update := range changes.Updates {
updateReqs, err := b.addUpdate(batchWriter, batchReader, sha, update, repo)
if err != nil {
return err
}
if len(updateReqs) > 0 {
reqs = append(reqs, updateReqs...)
}
}
cancel()
}

for _, filename := range changes.RemovedFilenames {
Expand Down