Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 45 additions & 77 deletions server/mongo/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import { createHash } from 'crypto'
import {
type AbstractCursor,
type AnyBulkWriteOperation,
type BulkWriteResult,
type Collection,
type Db,
type Document,
Expand Down Expand Up @@ -1107,57 +1106,6 @@ class MongoAdapter extends MongoAdapterBase {
}
}

bulkOps = new Map<Domain, AnyBulkWriteOperation<Doc>[]>()

async _pushBulk (ctx: MeasureContext): Promise<void> {
const bulk = Array.from(this.bulkOps.entries())
this.bulkOps.clear()
if (bulk.length === 0) {
return
}
const promises: Promise<BulkWriteResult>[] = []
for (const [domain, ops] of bulk) {
if (ops === undefined || ops.length === 0) {
continue
}
const coll = this.db.collection<Doc>(domain)

promises.push(
addOperation(
ctx,
'bulk-write',
{ domain, operations: ops.length },
async (ctx) =>
await ctx.with(
'bulk-write',
{ domain },
() =>
coll.bulkWrite(ops, {
ordered: false
}),
{
domain,
operations: ops.length
}
)
)
)
}
await Promise.all(promises)
}

async pushBulk (ctx: MeasureContext, domain: Domain, ops: AnyBulkWriteOperation<Doc>[]): Promise<void> {
const existing = this.bulkOps.get(domain)
if (existing !== undefined) {
existing.push(...ops)
} else {
this.bulkOps.set(domain, ops)
}
// We need to wait next cycle to send request
await new Promise<void>((resolve) => setImmediate(resolve))
await this._pushBulk(ctx)
}

async tx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult[]> {
const result: TxResult[] = []

Expand All @@ -1171,6 +1119,7 @@ class MongoAdapter extends MongoAdapterBase {

const stTime = Date.now()
const st = Date.now()
let promises: Promise<any>[] = []
for (const [domain, txs] of byDomain) {
if (domain === undefined) {
continue
Expand Down Expand Up @@ -1227,9 +1176,37 @@ class MongoAdapter extends MongoAdapterBase {
}

if (ops.length > 0) {
await this.pushBulk(ctx, domain, ops)
if (ops === undefined || ops.length === 0) {
continue
}
const coll = this.db.collection<Doc>(domain)

promises.push(
addOperation(
ctx,
'bulk-write',
{ domain, operations: ops.length },
async (ctx) =>
await ctx.with(
'bulk-write',
{ domain },
() =>
coll.bulkWrite(ops, {
ordered: false
}),
{
domain,
operations: ops.length
}
)
)
)
}
if (domainBulk.findUpdate.size > 0) {
if (promises.length > 0) {
await Promise.all(promises)
promises = []
}
const coll = this.db.collection<Doc>(domain)

await ctx.with(
Expand All @@ -1255,6 +1232,10 @@ class MongoAdapter extends MongoAdapterBase {
}

if (domainBulk.raw.length > 0) {
if (promises.length > 0) {
await Promise.all(promises)
promises = []
}
await ctx.with(
'raw',
{},
Expand All @@ -1270,6 +1251,9 @@ class MongoAdapter extends MongoAdapterBase {
)
}
}
if (promises.length > 0) {
await Promise.all(promises)
}
return result
}

Expand Down Expand Up @@ -1511,17 +1495,12 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
await this._db.init(DOMAIN_TX)
}

txBulk: Tx[] = []

async _bulkTx (ctx: MeasureContext): Promise<void> {
const txes = this.txBulk
this.txBulk = []

if (txes.length === 0) {
return
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}

const opName = txes.length === 1 ? 'tx-one' : 'tx'
const opName = tx.length === 1 ? 'tx-one' : 'tx'
await addOperation(
ctx,
opName,
Expand All @@ -1532,31 +1511,20 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
{ domain: 'tx' },
() =>
this.txCollection().insertMany(
txes.map((it) => translateDoc(it)),
tx.map((it) => translateDoc(it)),
{
ordered: false
}
),
{
count: txes.length
count: tx.length
}
),
{ domain: 'tx', count: txes.length }
{ domain: 'tx', count: tx.length }
)
ctx.withSync('handleEvent', {}, () => {
this.handleEvent(DOMAIN_TX, 'add', txes.length)
this.handleEvent(DOMAIN_TX, 'add', tx.length)
})
}

override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}
this.txBulk.push(...tx)

// We need to wait next cycle to send request
await new Promise<void>((resolve) => setImmediate(resolve))
await this._bulkTx(ctx)
return []
}

Expand Down