Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: index creation timeouts #199

Merged
merged 1 commit into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/store/postgres/document_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestDocumentSearchWithIndexEndToEnd(t *testing.T) {
err = vci.CreateIndex(context.Background(), true)
assert.NoError(t, err)

pollIndexCreation(documentStore, collectionName, ctx, t)

// Set Collection's IsIndexed flag to true
col, err := documentStore.GetCollection(ctx, vci.Collection.Name)
assert.NoError(t, err)
Expand Down
88 changes: 51 additions & 37 deletions pkg/store/postgres/vector_col_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/getzep/zep/pkg/models"

"github.com/uptrace/bun"
)

// reference: https://github.com/pgvector/pgvector#indexing

const IndexTimeout = 1 * time.Hour
const EmbeddingColName = "embedding"

// MinRowsForIndex is the minimum number of rows required to create an index. The pgvector docs
Expand Down Expand Up @@ -90,7 +90,6 @@ func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error {
}
// Lock the mutex for this collection.
IndexMutexMap[vci.Collection.Name].Lock()
defer IndexMutexMap[vci.Collection.Name].Unlock()

if vci.Collection.DistanceFunction != "cosine" {
return fmt.Errorf("only cosine distance function is currently supported")
Expand All @@ -108,40 +107,55 @@ func (vci *VectorColIndex) CreateIndex(ctx context.Context, force bool) error {

indexName := fmt.Sprintf("%s_%s_idx", vci.Collection.TableName, vci.ColName)

// Drop index if it exists
// We're using CONCURRENTLY for both drop and index operations. This means we can't run them in a transaction.
_, err := db.ExecContext(
ctx,
"DROP INDEX CONCURRENTLY IF EXISTS ?",
bun.Ident(indexName),
)
if err != nil {
return fmt.Errorf("error dropping index: %w", err)
}

// currently only supports cosine distance ops
_, err = db.ExecContext(
ctx,
"CREATE INDEX CONCURRENTLY ON ? USING ivfflat (embedding vector_cosine_ops) WITH (lists = ?)",
bun.Ident(vci.Collection.TableName),
vci.ListCount,
)
if err != nil {
return fmt.Errorf("error creating index: %w", err)
}

// Set Collection's IsIndexed flag to true
collection, err := vci.appState.DocumentStore.GetCollection(ctx, vci.Collection.Name)
if err != nil {
return fmt.Errorf("error getting collection: %w", err)
}
collection.IsIndexed = true
collection.ProbeCount = vci.ProbeCount
collection.ListCount = vci.ListCount
err = vci.appState.DocumentStore.UpdateCollection(ctx, collection)
if err != nil {
return fmt.Errorf("error updating collection: %w", err)
}
// run index creation in a goroutine with IndexTimeout
go func() {
defer IndexMutexMap[vci.Collection.Name].Unlock()
// Create a new context with a timeout
ctx, cancel := context.WithTimeout(ctx, IndexTimeout)
defer cancel()

// Drop index if it exists
// We're using CONCURRENTLY for both drop and index operations. This means we can't run them in a transaction.
_, err := db.ExecContext(
ctx,
"DROP INDEX CONCURRENTLY IF EXISTS ?",
bun.Ident(indexName),
)
if err != nil {
log.Error("error dropping index: ", err)
return
}

// currently only supports cosine distance ops
log.Infof("Starting index creation on %s", vci.Collection.Name)
_, err = db.ExecContext(
ctx,
"CREATE INDEX CONCURRENTLY ON ? USING ivfflat (embedding vector_cosine_ops) WITH (lists = ?)",
bun.Ident(vci.Collection.TableName),
vci.ListCount,
)
if err != nil {
log.Error("error creating index: ", err)
return
}

// Set Collection's IsIndexed flag to true
collection, err := vci.appState.DocumentStore.GetCollection(ctx, vci.Collection.Name)
if err != nil {
log.Error("error getting collection: ", err)
return
}
collection.IsIndexed = true
collection.ProbeCount = vci.ProbeCount
collection.ListCount = vci.ListCount
err = vci.appState.DocumentStore.UpdateCollection(ctx, collection)
if err != nil {
log.Error("error updating collection: ", err)
return
}

log.Infof("Index creation on %s completed successfully", collection.Name)
}()

return nil
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/store/postgres/vector_col_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"testing"
"time"

"github.com/brianvoe/gofakeit/v6"

Expand Down Expand Up @@ -77,10 +78,12 @@ func TestCreateIndex(t *testing.T) {
)
assert.NoError(t, err)

// CreateIndex will add a timeout to the ctx
err = vci.CreateIndex(context.Background(), true)
assert.NoError(t, err)

// Set Collection's IsIndexed flag to true
pollIndexCreation(documentStore, collectionName, ctx, t)

col, err := documentStore.GetCollection(ctx, vci.Collection.Name)
assert.NoError(t, err)
assert.Equal(t, true, col.IsIndexed)
Expand Down Expand Up @@ -158,3 +161,28 @@ func generateRandomEmbeddings(embeddingCount int, embeddingWidth int) [][]float3

return embeddings
}

func pollIndexCreation(
documentStore *DocumentStore,
collectionName string,
ctx context.Context,
t *testing.T,
) {
timeout := time.After(10 * time.Minute)
tick := time.Tick(500 * time.Millisecond)
Loop:
for {
select {
case <-timeout:
t.Fatal("timed out waiting for index to be created")
case <-tick:
col, err := documentStore.GetCollection(ctx, collectionName)
if err != nil {
t.Fatal("error getting collection: ", err)
}
if col.IsIndexed {
break Loop
}
}
}
}
Loading