Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion meerkat-dbm/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@devrev/meerkat-dbm",
"version": "0.0.15",
"version": "0.0.151",
"dependencies": {
"tslib": "^2.3.0",
"@duckdb/duckdb-wasm": "^1.28.0",
Expand Down
67 changes: 65 additions & 2 deletions meerkat-dbm/src/dbm/dbm.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ export class MockFileManager implements FileManagerType {
return data;
}

async getTableData(tableName: string): Promise<Table | undefined> {
return this.tables[tableName];
async getTableData(table: TableConfig): Promise<Table | undefined> {
return this.tables[table.name];
}

async setTableMetadata(table: string, metadata: object): Promise<void> {
Expand Down Expand Up @@ -490,4 +490,67 @@ describe('DBM', () => {
expect(promises[1].status).toBe('rejected');
});
});

describe('table locks', () => {
it('should lock the table and release it', async () => {
const tableName = 'exampleTable';

// Request the lock for the table and then release it
await dbm.lockTables([tableName]);

expect(dbm.isTableLocked(tableName)).toBe(true);

await dbm.unlockTables([tableName]);

expect(dbm.isTableLocked(tableName)).toBe(false);

// Again request the lock for the table
await dbm.lockTables([tableName]);

await dbm.unlockTables([tableName]);
});

it('two consumers requesting lock for the same table', async () => {
const tableName = 'exampleTable';

// Set up promises for the two consumers
const consumer1Promise = dbm.lockTables([tableName]);
const consumer2Promise = dbm.lockTables([tableName]);

// Wait for the first consumer to get the lock
await expect(consumer1Promise).resolves.toBeUndefined();

expect(dbm.isTableLocked(tableName)).toBe(true);

const timeout1 = new Promise((resolve) => {
setTimeout(resolve, 1000, 'TIMEOUT');
});

// Promise.race will wait for either the promises be resolved
// consumer2 will not be able to get the lock as it is already locked by consumer1
await expect(Promise.race([consumer2Promise, timeout1])).resolves.toBe(
'TIMEOUT'
);

// Release the lock for the first consumer
await dbm.unlockTables([tableName]);

// Check if the table is still locked as the consumer2 will get the lock
expect(dbm.isTableLocked(tableName)).toBe(true);

const timeout2 = new Promise((resolve) => {
setTimeout(resolve, 1000, 'TIMEOUT');
});

// This time the consumer2 will get the lock
await expect(
Promise.race([consumer2Promise, timeout2])
).resolves.toBeUndefined();

// Release the lock
await dbm.unlockTables([tableName]);

expect(dbm.isTableLocked(tableName)).toBe(false);
});
});
});
74 changes: 74 additions & 0 deletions meerkat-dbm/src/dbm/dbm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import {
QueryOptions,
QueryQueueItem,
TableConfig,
TableLock,
} from './types';

export class DBM {
private fileManager: FileManagerType;
private instanceManager: InstanceManagerType;
private connection: AsyncDuckDBConnection | null = null;
private queriesQueue: QueryQueueItem[] = [];
private tableLockRegistry: Record<string, TableLock> = {};

private logger: DBMLogger;
private onEvent?: (event: DBMEvent) => void;
Expand Down Expand Up @@ -99,6 +101,66 @@ export class DBM {
return this.connection;
}

async lockTables(tableNames: string[]): Promise<void> {
const promises = [];

for (const tableName of tableNames) {
const tableLock = this.tableLockRegistry[tableName];

// If the table lock doesn't exist, create a new lock
if (!tableLock) {
this.tableLockRegistry[tableName] = {
isLocked: true,
promiseQueue: [],
};
continue;
}

// If the table is already locked, add the promise to the queue
if (tableLock.isLocked) {
const promise = new Promise<void>((resolve, reject) => {
tableLock.promiseQueue.push({ reject, resolve });
});
promises.push(promise);
}

// Set the table as locked
tableLock.isLocked = true;
}

// Wait for all promises to resolve (locks to be acquired)
await Promise.all(promises);
}

async unlockTables(tableNames: string[]): Promise<void> {
for (const tableName of tableNames) {
const tableLock = this.tableLockRegistry[tableName];

// If the table lock doesn't exist, create a new lock
if (!tableLock) {
this.tableLockRegistry[tableName] = {
isLocked: false,
promiseQueue: [],
};
}

const nextPromiseInQueue = tableLock?.promiseQueue?.shift();

// If there is a promise in the queue, resolve it and keep the table as locked
if (nextPromiseInQueue) {
tableLock.isLocked = true;
nextPromiseInQueue.resolve();
} else {
// If there are no promises in the queue, set the table as unlocked
tableLock.isLocked = false;
}
}
}

isTableLocked(tableName: string): boolean {
return this.tableLockRegistry[tableName]?.isLocked ?? false;
}

private async _queryWithTables(
query: string,
tables: TableConfig[],
Expand Down Expand Up @@ -199,6 +261,11 @@ export class DBM {
}

try {
/**
* Lock the tables
*/
this.lockTables(this.currentQueryItem.tables.map((table) => table.name));

const startTime = Date.now();
this.logger.debug(
'Time since query was added to the queue:',
Expand Down Expand Up @@ -242,6 +309,13 @@ export class DBM {
* Reject the promise, so the caller can catch the error
*/
this.currentQueryItem?.promise.reject(error);
} finally {
/**
* Unlock the tables
*/
this.unlockTables(
this.currentQueryItem.tables.map((table) => table.name)
);
}

/**
Expand Down
8 changes: 8 additions & 0 deletions meerkat-dbm/src/dbm/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,11 @@ export interface QueryQueueItem {
connectionId: string;
options?: QueryOptions;
}

export interface TableLock {
promiseQueue: {
resolve: () => void;
reject: () => void;
}[];
isLocked: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const mockDB = {
};
}
};

describe('IndexedDBFileManager', () => {
let fileManager: IndexedDBFileManager;
let db: AsyncDuckDB;
Expand All @@ -29,22 +30,23 @@ describe('IndexedDBFileManager', () => {
tableName: 'taxi1',
fileName: 'taxi1.parquet',
buffer: new Uint8Array([1, 2, 3]),
fileType: FILE_TYPES.PARQUET
fileType: FILE_TYPES.PARQUET,
partitions: ['customer_oid=1'],
};

const fileBuffers = [
{
tableName: 'taxi1',
fileName: 'taxi2.parquet',
buffer: new Uint8Array([1, 2, 3, 4]),
fileType: FILE_TYPES.PARQUET
fileType: FILE_TYPES.PARQUET,
},
{
tableName: 'taxi2',
fileName: 'taxi3.parquet',
buffer: new Uint8Array([1, 2, 3, 4, 5]),
fileType: FILE_TYPES.PARQUET
}
fileType: FILE_TYPES.PARQUET,
},
];

beforeAll(() => {
Expand All @@ -57,7 +59,7 @@ describe('IndexedDBFileManager', () => {
},
terminateDB: async () => {
return;
}
},
};
});

Expand All @@ -71,7 +73,7 @@ describe('IndexedDBFileManager', () => {
logger: log,
onEvent: (event) => {
console.log(event);
}
},
});

await fileManager.initializeDB();
Expand All @@ -91,7 +93,13 @@ describe('IndexedDBFileManager', () => {
expect(tableData1.length).toBe(1);
expect(tableData1[0]).toEqual({
tableName: 'taxi1',
files: [{ fileName: fileBuffer.fileName, fileType: FILE_TYPES.PARQUET }]
files: [
{
fileName: fileBuffer.fileName,
fileType: FILE_TYPES.PARQUET,
partitions: fileBuffer.partitions,
},
],
});

/**
Expand All @@ -112,7 +120,7 @@ describe('IndexedDBFileManager', () => {
expect(tableData2.length).toBe(2);
expect(tableData2[0].files.map((file) => file.fileName)).toEqual([
'taxi1.parquet',
'taxi2.parquet'
'taxi2.parquet',
]);

/**
Expand All @@ -121,7 +129,7 @@ describe('IndexedDBFileManager', () => {
expect(fileBufferData2.map((file) => file.fileName)).toEqual([
'taxi1.parquet',
'taxi2.parquet',
'taxi3.parquet'
'taxi3.parquet',
]);
});

Expand All @@ -144,7 +152,7 @@ describe('IndexedDBFileManager', () => {
// Register the same file with a different buffer
await fileManager.registerFileBuffer({
...fileBuffer,
buffer: new Uint8Array([1])
buffer: new Uint8Array([1]),
});

const fileBufferData2 = await indexedDB.files.toArray();
Expand All @@ -156,14 +164,20 @@ describe('IndexedDBFileManager', () => {
});

it('should return the table data', async () => {
const fileData = await fileManager.getTableData('taxi1');
const fileData = await fileManager.getTableData({
name: 'taxi1',
partitions: fileBuffer.partitions,
});

expect(fileData).toEqual({
files: [
{ fileName: 'taxi1.parquet', fileType: 'parquet' },
{ fileName: 'taxi2.parquet', fileType: 'parquet' }
{
fileName: 'taxi1.parquet',
fileType: 'parquet',
partitions: fileBuffer.partitions,
},
],
tableName: 'taxi1'
tableName: 'taxi1',
});
});

Expand Down
2 changes: 1 addition & 1 deletion meerkat-dbm/src/file-manager/file-manager-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface FileManagerType {
* @param tableName - The name of the table.
* @returns Table object if found.
*/
getTableData: (tableName: string) => Promise<Table | undefined>;
getTableData: (table: TableConfig) => Promise<Table | undefined>;

/**
* @description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,15 @@ export class IndexedDBFileManager implements FileManagerType {
await Promise.all(promises);
}

async getTableData(tableName: string): Promise<Table | undefined> {
const tableData = await this.indexedDB.tablesKey.get(tableName);
async getTableData(table: TableConfig): Promise<Table | undefined> {
const tableData = await this.indexedDB.tablesKey.get(table.name);

if (!tableData) return undefined;

return tableData;
return {
...tableData,
files: getFilesByPartition(tableData?.files ?? [], table.partitions),
};
}

async setTableMetadata(tableName: string, metadata: object): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class MemoryDBFileManager implements FileManagerType {
return [];
}

async getTableData(tableName: string): Promise<Table | undefined> {
async getTableData(table: TableConfig): Promise<Table | undefined> {
// not needed for memory file manager
return;
}
Expand Down