Skip to content

Commit 730aa5b

Browse files
perf: Add performance optimizations for batch operations
- Add .high priority to BatchEmbedder TaskGroup tasks for better CPU scheduling - Implement automatic batching for PineconeStore upsert operations (100 vectors/batch) - Implement automatic batching for PineconeStore delete operations (1000 IDs/batch) - Execute batches concurrently for optimal throughput - Respect Pinecone API limits and best practices These optimizations improve performance for large-scale RAG operations by: 1. Better resource allocation through task prioritization 2. Avoiding API rate limits through intelligent batching 3. Maximizing throughput via concurrent batch processing Co-authored-by: Christopher Karani <christopherkarani@users.noreply.github.com>
1 parent f209006 commit 730aa5b

File tree

2 files changed

+77
-10
lines changed

2 files changed

+77
-10
lines changed

Sources/Zoni/Embedding/BatchEmbedder.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public actor BatchEmbedder {
8989

9090
// Start initial concurrent tasks
9191
while inFlight < maxConcurrency, let (index, batch) = batchIterator.next() {
92-
group.addTask {
92+
group.addTask(priority: .high) {
9393
let embeddings = try await self.provider.embed(batch)
9494
return (index, embeddings)
9595
}
@@ -103,7 +103,7 @@ public actor BatchEmbedder {
103103

104104
// Schedule next batch if available
105105
if let (index, batch) = batchIterator.next() {
106-
group.addTask {
106+
group.addTask(priority: .high) {
107107
let embeddings = try await self.provider.embed(batch)
108108
return (index, embeddings)
109109
}
@@ -142,7 +142,7 @@ public actor BatchEmbedder {
142142
var inFlight = 0
143143

144144
while inFlight < maxConcurrency, let (index, batch) = batchIterator.next() {
145-
group.addTask {
145+
group.addTask(priority: .high) {
146146
let embeddings = try await self.provider.embed(batch)
147147
return (index, embeddings)
148148
}
@@ -156,7 +156,7 @@ public actor BatchEmbedder {
156156
progress(completedBatches, totalBatches)
157157

158158
if let (index, batch) = batchIterator.next() {
159-
group.addTask {
159+
group.addTask(priority: .high) {
160160
let embeddings = try await self.provider.embed(batch)
161161
return (index, embeddings)
162162
}
@@ -246,7 +246,7 @@ public actor BatchEmbedder {
246246
))
247247
return
248248
}
249-
group.addTask {
249+
group.addTask(priority: .high) {
250250
let embeddings = try await self.provider.embed(batch)
251251
return embeddings.enumerated().map { (offset + $0.offset, $0.element) }
252252
}
@@ -267,7 +267,7 @@ public actor BatchEmbedder {
267267
))
268268
return
269269
}
270-
group.addTask {
270+
group.addTask(priority: .high) {
271271
let embeddings = try await self.provider.embed(batch)
272272
return embeddings.enumerated().map { (offset + $0.offset, $0.element) }
273273
}

Sources/Zoni/VectorStore/Stores/PineconeStore.swift

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ public actor PineconeStore: VectorStore {
184184
///
185185
/// This method uses upsert semantics: if a vector with the same ID already
186186
/// exists, it will be replaced with the new vector and metadata.
187+
/// For large batches (>100 vectors), the operation is automatically split
188+
/// into multiple requests to respect Pinecone's API limits.
187189
///
188190
/// - Parameters:
189191
/// - chunks: The chunks to store. Each chunk must have a unique ID.
@@ -206,10 +208,6 @@ public actor PineconeStore: VectorStore {
206208
///
207209
/// try await store.add(chunks, embeddings: embeddings)
208210
/// ```
209-
///
210-
/// - Note: Pinecone has a limit on the number of vectors per upsert request
211-
/// (typically 100). For large batches, consider splitting into smaller
212-
/// requests.
213211
public func add(_ chunks: [Chunk], embeddings: [Embedding]) async throws {
214212
// Validate that counts match
215213
guard chunks.count == embeddings.count else {
@@ -221,6 +219,40 @@ public actor PineconeStore: VectorStore {
221219
// Handle empty input
222220
guard !chunks.isEmpty else { return }
223221

222+
// Pinecone recommends batches of 100 vectors per upsert
223+
let batchSize = 100
224+
225+
// If within single batch size, execute directly
226+
if chunks.count <= batchSize {
227+
try await upsertBatch(chunks: chunks, embeddings: embeddings)
228+
return
229+
}
230+
231+
// Split into batches and execute concurrently
232+
let chunkBatches = chunks.chunked(into: batchSize)
233+
let embeddingBatches = embeddings.chunked(into: batchSize)
234+
235+
try await withThrowingTaskGroup(of: Void.self) { group in
236+
for (chunkBatch, embeddingBatch) in zip(chunkBatches, embeddingBatches) {
237+
group.addTask {
238+
try await self.upsertBatch(chunks: chunkBatch, embeddings: embeddingBatch)
239+
}
240+
}
241+
242+
// Wait for all batches to complete
243+
for try await _ in group { }
244+
}
245+
}
246+
247+
/// Upserts a single batch of chunks (internal helper).
248+
///
249+
/// - Parameters:
250+
/// - chunks: The chunks to upsert (should be ≤100).
251+
/// - embeddings: Corresponding embeddings.
252+
/// - Throws: `ZoniError.insertionFailed` if the Pinecone API returns an error.
253+
private func upsertBatch(chunks: [Chunk], embeddings: [Embedding]) async throws {
254+
guard !chunks.isEmpty else { return }
255+
224256
// Build the upsert request
225257
guard let url = URL(string: "\(indexHost)/vectors/upsert") else {
226258
throw ZoniError.insertionFailed(reason: "Invalid Pinecone URL: \(indexHost)")
@@ -370,6 +402,8 @@ public actor PineconeStore: VectorStore {
370402
/// Deletes chunks with the specified IDs from the Pinecone index.
371403
///
372404
/// IDs that do not exist in the index are silently ignored.
405+
/// For large batches (>1000 IDs), the operation is automatically split
406+
/// into multiple requests to respect Pinecone's API limits.
373407
///
374408
/// - Parameter ids: The IDs of chunks to delete.
375409
///
@@ -380,11 +414,44 @@ public actor PineconeStore: VectorStore {
380414
/// ```swift
381415
/// // Delete specific chunks by ID
382416
/// try await store.delete(ids: ["chunk-1", "chunk-2", "chunk-3"])
417+
///
418+
/// // Delete large batch (automatically batched)
419+
/// try await store.delete(ids: Array(1...5000).map { "chunk-\($0)" })
383420
/// ```
384421
public func delete(ids: [String]) async throws {
385422
// Handle empty input
386423
guard !ids.isEmpty else { return }
387424

425+
// Pinecone recommends batches of 1000 IDs per request
426+
let batchSize = 1000
427+
428+
// If within single batch size, execute directly
429+
if ids.count <= batchSize {
430+
try await deleteBatch(ids: ids)
431+
return
432+
}
433+
434+
// Split into batches and execute concurrently
435+
let batches = ids.chunked(into: batchSize)
436+
try await withThrowingTaskGroup(of: Void.self) { group in
437+
for batch in batches {
438+
group.addTask {
439+
try await self.deleteBatch(ids: batch)
440+
}
441+
}
442+
443+
// Wait for all batches to complete
444+
for try await _ in group { }
445+
}
446+
}
447+
448+
/// Deletes a single batch of IDs (internal helper).
449+
///
450+
/// - Parameter ids: The IDs to delete (should be ≤1000).
451+
/// - Throws: `ZoniError.insertionFailed` if the Pinecone API returns an error.
452+
private func deleteBatch(ids: [String]) async throws {
453+
guard !ids.isEmpty else { return }
454+
388455
// Build the delete request
389456
guard let url = URL(string: "\(indexHost)/vectors/delete") else {
390457
throw ZoniError.insertionFailed(reason: "Invalid Pinecone URL: \(indexHost)")

0 commit comments

Comments
 (0)