diff --git a/models/codes_indexer.go b/models/codes_indexer.go new file mode 100644 index 000000000000..03d8a4e0f3a8 --- /dev/null +++ b/models/codes_indexer.go @@ -0,0 +1,45 @@ +// Copyright 2016 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package models + +// RepoIndexerStatus status of a repo's entry in the repo indexer +// For now, implicitly refers to default branch +type RepoIndexerStatus struct { + ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"INDEX"` + CommitSha string `xorm:"VARCHAR(40)"` +} + +// GetIndexerStatus loads repo codes indxer status +func (repo *Repository) GetIndexerStatus() error { + if repo.IndexerStatus != nil { + return nil + } + status := &RepoIndexerStatus{RepoID: repo.ID} + has, err := x.Get(status) + if err != nil { + return err + } else if !has { + status.CommitSha = "" + } + repo.IndexerStatus = status + return nil +} + +// UpdateIndexerStatus updates indexer status +func (repo *Repository) UpdateIndexerStatus(sha string) error { + if err := repo.GetIndexerStatus(); err != nil { + return err + } + if len(repo.IndexerStatus.CommitSha) == 0 { + repo.IndexerStatus.CommitSha = sha + _, err := x.Insert(repo.IndexerStatus) + return err + } + repo.IndexerStatus.CommitSha = sha + _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). + Update(repo.IndexerStatus) + return err +} diff --git a/models/models.go b/models/models.go index c7e58737ede8..90c2bfb5d12b 100644 --- a/models/models.go +++ b/models/models.go @@ -360,3 +360,28 @@ func DumpDatabase(filePath string, dbType string) error { } return x.DumpTablesToFile(tbs, filePath) } + +// IsTableNotEmpty returns true if table has at least one record +func IsTableNotEmpty(tableName string) (bool, error) { + return x.Table(tableName).Exist() +} + +// DeleteAllRecords will delete all the records of this table +func DeleteAllRecords(tableName string) error { + _, err := x.Exec(fmt.Sprintf("DELETE FROM %s", tableName)) + return err +} + +// GetMaxID will return max id of the table +func GetMaxID(tableName string) (maxID int64, err error) { + _, err = x.Select("MAX(id)").Table(tableName).Get(&maxID) + return +} + +// FindByMaxID filled results as the condition from database +func FindByMaxID(maxID int64, limit int, results interface{}) error { + return x.Where("id <= ?", maxID). + OrderBy("id DESC"). + Limit(limit). + Find(results) +} diff --git a/models/repo.go b/models/repo.go index 2f87e2f514e2..a2a25cd2ccb2 100644 --- a/models/repo.go +++ b/models/repo.go @@ -977,10 +977,6 @@ func MigrateRepository(doer, u *User, opts MigrateRepoOptions) (*Repository, err repo, err = CleanUpMigrateInfo(repo) } - if err != nil && !repo.IsEmpty { - UpdateRepoIndexer(repo) - } - return repo, err } @@ -1917,7 +1913,6 @@ func DeleteRepository(doer *User, uid, repoID int64) error { go HookQueue.Add(repo.ID) } - DeleteRepoFromIndexer(repo) return nil } diff --git a/models/update.go b/models/update.go index 1492d6c0d351..0883cb0e0116 100644 --- a/models/update.go +++ b/models/update.go @@ -263,10 +263,6 @@ func pushUpdate(opts PushUpdateOptions) (repo *Repository, err error) { commits = ListToPushCommits(l) } - if opts.RefFullName == git.BranchPrefix+repo.DefaultBranch { - UpdateRepoIndexer(repo) - } - if err := CommitRepoAction(CommitRepoActionOptions{ PusherName: opts.PusherName, RepoOwnerID: owner.ID, diff --git a/modules/indexer/codes/bleve.go b/modules/indexer/codes/bleve.go new file mode 100644 index 000000000000..7e7a960819c7 --- /dev/null +++ b/modules/indexer/codes/bleve.go @@ -0,0 +1,325 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +import ( + "errors" + "fmt" + "os" + "strconv" + "strings" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/camelcase" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/token/unique" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" +) + +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) +} + +// idOfIndexerID the integer id associated with an indexer id +func idOfIndexerID(indexerID string) (int64, error) { + id, err := strconv.ParseInt(indexerID, 36, 64) + if err != nil { + return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err) + } + return id, nil +} + +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery { + q := bleve.NewMatchPhraseQuery(matchPhrase) + q.FieldVal = field + q.Analyzer = analyzer + return q +} + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +// createIndexer create a repo indexer if one does not already exist +func createIndexer(path string, latestVersion int) (bleve.Index, error) { + var err error + docMapping := bleve.NewDocumentMapping() + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + + mapping := bleve.NewIndexMapping() + if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name}, + }); err != nil { + return nil, err + } + mapping.DefaultAnalyzer = repoIndexerAnalyzer + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + repoIndexer, err := bleve.New(path, mapping) + if err != nil { + return nil, err + } + return repoIndexer, rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }) +} + +func filenameIndexerID(repoID int64, filename string) string { + return indexerID(repoID) + "_" + filename +} + +func filenameOfIndexerID(indexerID string) string { + index := strings.IndexByte(indexerID, '_') + if index == -1 { + log.Error("Unexpected ID in repo indexer: %s", indexerID) + } + return indexerID[index+1:] +} + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(setting.Indexer.IssuePath) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err + } + return index, nil +} + +const ( + maxBatchSize = 16 + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + repoIndexerLatestVersion = 1 +) + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer represents a bleve indexer implementation +type BleveIndexer struct { + indexDir string + indexer bleve.Index // indexer (thread-safe) index for repository contents +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) *BleveIndexer { + return &BleveIndexer{ + indexDir: indexDir, + } +} + +// Init init the indexer +func (b *BleveIndexer) Init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion) + if err != nil { + return false, err + } + if b.indexer != nil { + return true, nil + } + + b.indexer, err = createIndexer(b.indexDir, repoIndexerLatestVersion) + return false, err +} + +// Index indexes the data +func (b *BleveIndexer) Index(datas []*IndexerData) error { + for _, data := range datas { + repo, err := models.GetRepositoryByID(data.RepoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, update := range changes.Updates { + if err := addUpdate(update, repo, batch); err != nil { + return err + } + } + for _, filename := range changes.RemovedFilenames { + if err := batch.Delete(filenameIndexerID(repo.ID, filename)); err != nil { + return err + } + } + if err = batch.Flush(); err != nil { + return err + } + + if err := repo.UpdateIndexerStatus(sha); err != nil { + return err + } + } + return nil +} + +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(repoIDs ...int64) error { + if len(repoIDs) <= 0 { + return errors.New("no repo id given") + } + + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) + } + + query := bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + ) + + searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) + result, err := b.indexer.Search(searchRequest) + if err != nil { + return err + } + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, hit := range result.Hits { + if err = batch.Delete(hit.ID); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for files in the specified repo. +// Returns the matching file-paths +func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (*SearchResult, error) { + phraseQuery := bleve.NewMatchPhraseQuery(keyword) + phraseQuery.FieldVal = "Content" + phraseQuery.Analyzer = repoIndexerAnalyzer + + var indexerQuery query.Query + if len(repoIDs) > 0 { + var repoQueries = make([]query.Query, 0, len(repoIDs)) + for _, repoID := range repoIDs { + repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) + } + + indexerQuery = bleve.NewConjunctionQuery( + bleve.NewDisjunctionQuery(repoQueries...), + phraseQuery, + ) + } else { + indexerQuery = phraseQuery + } + + from := (page - 1) * pageSize + searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) + searchRequest.Fields = []string{"Content", "RepoID"} + searchRequest.IncludeLocations = true + + result, err := b.indexer.Search(searchRequest) + if err != nil { + return nil, err + } + + matches := make([]Match, len(result.Hits)) + for i, hit := range result.Hits { + var startIndex, endIndex int = -1, -1 + for _, locations := range hit.Locations["Content"] { + location := locations[0] + locationStart := int(location.Start) + locationEnd := int(location.End) + if startIndex < 0 || locationStart < startIndex { + startIndex = locationStart + } + if endIndex < 0 || locationEnd > endIndex { + endIndex = locationEnd + } + } + matches[i] = Match{ + RepoID: int64(hit.Fields["RepoID"].(float64)), + StartIndex: startIndex, + EndIndex: endIndex, + Filename: filenameOfIndexerID(hit.ID), + Content: hit.Fields["Content"].(string), + } + } + return &SearchResult{ + Total: result.Total, + Hits: matches, + }, nil +} + +// RepoIndexerData data stored in the repo indexer +type RepoIndexerData IndexerData + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType +} diff --git a/modules/indexer/codes/bleve_test.go b/modules/indexer/codes/bleve_test.go new file mode 100644 index 000000000000..1b7cf6c09722 --- /dev/null +++ b/modules/indexer/codes/bleve_test.go @@ -0,0 +1,76 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexAndSearch(t *testing.T) { + dir := "./bleve.index" + indexer := NewBleveIndexer(dir) + defer os.RemoveAll(dir) + + _, err := indexer.Init() + assert.NoError(t, err) + + err = indexer.Index([]*IndexerData{ + { + RepoID: 2, + Content: "As title", + }, + { + RepoID: 2, + Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default", + }, + }) + assert.NoError(t, err) + + var ( + keywords = []struct { + Keyword string + IDs []int64 + }{ + { + Keyword: "search", + IDs: []int64{1}, + }, + { + Keyword: "test1", + IDs: []int64{1}, + }, + { + Keyword: "test2", + IDs: []int64{1}, + }, + { + Keyword: "support", + IDs: []int64{1, 2}, + }, + { + Keyword: "chinese", + IDs: []int64{1, 2}, + }, + { + Keyword: "help", + IDs: []int64{}, + }, + } + ) + + for _, kw := range keywords { + res, err := indexer.Search(kw.IDs, kw.Keyword, 1, 10) + assert.NoError(t, err) + + var ids = make([]int64, 0, len(res.Hits)) + for _, hit := range res.Hits { + ids = append(ids, hit.RepoID) + } + assert.EqualValues(t, kw.IDs, ids) + } +} diff --git a/models/repo_indexer.go b/modules/indexer/codes/indexer.go similarity index 50% rename from models/repo_indexer.go rename to modules/indexer/codes/indexer.go index 9a7daa0ff88a..72181aaccd15 100644 --- a/models/repo_indexer.go +++ b/modules/indexer/codes/indexer.go @@ -1,161 +1,168 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. +// Copyright 2019 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package models +package codes import ( "fmt" "strconv" "strings" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "github.com/ethantkoenig/rupture" ) -// RepoIndexerStatus status of a repo's entry in the repo indexer -// For now, implicitly refers to default branch -type RepoIndexerStatus struct { - ID int64 `xorm:"pk autoincr"` - RepoID int64 `xorm:"INDEX"` - CommitSha string `xorm:"VARCHAR(40)"` +// IndexerData data stored in the issue indexer +type IndexerData struct { + RepoID int64 + Filepath string + Content string + IsDelete bool + RepoIDs []int64 } -func (repo *Repository) getIndexerStatus() error { - if repo.IndexerStatus != nil { - return nil - } - status := &RepoIndexerStatus{RepoID: repo.ID} - has, err := x.Get(status) - if err != nil { - return err - } else if !has { - status.CommitSha = "" - } - repo.IndexerStatus = status - return nil +// Match represents on search result +type Match struct { + RepoID int64 + StartIndex int + EndIndex int + Filename string + Content string } -func (repo *Repository) updateIndexerStatus(sha string) error { - if err := repo.getIndexerStatus(); err != nil { - return err - } - if len(repo.IndexerStatus.CommitSha) == 0 { - repo.IndexerStatus.CommitSha = sha - _, err := x.Insert(repo.IndexerStatus) - return err - } - repo.IndexerStatus.CommitSha = sha - _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha"). - Update(repo.IndexerStatus) - return err +// SearchResult represents search results +type SearchResult struct { + Total uint64 + Hits []Match } -type repoIndexerOperation struct { - repo *Repository - deleted bool +// Indexer defines an inteface to indexer issues contents +type Indexer interface { + Init() (bool, error) + Index(datas []*IndexerData) error + Delete(repoIDs ...int64) error + Search(repoIDs []int64, keyword string, page, pageSize int) (*SearchResult, error) } -var repoIndexerOperationQueue chan repoIndexerOperation +var ( + // codesIndexerQueue queue of issue ids to be updated + codesIndexerQueue Queue + codesIndexer Indexer +) -// InitRepoIndexer initialize the repo indexer -func InitRepoIndexer() { +// InitIndexer initialize codes indexer, syncReindex is true then reindex until +// all codes index done. +func InitIndexer(syncReindex bool) error { if !setting.Indexer.RepoIndexerEnabled { - return + return nil + } + + var populate bool + switch setting.Indexer.RepoType { + case "bleve": + codesIndexer = NewBleveIndexer(setting.Indexer.RepoPath) + exist, err := codesIndexer.Init() + if err != nil { + return err + } + populate = !exist + default: + return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) + } + + var err error + switch setting.Indexer.CodesQueueType { + case setting.LevelQueueType: + codesIndexerQueue, err = NewLevelQueue( + codesIndexer, + setting.Indexer.CodesQueueDir, + setting.Indexer.CodesQueueBatchNumber) + if err != nil { + return err + } + case setting.ChannelQueueType: + codesIndexerQueue = NewChannelQueue(codesIndexer, setting.Indexer.CodesQueueBatchNumber) + case setting.RedisQueueType: + addrs, pass, idx, err := parseConnStr(setting.Indexer.CodesQueueConnStr) + if err != nil { + return err + } + codesIndexerQueue, err = NewRedisQueue(addrs, pass, idx, codesIndexer, setting.Indexer.CodesQueueBatchNumber) + if err != nil { + return err + } + default: + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) + } + + go codesIndexerQueue.Run() + + if populate { + if syncReindex { + return populateIndexer() + } + + go func() { + if err := populateIndexer(); err != nil { + log.Error("populateIndexer: %v", err) + } + }() } - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() + + return nil } -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { - exist, err := x.Table("repository").Exist() +// populatendexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateIndexer() error { + notEmpty, err := models.IsTableNotEmpty("repository") if err != nil { return err - } else if !exist { + } else if !notEmpty { return nil } // if there is any existing repo indexer metadata in the DB, delete it // since we are starting afresh. Also, xorm requires deletes to have a // condition, and we want to delete everything, thus 1=1. - if _, err := x.Where("1=1").Delete(new(RepoIndexerStatus)); err != nil { + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { return err } - var maxRepoID int64 - if _, err = x.Select("MAX(id)").Table("repository").Get(&maxRepoID); err != nil { + maxRepoID, err := models.GetMaxID("repository") + if err != nil { return err } - go populateRepoIndexer(maxRepoID) - return nil -} -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -func populateRepoIndexer(maxRepoID int64) { log.Info("Populating the repo indexer with existing repositories") // start with the maximum existing repo ID and work backwards, so that we // don't include repos that are created after gitea starts; such repos will // already be added to the indexer, and we don't need to add them again. for maxRepoID > 0 { - repos := make([]*Repository, 0, RepositoryListDefaultPageSize) - err := x.Where("id <= ?", maxRepoID). - OrderBy("id DESC"). - Limit(RepositoryListDefaultPageSize). - Find(&repos) + repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) + err = models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) if err != nil { - log.Error("populateRepoIndexer: %v", err) - return + return err } else if len(repos) == 0 { break } for _, repo := range repos { - repoIndexerOperationQueue <- repoIndexerOperation{ - repo: repo, - deleted: false, - } + codesIndexerQueue.Push(&IndexerData{ + RepoID: repo.ID, + Filepath: repo.RepoPath(), + IsDelete: false, + }) maxRepoID = repo.ID - 1 } } log.Info("Done populating the repo indexer with existing repositories") -} - -func updateRepoIndexer(repo *Repository) error { - sha, err := getDefaultBranchSha(repo) - if err != nil { - return err - } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return err - } else if changes == nil { - return nil - } - - batch := indexer.RepoIndexerBatch() - for _, update := range changes.Updates { - if err := addUpdate(update, repo, batch); err != nil { - return err - } - } - for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { - return err - } - } - if err = batch.Flush(); err != nil { - return err - } - return repo.updateIndexerStatus(sha) + return nil } // repoChanges changes (file additions/updates/removals) to a repo @@ -169,7 +176,7 @@ type fileUpdate struct { BlobSha string } -func getDefaultBranchSha(repo *Repository) (string, error) { +func getDefaultBranchSha(repo *models.Repository) (string, error) { stdout, err := git.NewCommand("show-ref", "-s", repo.DefaultBranch).RunInDir(repo.RepoPath()) if err != nil { return "", err @@ -178,8 +185,8 @@ func getDefaultBranchSha(repo *Repository) (string, error) { } // getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) { - if err := repo.getIndexerStatus(); err != nil { +func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { + if err := repo.GetIndexerStatus(); err != nil { return nil, err } @@ -189,7 +196,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) { return nonGenesisChanges(repo, revision) } -func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error { +func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). RunInDir(repo.RepoPath()) if err != nil { @@ -208,26 +215,14 @@ func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) } else if !base.IsTextFile(fileContents) { return nil } - indexerUpdate := indexer.RepoIndexerUpdate{ - Filepath: update.Filename, - Op: indexer.RepoIndexerOpUpdate, - Data: &indexer.RepoIndexerData{ - RepoID: repo.ID, - Content: string(fileContents), - }, - } - return indexerUpdate.AddToFlushingBatch(batch) -} - -func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error { - indexerUpdate := indexer.RepoIndexerUpdate{ - Filepath: filename, - Op: indexer.RepoIndexerOpDelete, - Data: &indexer.RepoIndexerData{ - RepoID: repo.ID, + return batch.Index( + filenameIndexerID(repo.ID, update.Filename), + &IndexerData{ + RepoID: repo.ID, + Filepath: update.Filename, + Content: string(fileContents), }, - } - return indexerUpdate.AddToFlushingBatch(batch) + ) } // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command @@ -247,7 +242,7 @@ func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { } // genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(repo *Repository, revision string) (*repoChanges, error) { +func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { var changes repoChanges stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). RunInDirBytes(repo.RepoPath()) @@ -259,7 +254,7 @@ func genesisChanges(repo *Repository, revision string) (*repoChanges, error) { } // nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error) { +func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { diffCmd := git.NewCommand("diff", "--name-status", repo.IndexerStatus.CommitSha, revision) stdout, err := diffCmd.RunInDir(repo.RepoPath()) @@ -267,9 +262,7 @@ func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error) // previous commit sha may have been removed by a force push, so // try rebuilding from scratch log.Warn("git diff: %v", err) - if err = indexer.DeleteRepoFromIndexer(repo.ID); err != nil { - return nil, err - } + DeleteRepoFromIndexer(repo) return genesisChanges(repo, revision) } var changes repoChanges @@ -309,41 +302,18 @@ func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error) return &changes, err } -func processRepoIndexerOperationQueue() { - for { - op := <-repoIndexerOperationQueue - if op.deleted { - if err := indexer.DeleteRepoFromIndexer(op.repo.ID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) - } - } else { - if err := updateRepoIndexer(op.repo); err != nil { - log.Error("updateRepoIndexer: %v", err) - } - } - } -} - // DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *Repository) { - addOperationToQueue(repoIndexerOperation{repo: repo, deleted: true}) +func DeleteRepoFromIndexer(repo *models.Repository) { + codesIndexerQueue.Push(&IndexerData{ + RepoID: repo.ID, + IsDelete: true, + }) } // UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *Repository) { - addOperationToQueue(repoIndexerOperation{repo: repo, deleted: false}) -} - -func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return - } - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() - } +func UpdateRepoIndexer(repo *models.Repository) { + codesIndexerQueue.Push(&IndexerData{ + RepoID: repo.ID, + IsDelete: false, + }) } diff --git a/modules/indexer/codes/queue.go b/modules/indexer/codes/queue.go new file mode 100644 index 000000000000..17de472ed3b8 --- /dev/null +++ b/modules/indexer/codes/queue.go @@ -0,0 +1,11 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +// Queue defines an interface to save an issue indexer queue +type Queue interface { + Run() error + Push(*IndexerData) error +} diff --git a/modules/indexer/codes/queue_channel.go b/modules/indexer/codes/queue_channel.go new file mode 100644 index 000000000000..83b3840f7152 --- /dev/null +++ b/modules/indexer/codes/queue_channel.go @@ -0,0 +1,71 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +import ( + "time" + + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +// ChannelQueue implements +type ChannelQueue struct { + queue chan *IndexerData + indexer Indexer + batchNumber int +} + +// NewChannelQueue create a memory channel queue +func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue { + return &ChannelQueue{ + queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength), + indexer: indexer, + batchNumber: batchNumber, + } +} + +// Run starts to run the queue +func (c *ChannelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, c.batchNumber) + for { + select { + case data := <-c.queue: + if len(datas) >= c.batchNumber { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + + if data.IsDelete { + if data.RepoID > 0 { + if err := c.indexer.Delete(data.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else if len(data.RepoIDs) > 0 { + if err := c.indexer.Delete(data.RepoIDs...); err != nil { + log.Error("indexer.Delete: %v", err) + } + } + continue + } + datas = append(datas, data) + case <-time.After(time.Millisecond * 100): + i++ + if i >= 3 && len(datas) > 0 { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + } + } +} + +// Push will push the indexer data to queue +func (c *ChannelQueue) Push(data *IndexerData) error { + c.queue <- data + return nil +} diff --git a/modules/indexer/codes/queue_disk.go b/modules/indexer/codes/queue_disk.go new file mode 100644 index 000000000000..97096b966142 --- /dev/null +++ b/modules/indexer/codes/queue_disk.go @@ -0,0 +1,101 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +import ( + "encoding/json" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/lunny/levelqueue" +) + +var ( + _ Queue = &LevelQueue{} +) + +// LevelQueue implements a disk library queue +type LevelQueue struct { + indexer Indexer + queue *levelqueue.Queue + batchNumber int +} + +// NewLevelQueue creates a ledis local queue +func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) { + queue, err := levelqueue.Open(dataDir) + if err != nil { + return nil, err + } + + return &LevelQueue{ + indexer: indexer, + queue: queue, + batchNumber: batchNumber, + }, nil +} + +// Run starts to run the queue +func (l *LevelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, l.batchNumber) + for { + bs, err := l.queue.RPop() + if err != nil { + log.Error("RPop: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { + l.indexer.Index(datas) + datas = make([]*IndexerData, 0, l.batchNumber) + i = 0 + } + + if len(bs) <= 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data IndexerData + err = json.Unmarshal(bs, &data) + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("LedisLocalQueue: task found: %#v", data) + + if data.IsDelete { + if data.RepoID > 0 { + if err = l.indexer.Delete(data.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else if len(data.RepoIDs) > 0 { + if err = l.indexer.Delete(data.RepoIDs...); err != nil { + log.Error("indexer.Delete: %v", err) + } + } + time.Sleep(time.Millisecond * 10) + continue + } + + datas = append(datas, &data) + time.Sleep(time.Millisecond * 10) + } +} + +// Push will push the indexer data to queue +func (l *LevelQueue) Push(data *IndexerData) error { + bs, err := json.Marshal(data) + if err != nil { + return err + } + return l.queue.LPush(bs) +} diff --git a/modules/indexer/codes/queue_redis.go b/modules/indexer/codes/queue_redis.go new file mode 100644 index 000000000000..f0855815334b --- /dev/null +++ b/modules/indexer/codes/queue_redis.go @@ -0,0 +1,146 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package codes + +import ( + "encoding/json" + "errors" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/modules/log" + + "github.com/go-redis/redis" +) + +var ( + _ Queue = &RedisQueue{} +) + +type redisClient interface { + RPush(key string, args ...interface{}) *redis.IntCmd + LPop(key string) *redis.StringCmd + Ping() *redis.StatusCmd +} + +// RedisQueue redis queue +type RedisQueue struct { + client redisClient + queueName string + indexer Indexer + batchNumber int +} + +func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { + fields := strings.Fields(connStr) + for _, f := range fields { + items := strings.SplitN(f, "=", 2) + if len(items) < 2 { + continue + } + switch strings.ToLower(items[0]) { + case "addrs": + addrs = items[1] + case "password": + password = items[1] + case "db": + dbIdx, err = strconv.Atoi(items[1]) + if err != nil { + return + } + } + } + return +} + +// NewRedisQueue creates single redis or cluster redis queue +func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) { + dbs := strings.Split(addrs, ",") + var queue = RedisQueue{ + queueName: "issue_indexer_queue", + indexer: indexer, + batchNumber: batchNumber, + } + if len(dbs) == 0 { + return nil, errors.New("no redis host found") + } else if len(dbs) == 1 { + queue.client = redis.NewClient(&redis.Options{ + Addr: strings.TrimSpace(dbs[0]), // use default Addr + Password: password, // no password set + DB: dbIdx, // use default DB + }) + } else { + queue.client = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: dbs, + }) + } + if err := queue.client.Ping().Err(); err != nil { + return nil, err + } + return &queue, nil +} + +// Run runs the redis queue +func (r *RedisQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, r.batchNumber) + for { + bs, err := r.client.LPop(r.queueName).Bytes() + if err != nil && err != redis.Nil { + log.Error("LPop faile: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) { + r.indexer.Index(datas) + datas = make([]*IndexerData, 0, r.batchNumber) + i = 0 + } + + if len(bs) <= 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data IndexerData + err = json.Unmarshal(bs, &data) + if err != nil { + log.Error("Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("RedisQueue: task found: %#v", data) + + if data.IsDelete { + if data.RepoID > 0 { + if err = r.indexer.Delete(data.RepoID); err != nil { + log.Error("indexer.Delete: %v", err) + } + } else if len(data.RepoIDs) > 0 { + if err = r.indexer.Delete(data.RepoIDs...); err != nil { + log.Error("indexer.Delete: %v", err) + } + } + time.Sleep(time.Millisecond * 100) + continue + } + + datas = append(datas, &data) + time.Sleep(time.Millisecond * 100) + } +} + +// Push implements Queue +func (r *RedisQueue) Push(data *IndexerData) error { + bs, err := json.Marshal(data) + if err != nil { + return err + } + return r.client.RPush(r.queueName, bs).Err() +} diff --git a/modules/search/search.go b/modules/indexer/codes/search.go similarity index 84% rename from modules/search/search.go rename to modules/indexer/codes/search.go index 9b93fe58fb04..1ce4b82c1b12 100644 --- a/modules/search/search.go +++ b/modules/indexer/codes/search.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package search +package codes import ( "bytes" @@ -11,7 +11,6 @@ import ( "strings" "code.gitea.io/gitea/modules/highlight" - "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/util" ) @@ -60,7 +59,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error { return nil } -func searchResult(result *indexer.RepoSearchResult, startIndex, endIndex int) (*Result, error) { +func searchResult(result *Match, startIndex, endIndex int) (*Result, error) { startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") var formattedLinesBuffer bytes.Buffer @@ -113,19 +112,19 @@ func PerformSearch(repoIDs []int64, keyword string, page, pageSize int) (int, [] return 0, nil, nil } - total, results, err := indexer.SearchRepoByKeyword(repoIDs, keyword, page, pageSize) + result, err := codesIndexer.Search(repoIDs, keyword, page, pageSize) if err != nil { return 0, nil, err } - displayResults := make([]*Result, len(results)) + displayResults := make([]*Result, len(result.Hits)) - for i, result := range results { - startIndex, endIndex := indices(result.Content, result.StartIndex, result.EndIndex) - displayResults[i], err = searchResult(result, startIndex, endIndex) + for i, hit := range result.Hits { + startIndex, endIndex := indices(hit.Content, hit.StartIndex, hit.EndIndex) + displayResults[i], err = searchResult(&hit, startIndex, endIndex) if err != nil { return 0, nil, err } } - return int(total), displayResults, nil + return int(result.Total), displayResults, nil } diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go deleted file mode 100644 index 9e12a7f5013b..000000000000 --- a/modules/indexer/indexer.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2016 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package indexer - -import ( - "fmt" - "os" - "strconv" - - "code.gitea.io/gitea/modules/setting" - - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/token/unicodenorm" - "github.com/blevesearch/bleve/index/upsidedown" - "github.com/blevesearch/bleve/mapping" - "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" -) - -// indexerID a bleve-compatible unique identifier for an integer id -func indexerID(id int64) string { - return strconv.FormatInt(id, 36) -} - -// idOfIndexerID the integer id associated with an indexer id -func idOfIndexerID(indexerID string) (int64, error) { - id, err := strconv.ParseInt(indexerID, 36, 64) - if err != nil { - return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err) - } - return id, nil -} - -// numericEqualityQuery a numeric equality query for the given value and field -func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { - f := float64(value) - tru := true - q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) - q.SetField(field) - return q -} - -func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery { - q := bleve.NewMatchPhraseQuery(matchPhrase) - q.FieldVal = field - q.Analyzer = analyzer - return q -} - -const unicodeNormalizeName = "unicodeNormalize" - -func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { - return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ - "type": unicodenorm.Name, - "form": unicodenorm.NFC, - }) -} - -const maxBatchSize = 16 - -// openIndexer open the index at the specified path, checking for metadata -// updates and bleve version updates. If index needs to be created (or -// re-created), returns (nil, nil) -func openIndexer(path string, latestVersion int) (bleve.Index, error) { - _, err := os.Stat(setting.Indexer.IssuePath) - if err != nil && os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - - metadata, err := rupture.ReadIndexMetadata(path) - if err != nil { - return nil, err - } - if metadata.Version < latestVersion { - // the indexer is using a previous version, so we should delete it and - // re-populate - return nil, os.RemoveAll(path) - } - - index, err := bleve.Open(path) - if err != nil && err == upsidedown.IncompatibleVersion { - // the indexer was built with a previous version of bleve, so we should - // delete it and re-populate - return nil, os.RemoveAll(path) - } else if err != nil { - return nil, err - } - return index, nil -} diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go deleted file mode 100644 index 287d23854b2d..000000000000 --- a/modules/indexer/repo.go +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright 2017 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package indexer - -import ( - "strings" - - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - - "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/analysis/analyzer/custom" - "github.com/blevesearch/bleve/analysis/token/camelcase" - "github.com/blevesearch/bleve/analysis/token/lowercase" - "github.com/blevesearch/bleve/analysis/token/unique" - "github.com/blevesearch/bleve/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/search/query" - "github.com/ethantkoenig/rupture" -) - -const ( - repoIndexerAnalyzer = "repoIndexerAnalyzer" - repoIndexerDocType = "repoIndexerDocType" - - repoIndexerLatestVersion = 1 -) - -// repoIndexer (thread-safe) index for repository contents -var repoIndexer bleve.Index - -// RepoIndexerOp type of operation to perform on repo indexer -type RepoIndexerOp int - -const ( - // RepoIndexerOpUpdate add/update a file's contents - RepoIndexerOpUpdate = iota - - // RepoIndexerOpDelete delete a file - RepoIndexerOpDelete -) - -// RepoIndexerData data stored in the repo indexer -type RepoIndexerData struct { - RepoID int64 - Content string -} - -// Type returns the document type, for bleve's mapping.Classifier interface. -func (d *RepoIndexerData) Type() string { - return repoIndexerDocType -} - -// RepoIndexerUpdate an update to the repo indexer -type RepoIndexerUpdate struct { - Filepath string - Op RepoIndexerOp - Data *RepoIndexerData -} - -// AddToFlushingBatch adds the update to the given flushing batch. -func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { - id := filenameIndexerID(update.Data.RepoID, update.Filepath) - switch update.Op { - case RepoIndexerOpUpdate: - return batch.Index(id, update.Data) - case RepoIndexerOpDelete: - return batch.Delete(id) - default: - log.Error("Unrecognized repo indexer op: %d", update.Op) - } - return nil -} - -// InitRepoIndexer initialize repo indexer -func InitRepoIndexer(populateIndexer func() error) { - var err error - repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) - if err != nil { - log.Fatal("InitRepoIndexer: %v", err) - } - if repoIndexer != nil { - return - } - - if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil { - log.Fatal("CreateRepoIndexer: %v", err) - } - if err = populateIndexer(); err != nil { - log.Fatal("PopulateRepoIndex: %v", err) - } -} - -// createRepoIndexer create a repo indexer if one does not already exist -func createRepoIndexer(path string, latestVersion int) error { - var err error - docMapping := bleve.NewDocumentMapping() - numericFieldMapping := bleve.NewNumericFieldMapping() - numericFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) - - textFieldMapping := bleve.NewTextFieldMapping() - textFieldMapping.IncludeInAll = false - docMapping.AddFieldMappingsAt("Content", textFieldMapping) - - mapping := bleve.NewIndexMapping() - if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { - return err - } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ - "type": custom.Name, - "char_filters": []string{}, - "tokenizer": unicode.Name, - "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name}, - }); err != nil { - return err - } - mapping.DefaultAnalyzer = repoIndexerAnalyzer - mapping.AddDocumentMapping(repoIndexerDocType, docMapping) - mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) - - repoIndexer, err = bleve.New(path, mapping) - if err != nil { - return err - } - return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ - Version: latestVersion, - }) -} - -func filenameIndexerID(repoID int64, filename string) string { - return indexerID(repoID) + "_" + filename -} - -func filenameOfIndexerID(indexerID string) string { - index := strings.IndexByte(indexerID, '_') - if index == -1 { - log.Error("Unexpected ID in repo indexer: %s", indexerID) - } - return indexerID[index+1:] -} - -// RepoIndexerBatch batch to add updates to -func RepoIndexerBatch() rupture.FlushingBatch { - return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) -} - -// DeleteRepoFromIndexer delete all of a repo's files from indexer -func DeleteRepoFromIndexer(repoID int64) error { - query := numericEqualityQuery(repoID, "RepoID") - searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false) - result, err := repoIndexer.Search(searchRequest) - if err != nil { - return err - } - batch := RepoIndexerBatch() - for _, hit := range result.Hits { - if err = batch.Delete(hit.ID); err != nil { - return err - } - } - return batch.Flush() -} - -// RepoSearchResult result of performing a search in a repo -type RepoSearchResult struct { - RepoID int64 - StartIndex int - EndIndex int - Filename string - Content string -} - -// SearchRepoByKeyword searches for files in the specified repo. -// Returns the matching file-paths -func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (int64, []*RepoSearchResult, error) { - phraseQuery := bleve.NewMatchPhraseQuery(keyword) - phraseQuery.FieldVal = "Content" - phraseQuery.Analyzer = repoIndexerAnalyzer - - var indexerQuery query.Query - if len(repoIDs) > 0 { - var repoQueries = make([]query.Query, 0, len(repoIDs)) - for _, repoID := range repoIDs { - repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID")) - } - - indexerQuery = bleve.NewConjunctionQuery( - bleve.NewDisjunctionQuery(repoQueries...), - phraseQuery, - ) - } else { - indexerQuery = phraseQuery - } - - from := (page - 1) * pageSize - searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false) - searchRequest.Fields = []string{"Content", "RepoID"} - searchRequest.IncludeLocations = true - - result, err := repoIndexer.Search(searchRequest) - if err != nil { - return 0, nil, err - } - - searchResults := make([]*RepoSearchResult, len(result.Hits)) - for i, hit := range result.Hits { - var startIndex, endIndex int = -1, -1 - for _, locations := range hit.Locations["Content"] { - location := locations[0] - locationStart := int(location.Start) - locationEnd := int(location.End) - if startIndex < 0 || locationStart < startIndex { - startIndex = locationStart - } - if endIndex < 0 || locationEnd > endIndex { - endIndex = locationEnd - } - } - searchResults[i] = &RepoSearchResult{ - RepoID: int64(hit.Fields["RepoID"].(float64)), - StartIndex: startIndex, - EndIndex: endIndex, - Filename: filenameOfIndexerID(hit.ID), - Content: hit.Fields["Content"].(string), - } - } - return int64(result.Total), searchResults, nil -} diff --git a/modules/notification/base/notifier.go b/modules/notification/base/notifier.go index e44f3cc63216..64fd40a5866b 100644 --- a/modules/notification/base/notifier.go +++ b/modules/notification/base/notifier.go @@ -40,4 +40,6 @@ type Notifier interface { NotifyNewRelease(rel *models.Release) NotifyUpdateRelease(doer *models.User, rel *models.Release) NotifyDeleteRelease(doer *models.User, rel *models.Release) + + NotifyPushCommits(repo *models.Repository, branch string, opt models.PushUpdateOptions) } diff --git a/modules/notification/base/null.go b/modules/notification/base/null.go index 12be1999f901..17c8243f4af2 100644 --- a/modules/notification/base/null.go +++ b/modules/notification/base/null.go @@ -106,3 +106,7 @@ func (*NullNotifier) NotifyCreateRepository(doer *models.User, u *models.User, r // NotifyMigrateRepository places a place holder function func (*NullNotifier) NotifyMigrateRepository(doer *models.User, u *models.User, repo *models.Repository) { } + +// NotifyPushCommits places a place holder function +func (*NullNotifier) NotifyPushCommits(repo *models.Repository, branch string, opt models.PushUpdateOptions) { +} diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 66614b2c20c7..6e7452cf14ed 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,9 +6,12 @@ package indexer import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/git" + codes_indexer "code.gitea.io/gitea/modules/indexer/codes" issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/setting" ) type indexerNotifier struct { @@ -98,6 +101,9 @@ func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { issue_indexer.DeleteRepoIssueIndexer(repo) + if setting.Indexer.RepoIndexerEnabled { + codes_indexer.DeleteRepoFromIndexer(repo) + } } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { @@ -107,3 +113,15 @@ func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *mod func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) { issue_indexer.UpdateIssueIndexer(issue) } + +func (r *indexerNotifier) NotifyMigrateRepository(doer *models.User, u *models.User, repo *models.Repository) { + if setting.Indexer.RepoIndexerEnabled && !repo.IsEmpty { + codes_indexer.UpdateRepoIndexer(repo) + } +} + +func (r *indexerNotifier) NotifyPushCommits(repo *models.Repository, branch string, opts models.PushUpdateOptions) { + if setting.Indexer.RepoIndexerEnabled && opts.RefFullName == git.BranchPrefix+repo.DefaultBranch { + codes_indexer.UpdateRepoIndexer(repo) + } +} diff --git a/modules/notification/notification.go b/modules/notification/notification.go index e0de88346d75..91ad9c1dd7fb 100644 --- a/modules/notification/notification.go +++ b/modules/notification/notification.go @@ -177,3 +177,10 @@ func NotifyMigrateRepository(doer *models.User, u *models.User, repo *models.Rep notifier.NotifyMigrateRepository(doer, u, repo) } } + +// NotifyPushCommits notefies when pushed commits to repository +func NotifyPushCommits(repo *models.Repository, branch string, opts models.PushUpdateOptions) { + for _, notifier := range notifiers { + notifier.NotifyPushCommits(repo, branch, opts) + } +} diff --git a/modules/repofiles/delete.go b/modules/repofiles/delete.go index ccf90f43b307..09a4dbb44c9b 100644 --- a/modules/repofiles/delete.go +++ b/modules/repofiles/delete.go @@ -183,7 +183,8 @@ func DeleteRepoFile(repo *models.Repository, doer *models.User, opts *DeleteRepo if err = repo.GetOwner(); err != nil { return nil, fmt.Errorf("GetOwner: %v", err) } - err = models.PushUpdate( + err = PushUpdate( + repo, opts.NewBranch, models.PushUpdateOptions{ PusherID: doer.ID, @@ -199,8 +200,6 @@ func DeleteRepoFile(repo *models.Repository, doer *models.User, opts *DeleteRepo return nil, fmt.Errorf("PushUpdate: %v", err) } - // FIXME: Should we UpdateRepoIndexer(repo) here? - file, err := GetFileResponseFromCommit(repo, commit, opts.NewBranch, treePath) if err != nil { return nil, err diff --git a/modules/repofiles/update.go b/modules/repofiles/update.go index 66e3f2babced..a9dadf816504 100644 --- a/modules/repofiles/update.go +++ b/modules/repofiles/update.go @@ -18,6 +18,7 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/lfs" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/structs" ) @@ -394,7 +395,8 @@ func CreateOrUpdateRepoFile(repo *models.Repository, doer *models.User, opts *Up if err = repo.GetOwner(); err != nil { return nil, fmt.Errorf("GetOwner: %v", err) } - err = models.PushUpdate( + err = PushUpdate( + repo, opts.NewBranch, models.PushUpdateOptions{ PusherID: doer.ID, @@ -409,7 +411,6 @@ func CreateOrUpdateRepoFile(repo *models.Repository, doer *models.User, opts *Up if err != nil { return nil, fmt.Errorf("PushUpdate: %v", err) } - models.UpdateRepoIndexer(repo) commit, err = t.GetCommit(commitHash) if err != nil { @@ -422,3 +423,12 @@ func CreateOrUpdateRepoFile(repo *models.Repository, doer *models.User, opts *Up } return file, nil } + +// PushUpdate push updates +func PushUpdate(repo *models.Repository, branch string, opt models.PushUpdateOptions) error { + if err := models.PushUpdate(branch, opt); err != nil { + return err + } + notification.NotifyPushCommits(repo, branch, opt) + return nil +} diff --git a/modules/repofiles/upload.go b/modules/repofiles/upload.go index ed6a9438c70f..5f428c3139a4 100644 --- a/modules/repofiles/upload.go +++ b/modules/repofiles/upload.go @@ -188,7 +188,8 @@ func UploadRepoFiles(repo *models.Repository, doer *models.User, opts *UploadRep if err = repo.GetOwner(); err != nil { return fmt.Errorf("GetOwner: %v", err) } - err = models.PushUpdate( + err = PushUpdate( + repo, opts.NewBranch, models.PushUpdateOptions{ PusherID: doer.ID, @@ -203,7 +204,6 @@ func UploadRepoFiles(repo *models.Repository, doer *models.User, opts *UploadRep if err != nil { return fmt.Errorf("PushUpdate: %v", err) } - // FIXME: Should we models.UpdateRepoIndexer(repo) here? return models.DeleteUploads(uploads...) } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go index 36fd4a020b2c..a70e48de7221 100644 --- a/modules/setting/indexer.go +++ b/modules/setting/indexer.go @@ -21,14 +21,20 @@ var ( Indexer = struct { IssueType string IssuePath string - RepoIndexerEnabled bool - RepoPath string - UpdateQueueLength int - MaxIndexerFileSize int64 IssueQueueType string IssueQueueDir string IssueQueueConnStr string IssueQueueBatchNumber int + + RepoIndexerEnabled bool + RepoType string + RepoPath string + CodesQueueType string + CodesQueueDir string + CodesQueueConnStr string + CodesQueueBatchNumber int + UpdateQueueLength int + MaxIndexerFileSize int64 }{ IssueType: "bleve", IssuePath: "indexers/issues.bleve", @@ -36,6 +42,14 @@ var ( IssueQueueDir: "indexers/issues.queue", IssueQueueConnStr: "", IssueQueueBatchNumber: 20, + + RepoIndexerEnabled: false, + RepoType: "bleve", + RepoPath: "indexers/codes.bleve", + CodesQueueType: LevelQueueType, + CodesQueueDir: "indexers/codes.queue", + CodesQueueConnStr: "", + CodesQueueBatchNumber: 20, } ) @@ -46,6 +60,11 @@ func newIndexerService() { if !filepath.IsAbs(Indexer.IssuePath) { Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath) } + Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) + Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) + Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) + Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) + Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve")) if !filepath.IsAbs(Indexer.RepoPath) { @@ -53,8 +72,4 @@ func newIndexerService() { } Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20) Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024) - Indexer.IssueQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) - Indexer.IssueQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) - Indexer.IssueQueueConnStr = sec.Key("ISSUE_INDEXER_QUEUE_CONN_STR").MustString(path.Join(AppDataPath, "")) - Indexer.IssueQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) } diff --git a/routers/home.go b/routers/home.go index dbe27bd425c3..c5e87d1faaa4 100644 --- a/routers/home.go +++ b/routers/home.go @@ -12,8 +12,8 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/indexer/codes" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/search" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/routers/user" @@ -309,7 +309,7 @@ func ExploreCode(ctx *context.Context) { var ( total int - searchResults []*search.Result + searchResults []*codes.Result ) // if non-admin login user, we need check UnitTypeCode at first @@ -331,14 +331,14 @@ func ExploreCode(ctx *context.Context) { ctx.Data["RepoMaps"] = rightRepoMap - total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) + total, searchResults, err = codes.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err) return } // if non-login user or isAdmin, no need to check UnitTypeCode } else if (ctx.User == nil && len(repoIDs) > 0) || isAdmin { - total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) + total, searchResults, err = codes.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err) return diff --git a/routers/init.go b/routers/init.go index 47f837c523aa..bec64578310b 100644 --- a/routers/init.go +++ b/routers/init.go @@ -15,6 +15,7 @@ import ( "code.gitea.io/gitea/modules/cron" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/highlight" + "code.gitea.io/gitea/modules/indexer/codes" issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/mailer" @@ -94,7 +95,9 @@ func GlobalInit() { if err := issue_indexer.InitIssueIndexer(false); err != nil { log.Fatal("Failed to initialize issue indexer: %v", err) } - models.InitRepoIndexer() + if err := codes.InitIndexer(false); err != nil { + log.Fatal("Failed to initialize codes indexer: %v", err) + } models.InitSyncMirrors() models.InitDeliverHooks() models.InitTestPullRequests() diff --git a/routers/private/push_update.go b/routers/private/push_update.go index 5c42f066ee7d..d1247f099033 100644 --- a/routers/private/push_update.go +++ b/routers/private/push_update.go @@ -11,6 +11,7 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/repofiles" macaron "gopkg.in/macaron.v1" ) @@ -25,6 +26,14 @@ func PushUpdate(ctx *macaron.Context) { return } + repo, err := models.GetRepositoryByOwnerAndName(opt.RepoUserName, opt.RepoName) + if err != nil { + ctx.JSON(500, map[string]interface{}{ + "err": err.Error(), + }) + return + } + branch := strings.TrimPrefix(opt.RefFullName, git.BranchPrefix) if len(branch) == 0 || opt.PusherID <= 0 { ctx.Error(404) @@ -32,7 +41,7 @@ func PushUpdate(ctx *macaron.Context) { return } - err := models.PushUpdate(branch, opt) + err = repofiles.PushUpdate(repo, branch, opt) if err != nil { if models.IsErrUserNotExist(err) { ctx.Error(404) diff --git a/routers/repo/branch.go b/routers/repo/branch.go index ae87aa5b3a09..05d64fb4c8d0 100644 --- a/routers/repo/branch.go +++ b/routers/repo/branch.go @@ -131,15 +131,18 @@ func deleteBranch(ctx *context.Context, branchName string) error { } // Don't return error below this - if err := models.PushUpdate(branchName, models.PushUpdateOptions{ - RefFullName: git.BranchPrefix + branchName, - OldCommitID: commit.ID.String(), - NewCommitID: git.EmptySHA, - PusherID: ctx.User.ID, - PusherName: ctx.User.Name, - RepoUserName: ctx.Repo.Owner.Name, - RepoName: ctx.Repo.Repository.Name, - }); err != nil { + if err := repofiles.PushUpdate( + ctx.Repo.Repository, + branchName, + models.PushUpdateOptions{ + RefFullName: git.BranchPrefix + branchName, + OldCommitID: commit.ID.String(), + NewCommitID: git.EmptySHA, + PusherID: ctx.User.ID, + PusherName: ctx.User.Name, + RepoUserName: ctx.Repo.Owner.Name, + RepoName: ctx.Repo.Repository.Name, + }); err != nil { log.Error("Update: %v", err) } diff --git a/routers/repo/search.go b/routers/repo/search.go index de16eda83d18..a72b57b80566 100644 --- a/routers/repo/search.go +++ b/routers/repo/search.go @@ -10,7 +10,7 @@ import ( "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" - "code.gitea.io/gitea/modules/search" + "code.gitea.io/gitea/modules/indexer/codes" "code.gitea.io/gitea/modules/setting" ) @@ -27,7 +27,7 @@ func Search(ctx *context.Context) { if page <= 0 { page = 1 } - total, searchResults, err := search.PerformSearch([]int64{ctx.Repo.Repository.ID}, + total, searchResults, err := codes.PerformSearch([]int64{ctx.Repo.Repository.ID}, keyword, page, setting.UI.RepoSearchPagingNum) if err != nil { ctx.ServerError("SearchResults", err)