Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## UNRELEASED

- Add batch upserts & batch recompute for collection & on auto-embedding [#253](https://github.com/hypermodeAI/runtime/pull/253)
- Add auto-embedding for collection based on text checkpoint [#250](https://github.com/hypermodeAI/runtime/pull/250)
- Remove extraneous types in graphql schemas [#251] (https://github.com/hypermodeAI/runtime/pull/251)
- Allow arrays as inputs to host functions [#2521] (https://github.com/hypermodeAI/runtime/pull/252)
Expand Down
23 changes: 8 additions & 15 deletions collections/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,11 @@ func LoadTextsIntoCollection(ctx context.Context, collection interfaces.Collecti
}

// Insert all texts into collection
for i := range textIds {
err = collection.InsertTextToMemory(ctx, textIds[i], keys[i], texts[i])
if err != nil {
return err
}
err = collection.InsertTextsToMemory(ctx, textIds, keys, texts)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -241,16 +240,10 @@ func syncTextsWithVectorIndex(ctx context.Context, collection interfaces.Collect
return errors.New("mismatch in keys and texts")
}
// Get last indexed text id
for i := range textIds {
if textIds[i] > lastIndexedTextId {
text := texts[i]
key := keys[i]
// process text
err = ProcessText(ctx, collection, vectorIndex, key, text)
if err != nil {
return err
}
}
err = ProcessTexts(ctx, collection, vectorIndex, keys, texts)
if err != nil {
return err
}

return nil
}
24 changes: 24 additions & 0 deletions collections/in_mem/sequential/vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sequential
import (
"container/heap"
"context"
"fmt"
"hmruntime/collections/index"
"hmruntime/collections/utils"
"hmruntime/db"
Expand Down Expand Up @@ -97,6 +98,18 @@ func (ims *SequentialVectorIndex) SearchWithKey(ctx context.Context, queryKey st
return ims.Search(ctx, query, maxResults, filter)
}

func (ims *SequentialVectorIndex) InsertVectors(ctx context.Context, textIds []int64, vecs [][]float32) error {
if len(textIds) != len(vecs) {
return fmt.Errorf("textIds and vecs must have the same length")
}
vectorIds, keys, err := db.WriteCollectionVectors(ctx, ims.searchMethodName, textIds, vecs)
if err != nil {
return err
}

return ims.InsertVectorsToMemory(ctx, textIds, vectorIds, keys, vecs)
}

func (ims *SequentialVectorIndex) InsertVector(ctx context.Context, textId int64, vec []float32) error {

// Write vector to database, this textId is now the last inserted textId
Expand All @@ -109,6 +122,17 @@ func (ims *SequentialVectorIndex) InsertVector(ctx context.Context, textId int64

}

func (ims *SequentialVectorIndex) InsertVectorsToMemory(ctx context.Context, textIds []int64, vectorIds []int64, keys []string, vecs [][]float32) error {
ims.mu.Lock()
defer ims.mu.Unlock()
for i, key := range keys {
ims.VectorMap[key] = vecs[i]
ims.lastInsertedID = vectorIds[i]
ims.lastIndexedTextID = textIds[i]
}
return nil
}

func (ims *SequentialVectorIndex) InsertVectorToMemory(ctx context.Context, textId, vectorId int64, key string, vec []float32) error {
ims.mu.Lock()
defer ims.mu.Unlock()
Expand Down
29 changes: 29 additions & 0 deletions collections/in_mem/text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package in_mem

import (
"context"
"fmt"

"hmruntime/collections/index"
"hmruntime/collections/index/interfaces"
Expand Down Expand Up @@ -67,6 +68,18 @@ func (ti *InMemCollection) DeleteVectorIndex(ctx context.Context, searchMethod s
return nil
}

func (ti *InMemCollection) InsertTexts(ctx context.Context, keys []string, texts []string) error {
if len(keys) != len(texts) {
return fmt.Errorf("keys and texts must have the same length")
}
ids, err := db.WriteCollectionTexts(ctx, ti.collectionName, keys, texts)
if err != nil {
return err
}

return ti.InsertTextsToMemory(ctx, ids, keys, texts)
}

func (ti *InMemCollection) InsertText(ctx context.Context, key string, text string) error {
id, err := db.WriteCollectionText(ctx, ti.collectionName, key, text)
if err != nil {
Expand All @@ -76,6 +89,22 @@ func (ti *InMemCollection) InsertText(ctx context.Context, key string, text stri
return ti.InsertTextToMemory(ctx, id, key, text)
}

func (ti *InMemCollection) InsertTextsToMemory(ctx context.Context, ids []int64, keys []string, texts []string) error {

if len(ids) != len(keys) || len(ids) != len(texts) {
return fmt.Errorf("ids, keys and texts must have the same length")
}

ti.mu.Lock()
defer ti.mu.Unlock()
for i, key := range keys {
ti.TextMap[key] = texts[i]
ti.IdMap[key] = ids[i]
ti.lastInsertedID = ids[i]
}
return nil
}

func (ti *InMemCollection) InsertTextToMemory(ctx context.Context, id int64, key string, text string) error {
ti.mu.Lock()
defer ti.mu.Unlock()
Expand Down
8 changes: 8 additions & 0 deletions collections/index/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ type Collection interface {
// DeleteVectorIndex deletes the VectorIndex for a given searchMethod
DeleteVectorIndex(ctx context.Context, searchMethod string) error

// InsertTexts will add texts and keys into the existing VectorIndex
InsertTexts(ctx context.Context, keys []string, texts []string) error

// InsertText will add a text and key into the existing VectorIndex
InsertText(ctx context.Context, key string, text string) error

InsertTextsToMemory(ctx context.Context, ids []int64, keys []string, texts []string) error

InsertTextToMemory(ctx context.Context, id int64, key string, text string) error

// DeleteText will remove a text and key from the existing VectorIndex
Expand Down Expand Up @@ -128,6 +133,9 @@ type VectorIndex interface {
maxResults int,
filter index.SearchFilter) (utils.MinTupleHeap, error)

// Insert Vectors will add vectors and keys into the existing VectorIndex
InsertVectors(ctx context.Context, textIds []int64, vecs [][]float32) error

// Insert will add a vector and key into the existing VectorIndex. If
// key already exists, it should throw an error to not insert duplicate keys
InsertVector(ctx context.Context, textId int64, vec []float32) error
Expand Down
30 changes: 20 additions & 10 deletions collections/utils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,30 @@ func ConcatStrings(strs ...string) string {
return total
}

func ConvertToFloat32Array(result any) ([]float32, error) {
func ConvertToFloat32_2DArray(result any) ([][]float32, error) {
resultArr, ok := result.([]interface{})
if !ok {
return nil, fmt.Errorf("error converting type to float32: %v", result)
}
textVec := make([]float32, len(resultArr))
for i, val := range resultArr {
if v, ok := val.(float64); ok {
textVec[i] = float32(v)
} else if v, ok := val.(float32); ok {
textVec[i] = v
} else {
return nil, fmt.Errorf("error converting type to float32: %v", val)

textVecs := make([][]float32, len(resultArr))
for i, res := range resultArr {

subArr, ok := res.([]interface{})
if !ok {
return nil, fmt.Errorf("error converting type to float32: %v", res)
}

textVecs[i] = make([]float32, len(subArr))
for j, val := range subArr {
if v, ok := val.(float64); ok {
textVecs[i][j] = float32(v)
} else if v, ok := val.(float32); ok {
textVecs[i][j] = v
} else {
return nil, fmt.Errorf("error converting type to float32: %v", val)
}
}
}
return textVec, nil
return textVecs, nil
}
95 changes: 69 additions & 26 deletions collections/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collections

import (
"context"
"fmt"

"hmruntime/collections/in_mem"
"hmruntime/collections/in_mem/sequential"
Expand All @@ -16,24 +17,79 @@ import (
wasm "github.com/tetratelabs/wazero/api"
)

const (
batchSize = 10
)

func ProcessTexts(ctx context.Context, collection interfaces.Collection, vectorIndex interfaces.VectorIndex, keys []string, texts []string) error {
if len(keys) != len(texts) {
return fmt.Errorf("mismatch in keys and texts")
}
for i := 0; i < len(keys); i += batchSize {
end := i + batchSize
if end > len(keys) {
end = len(keys)
}
keysBatch := keys[i:end]
textsBatch := texts[i:end]

executionInfo, err := wasmhost.CallFunction(ctx, vectorIndex.GetEmbedderName(), textsBatch)
if err != nil {
return err
}

result := executionInfo.Result

textVecs, err := utils.ConvertToFloat32_2DArray(result)
if err != nil {
return err
}

if len(textVecs) == 0 {
return fmt.Errorf("no vectors returned for texts: %v", textsBatch)
}

textIds := make([]int64, len(keysBatch))

for i, key := range keysBatch {
textId, err := collection.GetExternalId(ctx, key)
if err != nil {
return err
}
textIds[i] = textId
}

err = vectorIndex.InsertVectors(ctx, textIds, textVecs)
if err != nil {
return err
}
}
return nil
}

func ProcessText(ctx context.Context, collection interfaces.Collection, vectorIndex interfaces.VectorIndex, key, text string) error {
executionInfo, err := wasmhost.CallFunction(ctx, vectorIndex.GetEmbedderName(), text)
texts := []string{text}
executionInfo, err := wasmhost.CallFunction(ctx, vectorIndex.GetEmbedderName(), texts)
if err != nil {
return err
}

result := executionInfo.Result

textVec, err := utils.ConvertToFloat32Array(result)
textVecs, err := utils.ConvertToFloat32_2DArray(result)
if err != nil {
return err
}

id, err := collection.GetExternalId(ctx, key)
if len(textVecs) == 0 {
return fmt.Errorf("no vectors returned for text: %s", text)
}

textId, err := collection.GetExternalId(ctx, key)
if err != nil {
return err
}
err = vectorIndex.InsertVector(ctx, id, textVec)
err = vectorIndex.InsertVector(ctx, textId, textVecs[0])
if err != nil {
return err
}
Expand All @@ -45,32 +101,19 @@ func ProcessTextMapWithModule(ctx context.Context, mod wasm.Module, collection i
textMap, err := collection.GetTextMap(ctx)
if err != nil {
logger.Err(ctx, err).
Str("colletion_name", collection.GetCollectionName()).
Str("collection_name", collection.GetCollectionName()).
Msg("Failed to get text map.")
return err
}
for key, text := range textMap {
executionInfo, err := wasmhost.CallFunction(ctx, embedder, text)
if err != nil {
return err
}

result := executionInfo.Result

textVec, err := utils.ConvertToFloat32Array(result)
if err != nil {
return err
}

id, err := collection.GetExternalId(ctx, key)
if err != nil {
return err
}
err = vectorIndex.InsertVector(ctx, id, textVec)
if err != nil {
return err
}
keys := make([]string, 0, len(textMap))
texts := make([]string, 0, len(textMap))
for key, text := range textMap {
keys = append(keys, key)
texts = append(texts, text)
}
return nil

return ProcessTexts(ctx, collection, vectorIndex, keys, texts)
}

func CleanAndProcessManifest(ctx context.Context) error {
Expand Down
Loading