Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Vectorizer Modules #4587

Open
1 of 4 tasks
trengrj opened this issue Apr 3, 2024 · 3 comments
Open
1 of 4 tasks

Async Vectorizer Modules #4587

trengrj opened this issue Apr 3, 2024 · 3 comments

Comments

@trengrj
Copy link
Member

trengrj commented Apr 3, 2024

Describe your feature request

With async indexing being released, the bottleneck to import data has moved to the vectorizer modules which often require 3rd party api calls to convert text/image chunks into vectors. There has been recent work #4546 #4578 to switch vectorizer modules to use batching wherever possible. We can further reduce import times by enabling async indexing at the module level as well.

At a high level:

  • Switch to async indexing straight to disk async: indexing from disk #3974.
  • If a module is enabled, async workers should read a batch of objects from disk (not vectors), use the module's BatchVectorizer to generate the vectors, and then write to the vector index as usual.
  • There needs to be a solution for persistent failures including surfacing errors to users.

Code of Conduct

@trengrj trengrj changed the title Async Module Vectorization Async Vectorizer Modules Apr 3, 2024
@bobvanluijt
Copy link
Member

I think that this feature in general is important but we need to fix error handling. So the challenge here is not the DX when all goes well (I.e., no brainer to have this) but when an async vectorization fails (e.g., something as simple as an incorrect API key or rate limiting on the model provider’s side)

@MarcusSorealheis
Copy link
Contributor

This is an interesting feature, but I agree that errors with async present a more obscure problem space for DX. In the synchronous world, the feedback loop is tight.

It would be nice if all the known errors that occur provide users with detailed information on the resolution. In some places, it is done with links, but those links need to land in CI for continuous validation. In other cases, improved error handling is done with code snippets that lead users to resolve the problem if it is in their control. However, you will know the best way to handle the known errors such that they lead to a speedy resolution and feel as if the feedback loop is almost as tight as a synchronous thread of execution.

Of course, there will still be errors that are not known or non-fatal (in the sense that they do not kill a query's execution). In those cases, users may favor a lexical fallback and a warning log. If you do consider a lexical fallback, it's very important to be squeaky about it (log) so that users can look into it if they choose.

Uptime reigns supreme.

@cdpierse
Copy link
Contributor

cdpierse commented Apr 3, 2024

One way we could make errors more observable might be to add a DLQ and retries to batches. If a batch hits a specified number of retries it would land on the DLQ with single error associated. So in pseudo something like:

type Batch struct {
	ID          string    
	Jobs        []Job     
	ProcessTime time.Time 
	Error       error     
	RetryCount  int       
}

type Job struct {
	ID   string 
	Data string 
}

type FailedBatch struct {
	Batch Batch
	Err   error
}

type DeadLetterQueue []FailedBatch


func ProcessBatch(batch Batch, maxRetries int, dlq *DeadLetterQueue) {
	// For simplicity, randomly assign an error to a specific batch ID
	if batch.ID == "errorBatch" {
		batch.Error = fmt.Errorf("processing error")
	}

	if batch.Error != nil {
		if batch.RetryCount >= maxRetries {
			*dlq = append(*dlq, FailedBatch{Batch: batch, Err: batch.Error})
			return
		}

		batch.RetryCount++

		// Some sort of backoff strategy
		backoff := time.Duration(2^batch.RetryCount) * time.Second
		time.Sleep(backoff)

		ProcessBatch(batch, maxRetries, dlq)
		return
	}
}


func main() {
	var dlq DeadLetterQueue

	batches := []Batch{
		{ID: "batch1", Jobs: []Job{{ID: "job1", Data: "data1"}}, ProcessTime: time.Now()},
		{ID: "errorBatch", Jobs: []Job{{ID: "job2", Data: "data2"}}, ProcessTime: time.Now()},
	}

	maxRetries := 3

	for _, batch := range batches {
		ProcessBatch(batch, maxRetries, &dlq)
	}

}

The queues themselves would be on disk in practice rather than in memory like this but the idea could still be the same. I think having a DLQ with retries would help with errors that are rate limit related, and for errors that are due to other reasons we would have a top level error message that could be accessed for each failed batch to see what has happened.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants