Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions packages/agents/src/db/postgresVectorStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { VectorStoreConfig, DocumentSource } from '../types';
import pg, { Pool, PoolClient } from 'pg';
import { DatabaseError as PgError } from 'pg';


/**
* Custom error class for handling Postgres-specific errors
*/
Expand Down Expand Up @@ -106,7 +105,7 @@ export class VectorStore {
): Promise<VectorStore> {
if (!VectorStore.instance) {
const pool = new Pool({
user: config.POSTGRES_USER,
user: config.POSTGRES_USER,
host: config.POSTGRES_HOST,
database: config.POSTGRES_DB,
password: config.POSTGRES_PASSWORD,
Expand All @@ -122,7 +121,7 @@ export class VectorStore {
logger.info('Connected to PostgreSQL');

const tableName = 'documents';

// Create instance first, then initialize DB
VectorStore.instance = new VectorStore(pool, embeddings, tableName);
await VectorStore.instance.initializeDb();
Expand Down Expand Up @@ -152,7 +151,7 @@ export class VectorStore {
UNIQUE(uniqueId)
);
`);

// Create index on source for filtering
await client.query(`
CREATE INDEX IF NOT EXISTS idx_${this.tableName}_source ON ${this.tableName} (source);
Expand Down Expand Up @@ -191,8 +190,8 @@ export class VectorStore {

// Build SQL query
let sql = `
SELECT
content,
SELECT
content,
metadata,
1 - (embedding <=> $1) as similarity
FROM ${this.tableName}
Expand Down Expand Up @@ -223,7 +222,7 @@ export class VectorStore {
const client = await this.pool.connect();
try {
const result = await client.query(sql, values);

// Convert to DocumentInterface format
return result.rows.map((row) => ({
pageContent: row.content,
Expand All @@ -244,15 +243,32 @@ export class VectorStore {
* @param uniqueIds - Optional array of unique IDs for the documents
* @returns Promise<void>
*/
async addDocuments(documents: DocumentInterface[], options?: { ids?: string[] }): Promise<void> {
async addDocuments(
documents: DocumentInterface[],
options?: { ids?: string[] },
): Promise<void> {
logger.info(`Adding ${documents.length} documents to the vector store`);

if (documents.length === 0) return;

try {
// Generate embeddings for all documents
const texts = documents.map((doc) => doc.pageContent);
const embeddings = await this.embeddings.embedDocuments(texts);
// Generate embeddings in batches by content length
const documentBatches = documents.reduce((batches: string[][], doc) => {
const batch = batches[batches.length - 1] || [];
const totalLength = batch.reduce((sum, text) => sum + text.length, 0);
totalLength + doc.pageContent.length > 100000 && batch.length > 0
? batches.push([doc.pageContent])
: batch.push(doc.pageContent);
return batches.length === 0 ? [[doc.pageContent]] : batches;
}, []);

// Process all batches
const batchEmbeddings = await Promise.all(
documentBatches.map((batch) => this.embeddings.embedDocuments(batch)),
);

// Merge all embeddings
const embeddings = batchEmbeddings.flat();

const client = await this.pool.connect();
try {
Expand All @@ -267,7 +283,7 @@ export class VectorStore {
const query = `
INSERT INTO ${this.tableName} (content, metadata, embedding, uniqueId, contentHash, source)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (uniqueId)
ON CONFLICT (uniqueId)
DO UPDATE SET
content = EXCLUDED.content,
metadata = EXCLUDED.metadata,
Expand All @@ -281,13 +297,13 @@ export class VectorStore {
JSON.stringify(embeddings[i]),
uniqueId,
contentHash,
source
source,
]);
});

await Promise.all(insertPromises);
await client.query('COMMIT');

logger.info(`Successfully added ${documents.length} documents`);
} catch (error) {
await client.query('ROLLBACK');
Expand All @@ -312,16 +328,16 @@ export class VectorStore {
try {
const result = await client.query(
`SELECT content, metadata, contentHash FROM ${this.tableName} WHERE uniqueId = $1`,
[name]
[name],
);

if (result.rows.length > 0) {
const row = result.rows[0];
return {
metadata: {
_id: name,
metadata: {
_id: name,
contentHash: row.contentHash,
...JSON.parse(row.metadata)
...JSON.parse(row.metadata),
},
pageContent: row.content,
};
Expand Down Expand Up @@ -356,7 +372,7 @@ export class VectorStore {
WHERE uniqueId = ANY($1)
AND source = $2
`;

await client.query(query, [uniqueIds, source]);
logger.info(`Removed ${uniqueIds.length} pages from source ${source}`);
} finally {
Expand All @@ -381,9 +397,9 @@ export class VectorStore {
try {
const result = await client.query(
`SELECT uniqueId, contentHash FROM ${this.tableName} WHERE source = $1`,
[source]
[source],
);

return result.rows.map((row) => ({
uniqueId: row.uniqueid,
contentHash: row.contenthash,
Expand Down Expand Up @@ -434,14 +450,14 @@ export class VectorStore {
private async transaction<T = any>(queries: Query[]): Promise<T[]> {
const client = await this.pool.connect();
let result;

try {
await client.query('BEGIN');

for (const q of queries) {
result = await client.query(q.query, q.values);
}

await client.query('COMMIT');
return result ? result.rows : [];
} catch (error) {
Expand All @@ -451,4 +467,4 @@ export class VectorStore {
client.release();
}
}
}
}
2 changes: 1 addition & 1 deletion packages/ingester/asciidoc/oz-playbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ content:
sources:
- url: https://github.com/OpenZeppelin/cairo-contracts
branches:
- docs-v*
- docs-v2.0.0
start_path: docs
ui:
bundle:
Expand Down