Skip to content

Commit

Permalink
fix: ensure events are inserted into the raw event request table (#1925)
Browse files Browse the repository at this point in the history
* feat: ensure events are inserted into the raw event request table otherwise explicit error

* chore: fix a few missing JSON.parse calls

* fix: do not string escape the json when inserting into event_observer_requests

* fix: event-server requested ending incorrectly

* fix: revert back to passing json object to avoid escaping issues with sql.unsafe()

* fix: fix drop_mempool_tx handler

---------

Co-authored-by: semantic-release-bot <semantic-release-bot@martynus.net>
Co-authored-by: Chris Guimaraes <cguimaraes@hiro.so>
  • Loading branch information
3 people committed Jun 3, 2024
1 parent 2c572f2 commit 34a8454
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 49 deletions.
40 changes: 19 additions & 21 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,25 @@ export class PgWriteStore extends PgStore {
return store;
}

async storeRawEventRequest(eventPath: string, payload: PgJsonb): Promise<void> {
// To avoid depending on the DB more than once and to allow the query transaction to settle,
// we'll take the complete insert result and move that to the output TSV file instead of taking
// only the `id` and performing a `COPY` of that row later.
const insertResult = await this.sql<
{
id: string;
receive_timestamp: string;
event_path: string;
payload: string;
}[]
>`INSERT INTO event_observer_requests(
event_path, payload
) values(${eventPath}, ${payload})
RETURNING id, receive_timestamp::text, event_path, payload::text
`;
if (insertResult.length !== 1) {
throw new Error(
`Unexpected row count ${insertResult.length} when storing event_observer_requests entry`
);
}
async storeRawEventRequest(eventPath: string, payload: any): Promise<void> {
await this.sqlWriteTransaction(async sql => {
const insertResult = await sql<
{
id: string;
receive_timestamp: string;
event_path: string;
}[]
>`INSERT INTO event_observer_requests(
event_path, payload
) values(${eventPath}, ${payload})
RETURNING id, receive_timestamp::text, event_path
`;
if (insertResult.length !== 1) {
throw new Error(
`Unexpected row count ${insertResult.length} when storing event_observer_requests entry`
);
}
});
}

async update(data: DataStoreBlockUpdateData): Promise<void> {
Expand Down
49 changes: 21 additions & 28 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ export async function startEventServer(opts: {

const app = express();

const handleRawEventRequest = asyncHandler(async req => {
const handleRawEventRequest = async (req: express.Request) => {
await messageHandler.handleRawEventRequest(req.path, req.body, db);

if (logger.level === 'debug') {
Expand All @@ -938,10 +938,9 @@ export async function startEventServer(opts: {
}
logger.debug(`${eventPath} ${payload}`, { component: 'stacks-node-event' });
}
});
};

app.use(loggerMiddleware);

app.use(bodyParser.json({ type: 'application/json', limit: '500MB' }));

const ibdHeight = getIbdBlockHeight();
Expand All @@ -952,7 +951,7 @@ export async function startEventServer(opts: {
if (chainTip.block_height > ibdHeight) {
next();
} else {
handleRawEventRequest(req, res, next);
await handleRawEventRequest(req);
res.status(200).send(`IBD`);
}
} catch (error) {
Expand All @@ -971,101 +970,95 @@ export async function startEventServer(opts: {

app.post(
'/new_block',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const blockMessage: CoreNodeBlockMessage = req.body;
await messageHandler.handleBlockMessage(opts.chainId, blockMessage, db);
if (blockMessage.block_height === 1) {
await handleBnsImport(db);
}
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_block');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/new_burn_block',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeBurnBlockMessage = req.body;
await messageHandler.handleBurnBlock(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_burn_block');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/new_mempool_tx',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const rawTxs: string[] = req.body;
await messageHandler.handleMempoolTxs(rawTxs, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_mempool_tx');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/drop_mempool_tx',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeDropMempoolTxMessage = req.body;
await messageHandler.handleDroppedMempoolTxs(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /drop_mempool_tx');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/attachments/new',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeAttachmentMessage[] = req.body;
await messageHandler.handleNewAttachment(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /attachments/new');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/new_microblocks',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeMicroblockMessage = req.body;
await messageHandler.handleMicroblockMessage(opts.chainId, msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_microblocks');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post('*', (req, res, next) => {
Expand Down
65 changes: 65 additions & 0 deletions src/tests-event-replay/raw-event-request-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,69 @@ describe('Events table', () => {
}
);
});

test('Large event requests are stored correctly', async () => {
const getRawEventCount = async () => {
const [row] = await client<{ count: string }[]>`SELECT count(*) from event_observer_requests`;
return Number(row.count);
};

await useWithCleanup(
async () => {
const eventServer = await startEventServer({
datastore: db,
chainId: ChainID.Mainnet,
serverHost: '127.0.0.1',
serverPort: 0,
});
return [eventServer, eventServer.closeAsync] as const;
},
async eventServer => {
// split the tsv file into lines, split each line by tab, find the first line that has a cell value of `/new_block`
const sampleTsv = fs
.readFileSync('src/tests-event-replay/tsv/mainnet-block0.tsv', 'utf8')
.split('\n')
.map(line => line.split('\t'))
.find(line => line[2] === '/new_block');
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const sampleNewBlock = JSON.parse(sampleTsv![3]);
console.log(sampleTsv);
// Create a huge JSON object, 10000 nodes, 20 layers deep, some nodes containing 4 megabytes of data
function generateNestedObject(depth: number, nodesPerLevel: number, currentDepth = 1): any {
if (currentDepth > depth) {
// Return a leaf object instead of trying to link back to the top-level node
return { info: `Leaf at depth ${currentDepth}` };
}
// Create a new object for each call to ensure uniqueness
const currentNode: any = {};
for (let i = 0; i < nodesPerLevel; i++) {
currentNode[`node_${currentDepth}_${i}`] =
currentDepth === depth
? { info: `Simulated large node leaf at ${currentDepth}_${i}` }
: generateNestedObject(depth, nodesPerLevel, currentDepth + 1);
}
return currentNode;
}
let hugeJsonObject = generateNestedObject(10, 3);
hugeJsonObject = Object.assign(hugeJsonObject, sampleNewBlock);
hugeJsonObject['very_large_value'] = 'x'.repeat(100 * 1024 * 1024); // 100 megabytes
const rawEvent = {
event_path: '/new_block',
payload: JSON.stringify(hugeJsonObject),
};
const rawEventRequestCountBefore = await getRawEventCount();
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
expect(response.statusCode).toBe(200);
const rawEventRequestCountAfter = await getRawEventCount();
expect(rawEventRequestCountAfter).toEqual(rawEventRequestCountBefore + 1);
}
);
});
});

1 comment on commit 34a8454

@Zionsammy
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/tests-event-replay/raw-event-request-tests.ts

Please sign in to comment.