From 447314abb07deb6d88901f5f7fd887887dc8ec55 Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:37:13 +0530 Subject: [PATCH 01/17] feat: add pool config --- server/src/storage/db.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/src/storage/db.ts b/server/src/storage/db.ts index 72b9f22ec..6348a0436 100644 --- a/server/src/storage/db.ts +++ b/server/src/storage/db.ts @@ -17,6 +17,16 @@ const sequelize = new Sequelize(databaseUrl, host, dialect: 'postgres', logging: false, + pool: { + max: 10, // Maximum number of connections in pool (reduced from 20) + min: 0, // Minimum number of connections in pool (let pool shrink to 0) + acquire: 30000, // Maximum time (ms) to try to get connection before throwing error + idle: 10000, // Maximum time (ms) a connection can be idle before being released + evict: 1000, // Time interval (ms) for eviction runs + }, + dialectOptions: { + statement_timeout: 60000, // 60 seconds + }, } ); From 58e6da8d6a207783a85dc438931461c7d96342f1 Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:40:13 +0530 Subject: [PATCH 02/17] feat: shared pgboss singleton for job queue ops --- server/src/storage/pgboss.ts | 84 ++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 server/src/storage/pgboss.ts diff --git a/server/src/storage/pgboss.ts b/server/src/storage/pgboss.ts new file mode 100644 index 000000000..9a6eedd1c --- /dev/null +++ b/server/src/storage/pgboss.ts @@ -0,0 +1,84 @@ +/** + * Shared PgBoss singleton for job queue operations + * + * This module provides a single PgBoss instance that can be safely + * imported by both the main server process and routes without creating + * duplicate connection pools. + * + * IMPORTANT: This is separate from pgboss-worker.ts which runs in a + * forked child process and handles job processing. + */ +import PgBoss from 'pg-boss'; +import logger from '../logger'; +import dotenv from 'dotenv'; + +dotenv.config(); + +if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { + throw new Error('One or more required environment variables are missing.'); +} + +const pgBossConnectionString = `postgres://${process.env.DB_USER}:${encodeURIComponent(process.env.DB_PASSWORD)}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; + +/** + * Shared PgBoss instance for submitting jobs (NOT processing) + * This instance is only used to send jobs to queues, not to work on them + */ +export const pgBossClient = new PgBoss({ + connectionString: pgBossConnectionString, + max: 3, // Small pool since we only send jobs + ssl: { + require: true, + rejectUnauthorized: false, + }, +}); + +let isStarted = false; + +/** + * Initialize the PgBoss client for job submission + * Should be called once during server startup + */ +export async function startPgBossClient(): Promise { + if (isStarted) { + logger.log('warn', 'PgBoss client already started, skipping...'); + return; + } + + try { + await pgBossClient.start(); + isStarted = true; + logger.log('info', 'PgBoss client started successfully (job submission only)'); + } catch (error: any) { + logger.log('error', `Failed to start PgBoss client: ${error.message}`); + throw error; + } +} + +/** + * Stop the PgBoss client gracefully + */ +export async function stopPgBossClient(): Promise { + if (!isStarted) { + return; + } + + try { + await pgBossClient.stop(); + isStarted = false; + logger.log('info', 'PgBoss client stopped successfully'); + } catch (error: any) { + logger.log('error', `Failed to stop PgBoss client: ${error.message}`); + } +} + +// Handle graceful shutdown +process.on('SIGTERM', async () => { + await stopPgBossClient(); +}); + +process.on('SIGINT', async () => { + await stopPgBossClient(); +}); + +export default pgBossClient; \ No newline at end of file From 6efd92ace93e54e04273125e01a44404c3f0e979 Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:40:59 +0530 Subject: [PATCH 03/17] feat: shared schedule utils to avoid connection leaks --- server/src/storage/schedule.ts | 68 ++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 server/src/storage/schedule.ts diff --git a/server/src/storage/schedule.ts b/server/src/storage/schedule.ts new file mode 100644 index 000000000..33ddd7458 --- /dev/null +++ b/server/src/storage/schedule.ts @@ -0,0 +1,68 @@ +/** + * Shared scheduling utilities + * These functions use the shared PgBoss client to avoid connection leaks + */ +import { v4 as uuid } from 'uuid'; +import logger from '../logger'; +import { pgBossClient } from './pgboss'; + +/** + * Utility function to schedule a cron job using PgBoss + * @param id The robot ID + * @param userId The user ID + * @param cronExpression The cron expression for scheduling + * @param timezone The timezone for the cron expression + */ +export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { + try { + const runId = uuid(); + + const queueName = `scheduled-workflow-${id}`; + + logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); + + await pgBossClient.createQueue(queueName); + + await pgBossClient.schedule(queueName, cronExpression, + { id, runId, userId }, + { tz: timezone } + ); + + logger.log('info', `Scheduled workflow job for robot ${id}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to schedule workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Utility function to cancel a scheduled job + * @param robotId The robot ID + * @returns true if successful + */ +export async function cancelScheduledWorkflow(robotId: string) { + try { + const jobs = await pgBossClient.getSchedules(); + + const matchingJobs = jobs.filter((job: any) => { + try { + const data = job.data; + return data && data.id === robotId; + } catch { + return false; + } + }); + + for (const job of matchingJobs) { + logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); + await pgBossClient.unschedule(job.name); + } + + return true; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); + throw error; + } +} \ No newline at end of file From 87346cba1afc5e2c8ec501a15d6b6b2df1296d94 Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:45:30 +0530 Subject: [PATCH 04/17] feat: use pgBossClient --- server/src/routes/record.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index 8a5898113..6676fdb31 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -17,7 +17,7 @@ import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import logger from "../logger"; import { requireSignIn } from '../middlewares/auth'; -import { pgBoss } from '../pgboss-worker'; +import { pgBossClient } from '../storage/pgboss'; export const router = Router(); chromium.use(stealthPlugin()); @@ -36,7 +36,7 @@ async function waitForJobCompletion(jobId: string, queueName: string, timeout = } try { - const job = await pgBoss.getJobById(queueName, jobId); + const job = await pgBossClient.getJobById(queueName, jobId); if (!job) { return reject(new Error(`Job ${jobId} not found`)); @@ -79,9 +79,9 @@ router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Respo } try { - await pgBoss.createQueue('initialize-browser-recording'); + await pgBossClient.createQueue('initialize-browser-recording'); - const jobId = await pgBoss.send('initialize-browser-recording', { + const jobId = await pgBossClient.send('initialize-browser-recording', { userId: req.user.id, timestamp: new Date().toISOString() }); @@ -139,9 +139,9 @@ router.get('/stop/:browserId', requireSignIn, async (req: AuthenticatedRequest, } try { - await pgBoss.createQueue('destroy-browser'); + await pgBossClient.createQueue('destroy-browser'); - const jobId = await pgBoss.send('destroy-browser', { + const jobId = await pgBossClient.send('destroy-browser', { browserId: req.params.browserId, userId: req.user.id, timestamp: new Date().toISOString() @@ -233,9 +233,9 @@ router.get('/interpret', requireSignIn, async (req: AuthenticatedRequest, res) = } try { - await pgBoss.createQueue('interpret-workflow'); + await pgBossClient.createQueue('interpret-workflow'); - const jobId = await pgBoss.send('interpret-workflow', { + const jobId = await pgBossClient.send('interpret-workflow', { userId: req.user.id, timestamp: new Date().toISOString() }); @@ -270,9 +270,9 @@ router.get('/interpret/stop', requireSignIn, async (req: AuthenticatedRequest, r } try { - await pgBoss.createQueue('stop-interpretation'); + await pgBossClient.createQueue('stop-interpretation'); - const jobId = await pgBoss.send('stop-interpretation', { + const jobId = await pgBossClient.send('stop-interpretation', { userId: req.user.id, timestamp: new Date().toISOString() }); From cbe5685dd153f750cc28dc85c6af557935b441ec Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:47:50 +0530 Subject: [PATCH 05/17] chore: -rm unused import --- server/src/routes/storage.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 8451c7205..f59c250dd 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -7,7 +7,6 @@ import { browserPool } from "../server"; import { v4 as uuid } from "uuid"; import moment from 'moment-timezone'; import cron from 'node-cron'; -import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; import Robot from '../models/Robot'; import Run from '../models/Run'; From ffe248810d474517a2070e03b14588ef920ce7bf Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:51:45 +0530 Subject: [PATCH 06/17] feat: use pgBossClient --- server/src/routes/storage.ts | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index f59c250dd..235e5f744 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -15,8 +15,8 @@ import { computeNextRun } from '../utils/schedule'; import { capture } from "../utils/analytics"; import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; -import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; -import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker'; +import { cancelScheduledWorkflow, scheduleWorkflow } from '../storage/schedule'; +import { pgBossClient } from '../storage/pgboss'; chromium.use(stealthPlugin()); export const router = Router(); @@ -590,7 +590,7 @@ router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) * PUT endpoint for starting a remote browser instance and saving run metadata to the storage. * Making it ready for interpretation and returning a runId. * - * If the user has reached their browser limit, the run will be queued using PgBoss. + * If the user has reached their browser limit, the run will be queued using pgBossClient. */ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { @@ -664,10 +664,9 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => try { const userQueueName = `execute-run-user-${req.user.id}`; - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); - - const jobId = await pgBoss.send(userQueueName, { + await pgBossClient.createQueue(userQueueName); + + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: runId, browserId: browserId, @@ -782,10 +781,9 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re const userQueueName = `execute-run-user-${req.user.id}`; // Queue the execution job - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); + await pgBossClient.createQueue(userQueueName); - const jobId = await pgBoss.send(userQueueName, { + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: req.params.id, browserId: plainRun.browserId @@ -974,7 +972,7 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, return res.status(404).json({ error: 'Robot not found' }); } - // Cancel the scheduled job in PgBoss + // Cancel the scheduled job in pgBossClient try { await cancelScheduledWorkflow(id); } catch (error) { @@ -1055,10 +1053,9 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, } const userQueueName = `abort-run-user-${req.user.id}`; - await pgBoss.createQueue(userQueueName); - await registerAbortWorkerForQueue(userQueueName); - - const jobId = await pgBoss.send(userQueueName, { + await pgBossClient.createQueue(userQueueName); + + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: req.params.id }); @@ -1123,10 +1120,9 @@ async function processQueuedRuns() { }); const userQueueName = `execute-run-user-${userId}`; - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); + await pgBossClient.createQueue(userQueueName); - const jobId = await pgBoss.send(userQueueName, { + const jobId = await pgBossClient.send(userQueueName, { userId: userId, runId: queuedRun.runId, browserId: newBrowserId, From 49e1dc0056aa16c00d94088f3a96ed95d15c1d5c Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:54:55 +0530 Subject: [PATCH 07/17] feat: circuit breaker for db connection issues --- server/src/routes/storage.ts | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 235e5f744..79e94eabb 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -1076,13 +1076,22 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, } }); +// Circuit breaker for database connection issues +let consecutiveDbErrors = 0; +const MAX_CONSECUTIVE_ERRORS = 3; +const CIRCUIT_BREAKER_COOLDOWN = 30000; +let circuitBreakerOpenUntil = 0; + async function processQueuedRuns() { try { + if (Date.now() < circuitBreakerOpenUntil) { + return; + } const queuedRun = await Run.findOne({ where: { status: 'queued' }, - order: [['startedAt', 'ASC']] + order: [['startedAt', 'ASC']], }); - + consecutiveDbErrors = 0; if (!queuedRun) return; const userId = queuedRun.runByUserId; @@ -1140,7 +1149,14 @@ async function processQueuedRuns() { } } } catch (error: any) { - logger.log('error', `Error processing queued runs: ${error.message}`); + consecutiveDbErrors++; + + if (consecutiveDbErrors >= MAX_CONSECUTIVE_ERRORS) { + circuitBreakerOpenUntil = Date.now() + CIRCUIT_BREAKER_COOLDOWN; + logger.log('error', `Circuit breaker opened after ${MAX_CONSECUTIVE_ERRORS} consecutive errors. Cooling down for ${CIRCUIT_BREAKER_COOLDOWN/1000}s`); + } + + logger.log('error', `Error processing queued runs (${consecutiveDbErrors}/${MAX_CONSECUTIVE_ERRORS}): ${error.message}`); } } From 915c8f1271cbb71254b3f8f29167e528d086ef3f Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 15:58:50 +0530 Subject: [PATCH 08/17] feat: set max limit + cleanup schedule utils --- server/src/pgboss-worker.ts | 3 +- server/src/schedule-worker.ts | 70 +++-------------------------------- 2 files changed, 7 insertions(+), 66 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index f5d719b46..bb33ffe5c 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -58,7 +58,8 @@ interface AbortRunData { const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, - expireInHours: 23 + expireInHours: 23, + max: 3, }); /** diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts index c75770e41..da3f9dd45 100644 --- a/server/src/schedule-worker.ts +++ b/server/src/schedule-worker.ts @@ -14,7 +14,11 @@ if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIComponent(process.env.DB_PASSWORD)}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; -const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); +const pgBoss = new PgBoss({ + connectionString: pgBossConnectionString, + max: 5, + expireInHours: 23, + }); const registeredQueues = new Set(); @@ -23,70 +27,6 @@ interface ScheduledWorkflowData { runId: string; userId: string; } - -/** - * Utility function to schedule a cron job using PgBoss - * @param id The robot ID - * @param userId The user ID - * @param cronExpression The cron expression for scheduling - * @param timezone The timezone for the cron expression - */ -export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { - try { - const runId = uuid(); - - const queueName = `scheduled-workflow-${id}`; - - logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); - - await pgBoss.createQueue(queueName); - - await pgBoss.schedule(queueName, cronExpression, - { id, runId, userId }, - { tz: timezone } - ); - - await registerWorkerForQueue(queueName); - - logger.log('info', `Scheduled workflow job for robot ${id}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to schedule workflow: ${errorMessage}`); - throw error; - } -} - -/** - * Utility function to cancel a scheduled job - * @param robotId The robot ID - * @returns true if successful - */ -export async function cancelScheduledWorkflow(robotId: string) { - try { - const jobs = await pgBoss.getSchedules(); - - const matchingJobs = jobs.filter((job: any) => { - try { - const data = job.data; - return data && data.id === robotId; - } catch { - return false; - } - }); - - for (const job of matchingJobs) { - logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); - await pgBoss.unschedule(job.name); - } - - return true; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); - throw error; - } -} - /** * Process a scheduled workflow job */ From ac1c728e962ae75d2f93d22594595615a9a0c3bc Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 16:03:53 +0530 Subject: [PATCH 09/17] chore: modify server config --- server/src/server.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/server.ts b/server/src/server.ts index 5aa8efeda..13050e781 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -22,6 +22,7 @@ import session from 'express-session'; import Run from './models/Run'; import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage'; import { startWorkers } from './pgboss-worker'; +import { stopPgBossClient, startPgBossClient } from './storage/pgboss' const app = express(); app.use(cors({ @@ -37,8 +38,8 @@ const pool = new Pool({ database: process.env.DB_NAME, password: process.env.DB_PASSWORD, port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined, - max: 50, - min: 5, + max: 10, + min: 0, idleTimeoutMillis: 30000, connectionTimeoutMillis: 10000, maxUses: 7500, @@ -152,8 +153,13 @@ if (require.main === module) { logger.log('info', 'Cleaning up stale browser slots...'); browserPool.cleanupStaleBrowserSlots(); - + + // Recover orphaned runs from potential crashes await recoverOrphanedRuns(); + // Start pgBoss client for job submission + await startPgBossClient(); + + // Start pgBoss workers AFTER recovery is complete await startWorkers(); io = new Server(server); From 66d31d2c650c6327768167f76b50638101d3bfaf Mon Sep 17 00:00:00 2001 From: amhsirak Date: Fri, 28 Nov 2025 16:04:09 +0530 Subject: [PATCH 10/17] chore: remove unused import --- server/src/server.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/server.ts b/server/src/server.ts index 13050e781..4a33b67be 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -19,7 +19,6 @@ import swaggerSpec from './swagger/config'; import connectPgSimple from 'connect-pg-simple'; import pg from 'pg'; import session from 'express-session'; -import Run from './models/Run'; import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage'; import { startWorkers } from './pgboss-worker'; import { stopPgBossClient, startPgBossClient } from './storage/pgboss' From 0e6bc22dce0bc397dbf1c5f560d1c21ee880bb39 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 11:11:02 +0530 Subject: [PATCH 11/17] feat: add max queue integration tasks --- .../integrations/airtable.ts | 80 ++++++++++++------- .../integrations/gsheet.ts | 51 +++++++++--- 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 0437afc37..788cb60b1 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -18,8 +18,25 @@ interface SerializableOutput { const MAX_RETRIES = 3; const BASE_API_DELAY = 2000; +const MAX_QUEUE_SIZE = 1000; export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {}; +let isProcessingAirtable = false; + +export function addAirtableUpdateTask(runId: string, task: AirtableUpdateTask): boolean { + const currentSize = Object.keys(airtableUpdateTasks).length; + + if (currentSize >= MAX_QUEUE_SIZE) { + logger.log('warn', `Airtable task queue full (${currentSize}/${MAX_QUEUE_SIZE}), dropping oldest task`); + const oldestKey = Object.keys(airtableUpdateTasks)[0]; + if (oldestKey) { + delete airtableUpdateTasks[oldestKey]; + } + } + + airtableUpdateTasks[runId] = task; + return true; +} async function refreshAirtableToken(refreshToken: string) { try { @@ -44,15 +61,13 @@ async function refreshAirtableToken(refreshToken: string) { } } - function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record) { const allRecords: Record[] = []; - + const schemaData: Array<{ Group: string; Field: string; Value: any }> = []; const listData: any[] = []; - const screenshotData: Array<{key: string, url: string}> = []; - - // Collect schema data + const screenshotData: Array<{ key: string; url: string }> = []; + if (serializableOutput.scrapeSchema) { if (Array.isArray(serializableOutput.scrapeSchema)) { for (const schemaArray of serializableOutput.scrapeSchema) { @@ -82,8 +97,7 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: } } } - - // Collect list data + if (serializableOutput.scrapeList) { if (Array.isArray(serializableOutput.scrapeList)) { for (const listArray of serializableOutput.scrapeList) { @@ -107,8 +121,8 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: } } } - - // Collect screenshot data + + // Collect screenshot data (handles both string and object forms safely) // if (binaryOutput && Object.keys(binaryOutput).length > 0) { // Object.entries(binaryOutput).forEach(([key, rawValue]: [string, any]) => { // if (!key || key.trim() === "") return; @@ -136,37 +150,38 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: // } // }); // } - - // Mix all data types together to create consecutive records + + // --- Merge all types into Airtable rows --- const maxLength = Math.max(schemaData.length, listData.length, screenshotData.length); - + for (let i = 0; i < maxLength; i++) { const record: Record = {}; - + if (i < schemaData.length) { record.Group = schemaData[i].Group; record.Label = schemaData[i].Field; record.Value = schemaData[i].Value; } - + if (i < listData.length) { - Object.entries(listData[i]).forEach(([key, value]) => { - if (value !== null && value !== undefined && value !== '') { + Object.entries(listData[i] || {}).forEach(([key, value]) => { + if (value !== null && value !== undefined && value !== "") { record[key] = value; } }); } - + if (i < screenshotData.length) { record.Key = screenshotData[i].key; record.Screenshot = screenshotData[i].url; } - + if (Object.keys(record).length > 0) { allRecords.push(record); } } - + + // Push leftovers for (let i = maxLength; i < schemaData.length; i++) { allRecords.push({ Label: schemaData[i].Field, Value: schemaData[i].Value }); } @@ -179,7 +194,7 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Screenshot: screenshotData[i].url, }); } - + return allRecords; } @@ -497,10 +512,18 @@ function isValidUrl(str: string): boolean { } export const processAirtableUpdates = async () => { - const maxProcessingTime = 60000; - const startTime = Date.now(); + if (isProcessingAirtable) { + logger.log('info', 'Airtable processing already in progress, skipping'); + return; + } - while (Date.now() - startTime < maxProcessingTime) { + isProcessingAirtable = true; + + try { + const maxProcessingTime = 60000; + const startTime = Date.now(); + + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; for (const runId in airtableUpdateTasks) { @@ -535,9 +558,12 @@ export const processAirtableUpdates = async () => { break; } - console.log('Waiting for 5 seconds before checking again...'); - await new Promise(resolve => setTimeout(resolve, 5000)); - } + console.log('Waiting for 5 seconds before checking again...'); + await new Promise(resolve => setTimeout(resolve, 5000)); + } - console.log('Airtable processing completed or timed out'); + console.log('Airtable processing completed or timed out'); + } finally { + isProcessingAirtable = false; + } }; \ No newline at end of file diff --git a/server/src/workflow-management/integrations/gsheet.ts b/server/src/workflow-management/integrations/gsheet.ts index c32e4fe01..b0871b750 100644 --- a/server/src/workflow-management/integrations/gsheet.ts +++ b/server/src/workflow-management/integrations/gsheet.ts @@ -11,13 +11,31 @@ interface GoogleSheetUpdateTask { } interface SerializableOutput { - scrapeSchema?: any[]; - scrapeList?: any[]; + scrapeSchema?: Record; + scrapeList?: Record; } + const MAX_RETRIES = 5; +const MAX_QUEUE_SIZE = 1000; export let googleSheetUpdateTasks: { [runId: string]: GoogleSheetUpdateTask } = {}; +let isProcessingGoogleSheets = false; + +export function addGoogleSheetUpdateTask(runId: string, task: GoogleSheetUpdateTask): boolean { + const currentSize = Object.keys(googleSheetUpdateTasks).length; + + if (currentSize >= MAX_QUEUE_SIZE) { + logger.log('warn', `Google Sheets task queue full (${currentSize}/${MAX_QUEUE_SIZE}), dropping oldest task`); + const oldestKey = Object.keys(googleSheetUpdateTasks)[0]; + if (oldestKey) { + delete googleSheetUpdateTasks[oldestKey]; + } + } + + googleSheetUpdateTasks[runId] = task; + return true; +} export async function updateGoogleSheet(robotId: string, runId: string) { try { @@ -144,7 +162,7 @@ async function ensureSheetExists(spreadsheetId: string, sheetName: string, robot fields: 'sheets.properties.title' }); - const existingSheets = response.data.sheets?.map(sheet => sheet.properties?.title) || []; + const existingSheets = response.data.sheets?.map((sheet: any) => sheet.properties?.title) || []; if (!existingSheets.includes(sheetName)) { await sheets.spreadsheets.batchUpdate({ @@ -219,7 +237,7 @@ export async function writeDataToSheet( refresh_token: robot.google_refresh_token, }); - oauth2Client.on('tokens', async (tokens) => { + oauth2Client.once('tokens', async (tokens: any) => { if (tokens.refresh_token || tokens.access_token) { const robotModel = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); if (robotModel) { @@ -292,10 +310,18 @@ export async function writeDataToSheet( } export const processGoogleSheetUpdates = async () => { - const maxProcessingTime = 60000; - const startTime = Date.now(); + if (isProcessingGoogleSheets) { + logger.log('info', 'Google Sheets processing already in progress, skipping'); + return; + } + + isProcessingGoogleSheets = true; + + try { + const maxProcessingTime = 60000; + const startTime = Date.now(); - while (Date.now() - startTime < maxProcessingTime) { + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; for (const runId in googleSheetUpdateTasks) { @@ -328,9 +354,12 @@ export const processGoogleSheetUpdates = async () => { break; } - console.log('Waiting for 5 seconds before checking again...'); - await new Promise(resolve => setTimeout(resolve, 5000)); - } + console.log('Waiting for 5 seconds before checking again...'); + await new Promise(resolve => setTimeout(resolve, 5000)); + } - console.log('Google Sheets processing completed or timed out'); + console.log('Google Sheets processing completed or timed out'); + } finally { + isProcessingGoogleSheets = false; + } }; \ No newline at end of file From 2a1d461357965aef40c7016adc1b9d6bc8630019 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 12:43:19 +0530 Subject: [PATCH 12/17] feat: add execution timeouts api, scheduled, manual run --- server/src/api/record.ts | 171 +++++++++++++++--- server/src/pgboss-worker.ts | 163 ++++++++++------- .../workflow-management/scheduler/index.ts | 121 ++++++++++--- 3 files changed, 336 insertions(+), 119 deletions(-) diff --git a/server/src/api/record.ts b/server/src/api/record.ts index 7c665001e..1f567c3e7 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -15,8 +15,8 @@ import { AuthenticatedRequest } from "../routes/record" import {capture} from "../utils/analytics"; import { Page } from "playwright"; import { WorkflowFile } from "maxun-core"; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet"; -import { airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable"; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet"; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable"; import { sendWebhook } from "../routes/webhook"; import { convertPageToHTML, convertPageToMarkdown } from '../markdownify/scrape'; @@ -557,32 +557,44 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} + async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } } -async function readyForRunHandler(browserId: string, id: string, userId: string, requestedFormats?: string[]){ +async function readyForRunHandler(browserId: string, id: string, userId: string, socket: Socket){ try { - const result = await executeRun(id, userId, requestedFormats); + const result = await executeRun(id, userId); if (result && result.success) { logger.log('info', `Interpretation of ${id} succeeded`); @@ -599,6 +611,8 @@ async function readyForRunHandler(browserId: string, id: string, userId: string, logger.error(`Error during readyForRunHandler: ${error.message}`); await destroyRemoteBrowser(browserId, userId); return null; + } finally { + cleanupSocketConnection(socket, browserId, id); } } @@ -688,15 +702,23 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ let html = ''; const serializableOutput: any = {}; - // Markdown conversion + const SCRAPE_TIMEOUT = 120000; + if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion if (formats.includes('html')) { - html = await convertPageToHTML(url); + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -808,6 +830,22 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ ); } + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', { + robot_id: plainRun.robotMetaId, + run_id: plainRun.runId, + robot_name: recording.recording_meta.name, + status: 'failed', + finished_at: new Date().toLocaleString(), + error: { + message: error.message, + type: 'ConversionError' + } + }); + } catch (webhookError: any) { + logger.log('warn', `Failed to send webhook for failed API scrape run ${plainRun.runId}: ${webhookError.message}`); + } + capture("maxun-oss-run-created-api", { runId: plainRun.runId, user_id: userId, @@ -838,13 +876,24 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ browser.interpreter.setRunId(plainRun.runId); - const interpretationInfo = await browser.interpreter.InterpretRecording( + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + if (browser && browser.interpreter) { + await browser.interpreter.clearState(); + } await destroyRemoteBrowser(plainRun.browserId, userId); const updatedRun = await run.update({ @@ -854,6 +903,25 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ binaryOutput: uploadedBinaryOutput, }); + try { + const completionData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording.recording_meta.name, + status: 'success', + finishedAt: new Date().toLocaleString(), + runByUserId: plainRun.runByUserId, + runByScheduleId: plainRun.runByScheduleId, + runByAPI: plainRun.runByAPI || false, + browserId: plainRun.browserId + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', completionData); + logger.log('info', `API run completed notification sent for run: ${plainRun.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-completed notification for API run ${plainRun.runId}: ${socketError.message}`); + } + let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; @@ -950,6 +1018,17 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); const run = await Run.findOne({ where: { runId: id } }); if (run) { + if (browser) { + try { + if (browser.interpreter) { + await browser.interpreter.clearState(); + } + await destroyRemoteBrowser(run.browserId, userId); + } catch (cleanupError: any) { + logger.error(`Failed to cleanup browser in error handler: ${cleanupError.message}`); + } + } + await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), @@ -1020,6 +1099,8 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ } export async function handleRunRecording(id: string, userId: string, requestedFormats?: string[]) { + let socket: Socket | null = null; + try { const result = await createWorkflowAndStoreMetadata(id, userId); const { browserId, runId: newRunId } = result; @@ -1028,41 +1109,79 @@ export async function handleRunRecording(id: string, userId: string, requestedFo throw new Error('browserId or runId or userId is undefined'); } - const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { + const CONNECTION_TIMEOUT = 30000; + + socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { transports: ['websocket'], - rejectUnauthorized: false + rejectUnauthorized: false, + timeout: CONNECTION_TIMEOUT, }); - socket.on('ready-for-run', () => readyForRunHandler(browserId, newRunId, userId, requestedFormats)); + const readyHandler = () => readyForRunHandler(browserId, newRunId, userId, socket!); - logger.log('info', `Running Robot: ${id}`); + socket.on('ready-for-run', readyHandler); + + socket.on('connect_error', (error: Error) => { + logger.error(`Socket connection error for API run ${newRunId}: ${error.message}`); + cleanupSocketConnection(socket!, browserId, newRunId); + }); socket.on('disconnect', () => { - cleanupSocketListeners(socket, browserId, newRunId, userId); + cleanupSocketConnection(socket!, browserId, newRunId); }); - // Return the runId immediately, so the client knows the run is started + logger.log('info', `Running Robot: ${id}`); + return newRunId; } catch (error: any) { logger.error('Error running robot:', error); + if (socket) { + cleanupSocketConnection(socket, '', ''); + } } } -function cleanupSocketListeners(socket: Socket, browserId: string, id: string, userId: string) { - socket.off('ready-for-run', () => readyForRunHandler(browserId, id, userId)); - logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${id}`); +function cleanupSocketConnection(socket: Socket, browserId: string, id: string) { + try { + socket.removeAllListeners(); + socket.disconnect(); + + if (browserId) { + const namespace = serverIo.of(browserId); + namespace.removeAllListeners(); + namespace.disconnectSockets(true); + const nsps = (serverIo as any)._nsps; + if (nsps && nsps.has(`/${browserId}`)) { + nsps.delete(`/${browserId}`); + logger.log('debug', `Deleted namespace /${browserId} from io._nsps Map`); + } + } + + logger.log('info', `Cleaned up socket connection for browserId: ${browserId}, runId: ${id}`); + } catch (error: any) { + logger.error(`Error cleaning up socket connection: ${error.message}`); + } } async function waitForRunCompletion(runId: string, interval: number = 2000) { + const MAX_WAIT_TIME = 180 * 60 * 1000; + const startTime = Date.now(); + while (true) { - const run = await Run.findOne({ where: { runId }, raw: true }); + if (Date.now() - startTime > MAX_WAIT_TIME) { + throw new Error('Run completion timeout after 3 hours'); + } + + const run = await Run.findOne({ where: { runId } }); if (!run) throw new Error('Run not found'); if (run.status === 'success') { - return run; + return run.toJSON(); } else if (run.status === 'failed') { throw new Error('Run failed'); + } else if (run.status === 'aborted' || run.status === 'aborting') { + throw new Error('Run was aborted'); } await new Promise(resolve => setTimeout(resolve, interval)); diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index bb33ffe5c..3545cb3ae 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -15,8 +15,8 @@ import Robot from './models/Robot'; import { browserPool } from './server'; import { Page } from 'playwright'; import { capture } from './utils/analytics'; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; -import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { io as serverIo } from "./server"; import { sendWebhook } from './routes/webhook'; import { BinaryOutputService } from './storage/mino'; @@ -59,7 +59,7 @@ interface AbortRunData { const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, expireInHours: 23, - max: 3, + max: 5, }); /** @@ -86,26 +86,36 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} -// Helper function to handle integration updates async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } @@ -115,8 +125,8 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr * Modified processRunExecution function - only add browser reset */ async function processRunExecution(job: Job) { - const BROWSER_INIT_TIMEOUT = 60000; - const BROWSER_PAGE_TIMEOUT = 45000; + const BROWSER_INIT_TIMEOUT = 30000; + const BROWSER_PAGE_TIMEOUT = 15000; const data = job.data; logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); @@ -211,15 +221,23 @@ async function processRunExecution(job: Job) { let html = ''; const serializableOutput: any = {}; - // Markdown conversion + const SCRAPE_TIMEOUT = 120000; + if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion if (formats.includes('html')) { - html = await convertPageToHTML(url); + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -375,21 +393,33 @@ async function processRunExecution(job: Job) { logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`); } - // Execute the workflow - const workflow = AddGeneratedFlags(recording.recording); - browser.interpreter.setRunId(data.runId); - - const interpretationInfo = await browser.interpreter.InterpretRecording( - workflow, - currentPage, - (newPage: Page) => currentPage = newPage, - plainRun.interpreterSettings + + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( + AddGeneratedFlags(recording.recording), + currentPage, + (newPage: Page) => currentPage = newPage, + plainRun.interpreterSettings, ); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + try { + await browser.interpreter.clearState(); + logger.debug(`Cleared interpreter state for aborted run ${data.runId}`); + } catch (clearError: any) { + logger.warn(`Failed to clear interpreter state on abort: ${clearError.message}`); + } + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: true }; @@ -635,6 +665,15 @@ async function processRunExecution(job: Job) { totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0, }); + try { + if (browser && browser.interpreter) { + await browser.interpreter.clearState(); + logger.debug(`Cleared interpreter state for failed run ${data.runId}`); + } + } catch (clearError: any) { + logger.warn(`Failed to clear interpreter state on error: ${clearError.message}`); + } + await destroyRemoteBrowser(browserId, data.userId); logger.log('info', `Browser ${browserId} destroyed after failed run`); @@ -804,6 +843,8 @@ async function abortRun(runId: string, userId: string): Promise { const registeredUserQueues = new Map(); const registeredAbortQueues = new Map(); +const workerIntervals: NodeJS.Timeout[] = []; + async function registerWorkerForQueue(queueName: string) { if (!registeredUserQueues.has(queueName)) { await pgBoss.work(queueName, async (job: Job | Job[]) => { @@ -866,21 +907,7 @@ async function registerRunExecutionWorker() { const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-')); for (const queue of userQueues) { - if (!registeredUserQueues.has(queue.name)) { - await pgBoss.work(queue.name, async (job: Job | Job[]) => { - try { - const singleJob = Array.isArray(job) ? job[0] : job; - return await processRunExecution(singleJob); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`); - throw error; - } - }); - - registeredUserQueues.set(queue.name, true); - logger.log('info', `Registered worker for queue: ${queue.name}`); - } + await registerWorkerForQueue(queue.name); } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -890,10 +917,15 @@ async function registerRunExecutionWorker() { await checkForNewUserQueues(); - setInterval(async () => { - await checkForNewUserQueues(); + const userQueueInterval = setInterval(async () => { + try { + await checkForNewUserQueues(); + } catch (error: any) { + logger.log('error', `Error checking user queues: ${error.message}`); + } }, 10000); - + workerIntervals.push(userQueueInterval); + logger.log('info', 'Run execution worker registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -903,7 +935,6 @@ async function registerRunExecutionWorker() { async function registerAbortRunWorker() { try { - const registeredAbortQueues = new Map(); const checkForNewAbortQueues = async () => { try { @@ -912,25 +943,7 @@ async function registerAbortRunWorker() { const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-')); for (const queue of abortQueues) { - if (!registeredAbortQueues.has(queue.name)) { - await pgBoss.work(queue.name, async (job: Job | Job[]) => { - try { - const data = extractJobData(job); - const { userId, runId } = data; - - logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); - const success = await abortRun(runId, userId); - return { success }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`); - throw error; - } - }); - - registeredAbortQueues.set(queue.name, true); - logger.log('info', `Registered abort worker for queue: ${queue.name}`); - } + await registerAbortWorkerForQueue(queue.name); } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -940,9 +953,14 @@ async function registerAbortRunWorker() { await checkForNewAbortQueues(); - setInterval(async () => { - await checkForNewAbortQueues(); + const abortQueueInterval = setInterval(async () => { + try { + await checkForNewAbortQueues(); + } catch (error: any) { + logger.log('error', `Error checking abort queues: ${error.message}`); + } }, 10000); + workerIntervals.push(abortQueueInterval); logger.log('info', 'Abort run worker registration system initialized'); } catch (error: unknown) { @@ -1050,15 +1068,22 @@ pgBoss.on('error', (error) => { // Handle graceful shutdown process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss...'); + + logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`); + workerIntervals.forEach(clearInterval); + await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss stopped, waiting for main process cleanup...'); }); process.on('SIGINT', async () => { logger.log('info', 'SIGINT received, shutting down PgBoss...'); + + logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`); + workerIntervals.forEach(clearInterval); + await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss stopped, waiting for main process cleanup...'); }); -// For use in other files -export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers }; +export { startWorkers }; diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 470cdacb3..30ed892ba 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -5,7 +5,7 @@ import { io, Socket } from "socket.io-client"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-management/controller'; import logger from '../../logger'; import { browserPool, io as serverIo } from "../../server"; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; import Robot from "../../models/Robot"; import Run from "../../models/Run"; import { getDecryptedProxyConfig } from "../../routes/proxy"; @@ -14,7 +14,7 @@ import { capture } from "../../utils/analytics"; import { WorkflowFile } from "maxun-core"; import { Page } from "playwright"; import { sendWebhook } from "../../routes/webhook"; -import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable"; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable"; import { convertPageToMarkdown, convertPageToHTML } from "../../markdownify/scrape"; chromium.use(stealthPlugin()); @@ -104,24 +104,36 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} + async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } @@ -250,15 +262,24 @@ async function executeRun(id: string, userId: string) { let html = ''; const serializableOutput: any = {}; + const SCRAPE_TIMEOUT = 120000; + // Markdown conversion - if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + if (formats.includes("markdown")) { + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion - if (formats.includes('html')) { - html = await convertPageToHTML(url); + if (formats.includes("html")) { + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -406,10 +427,18 @@ async function executeRun(id: string, userId: string) { // Set run ID for real-time data persistence browser.interpreter.setRunId(id); - const interpretationInfo = await browser.interpreter.InterpretRecording( + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); @@ -523,9 +552,19 @@ async function executeRun(id: string, userId: string) { return true; } catch (error: any) { logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); - console.log(error.message); const run = await Run.findOne({ where: { runId: id } }); if (run) { + if (browser) { + try { + if (browser.interpreter) { + await browser.interpreter.clearState(); + } + await destroyRemoteBrowser(run.browserId, userId); + } catch (cleanupError: any) { + logger.error(`Failed to cleanup browser in error handler: ${cleanupError.message}`); + } + } + await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), @@ -586,7 +625,7 @@ async function executeRun(id: string, userId: string) { } } -async function readyForRunHandler(browserId: string, id: string, userId: string) { +async function readyForRunHandler(browserId: string, id: string, userId: string, socket: Socket) { try { const interpretation = await executeRun(id, userId); @@ -602,6 +641,8 @@ async function readyForRunHandler(browserId: string, id: string, userId: string) } catch (error: any) { logger.error(`Error during readyForRunHandler: ${error.message}`); await destroyRemoteBrowser(browserId, userId); + } finally { + cleanupSocketConnection(socket, browserId, id); } } @@ -611,6 +652,8 @@ function resetRecordingState(browserId: string, id: string) { } export async function handleRunRecording(id: string, userId: string) { + let socket: Socket | null = null; + try { const result = await createWorkflowAndStoreMetadata(id, userId); const { browserId, runId: newRunId } = result; @@ -619,27 +662,57 @@ export async function handleRunRecording(id: string, userId: string) { throw new Error('browserId or runId or userId is undefined'); } - const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { + const CONNECTION_TIMEOUT = 30000; + + socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:5000'}/${browserId}`, { transports: ['websocket'], - rejectUnauthorized: false + rejectUnauthorized: false, + timeout: CONNECTION_TIMEOUT, }); - socket.on('ready-for-run', () => readyForRunHandler(browserId, newRunId, userId)); + const readyHandler = () => readyForRunHandler(browserId, newRunId, userId, socket!); - logger.log('info', `Running robot: ${id}`); + socket.on('ready-for-run', readyHandler); + + socket.on('connect_error', (error: Error) => { + logger.error(`Socket connection error for scheduled run ${newRunId}: ${error.message}`); + cleanupSocketConnection(socket!, browserId, newRunId); + }); socket.on('disconnect', () => { - cleanupSocketListeners(socket, browserId, newRunId, userId); + cleanupSocketConnection(socket!, browserId, newRunId); }); + logger.log('info', `Running robot: ${id}`); + } catch (error: any) { logger.error('Error running recording:', error); + if (socket) { + cleanupSocketConnection(socket, '', ''); + } } } -function cleanupSocketListeners(socket: Socket, browserId: string, id: string, userId: string) { - socket.off('ready-for-run', () => readyForRunHandler(browserId, id, userId)); - logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${id}`); +function cleanupSocketConnection(socket: Socket, browserId: string, id: string) { + try { + socket.removeAllListeners(); + socket.disconnect(); + + if (browserId) { + const namespace = serverIo.of(browserId); + namespace.removeAllListeners(); + namespace.disconnectSockets(true); + const nsps = (serverIo as any)._nsps; + if (nsps && nsps.has(`/${browserId}`)) { + nsps.delete(`/${browserId}`); + logger.log('debug', `Deleted namespace /${browserId} from io._nsps Map`); + } + } + + logger.log('info', `Cleaned up socket connection for browserId: ${browserId}, runId: ${id}`); + } catch (error: any) { + logger.error(`Error cleaning up socket connection: ${error.message}`); + } } export { createWorkflowAndStoreMetadata }; \ No newline at end of file From 762654395b93897f77d2b37f3586d1dfc814caeb Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 14:40:59 +0530 Subject: [PATCH 13/17] feat: add promise timeout, socket cleanup --- .../classes/RemoteBrowser.ts | 1705 ++++------------- server/src/browser-management/controller.ts | 189 +- .../src/browser-management/inputHandlers.ts | 37 +- server/src/socket-connection/connection.ts | 8 +- .../workflow-management/classes/Generator.ts | 58 + 5 files changed, 627 insertions(+), 1370 deletions(-) diff --git a/server/src/browser-management/classes/RemoteBrowser.ts b/server/src/browser-management/classes/RemoteBrowser.ts index c88ba068b..731b583d0 100644 --- a/server/src/browser-management/classes/RemoteBrowser.ts +++ b/server/src/browser-management/classes/RemoteBrowser.ts @@ -9,13 +9,14 @@ import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import { PlaywrightBlocker } from '@cliqz/adblocker-playwright'; import fetch from 'cross-fetch'; -import sharp from 'sharp'; import logger from '../../logger'; import { InterpreterSettings } from "../../types"; import { WorkflowGenerator } from "../../workflow-management/classes/Generator"; import { WorkflowInterpreter } from "../../workflow-management/classes/Interpreter"; import { getDecryptedProxyConfig } from '../../routes/proxy'; import { getInjectableScript } from 'idcac-playwright'; +import { FingerprintInjector } from "fingerprint-injector"; +import { FingerprintGenerator } from "fingerprint-generator"; declare global { interface Window { @@ -35,52 +36,7 @@ interface RRWebSnapshot { interface ProcessedSnapshot { snapshot: RRWebSnapshot; - resources: { - stylesheets: Array<{ - href: string; - content: string; - media?: string; - }>; - images: Array<{ - src: string; - dataUrl: string; - alt?: string; - }>; - fonts: Array<{ - url: string; - dataUrl: string; - format?: string; - }>; - scripts: Array<{ - src: string; - content: string; - type?: string; - }>; - media: Array<{ - src: string; - dataUrl: string; - type: string; - }>; - }; baseUrl: string; - viewport: { width: number; height: number }; - timestamp: number; - processingStats: { - discoveredResources: { - images: number; - stylesheets: number; - scripts: number; - fonts: number; - media: number; - }; - cachedResources: { - stylesheets: number; - images: number; - fonts: number; - scripts: number; - media: number; - }; - }; } chromium.use(stealthPlugin()); @@ -91,33 +47,6 @@ const MEMORY_CONFIG = { heapUsageThreshold: 0.7 // 70% (reduced threshold to react earlier) }; -const DEFAULT_VIEWPORT = { - width: 1280, - height: 720, - deviceScaleFactor: 1, - mobile: false -}; - -const SCREENCAST_CONFIG: { - format: "jpeg" | "png"; - maxWidth: number; - maxHeight: number; - targetFPS: number; - compressionQuality: number; - maxQueueSize: number; - skipFrameThreshold: number, - enableAdaptiveQuality: boolean, -} = { - format: 'jpeg', - maxWidth: DEFAULT_VIEWPORT.width, - maxHeight: DEFAULT_VIEWPORT.height, - targetFPS: 30, - compressionQuality: 0.8, - maxQueueSize: 2, - skipFrameThreshold: 100, - enableAdaptiveQuality: true, -}; - /** * This class represents a remote browser instance. * It is used to allow a variety of interaction with the Playwright's browser instance. @@ -183,30 +112,24 @@ export class RemoteBrowser { */ public interpreter: WorkflowInterpreter; - - private screenshotQueue: Buffer[] = []; - private isProcessingScreenshot = false; - private screencastInterval: NodeJS.Timeout | null = null - private isScreencastActive: boolean = false; - - private isDOMStreamingActive: boolean = false; + public isDOMStreamingActive: boolean = false; private domUpdateInterval: NodeJS.Timeout | null = null; - private renderingMode: "screenshot" | "dom" = "screenshot"; private lastScrollPosition = { x: 0, y: 0 }; private scrollThreshold = 200; // pixels private snapshotDebounceTimeout: NodeJS.Timeout | null = null; - private isScrollTriggeredSnapshot = false; private networkRequestTimeout: NodeJS.Timeout | null = null; private pendingNetworkRequests: string[] = []; - private readonly NETWORK_QUIET_PERIOD = 8000; private readonly INITIAL_LOAD_QUIET_PERIOD = 3000; private networkWaitStartTime: number = 0; private progressInterval: NodeJS.Timeout | null = null; private hasShownInitialLoader: boolean = false; private isInitialLoadInProgress: boolean = false; + private memoryCleanupInterval: NodeJS.Timeout | null = null; + private memoryManagementInterval: NodeJS.Timeout | null = null; + /** * Initializes a new instances of the {@link Generator} and {@link WorkflowInterpreter} classes and * assigns the socket instance everywhere. @@ -220,126 +143,63 @@ export class RemoteBrowser { this.generator = new WorkflowGenerator(socket, poolId); } - private cleanupMemory(): void { - if (this.screenshotQueue.length > 10) { - this.screenshotQueue = this.screenshotQueue.slice(-3); // Keep only last 3 - } - } - - private setupMemoryCleanup(): void { - setInterval(() => { - this.cleanupMemory(); - }, 30000); // Every 30 seconds - } - private async processRRWebSnapshot( snapshot: RRWebSnapshot ): Promise { const baseUrl = this.currentPage?.url() || ""; - const resources = { - stylesheets: [] as Array<{ - href: string; - content: string; - media?: string; - }>, - images: [] as Array<{ src: string; dataUrl: string; alt?: string }>, - fonts: [] as Array<{ url: string; dataUrl: string; format?: string }>, - scripts: [] as Array<{ src: string; content: string; type?: string }>, - media: [] as Array<{ src: string; dataUrl: string; type: string }>, - }; - - const viewport = (await this.currentPage?.viewportSize()) || { - width: 1280, - height: 720, - }; - return { snapshot, - resources, - baseUrl, - viewport, - timestamp: Date.now(), - processingStats: { - discoveredResources: { - images: resources.images.length, - stylesheets: resources.stylesheets.length, - scripts: resources.scripts.length, - fonts: resources.fonts.length, - media: resources.media.length, - }, - cachedResources: { - stylesheets: resources.stylesheets.length, - images: resources.images.length, - fonts: resources.fonts.length, - scripts: resources.scripts.length, - media: resources.media.length, - }, - }, + baseUrl }; } private initializeMemoryManagement(): void { - setInterval(() => { - const memoryUsage = process.memoryUsage(); - const heapUsageRatio = memoryUsage.heapUsed / MEMORY_CONFIG.maxHeapSize; - - if (heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold * 1.2) { - logger.warn('Critical memory pressure detected, triggering emergency cleanup'); - this.performMemoryCleanup(); - } else if (heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold) { - logger.warn('High memory usage detected, triggering cleanup'); - - if (this.screenshotQueue.length > 0) { - this.screenshotQueue = []; - logger.info('Screenshot queue cleared due to memory pressure'); - } - - if (global.gc && heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold * 1.1) { - global.gc(); - } - } - - if (this.screenshotQueue.length > SCREENCAST_CONFIG.maxQueueSize) { - this.screenshotQueue = this.screenshotQueue.slice(-SCREENCAST_CONFIG.maxQueueSize); - } - }, MEMORY_CONFIG.gcInterval); + this.memoryManagementInterval = setInterval(() => { + const memoryUsage = process.memoryUsage(); + const heapUsageRatio = memoryUsage.heapUsed / MEMORY_CONFIG.maxHeapSize; + + if (heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold * 1.2) { + logger.warn( + "Critical memory pressure detected, triggering emergency cleanup" + ); + this.performMemoryCleanup(); + } else if (heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold) { + logger.warn("High memory usage detected, triggering cleanup"); + + if ( + global.gc && + heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold * 1.1 + ) { + global.gc(); + } + } + }, MEMORY_CONFIG.gcInterval); } private async performMemoryCleanup(): Promise { - this.screenshotQueue = []; - this.isProcessingScreenshot = false; - - if (global.gc) { - try { - global.gc(); - logger.info('Garbage collection requested'); - } catch (error) { - logger.error('Error during garbage collection:', error); - } + if (global.gc) { + try { + global.gc(); + logger.info("Garbage collection requested"); + } catch (error) { + logger.error("Error during garbage collection:", error); } - - if (this.client) { - try { - await this.stopScreencast(); - - await new Promise(resolve => setTimeout(resolve, 500)); - - this.client = null; - if (this.currentPage) { - this.client = await this.currentPage.context().newCDPSession(this.currentPage); - await this.startScreencast(); - logger.info('CDP session reset completed'); - } - } catch (error) { - logger.error('Error resetting CDP session:', error); - } + } + + if (this.currentPage) { + try { + await new Promise((resolve) => setTimeout(resolve, 500)); + logger.info("CDP session reset completed"); + } catch (error) { + logger.error("Error resetting CDP session:", error); } - - this.socket.emit('memory-cleanup', { - userId: this.userId, - timestamp: Date.now() - }); + } + + this.socket.emit("memory-cleanup", { + userId: this.userId, + timestamp: Date.now(), + }); } /** @@ -374,30 +234,25 @@ export class RemoteBrowser { * Setup scroll event listener to track user scrolling */ private setupScrollEventListener(): void { + try { + this.socket.removeAllListeners('dom:scroll'); + } catch (error: any) { + logger.warn(`Error removing old scroll listener: ${error.message}`); + } + this.socket.on( "dom:scroll", async (data: { deltaX: number; deltaY: number }) => { if (!this.isDOMStreamingActive || !this.currentPage) return; try { - logger.debug( - `Received scroll event: deltaX=${data.deltaX}, deltaY=${data.deltaY}` - ); - await this.currentPage.mouse.wheel(data.deltaX, data.deltaY); - await this.currentPage.waitForLoadState("networkidle", { timeout: 5000 }); const scrollInfo = await this.currentPage.evaluate(() => ({ x: window.scrollX, y: window.scrollY, - maxX: Math.max( - 0, - document.documentElement.scrollWidth - window.innerWidth - ), - maxY: Math.max( - 0, - document.documentElement.scrollHeight - window.innerHeight - ), + maxX: Math.max(0, document.documentElement.scrollWidth - window.innerWidth), + maxY: Math.max(0, document.documentElement.scrollHeight - window.innerHeight), documentHeight: document.documentElement.scrollHeight, viewportHeight: window.innerHeight, })); @@ -406,23 +261,14 @@ export class RemoteBrowser { Math.abs(scrollInfo.y - this.lastScrollPosition.y) + Math.abs(scrollInfo.x - this.lastScrollPosition.x); - logger.debug( - `Scroll delta: ${scrollDelta}, threshold: ${this.scrollThreshold}` - ); - if (scrollDelta > this.scrollThreshold) { this.lastScrollPosition = { x: scrollInfo.x, y: scrollInfo.y }; - this.isScrollTriggeredSnapshot = true; if (this.snapshotDebounceTimeout) { clearTimeout(this.snapshotDebounceTimeout); } this.snapshotDebounceTimeout = setTimeout(async () => { - logger.info( - `Triggering snapshot due to scroll. Position: ${scrollInfo.y}/${scrollInfo.maxY}` - ); - await this.makeAndEmitDOMSnapshot(); }, 300); } @@ -436,6 +282,15 @@ export class RemoteBrowser { private setupPageChangeListeners(): void { if (!this.currentPage) return; + try { + if (!this.currentPage.isClosed()) { + this.currentPage.removeAllListeners("domcontentloaded"); + this.currentPage.removeAllListeners("response"); + } + } catch (error: any) { + logger.warn(`Error removing page change listeners: ${error.message}`); + } + this.currentPage.on("domcontentloaded", async () => { if (!this.isInitialLoadInProgress) { logger.info("DOM content loaded - triggering snapshot"); @@ -463,6 +318,7 @@ export class RemoteBrowser { } this.networkWaitStartTime = Date.now(); + this.progressInterval = setInterval(() => { const elapsed = Date.now() - this.networkWaitStartTime; const navigationProgress = Math.min((elapsed / this.INITIAL_LOAD_QUIET_PERIOD) * 40, 35); @@ -506,6 +362,14 @@ export class RemoteBrowser { } private async setupPageEventListeners(page: Page) { + try { + page.removeAllListeners('framenavigated'); + page.removeAllListeners('load'); + logger.debug('Removed existing page event listeners before re-registering'); + } catch (error: any) { + logger.warn(`Error removing existing page listeners: ${error.message}`); + } + page.on('framenavigated', async (frame) => { if (frame === page.mainFrame()) { const currentUrl = page.url(); @@ -522,6 +386,11 @@ export class RemoteBrowser { try { await page.waitForLoadState('networkidle', { timeout: 5000 }); + if (page.isClosed()) { + logger.debug('Page is closed, cannot inject script'); + return false; + } + await page.evaluate(getInjectableScript()); return true; } catch (error: any) { @@ -548,6 +417,28 @@ export class RemoteBrowser { return userAgents[Math.floor(Math.random() * userAgents.length)]; } + /** + * Apply modern fingerprint-suite injection + */ + private async applyEnhancedFingerprinting(context: BrowserContext): Promise { + try { + try { + const fingerprintGenerator = new FingerprintGenerator(); + const fingerprint = fingerprintGenerator.getFingerprint(); + const fingerprintInjector = new FingerprintInjector(); + + await fingerprintInjector.attachFingerprintToPlaywright(context as any, fingerprint); + + logger.info("Enhanced fingerprinting applied successfully"); + } catch (fingerprintError: any) { + logger.warn(`Modern fingerprint injection failed: ${fingerprintError.message}. Using existing protection.`); + } + } catch (error: any) { + logger.error(`Enhanced fingerprinting failed: ${error.message}`); + // Don't throw - fallback to basic functionality + } + } + /** * An asynchronous constructor for asynchronously initialized properties. * Must be called right after creating an instance of RemoteBrowser class. @@ -556,6 +447,7 @@ export class RemoteBrowser { */ public initialize = async (userId: string): Promise => { const MAX_RETRIES = 3; + const OVERALL_INIT_TIMEOUT = 120000; let retryCount = 0; let success = false; @@ -565,715 +457,160 @@ export class RemoteBrowser { }); this.emitLoadingProgress(0, 0); - while (!success && retryCount < MAX_RETRIES) { + const initializationPromise = (async () => { + while (!success && retryCount < MAX_RETRIES) { try { - this.browser = (await chromium.launch({ - headless: true, - args: [ - "--disable-blink-features=AutomationControlled", - "--disable-web-security", - "--disable-features=IsolateOrigins,site-per-process", - "--disable-site-isolation-trials", - "--disable-extensions", - "--no-sandbox", - "--disable-dev-shm-usage", - "--disable-gpu", - "--force-color-profile=srgb", - "--force-device-scale-factor=2", - "--ignore-certificate-errors", - "--mute-audio" - ], - })); - - if (!this.browser || this.browser.isConnected() === false) { - throw new Error('Browser failed to launch or is not connected'); - } + this.browser = (await chromium.launch({ + headless: true, + args: [ + "--disable-blink-features=AutomationControlled", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-site-isolation-trials", + "--disable-extensions", + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-gpu", + "--force-color-profile=srgb", + "--force-device-scale-factor=2", + "--ignore-certificate-errors", + "--mute-audio" + ], + })); + + if (!this.browser || this.browser.isConnected() === false) { + throw new Error('Browser failed to launch or is not connected'); + } - this.emitLoadingProgress(20, 0); - - const proxyConfig = await getDecryptedProxyConfig(userId); - let proxyOptions: { server: string, username?: string, password?: string } = { server: '' }; - - if (proxyConfig.proxy_url) { - proxyOptions = { - server: proxyConfig.proxy_url, - ...(proxyConfig.proxy_username && proxyConfig.proxy_password && { - username: proxyConfig.proxy_username, - password: proxyConfig.proxy_password, - }), - }; - } - - const contextOptions: any = { - // viewport: { height: 400, width: 900 }, - // recordVideo: { dir: 'videos/' } - // Force reduced motion to prevent animation issues - reducedMotion: 'reduce', - // Force JavaScript to be enabled - javaScriptEnabled: true, - // Set a reasonable timeout - timeout: 50000, - // Disable hardware acceleration - forcedColors: 'none', - isMobile: false, - hasTouch: false, - userAgent: this.getUserAgent(), + this.emitLoadingProgress(20, 0); + + const proxyConfig = await getDecryptedProxyConfig(userId); + let proxyOptions: { server: string, username?: string, password?: string } = { server: '' }; + + if (proxyConfig.proxy_url) { + proxyOptions = { + server: proxyConfig.proxy_url, + ...(proxyConfig.proxy_username && proxyConfig.proxy_password && { + username: proxyConfig.proxy_username, + password: proxyConfig.proxy_password, + }), }; - - if (proxyOptions.server) { - contextOptions.proxy = { - server: proxyOptions.server, - username: proxyOptions.username ? proxyOptions.username : undefined, - password: proxyOptions.password ? proxyOptions.password : undefined, - }; - } - - await new Promise(resolve => setTimeout(resolve, 500)); - - const contextPromise = this.browser.newContext(contextOptions); - this.context = await Promise.race([ - contextPromise, - new Promise((_, reject) => { - setTimeout(() => reject(new Error('Context creation timed out after 15s')), 15000); - }) - ]) as BrowserContext; - - await this.context.addInitScript( - `const defaultGetter = Object.getOwnPropertyDescriptor( - Navigator.prototype, - "webdriver" - ).get; - defaultGetter.apply(navigator); - defaultGetter.toString(); - Object.defineProperty(Navigator.prototype, "webdriver", { - set: undefined, - enumerable: true, - configurable: true, - get: new Proxy(defaultGetter, { - apply: (target, thisArg, args) => { - Reflect.apply(target, thisArg, args); - return false; - }, - }), - }); - const patchedGetter = Object.getOwnPropertyDescriptor( - Navigator.prototype, - "webdriver" - ).get; - patchedGetter.apply(navigator); - patchedGetter.toString();` - ); + } - await this.context.addInitScript({ path: './server/src/browser-management/classes/rrweb-bundle.js' }); - - this.currentPage = await this.context.newPage(); - - this.emitLoadingProgress(40, 0); - - await this.setupPageEventListeners(this.currentPage); - - const viewportSize = await this.currentPage.viewportSize(); - if (viewportSize) { - this.socket.emit('viewportInfo', { - width: viewportSize.width, - height: viewportSize.height, - userId: this.userId - }); - } - - try { - const blocker = await PlaywrightBlocker.fromLists(fetch, ['https://easylist.to/easylist/easylist.txt']); - await blocker.enableBlockingInPage(this.currentPage); - this.client = await this.currentPage.context().newCDPSession(this.currentPage); - await blocker.disableBlockingInPage(this.currentPage); - console.log('Adblocker initialized'); - } catch (error: any) { - console.warn('Failed to initialize adblocker, continuing without it:', error.message); - // Still need to set up the CDP session even if blocker fails - this.client = await this.currentPage.context().newCDPSession(this.currentPage); - } + const contextOptions: any = { + // viewport: { height: 400, width: 900 }, + // recordVideo: { dir: 'videos/' } + // Force reduced motion to prevent animation issues + reducedMotion: 'reduce', + // Force JavaScript to be enabled + javaScriptEnabled: true, + // Set a reasonable timeout + timeout: 50000, + // Disable hardware acceleration + forcedColors: 'none', + isMobile: false, + hasTouch: false, + userAgent: this.getUserAgent(), + }; - this.emitLoadingProgress(60, 0); + if (proxyOptions.server) { + contextOptions.proxy = { + server: proxyOptions.server, + username: proxyOptions.username ? proxyOptions.username : undefined, + password: proxyOptions.password ? proxyOptions.password : undefined, + }; + } - success = true; - logger.log('debug', `Browser initialized successfully for user ${userId}`); - } catch (error: any) { - retryCount++; - logger.log('error', `Browser initialization failed (attempt ${retryCount}/${MAX_RETRIES}): ${error.message}`); - - if (this.browser) { - try { - await this.browser.close(); - } catch (closeError) { - logger.log('warn', `Failed to close browser during cleanup: ${closeError}`); - } - this.browser = null; - } - - if (retryCount >= MAX_RETRIES) { - throw new Error(`Failed to initialize browser after ${MAX_RETRIES} attempts: ${error.message}`); - } - - await new Promise(resolve => setTimeout(resolve, 1000)); - } - } + await new Promise(resolve => setTimeout(resolve, 500)); - this.setupMemoryCleanup(); - // this.initializeMemoryManagement(); - }; + const contextPromise = this.browser.newContext(contextOptions); + this.context = await Promise.race([ + contextPromise, + new Promise((_, reject) => { + setTimeout(() => reject(new Error('Context creation timed out after 15s')), 15000); + }) + ]) as BrowserContext; + + await this.applyEnhancedFingerprinting(this.context); + + await this.context.addInitScript( + `const defaultGetter = Object.getOwnPropertyDescriptor( + Navigator.prototype, + "webdriver" + ).get; + defaultGetter.apply(navigator); + defaultGetter.toString(); + Object.defineProperty(Navigator.prototype, "webdriver", { + set: undefined, + enumerable: true, + configurable: true, + get: new Proxy(defaultGetter, { + apply: (target, thisArg, args) => { + Reflect.apply(target, thisArg, args); + return false; + }, + }), + }); + const patchedGetter = Object.getOwnPropertyDescriptor( + Navigator.prototype, + "webdriver" + ).get; + patchedGetter.apply(navigator); + patchedGetter.toString();` + ); + + await this.context.addInitScript({ path: './server/src/browser-management/classes/rrweb-bundle.js' }); + + this.currentPage = await this.context.newPage(); + + this.emitLoadingProgress(40, 0); + + await this.setupPageEventListeners(this.currentPage); + + try { + const blocker = await PlaywrightBlocker.fromLists(fetch, ['https://easylist.to/easylist/easylist.txt']); + await blocker.enableBlockingInPage(this.currentPage); + this.client = await this.currentPage.context().newCDPSession(this.currentPage); + await blocker.disableBlockingInPage(this.currentPage); + console.log('Adblocker initialized'); + } catch (error: any) { + console.warn('Failed to initialize adblocker, continuing without it:', error.message); + // Still need to set up the CDP session even if blocker fails + this.client = await this.currentPage.context().newCDPSession(this.currentPage); + } - public updateViewportInfo = async (): Promise => { - if (this.currentPage) { - const viewportSize = await this.currentPage.viewportSize(); - if (viewportSize) { - this.socket.emit('viewportInfo', { - width: viewportSize.width, - height: viewportSize.height, - userId: this.userId - }); - } - } - }; + this.emitLoadingProgress(60, 0); - /** - * Extract data from a list of elements on a page - * @param page - Playwright Page object - * @param listSelector - CSS selector for the list container - * @param fields - Record of field configurations - * @param limit - Maximum number of items to extract (default: 5) - * @returns Promise>> - Array of extracted data objects - */ - private async extractListData( - page: Page, - listSelector: string, - fields: Record, - limit: number = 5 - ): Promise>> { - if (page.isClosed()) { - logger.warn("Page is closed, cannot extract list data"); - return []; - } + success = true; + logger.log('debug', `Browser initialized successfully for user ${userId}`); + } catch (error: any) { + retryCount++; + logger.log('error', `Browser initialization failed (attempt ${retryCount}/${MAX_RETRIES}): ${error.message}`); - return await page.evaluate( - async ({ listSelector, fields, limit }: { - listSelector: string; - fields: Record; - limit: number; - }) => { - const convertedFields: Record = {}; - - for (const [key, field] of Object.entries(fields)) { - convertedFields[field.label] = { - selector: field.selectorObj.selector, - attribute: field.selectorObj.attribute - }; - } - - const queryElement = (rootElement: Element | Document, selector: string): Element | null => { - if (!selector.includes('>>') && !selector.includes(':>>')) { - return rootElement.querySelector(selector); - } - - const parts = selector.split(/(?:>>|:>>)/).map(part => part.trim()); - let currentElement: Element | Document | null = rootElement; - - for (let i = 0; i < parts.length; i++) { - if (!currentElement) return null; - - if ((currentElement as Element).tagName === 'IFRAME' || (currentElement as Element).tagName === 'FRAME') { - try { - const frameElement = currentElement as HTMLIFrameElement | HTMLFrameElement; - const frameDoc = frameElement.contentDocument || frameElement.contentWindow?.document; - if (!frameDoc) return null; - currentElement = frameDoc.querySelector(parts[i]); - continue; - } catch (e) { - console.warn(`Cannot access ${(currentElement as Element).tagName.toLowerCase()} content:`, e); - return null; - } - } - - let nextElement: Element | null = null; - - if ('querySelector' in currentElement) { - nextElement = currentElement.querySelector(parts[i]); - } - - if (!nextElement && 'shadowRoot' in currentElement && (currentElement as Element).shadowRoot) { - nextElement = (currentElement as Element).shadowRoot!.querySelector(parts[i]); - } - - if (!nextElement && 'children' in currentElement) { - const children: any = Array.from((currentElement as Element).children || []); - for (const child of children) { - if (child.shadowRoot) { - nextElement = child.shadowRoot.querySelector(parts[i]); - if (nextElement) break; - } - } - } - - currentElement = nextElement; - } - - return currentElement as Element | null; - }; - - const queryElementAll = (rootElement: Element | Document, selector: string): Element[] => { - if (!selector.includes('>>') && !selector.includes(':>>')) { - return Array.from(rootElement.querySelectorAll(selector)); - } - - const parts = selector.split(/(?:>>|:>>)/).map(part => part.trim()); - let currentElements: (Element | Document)[] = [rootElement]; - - for (const part of parts) { - const nextElements: Element[] = []; - - for (const element of currentElements) { - if ((element as Element).tagName === 'IFRAME' || (element as Element).tagName === 'FRAME') { - try { - const frameElement = element as HTMLIFrameElement | HTMLFrameElement; - const frameDoc = frameElement.contentDocument || frameElement.contentWindow?.document; - if (frameDoc) { - nextElements.push(...Array.from(frameDoc.querySelectorAll(part))); - } - } catch (e) { - console.warn(`Cannot access ${(element as Element).tagName.toLowerCase()} content:`, e); - continue; - } - } else { - if ('querySelectorAll' in element) { - nextElements.push(...Array.from(element.querySelectorAll(part))); - } - - if ('shadowRoot' in element && (element as Element).shadowRoot) { - nextElements.push(...Array.from((element as Element).shadowRoot!.querySelectorAll(part))); - } - - if ('children' in element) { - const children = Array.from((element as Element).children || []); - for (const child of children) { - if (child.shadowRoot) { - nextElements.push(...Array.from(child.shadowRoot.querySelectorAll(part))); - } - } - } - } - } - - currentElements = nextElements; - } - - return currentElements as Element[]; - }; - - function extractValue(element: Element, attribute: string): string | null { - if (!element) return null; - - const baseURL = element.ownerDocument?.location?.href || window.location.origin; - - if (element.shadowRoot) { - const shadowContent = element.shadowRoot.textContent; - if (shadowContent?.trim()) { - return shadowContent.trim(); - } - } - - if (attribute === 'innerText') { - return (element as HTMLElement).innerText.trim(); - } else if (attribute === 'innerHTML') { - return element.innerHTML.trim(); - } else if (attribute === 'src' || attribute === 'href') { - if (attribute === 'href' && element.tagName !== 'A') { - const parentElement = element.parentElement; - if (parentElement && parentElement.tagName === 'A') { - const parentHref = parentElement.getAttribute('href'); - if (parentHref) { - try { - return new URL(parentHref, baseURL).href; - } catch (e) { - return parentHref; - } - } - } - } - - const attrValue = element.getAttribute(attribute); - const dataAttr = attrValue || element.getAttribute('data-' + attribute); - - if (!dataAttr || dataAttr.trim() === '') { - if (attribute === 'src') { - const style = window.getComputedStyle(element); - const bgImage = style.backgroundImage; - if (bgImage && bgImage !== 'none') { - const matches = bgImage.match(/url\(['"]?([^'")]+)['"]?\)/); - return matches ? new URL(matches[1], baseURL).href : null; - } - } - return null; - } - + if (this.browser) { try { - return new URL(dataAttr, baseURL).href; - } catch (e) { - console.warn('Error creating URL from', dataAttr, e); - return dataAttr; // Return the original value if URL construction fails + await this.browser.close(); + } catch (closeError) { + logger.log('warn', `Failed to close browser during cleanup: ${closeError}`); } + this.browser = null; } - return element.getAttribute(attribute); - } - - function findTableAncestor(element: Element): { type: string; element: Element } | null { - let currentElement: Element | null = element; - const MAX_DEPTH = 5; - let depth = 0; - - while (currentElement && depth < MAX_DEPTH) { - if (currentElement.getRootNode() instanceof ShadowRoot) { - currentElement = (currentElement.getRootNode() as ShadowRoot).host; - continue; - } - - if (currentElement.tagName === 'TD') { - return { type: 'TD', element: currentElement }; - } else if (currentElement.tagName === 'TR') { - return { type: 'TR', element: currentElement }; - } - - if (currentElement.tagName === 'IFRAME' || currentElement.tagName === 'FRAME') { - try { - const frameElement = currentElement as HTMLIFrameElement | HTMLFrameElement; - currentElement = frameElement.contentDocument?.body || null; - } catch (e) { - return null; - } - } else { - currentElement = currentElement.parentElement; - } - depth++; - } - return null; - } - - function getCellIndex(td: Element): number { - if (td.getRootNode() instanceof ShadowRoot) { - const shadowRoot = td.getRootNode() as ShadowRoot; - const allCells = Array.from(shadowRoot.querySelectorAll('td')); - return allCells.indexOf(td as HTMLTableCellElement); - } - - let index = 0; - let sibling = td; - while (sibling = sibling.previousElementSibling as Element) { - index++; - } - return index; - } - - function hasThElement(row: Element, tableFields: Record): boolean { - for (const [_, { selector }] of Object.entries(tableFields)) { - const element = queryElement(row, selector); - if (element) { - let current: Element | ShadowRoot | Document | null = element; - while (current && current !== row) { - if (current.getRootNode() instanceof ShadowRoot) { - current = (current.getRootNode() as ShadowRoot).host; - continue; - } - - if ((current as Element).tagName === 'TH') return true; - - if ((current as Element).tagName === 'IFRAME' || (current as Element).tagName === 'FRAME') { - try { - const frameElement = current as HTMLIFrameElement | HTMLFrameElement; - current = frameElement.contentDocument?.body || null; - } catch (e) { - break; - } - } else { - current = (current as Element).parentElement; - } - } - } - } - return false; - } - - function filterRowsBasedOnTag(rows: Element[], tableFields: Record): Element[] { - for (const row of rows) { - if (hasThElement(row, tableFields)) { - return rows; - } - } - return rows.filter(row => { - const directTH = row.getElementsByTagName('TH').length === 0; - const shadowTH = row.shadowRoot ? - row.shadowRoot.querySelector('th') === null : true; - return directTH && shadowTH; - }); - } - - function calculateClassSimilarity(classList1: string[], classList2: string[]): number { - const set1 = new Set(classList1); - const set2 = new Set(classList2); - const intersection = new Set([...set1].filter(x => set2.has(x))); - const union = new Set([...set1, ...set2]); - return intersection.size / union.size; - } - - function findSimilarElements(baseElement: Element, similarityThreshold: number = 0.7): Element[] { - const baseClasses = Array.from(baseElement.classList); - if (baseClasses.length === 0) return []; - - const allElements: Element[] = []; - - allElements.push(...Array.from(document.getElementsByTagName(baseElement.tagName))); - - if (baseElement.getRootNode() instanceof ShadowRoot) { - const shadowHost = (baseElement.getRootNode() as ShadowRoot).host; - allElements.push(...Array.from(shadowHost.getElementsByTagName(baseElement.tagName))); - } - - const frames = [ - ...Array.from(document.getElementsByTagName('iframe')), - ...Array.from(document.getElementsByTagName('frame')) - ]; - - for (const frame of frames) { - try { - const frameElement = frame as HTMLIFrameElement | HTMLFrameElement; - const frameDoc = frameElement.contentDocument || frameElement.contentWindow?.document; - if (frameDoc) { - allElements.push(...Array.from(frameDoc.getElementsByTagName(baseElement.tagName))); - } - } catch (e) { - console.warn(`Cannot access ${frame.tagName.toLowerCase()} content:`, e); - } - } - - return allElements.filter(element => { - if (element === baseElement) return false; - const similarity = calculateClassSimilarity( - baseClasses, - Array.from(element.classList) - ); - return similarity >= similarityThreshold; - }); - } - - let containers = queryElementAll(document, listSelector); - - if (containers.length === 0) return []; - - if (limit > 1 && containers.length === 1) { - const baseContainer = containers[0]; - const similarContainers = findSimilarElements(baseContainer); - - if (similarContainers.length > 0) { - const newContainers = similarContainers.filter(container => - !container.matches(listSelector) - ); - containers = [...containers, ...newContainers]; - } - } - - const containerFields = containers.map(() => ({ - tableFields: {} as Record, - nonTableFields: {} as Record - })); - - containers.forEach((container, containerIndex) => { - for (const [label, field] of Object.entries(convertedFields)) { - const sampleElement = queryElement(container, field.selector); - - if (sampleElement) { - const ancestor = findTableAncestor(sampleElement); - if (ancestor) { - containerFields[containerIndex].tableFields[label] = { - ...field, - tableContext: ancestor.type, - cellIndex: ancestor.type === 'TD' ? getCellIndex(ancestor.element) : -1 - }; - } else { - containerFields[containerIndex].nonTableFields[label] = field; - } - } else { - containerFields[containerIndex].nonTableFields[label] = field; - } - } - }); - - const tableData: Array> = []; - const nonTableData: Array> = []; - - for (let containerIndex = 0; containerIndex < containers.length; containerIndex++) { - const container = containers[containerIndex]; - const { tableFields } = containerFields[containerIndex]; - - if (Object.keys(tableFields).length > 0) { - const firstField = Object.values(tableFields)[0]; - const firstElement = queryElement(container, firstField.selector); - let tableContext: Element | null = firstElement; - - while (tableContext && tableContext.tagName !== 'TABLE' && tableContext !== container) { - if (tableContext.getRootNode() instanceof ShadowRoot) { - tableContext = (tableContext.getRootNode() as ShadowRoot).host; - continue; - } - - if (tableContext.tagName === 'IFRAME' || tableContext.tagName === 'FRAME') { - try { - const frameElement = tableContext as HTMLIFrameElement | HTMLFrameElement; - tableContext = frameElement.contentDocument?.body || null; - } catch (e) { - break; - } - } else { - tableContext = tableContext.parentElement; - } - } - - if (tableContext) { - const rows: Element[] = []; - - rows.push(...Array.from(tableContext.getElementsByTagName('TR'))); - - if (tableContext.tagName === 'IFRAME' || tableContext.tagName === 'FRAME') { - try { - const frameElement = tableContext as HTMLIFrameElement | HTMLFrameElement; - const frameDoc = frameElement.contentDocument || frameElement.contentWindow?.document; - if (frameDoc) { - rows.push(...Array.from(frameDoc.getElementsByTagName('TR'))); - } - } catch (e) { - console.warn(`Cannot access ${tableContext.tagName.toLowerCase()} rows:`, e); - } - } - - const processedRows = filterRowsBasedOnTag(rows, tableFields); - - for (let rowIndex = 0; rowIndex < Math.min(processedRows.length, limit); rowIndex++) { - const record: Record = {}; - const currentRow = processedRows[rowIndex]; - - for (const [label, { selector, attribute, cellIndex }] of Object.entries(tableFields)) { - let element: Element | null = null; - - if (cellIndex !== undefined && cellIndex >= 0) { - let td: Element | null = currentRow.children[cellIndex] || null; - - if (!td && currentRow.shadowRoot) { - const shadowCells = currentRow.shadowRoot.children; - if (shadowCells && shadowCells.length > cellIndex) { - td = shadowCells[cellIndex]; - } - } - - if (td) { - element = queryElement(td, selector); - - if (!element && selector.split(/(?:>>|:>>)/).pop()?.includes('td:nth-child')) { - element = td; - } - - if (!element) { - const tagOnlySelector = selector.split('.')[0]; - element = queryElement(td, tagOnlySelector); - } - - if (!element) { - let currentElement: Element | null = td; - while (currentElement && currentElement.children.length > 0) { - let foundContentChild = false; - for (const child of Array.from(currentElement.children)) { - if (extractValue(child, attribute)) { - currentElement = child; - foundContentChild = true; - break; - } - } - if (!foundContentChild) break; - } - element = currentElement; - } - } - } else { - element = queryElement(currentRow, selector); - } - - if (element) { - const value = extractValue(element, attribute); - if (value !== null) { - record[label] = value; - } - } - } - - if (Object.keys(record).length > 0) { - tableData.push(record); - } - } - } + + if (retryCount >= MAX_RETRIES) { + throw new Error(`Failed to initialize browser after ${MAX_RETRIES} attempts: ${error.message}`); } + + await new Promise(resolve => setTimeout(resolve, 1000)); } - - for (let containerIndex = 0; containerIndex < containers.length; containerIndex++) { - if (nonTableData.length >= limit) break; - - const container = containers[containerIndex]; - const { nonTableFields } = containerFields[containerIndex]; - - if (Object.keys(nonTableFields).length > 0) { - const record: Record = {}; - - for (const [label, { selector, attribute }] of Object.entries(nonTableFields)) { - const relativeSelector = selector.split(/(?:>>|:>>)/).slice(-1)[0]; - const element = queryElement(container, relativeSelector); - - if (element) { - const value = extractValue(element, attribute); - if (value !== null) { - record[label] = value; - } - } - } - - if (Object.keys(record).length > 0) { - nonTableData.push(record); - } - } - } - - const scrapedData = [...tableData, ...nonTableData].slice(0, limit); - return scrapedData; - }, - { listSelector, fields, limit } - ) as Array>; - } + } + })(); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Browser initialization timed out after ${OVERALL_INIT_TIMEOUT}ms`)), OVERALL_INIT_TIMEOUT); + }); + + await Promise.race([initializationPromise, timeoutPromise]); + }; /** * Captures a screenshot directly without running the workflow interpreter @@ -1332,133 +669,51 @@ export class RemoteBrowser { } }; + /** + * Removes all socket event listeners + */ + private removeAllSocketListeners(): void { + try { + this.socket.removeAllListeners('captureDirectScreenshot'); + this.socket.removeAllListeners('rerender'); + this.socket.removeAllListeners('settings'); + this.socket.removeAllListeners('changeTab'); + this.socket.removeAllListeners('addTab'); + this.socket.removeAllListeners('closeTab'); + this.socket.removeAllListeners('dom:scroll'); + + logger.debug(`Removed all socket listeners for user ${this.userId}`); + } catch (error: any) { + logger.warn(`Error removing socket listeners: ${error.message}`); + } + } + /** * Registers all event listeners needed for the recording editor session. * Should be called only once after the full initialization of the remote browser. * @returns void */ public registerEditorEvents = (): void => { - // For each event, include userId to make sure events are handled for the correct browser logger.log("debug", `Registering editor events for user: ${this.userId}`); - this.socket.on( - `captureDirectScreenshot:${this.userId}`, - async (settings) => { - logger.debug( - `Direct screenshot capture requested for user ${this.userId}` - ); - await this.captureDirectScreenshot(settings); - } - ); + this.removeAllSocketListeners(); - // For backward compatibility this.socket.on("captureDirectScreenshot", async (settings) => { await this.captureDirectScreenshot(settings); }); - // Listen for specific events for this user - this.socket.on(`rerender:${this.userId}`, async () => { - logger.debug(`Rerender event received for user ${this.userId}`); - if (this.renderingMode === "dom") { - await this.makeAndEmitDOMSnapshot(); - } else { - await this.makeAndEmitScreenshot(); - } - }); - this.socket.on("rerender", async () => { logger.debug( `General rerender event received, checking if for user ${this.userId}` ); - if (this.renderingMode === "dom") { - await this.makeAndEmitDOMSnapshot(); - } else { - await this.makeAndEmitScreenshot(); - } - }); - - this.socket.on(`settings:${this.userId}`, (settings) => { - this.interpreterSettings = settings; - logger.debug(`Settings updated for user ${this.userId}`); - }); - - this.socket.on(`changeTab:${this.userId}`, async (tabIndex) => { - logger.debug( - `Tab change to ${tabIndex} requested for user ${this.userId}` - ); - await this.changeTab(tabIndex); - }); - - this.socket.on(`addTab:${this.userId}`, async () => { - logger.debug(`New tab requested for user ${this.userId}`); - await this.currentPage?.context().newPage(); - const lastTabIndex = this.currentPage - ? this.currentPage.context().pages().length - 1 - : 0; - await this.changeTab(lastTabIndex); - }); - - this.socket.on(`closeTab:${this.userId}`, async (tabInfo) => { - logger.debug( - `Close tab ${tabInfo.index} requested for user ${this.userId}` - ); - const page = this.currentPage?.context().pages()[tabInfo.index]; - if (page) { - if (tabInfo.isCurrent) { - if (this.currentPage?.context().pages()[tabInfo.index + 1]) { - // next tab - await this.changeTab(tabInfo.index + 1); - } else { - //previous tab - await this.changeTab(tabInfo.index - 1); - } - } - await page.close(); - logger.log( - "debug", - `Tab ${tabInfo.index} was closed for user ${ - this.userId - }, new tab count: ${this.currentPage?.context().pages().length}` - ); - } else { - logger.log( - "error", - `Tab index ${tabInfo.index} out of range for user ${this.userId}` - ); - } + await this.makeAndEmitDOMSnapshot(); }); - this.socket.on( - `setViewportSize:${this.userId}`, - async (data: { width: number; height: number }) => { - const { width, height } = data; - logger.log( - "debug", - `Viewport size change to width=${width}, height=${height} requested for user ${this.userId}` - ); - - // Update the browser context's viewport dynamically - if (this.context && this.browser) { - this.context = await this.browser.newContext({ - viewport: { width, height }, - }); - logger.log( - "debug", - `Viewport size updated to width=${width}, height=${height} for user ${this.userId}` - ); - } - } - ); - - // For backward compatibility, also register the standard events - this.socket.on( - "settings", - (settings) => (this.interpreterSettings = settings) - ); this.socket.on( "changeTab", async (tabIndex) => await this.changeTab(tabIndex) ); + this.socket.on("addTab", async () => { await this.currentPage?.context().newPage(); const lastTabIndex = this.currentPage @@ -1466,6 +721,7 @@ export class RemoteBrowser { : 0; await this.changeTab(lastTabIndex); }); + this.socket.on("closeTab", async (tabInfo) => { const page = this.currentPage?.context().pages()[tabInfo.index]; if (page) { @@ -1479,78 +735,6 @@ export class RemoteBrowser { await page.close(); } }); - this.socket.on( - "setViewportSize", - async (data: { width: number; height: number }) => { - const { width, height } = data; - if (this.context && this.browser) { - this.context = await this.browser.newContext({ - viewport: { width, height }, - }); - } - } - ); - - this.socket.on( - "extractListData", - async (data: { - listSelector: string; - fields: Record; - currentListId: number; - pagination: any; - }) => { - if (this.currentPage) { - const extractedData = await this.extractListData( - this.currentPage, - data.listSelector, - data.fields - ); - - this.socket.emit("listDataExtracted", { - currentListId: data.currentListId, - data: extractedData, - }); - } - } - ); - }; - /** - * Subscribes the remote browser for a screencast session - * on [CDP](https://chromedevtools.github.io/devtools-protocol/) level, - * where screenshot is being sent through the socket - * every time the browser's active page updates. - * @returns {Promise} - */ - public subscribeToScreencast = async (): Promise => { - logger.log('debug', `Starting screencast for user: ${this.userId}`); - await this.startScreencast(); - if (!this.client) { - logger.log('warn', 'client is not initialized'); - return; - } - // Set flag to indicate screencast is active - this.isScreencastActive = true; - - await this.updateViewportInfo(); - - this.client.on('Page.screencastFrame', ({ data: base64, sessionId }) => { - // Only process if screencast is still active for this user - if (!this.isScreencastActive) { - return; - } - this.emitScreenshot(Buffer.from(base64, 'base64')) - setTimeout(async () => { - try { - if (!this.client || !this.isScreencastActive) { - logger.log('warn', 'client is not initialized'); - return; - } - await this.client.send('Page.screencastFrameAck', { sessionId: sessionId }); - } catch (e: any) { - logger.log('error', `Screencast error: ${e}`); - } - }, 100); - }); }; /** @@ -1563,10 +747,6 @@ export class RemoteBrowser { } try { - // Enable required CDP domains - await this.client.send("DOM.enable"); - await this.client.send("CSS.enable"); - this.isDOMStreamingActive = true; logger.info("DOM streaming started successfully"); @@ -1663,7 +843,7 @@ export class RemoteBrowser { error.message.includes("Target closed")) ) { logger.debug("DOM snapshot skipped due to page navigation or closure"); - return; // Don't emit error for navigation - this is expected + return; } logger.error("Failed to create rrweb snapshot:", error); @@ -1688,7 +868,7 @@ export class RemoteBrowser { } /** - * Stop DOM streaming - following screencast pattern + * Stop DOM streaming - following dom snapshot pattern */ private async stopDOM(): Promise { this.isDOMStreamingActive = false; @@ -1705,94 +885,147 @@ export class RemoteBrowser { this.pendingNetworkRequests = []; - if (this.client) { - try { - await this.client.send("DOM.disable"); - await this.client.send("CSS.disable"); - } catch (error) { - logger.warn("Error stopping DOM stream:", error); - } - } - logger.info("DOM streaming stopped successfully"); } /**rrweb-bundle - * Terminates the screencast session and closes the remote browser. + * Terminates the dom snapshot session and closes the remote browser. * If an interpretation was running it will be stopped. * @returns {Promise} */ public async switchOff(): Promise { - try { - this.isScreencastActive = false; - this.isDOMStreamingActive = false; + this.isDOMStreamingActive = false; - await this.interpreter.stopInterpretation(); + if (this.domUpdateInterval) { + clearInterval(this.domUpdateInterval); + this.domUpdateInterval = null; + } - if (this.screencastInterval) { - clearInterval(this.screencastInterval); - } + if (this.memoryCleanupInterval) { + clearInterval(this.memoryCleanupInterval); + this.memoryCleanupInterval = null; + } - if (this.domUpdateInterval) { - clearInterval(this.domUpdateInterval); - } + if (this.memoryManagementInterval) { + clearInterval(this.memoryManagementInterval); + this.memoryManagementInterval = null; + } - if (this.client) { - await this.stopScreencast(); - await this.stopDOM(); + if (this.progressInterval) { + clearInterval(this.progressInterval); + this.progressInterval = null; + } + + if (this.snapshotDebounceTimeout) { + clearTimeout(this.snapshotDebounceTimeout); + this.snapshotDebounceTimeout = null; + } + + if (this.networkRequestTimeout) { + clearTimeout(this.networkRequestTimeout); + this.networkRequestTimeout = null; + } + + this.removeAllSocketListeners(); + + try { + if (this.currentPage) { + const isClosed = this.currentPage.isClosed(); + if (!isClosed) { + this.currentPage.removeAllListeners(); + logger.debug('Removed all page event listeners'); + } else { + logger.debug('Page already closed, skipping listener removal'); + } } + } catch (error: any) { + logger.warn(`Error removing page listeners: ${error.message}`); + } - if (this.browser) { - await this.browser.close(); + // Clean up Generator listeners to prevent memory leaks + if (this.generator) { + try { + this.generator.cleanup(); + logger.debug('Generator cleanup completed'); + } catch (error: any) { + logger.warn(`Error cleaning up generator: ${error.message}`); } + } - this.screenshotQueue = []; - //this.performanceMonitor.reset(); + // Stop interpretation with individual error handling (also calls clearState which removes pausing listeners) + try { + await this.interpreter.stopInterpretation(); + } catch (error) { + logger.error("Error stopping interpretation during shutdown:", error); + } + // Stop DOM streaming with individual error handling + try { + await this.stopDOM(); } catch (error) { - logger.error('Error during browser shutdown:', error); + logger.error("Error stopping DOM during shutdown:", error); } - } - private async optimizeScreenshot(screenshot: Buffer): Promise { - try { - return await sharp(screenshot) - .png({ - quality: Math.round(SCREENCAST_CONFIG.compressionQuality * 100), - compressionLevel: 6, - adaptiveFiltering: true, - force: true - }) - .resize({ - width: SCREENCAST_CONFIG.maxWidth, - height: SCREENCAST_CONFIG.maxHeight, - fit: 'inside', - withoutEnlargement: true, - kernel: 'lanczos3' - }) - .toBuffer(); - } catch (error) { - logger.error('Screenshot optimization failed:', error); - return screenshot; + try { + if (this.client && this.currentPage && !this.currentPage.isClosed()) { + const detachPromise = this.client.detach(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('CDP detach timeout')), 5000) + ); + await Promise.race([detachPromise, timeoutPromise]); + logger.debug('CDP session detached successfully'); } - } - + } catch (error: any) { + logger.warn(`Error detaching CDP session: ${error.message}`); + } finally { + this.client = null; + } - /** - * Makes and emits a single screenshot to the client side. - * @returns {Promise} - */ - public makeAndEmitScreenshot = async (): Promise => { - try { - const screenshot = await this.currentPage?.screenshot(); - if (screenshot) { - this.emitScreenshot(screenshot); - } - } catch (e) { - const { message } = e as Error; - logger.log('error', `Screenshot error: ${message}`); + try { + if (this.currentPage && !this.currentPage.isClosed()) { + const closePromise = this.currentPage.close(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Page close timeout')), 5000) + ); + await Promise.race([closePromise, timeoutPromise]); + logger.debug('Current page closed successfully'); } - }; + } catch (error: any) { + logger.warn(`Error closing current page: ${error.message}`); + } finally { + this.currentPage = null; + } + + try { + if (this.context) { + const contextClosePromise = this.context.close(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Context close timeout')), 5000) + ); + await Promise.race([contextClosePromise, timeoutPromise]); + logger.debug('Browser context closed successfully'); + } + } catch (error: any) { + logger.warn(`Error closing browser context: ${error.message}`); + } finally { + this.context = null; + } + + try { + if (this.browser) { + const browserClosePromise = this.browser.close(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Browser close timeout')), 5000) + ); + await Promise.race([browserClosePromise, timeoutPromise]); + logger.debug('Browser closed successfully'); + } + } catch (error: any) { + logger.error("Error during browser close:", error); + } finally { + this.browser = null; + } + } /** * Updates the active socket instance. @@ -1806,6 +1039,10 @@ export class RemoteBrowser { this.registerEditorEvents(); this.generator?.updateSocket(socket); this.interpreter?.updateSocket(socket); + + if (this.isDOMStreamingActive) { + this.setupScrollEventListener(); + } }; /** @@ -1845,15 +1082,6 @@ export class RemoteBrowser { } }; - /** - * Stops the workflow interpretation and initializes a new page. - * @returns {Promise} - */ - public stopCurrentInterpretation = async (): Promise => { - await this.interpreter.stopInterpretation(); - await this.initializeNewPage(); - }; - /** * Returns the current page instance. * @returns {Page | null | undefined} @@ -1872,7 +1100,6 @@ export class RemoteBrowser { private changeTab = async (tabIndex: number): Promise => { const page = this.currentPage?.context().pages()[tabIndex]; if (page) { - await this.stopScreencast(); await this.stopDOM(); this.currentPage = page; @@ -1888,9 +1115,6 @@ export class RemoteBrowser { if (this.isDOMStreamingActive) { await this.makeAndEmitDOMSnapshot(); await this.subscribeToDOM(); - } else { - await this.makeAndEmitScreenshot(); - await this.subscribeToScreencast(); } } else { logger.log('error', `${tabIndex} index out of range of pages`) @@ -1903,7 +1127,6 @@ export class RemoteBrowser { * @returns {Promise} */ private initializeNewPage = async (options?: Object): Promise => { - await this.stopScreencast(); const newPage = options ? await this.browser?.newPage(options) : await this.browser?.newPage(); await newPage?.setExtraHTTPHeaders({ @@ -1915,161 +1138,9 @@ export class RemoteBrowser { if (this.currentPage) { await this.setupPageEventListeners(this.currentPage); - this.client = await this.currentPage.context().newCDPSession(this.currentPage); - if (this.renderingMode === "dom") { - await this.subscribeToDOM(); - } else { - await this.subscribeToScreencast(); - } + await this.subscribeToDOM(); } else { logger.log('error', 'Could not get a new page, returned undefined'); } }; - - /** - * Initiates screencast of the remote browser through socket, - * registers listener for rerender event and emits the loaded event. - * Should be called only once after the browser is fully initialized. - * @returns {Promise} - */ - private async startScreencast(): Promise { - if (!this.client) { - logger.warn('Client is not initialized'); - return; - } - - try { - await this.client.send('Page.startScreencast', { - format: SCREENCAST_CONFIG.format, - quality: Math.round(SCREENCAST_CONFIG.compressionQuality * 100), - maxWidth: SCREENCAST_CONFIG.maxWidth, - maxHeight: SCREENCAST_CONFIG.maxHeight, - everyNthFrame: 1 - }); - - this.isScreencastActive = true; - - this.client.on('Page.screencastFrame', async ({ data, sessionId }) => { - try { - if (this.screenshotQueue.length >= SCREENCAST_CONFIG.maxQueueSize && this.isProcessingScreenshot) { - await this.client?.send('Page.screencastFrameAck', { sessionId }); - return; - } - - const buffer = Buffer.from(data, 'base64'); - this.emitScreenshot(buffer); - - setTimeout(async () => { - try { - if (this.client) { - await this.client.send('Page.screencastFrameAck', { sessionId }); - } - } catch (e) { - logger.error('Error acknowledging screencast frame:', e); - } - }, 10); - } catch (error) { - logger.error('Screencast frame processing failed:', error); - - try { - await this.client?.send('Page.screencastFrameAck', { sessionId }); - } catch (ackError) { - logger.error('Failed to acknowledge screencast frame:', ackError); - } - } - }); - logger.info('Screencast started successfully'); - } catch (error) { - logger.error('Failed to start screencast:', error); - } - } - - private async stopScreencast(): Promise { - if (!this.client) { - logger.error('Client is not initialized'); - return; - } - - try { - // Set flag to indicate screencast is active - this.isScreencastActive = false; - await this.client.send('Page.stopScreencast'); - this.screenshotQueue = []; - this.isProcessingScreenshot = false; - logger.info('Screencast stopped successfully'); - } catch (error) { - logger.error('Failed to stop screencast:', error); - } - } - - - /** - * Helper for emitting the screenshot of browser's active page through websocket. - * @param payload the screenshot binary data - * @returns void - */ - private emitScreenshot = async (payload: Buffer, viewportSize?: { width: number, height: number }): Promise => { - if (this.screenshotQueue.length > SCREENCAST_CONFIG.maxQueueSize) { - this.screenshotQueue = this.screenshotQueue.slice(-1); - } - - if (this.isProcessingScreenshot) { - if (this.screenshotQueue.length < SCREENCAST_CONFIG.maxQueueSize) { - this.screenshotQueue.push(payload); - } - return; - } - - this.isProcessingScreenshot = true; - - try { - const optimizationPromise = this.optimizeScreenshot(payload); - const timeoutPromise = new Promise((resolve) => { - setTimeout(() => resolve(payload), 100); - }); - - const optimizedScreenshot = await Promise.race([optimizationPromise, timeoutPromise]); - const base64Data = optimizedScreenshot.toString('base64'); - const dataWithMimeType = `data:image/${SCREENCAST_CONFIG.format};base64,${base64Data}`; - - payload = null as any; - - setImmediate(async () => { - this.socket.emit('screencast', { - image: dataWithMimeType, - userId: this.userId, - viewport: viewportSize || await this.currentPage?.viewportSize() || null - }); - }); - } catch (error) { - logger.error('Screenshot emission failed:', error); - try { - const base64Data = payload.toString('base64'); - const dataWithMimeType = `data:image/png;base64,${base64Data}`; - - setImmediate(async () => { - this.socket.emit('screencast', { - image: dataWithMimeType, - userId: this.userId, - viewport: viewportSize || await this.currentPage?.viewportSize() || null - }); - }); - } catch (e) { - logger.error('Fallback screenshot emission also failed:', e); - } - } finally { - this.isProcessingScreenshot = false; - - if (this.screenshotQueue.length > 0) { - const nextScreenshot = this.screenshotQueue.shift(); - if (nextScreenshot) { - const delay = this.screenshotQueue.length > 0 ? 16 : 33; - setTimeout(() => { - this.emitScreenshot(nextScreenshot); - }, delay); - } - } - } - }; - } diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index a6db615e4..4058fa56a 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -31,22 +31,50 @@ export const initializeRemoteBrowserForRecording = (userId: string, mode: string if (activeId) { const remoteBrowser = browserPool.getRemoteBrowser(activeId); remoteBrowser?.updateSocket(socket); - await remoteBrowser?.makeAndEmitScreenshot(); + + if (remoteBrowser?.isDOMStreamingActive) { + remoteBrowser?.makeAndEmitDOMSnapshot(); + } } else { const browserSession = new RemoteBrowser(socket, userId, id); browserSession.interpreter.subscribeToPausing(); - await browserSession.initialize(userId); - await browserSession.registerEditorEvents(); + + try { + await browserSession.initialize(userId); + await browserSession.registerEditorEvents(); - if (mode === "dom") { await browserSession.subscribeToDOM(); - logger.info('DOM streaming started for scraping browser in recording mode'); - } else { - await browserSession.subscribeToScreencast(); - logger.info('Screenshot streaming started for local browser in recording mode'); + logger.info('DOM streaming started for remote browser in recording mode'); + + browserPool.addRemoteBrowser(id, browserSession, userId, false, "recording"); + } catch (initError: any) { + logger.error(`Failed to initialize browser for recording: ${initError.message}`); + logger.info('Sending browser failure notification to frontend'); + + socket.emit('dom-mode-error', { + userId: userId, + error: 'Failed to start the browser, please try again in some time.' + }); + + socket.emit('error', { + userId: userId, + message: 'Failed to start the browser, please try again in some time.', + details: initError.message + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + try { + await browserSession.switchOff(); + logger.debug('Cleaned up failed browser session'); + } catch (cleanupError: any) { + logger.warn(`Failed to cleanup browser session: ${cleanupError.message}`); + } + + logger.info('Browser initialization failed, user notified'); + + return id; } - - browserPool.addRemoteBrowser(id, browserSession, userId, false, "recording"); } socket.emit('loaded'); }); @@ -69,7 +97,7 @@ export const createRemoteBrowserForRun = (userId: string): string => { const id = uuid(); - const slotReserved = browserPool.reserveBrowserSlot(id, userId, "run"); + const slotReserved = browserPool.reserveBrowserSlotAtomic(id, userId, "run"); if (!slotReserved) { logger.log('warn', `Cannot create browser for user ${userId}: no available slots`); throw new Error('User has reached maximum browser limit'); @@ -94,45 +122,78 @@ export const createRemoteBrowserForRun = (userId: string): string => { * @category BrowserManagement-Controller */ export const destroyRemoteBrowser = async (id: string, userId: string): Promise => { - try { - const browserSession = browserPool.getRemoteBrowser(id); - if (!browserSession) { - logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`); - return true; - } - - logger.log('debug', `Switching off the browser with id: ${id}`); - - try { - await browserSession.stopCurrentInterpretation(); - } catch (stopError) { - logger.log('warn', `Error stopping interpretation for browser ${id}: ${stopError}`); - } - - try { - await browserSession.switchOff(); - } catch (switchOffError) { - logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`); - } + const DESTROY_TIMEOUT = 30000; + const destroyPromise = (async () => { try { - const namespace = io.of(id); - namespace.removeAllListeners(); - namespace.disconnectSockets(true); - logger.log('debug', `Cleaned up socket namespace for browser ${id}`); - } catch (namespaceCleanupError: any) { - logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); + const browserSession = browserPool.getRemoteBrowser(id); + if (!browserSession) { + logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`); + return true; + } + + logger.log('debug', `Switching off the browser with id: ${id}`); + + try { + await browserSession.switchOff(); + } catch (switchOffError) { + logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`); + } + + try { + const namespace = io.of(id); + + const sockets = await namespace.fetchSockets(); + for (const socket of sockets) { + socket.disconnect(true); + } + + namespace.removeAllListeners(); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const nsps = (io as any)._nsps; + if (nsps && nsps.has(`/${id}`)) { + const ns = nsps.get(`/${id}`); + if (ns && ns.sockets && ns.sockets.size === 0) { + nsps.delete(`/${id}`); + logger.log('debug', `Deleted empty namespace /${id} from io._nsps Map`); + } else { + logger.log('warn', `Namespace /${id} still has ${ns?.sockets?.size || 0} sockets, skipping manual deletion`); + } + } + + logger.log('debug', `Cleaned up socket namespace for browser ${id}`); + } catch (namespaceCleanupError: any) { + logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); + } + + return browserPool.deleteRemoteBrowser(id); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`); + + try { + return browserPool.deleteRemoteBrowser(id); + } catch (deleteError) { + logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`); + return false; + } } + })(); - return browserPool.deleteRemoteBrowser(id); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`); - + try { + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error(`Browser destruction timed out after ${DESTROY_TIMEOUT}ms`)), DESTROY_TIMEOUT) + ); + + return await Promise.race([destroyPromise, timeoutPromise]); + } catch (timeoutError: any) { + logger.log('error', `Browser ${id} destruction timeout: ${timeoutError.message} - force removing from pool`); try { return browserPool.deleteRemoteBrowser(id); } catch (deleteError) { - logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`); + logger.log('error', `Failed to force delete browser ${id} after timeout: ${deleteError}`); return false; } } @@ -229,8 +290,8 @@ export const interpretWholeWorkflow = async (userId: string) => { export const stopRunningInterpretation = async (userId: string) => { const id = getActiveBrowserIdByState(userId, "recording"); if (id) { - const browser = browserPool.getRemoteBrowser(id); - await browser?.stopCurrentInterpretation(); + const browserSession = browserPool.getRemoteBrowser(id); + await browserSession?.switchOff(); } else { logger.log('error', 'Cannot stop interpretation: No active browser or generator.'); } @@ -264,7 +325,31 @@ const initializeBrowserAsync = async (id: string, userId: string) => { browserPool.failBrowserSlot(id); }); - const socket = await waitForConnection; + const connectWithRetry = async (maxRetries: number = 3): Promise => { + let retryCount = 0; + + while (retryCount < maxRetries) { + try { + const socket = await waitForConnection; + if (socket || retryCount === maxRetries - 1) { + return socket; + } + } catch (error: any) { + logger.log('warn', `Connection attempt ${retryCount + 1} failed for browser ${id}: ${error.message}`); + } + + retryCount++; + if (retryCount < maxRetries) { + const delay = Math.pow(2, retryCount) * 1000; + logger.log('info', `Retrying connection for browser ${id} in ${delay}ms (attempt ${retryCount + 1}/${maxRetries})`); + await new Promise(resolve => setTimeout(resolve, delay)); + } + } + + return null; + }; + + const socket = await connectWithRetry(3); try { let browserSession: RemoteBrowser; @@ -288,9 +373,17 @@ const initializeBrowserAsync = async (id: string, userId: string) => { logger.log('debug', `Starting browser initialization for ${id}`); try { - await browserSession.initialize(userId); - logger.log('debug', `Browser initialization completed for ${id}`); + const BROWSER_INIT_TIMEOUT = 45000; + logger.log('info', `Browser initialization starting with ${BROWSER_INIT_TIMEOUT/1000}s timeout`); + + const initPromise = browserSession.initialize(userId); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error('Browser initialization timeout')), BROWSER_INIT_TIMEOUT); + }); + + await Promise.race([initPromise, timeoutPromise]); } catch (initError: any) { + logger.log('error', `Browser initialization failed for ${id}: ${initError.message}`); try { await browserSession.switchOff(); logger.log('info', `Cleaned up failed browser initialization for ${id}`); diff --git a/server/src/browser-management/inputHandlers.ts b/server/src/browser-management/inputHandlers.ts index 3e58664e1..ee31ba063 100644 --- a/server/src/browser-management/inputHandlers.ts +++ b/server/src/browser-management/inputHandlers.ts @@ -7,9 +7,7 @@ import { Socket } from 'socket.io'; import logger from "../logger"; import { Coordinates, ScrollDeltas, KeyboardInput, DatePickerEventData } from '../types'; import { browserPool } from "../server"; -import { WorkflowGenerator } from "../workflow-management/classes/Generator"; import { Page } from "playwright"; -import { throttle } from "../../../src/helpers/inputHelpers"; import { CustomActions } from "../../../src/shared/types"; import { WhereWhatPair } from "maxun-core"; import { RemoteBrowser } from './classes/RemoteBrowser'; @@ -899,4 +897,37 @@ const registerInputHandlers = (socket: Socket, userId: string) => { socket.on("dom:addpair", (data) => onDOMWorkflowPair(data, userId)); }; -export default registerInputHandlers; +/** + * Removes all input handler socket listeners to prevent memory leaks + * Must be called when socket disconnects or browser session ends + * @param socket websocket with established connection + * @returns void + * @category BrowserManagement + */ +const removeInputHandlers = (socket: Socket) => { + try { + socket.removeAllListeners("input:mousedown"); + socket.removeAllListeners("input:wheel"); + socket.removeAllListeners("input:mousemove"); + socket.removeAllListeners("input:keydown"); + socket.removeAllListeners("input:keyup"); + socket.removeAllListeners("input:url"); + socket.removeAllListeners("input:refresh"); + socket.removeAllListeners("input:back"); + socket.removeAllListeners("input:forward"); + socket.removeAllListeners("input:date"); + socket.removeAllListeners("input:dropdown"); + socket.removeAllListeners("input:time"); + socket.removeAllListeners("input:datetime-local"); + socket.removeAllListeners("action"); + socket.removeAllListeners("dom:input"); + socket.removeAllListeners("dom:click"); + socket.removeAllListeners("dom:keypress"); + socket.removeAllListeners("dom:addpair"); + socket.removeAllListeners("removeAction"); + } catch (error: any) { + console.warn(`Error removing input handlers: ${error.message}`); + } +}; + +export { registerInputHandlers, removeInputHandlers }; diff --git a/server/src/socket-connection/connection.ts b/server/src/socket-connection/connection.ts index 109e50cb6..88ad2c138 100644 --- a/server/src/socket-connection/connection.ts +++ b/server/src/socket-connection/connection.ts @@ -1,6 +1,6 @@ import { Namespace, Socket } from 'socket.io'; import logger from "../logger"; -import registerInputHandlers from '../browser-management/inputHandlers'; +import { registerInputHandlers, removeInputHandlers } from '../browser-management/inputHandlers'; /** * Opens a websocket canal for duplex data transfer and registers all handlers for this data for the recording session. @@ -17,7 +17,11 @@ export const createSocketConnection = ( const onConnection = async (socket: Socket) => { logger.log('info', "Client connected " + socket.id); registerInputHandlers(socket, userId); - socket.on('disconnect', () => logger.log('info', "Client disconnected " + socket.id)); + socket.on('disconnect', () => { + logger.log('info', "Client disconnected " + socket.id); + removeInputHandlers(socket); + logger.log('debug', "Input handlers cleaned up for socket " + socket.id); + }); callback(socket); } diff --git a/server/src/workflow-management/classes/Generator.ts b/server/src/workflow-management/classes/Generator.ts index 57a30863b..bb19b4659 100644 --- a/server/src/workflow-management/classes/Generator.ts +++ b/server/src/workflow-management/classes/Generator.ts @@ -71,6 +71,8 @@ export class WorkflowGenerator { private poolId: string | null = null; + private pageCloseListeners: Map void> = new Map(); + /** * The public constructor of the WorkflowGenerator. * Takes socket for communication as a parameter and registers some important events on it. @@ -884,6 +886,29 @@ export class WorkflowGenerator { } }; + /** + * Removes all socket listeners to prevent memory leaks + * Must be called before re-registering listeners or during cleanup + * @private + */ + private removeSocketListeners(): void { + try { + this.socket.removeAllListeners('setGetList'); + this.socket.removeAllListeners('listSelector'); + this.socket.removeAllListeners('setPaginationMode'); + this.socket.removeAllListeners('dom-mode-enabled'); + this.socket.removeAllListeners('screenshot-mode-enabled'); + this.socket.removeAllListeners('save'); + this.socket.removeAllListeners('new-recording'); + this.socket.removeAllListeners('activeIndex'); + this.socket.removeAllListeners('decision'); + this.socket.removeAllListeners('updatePair'); + logger.log('debug', 'Removed all Generator socket listeners'); + } catch (error: any) { + logger.warn(`Error removing Generator socket listeners: ${error.message}`); + } + } + /** * Removes an action with the given actionId from the workflow. * Only removes the specific action from the what array, not the entire pair. @@ -938,6 +963,39 @@ export class WorkflowGenerator { this.initializeDOMListeners(); }; + /** + * Cleanup method to release resources and prevent memory leaks + * Must be called when the generator is no longer needed + */ + public cleanup(): void { + try { + this.removeSocketListeners(); + + for (const [page, listener] of this.pageCloseListeners.entries()) { + try { + if (!page.isClosed()) { + page.removeListener('close', listener); + } + } catch (error: any) { + logger.warn(`Error removing page close listener: ${error.message}`); + } + } + this.pageCloseListeners.clear(); + + this.workflowRecord = { workflow: [] }; + this.generatedData = { + lastUsedSelector: '', + lastIndex: null, + lastAction: '', + lastUsedSelectorTagName: '', + lastUsedSelectorInnerText: '', + }; + logger.log('debug', 'Generator cleanup completed'); + } catch (error: any) { + logger.error(`Error during Generator cleanup: ${error.message}`); + } + } + /** * Returns the currently generated workflow without all the generated flag actions. * @param workflow The workflow for removing the generated flag actions from. From c01e2a882515809fcf2bb0525a6cd52063bf528d Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 15:06:03 +0530 Subject: [PATCH 14/17] fix: add persist timer, rm ssl --- .../browser-management/classes/BrowserPool.ts | 8 ---- server/src/schedule-worker.ts | 9 ++-- server/src/storage/pgboss.ts | 6 +-- .../classes/Interpreter.ts | 45 +++++++++++++++++-- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/server/src/browser-management/classes/BrowserPool.ts b/server/src/browser-management/classes/BrowserPool.ts index 286ef6812..2ddd01ede 100644 --- a/server/src/browser-management/classes/BrowserPool.ts +++ b/server/src/browser-management/classes/BrowserPool.ts @@ -645,14 +645,6 @@ export class BrowserPool { } }; - /** - * Legacy method - kept for backwards compatibility but now uses atomic version - * @deprecated Use reserveBrowserSlotAtomic instead - */ - public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => { - return this.reserveBrowserSlotAtomic(id, userId, state); - }; - /** * Upgrades a reserved slot to an actual browser instance. * diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts index da3f9dd45..63c7bad46 100644 --- a/server/src/schedule-worker.ts +++ b/server/src/schedule-worker.ts @@ -6,7 +6,6 @@ import logger from './logger'; import Robot from './models/Robot'; import { handleRunRecording } from './workflow-management/scheduler'; import { computeNextRun } from './utils/schedule'; -import { v4 as uuid } from "uuid"; if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { throw new Error('One or more required environment variables are missing.'); @@ -16,7 +15,7 @@ const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIC const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, - max: 5, + max: 3, expireInHours: 23, }); @@ -36,7 +35,7 @@ async function processScheduledWorkflow(job: Job) { try { // Execute the workflow using the existing handleRunRecording function - const result = await handleRunRecording(id, userId); + await handleRunRecording(id, userId); // Update the robot's schedule with last run and next run times const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); @@ -143,11 +142,11 @@ pgBoss.on('error', (error) => { process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...'); await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss scheduler stopped, ready for termination'); }); process.on('SIGINT', async () => { logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...'); await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss scheduler stopped, waiting for main process cleanup...'); }); diff --git a/server/src/storage/pgboss.ts b/server/src/storage/pgboss.ts index 9a6eedd1c..7e657ed13 100644 --- a/server/src/storage/pgboss.ts +++ b/server/src/storage/pgboss.ts @@ -26,11 +26,7 @@ const pgBossConnectionString = `postgres://${process.env.DB_USER}:${encodeURICom */ export const pgBossClient = new PgBoss({ connectionString: pgBossConnectionString, - max: 3, // Small pool since we only send jobs - ssl: { - require: true, - rejectUnauthorized: false, - }, + max: 3, }); let isStarted = false; diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 8a73a25fb..07f99dfee 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -143,8 +143,10 @@ export class WorkflowInterpreter { }> = []; private persistenceTimer: NodeJS.Timeout | null = null; + private persistenceRetryTimer: NodeJS.Timeout | null = null; private readonly BATCH_SIZE = 5; private readonly BATCH_TIMEOUT = 3000; + private readonly MAX_PERSISTENCE_RETRIES = 3; private persistenceInProgress = false; private persistenceRetryCount = 0; @@ -172,6 +174,23 @@ export class WorkflowInterpreter { this.currentRunId = runId || null; } + /** + * Removes pausing-related socket listeners to prevent memory leaks + * Must be called before re-registering listeners or during cleanup + * @private + */ + private removePausingListeners(): void { + try { + this.socket.removeAllListeners('pause'); + this.socket.removeAllListeners('resume'); + this.socket.removeAllListeners('step'); + this.socket.removeAllListeners('breakpoints'); + logger.log('debug', 'Removed pausing socket listeners'); + } catch (error: any) { + logger.warn(`Error removing pausing listeners: ${error.message}`); + } + } + /** * Subscribes to the events that are used to control the interpretation. * The events are pause, resume, step and breakpoints. @@ -179,6 +198,8 @@ export class WorkflowInterpreter { * @returns void */ public subscribeToPausing = () => { + this.removePausingListeners(); + this.socket.on('pause', () => { this.interpretationIsPaused = true; }); @@ -363,6 +384,11 @@ export class WorkflowInterpreter { this.persistenceTimer = null; } + if (this.persistenceRetryTimer) { + clearTimeout(this.persistenceRetryTimer); + this.persistenceRetryTimer = null; + } + if (this.interpreter) { try { if (!this.interpreter.getIsAborted()) { @@ -370,11 +396,18 @@ export class WorkflowInterpreter { } await this.interpreter.stop(); logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup'); + + if (typeof this.interpreter.cleanup === 'function') { + await this.interpreter.cleanup(); + logger.log('debug', 'mx-cloud interpreter cleanup completed'); + } } catch (error: any) { logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`); } } + this.removePausingListeners(); + this.debugMessages = []; this.interpretationIsPaused = false; this.activeId = null; @@ -815,16 +848,22 @@ export class WorkflowInterpreter { this.persistenceRetryCount = 0; } - if (this.persistenceRetryCount < 3) { + if (this.persistenceRetryCount < this.MAX_PERSISTENCE_RETRIES) { this.persistenceBuffer.unshift(...batchToProcess); this.persistenceRetryCount++; const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); - setTimeout(async () => { + + if (this.persistenceRetryTimer) { + clearTimeout(this.persistenceRetryTimer); + } + + this.persistenceRetryTimer = setTimeout(async () => { + this.persistenceRetryTimer = null; await this.flushPersistenceBuffer(); }, backoffDelay); - logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); + logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/${this.MAX_PERSISTENCE_RETRIES} in ${backoffDelay}ms`); } else { logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); this.persistenceRetryCount = 0; From 55a987285ff518c0a7fcfd3bf92012d87babde97 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 15:08:47 +0530 Subject: [PATCH 15/17] fix: add partial data recovery --- server/src/server.ts | 243 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 200 insertions(+), 43 deletions(-) diff --git a/server/src/server.ts b/server/src/server.ts index 4a33b67be..88dc74c55 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -8,7 +8,7 @@ dotenv.config(); import { record, workflow, storage, auth, integration, proxy, webhook } from './routes'; import { BrowserPool } from "./browser-management/classes/BrowserPool"; import logger from './logger'; -import { connectDB, syncDB } from './storage/db' +import sequelize, { connectDB, syncDB } from './storage/db' import cookieParser from 'cookie-parser'; import { SERVER_PORT } from "./constants/config"; import { readdirSync } from "fs" @@ -22,6 +22,7 @@ import session from 'express-session'; import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage'; import { startWorkers } from './pgboss-worker'; import { stopPgBossClient, startPgBossClient } from './storage/pgboss' +import Run from './models/Run'; const app = express(); app.use(cors({ @@ -38,7 +39,7 @@ const pool = new Pool({ password: process.env.DB_PASSWORD, port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined, max: 10, - min: 0, + min: 0, idleTimeoutMillis: 30000, connectionTimeoutMillis: 10000, maxUses: 7500, @@ -83,13 +84,22 @@ const server = http.createServer(app); /** * Globally exported singleton instance of socket.io for socket communication with the client. */ -export let io: Server; +export let io = new Server(server, { + cleanupEmptyChildNamespaces: true, + pingTimeout: 60000, + pingInterval: 25000, + maxHttpBufferSize: 1e8, + transports: ['websocket', 'polling'], + allowEIO3: true +}); /** * {@link BrowserPool} globally exported singleton instance for managing browsers. */ export const browserPool = new BrowserPool(); +export const recentRecoveries = new Map(); + app.use(cookieParser()) app.use('/webhook', webhook); @@ -139,30 +149,38 @@ app.use((req, res, next) => { }); if (require.main === module) { - setInterval(() => { - processQueuedRuns(); + const serverIntervals: NodeJS.Timeout[] = []; + + const processQueuedRunsInterval = setInterval(async () => { + try { + await processQueuedRuns(); + } catch (error: any) { + logger.log('error', `Error in processQueuedRuns interval: ${error.message}`); + } }, 5000); -} + serverIntervals.push(processQueuedRunsInterval); + + const browserPoolCleanupInterval = setInterval(() => { + browserPool.cleanupStaleBrowserSlots(); + }, 60000); + serverIntervals.push(browserPoolCleanupInterval); -if (require.main === module) { server.listen(SERVER_PORT, '0.0.0.0', async () => { try { await connectDB(); await syncDB(); - + logger.log('info', 'Cleaning up stale browser slots...'); browserPool.cleanupStaleBrowserSlots(); - // Recover orphaned runs from potential crashes await recoverOrphanedRuns(); - // Start pgBoss client for job submission + await startPgBossClient(); - // Start pgBoss workers AFTER recovery is complete await startWorkers(); - + io = new Server(server); - + io.of('/queued-run').on('connection', (socket) => { const userId = socket.handshake.query.userId as string; @@ -170,6 +188,15 @@ if (require.main === module) { socket.join(`user-${userId}`); logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`); + if (recentRecoveries.has(userId)) { + const recoveries = recentRecoveries.get(userId)!; + recoveries.forEach(recoveryData => { + socket.emit('run-recovered', recoveryData); + logger.log('info', `Sent stored recovery notification for run: ${recoveryData.runId} to user: ${userId}`); + }); + recentRecoveries.delete(userId); + } + socket.on('disconnect', () => { logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`); }); @@ -178,8 +205,9 @@ if (require.main === module) { socket.disconnect(); } }); - + if (!isProduction) { + // Development mode if (process.platform === 'win32') { workerProcess = fork(workerPath, [], { execArgv: ['--inspect=5859'], @@ -207,7 +235,7 @@ if (require.main === module) { console.log(`Recording worker exited with code: ${code}`); }); } else { - // Run in same process for non-Windows + // Run in same process for non-Windows development try { await import('./schedule-worker'); await import('./pgboss-worker'); @@ -216,48 +244,177 @@ if (require.main === module) { console.error('Failed to start workers in main process:', error); } } + } else { + // Production mode - run workers in same process for memory sharing + try { + await import('./schedule-worker'); + await import('./pgboss-worker'); + logger.log('info', 'Workers started in main process'); + } catch (error: any) { + logger.log('error', `Failed to start workers: ${error.message}`); + process.exit(1); + } } - - logger.log('info', `Server listening on port ${SERVER_PORT}`); + + logger.log('info', `Server listening on port ${SERVER_PORT}`); } catch (error: any) { logger.log('error', `Failed to connect to the database: ${error.message}`); process.exit(1); } }); -} -process.on('unhandledRejection', (reason, promise) => { - logger.log('error', `Unhandled promise rejection at: ${promise}, reason: ${reason}`); - console.error('Unhandled promise rejection:', reason); -}); + process.on('SIGINT', async () => { + console.log('Main app shutting down...'); + let shutdownSuccessful = true; -process.on('uncaughtException', (error) => { - logger.log('error', `Uncaught exception: ${error.message}`, { stack: error.stack }); - console.error('Uncaught exception:', error); + await new Promise(resolve => setTimeout(resolve, 2000)); - if (process.env.NODE_ENV === 'production') { - setTimeout(() => { - process.exit(1); - }, 5000); - } -}); + try { + const runningBrowsers = browserPool.getAllBrowsers(); -if (require.main === module) { - process.on('SIGINT', async () => { - console.log('Main app shutting down...'); + for (const [browserId, browser] of runningBrowsers) { + try { + if (browser && browser.interpreter) { + const hasData = (browser.interpreter.serializableDataByType?.scrapeSchema?.length > 0) || + (browser.interpreter.serializableDataByType?.scrapeList?.length > 0) || + (browser.interpreter.binaryData?.length > 0); + + if (hasData) { + const run = await Run.findOne({ where: { browserId, status: 'running' } }); + if (run) { + const limitedData = { + scrapeSchemaOutput: browser.interpreter.serializableDataByType?.scrapeSchema + ? { "schema-tabular": browser.interpreter.serializableDataByType.scrapeSchema } + : {}, + scrapeListOutput: browser.interpreter.serializableDataByType?.scrapeList || {}, + binaryOutput: browser.interpreter.binaryData || [] + }; + + const binaryOutputRecord = limitedData.binaryOutput.reduce((acc: Record, item: any, index: number) => { + acc[`item-${index}`] = item; + return acc; + }, {}); + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Process interrupted during execution - partial data preserved', + serializableOutput: { + scrapeSchema: Object.values(limitedData.scrapeSchemaOutput), + scrapeList: Object.values(limitedData.scrapeListOutput), + }, + binaryOutput: binaryOutputRecord + }); + } + } + } + } catch (browserError: any) { + shutdownSuccessful = false; + } + } + } catch (error: any) { + shutdownSuccessful = false; + } + + serverIntervals.forEach(clearInterval); + + try { + const allBrowsers = browserPool.getAllBrowsers(); + for (const [browserId, browser] of allBrowsers) { + try { + if (browser) { + await browser.switchOff(); + } + } catch (browserCleanupError: any) { + console.error(`Error shutting down browser ${browserId}:`, browserCleanupError.message); + } + } + } catch (error: any) { + console.error('Error during browser cleanup:', error.message); + } + + if (!isProduction) { + try { + if (workerProcess) { + workerProcess.kill('SIGTERM'); + } + if (recordingWorkerProcess) { + recordingWorkerProcess.kill('SIGTERM'); + } + } catch (workerError: any) { + console.error('Error terminating worker processes:', workerError.message); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + try { + await new Promise((resolve) => { + io.close(() => { + resolve(); + }); + }); + } catch (ioError: any) { + shutdownSuccessful = false; + } + + try { + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } catch (serverError: any) { + console.error('Error closing HTTP server:', serverError.message); + shutdownSuccessful = false; + } try { - console.log('Closing PostgreSQL connection pool...'); await pool.end(); - console.log('PostgreSQL connection pool closed'); - } catch (error) { - console.error('Error closing PostgreSQL connection pool:', error); + } catch (poolError: any) { + console.error('Error closing PostgreSQL connection pool:', poolError.message); + shutdownSuccessful = false; + } + + try { + await stopPgBossClient(); + } catch (pgBossError: any) { + console.error('Error closing PgBoss client connection:', pgBossError.message); + shutdownSuccessful = false; + } + + try { + await sequelize.close(); + } catch (sequelizeError: any) { + console.error('Error closing Sequelize connection:', sequelizeError.message); + shutdownSuccessful = false; } - if (!isProduction && process.platform === 'win32') { - if (workerProcess) workerProcess.kill(); - if (recordingWorkerProcess) recordingWorkerProcess.kill(); + console.log(`Shutdown ${shutdownSuccessful ? 'completed successfully' : 'completed with errors'}`); + process.exit(shutdownSuccessful ? 0 : 1); + }); + + process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled promise rejection:', reason); + + if (process.env.NODE_ENV === 'production') { + setTimeout(() => { + process.exit(1); + }, 1000); } - process.exit(); }); -} \ No newline at end of file + + process.on('uncaughtException', (error) => { + console.error('Uncaught exception:', error); + + if (process.env.NODE_ENV === 'production') { + setTimeout(() => { + process.exit(1); + }, 5000); + } + }); +} From 54ed1101f77210e60e33ca00a73b01047a55d80b Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 15:09:33 +0530 Subject: [PATCH 16/17] chore: add fingerprint-suite packages --- package.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/package.json b/package.json index 65a2f87ae..a7982fcda 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,8 @@ "dotenv": "^16.0.0", "express": "^4.17.2", "express-session": "^1.18.1", + "fingerprint-generator": "^2.1.77", + "fingerprint-injector": "^2.1.77", "fortawesome": "^0.0.1-security", "google-auth-library": "^9.14.1", "googleapis": "^144.0.0", From 926db0a66bde3b1e10c7c4237757ea552f2d8f1d Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 16:16:10 +0530 Subject: [PATCH 17/17] feat: add test pagination socket event --- .../src/browser-management/inputHandlers.ts | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/server/src/browser-management/inputHandlers.ts b/server/src/browser-management/inputHandlers.ts index ee31ba063..69b2697d7 100644 --- a/server/src/browser-management/inputHandlers.ts +++ b/server/src/browser-management/inputHandlers.ts @@ -862,6 +862,157 @@ const onRemoveAction = async ( await handleWrapper(handleRemoveAction, userId, data); }; +/** + * Tests pagination by scrolling down and checking if new content loads + * @param data Object containing listSelector + * @param userId The user ID + * @param socket The socket connection to emit results + */ +const onTestPaginationScroll = async ( + data: { listSelector: string }, + userId: string, + socket: Socket +) => { + logger.log("debug", "Testing pagination scroll emitted from client"); + + const id = browserPool.getActiveBrowserId(userId, "recording"); + if (!id) { + logger.log("warn", `No active browser for id ${id}`); + socket.emit("paginationScrollTestResult", { + success: false, + error: "No active browser" + }); + return; + } + + const activeBrowser = browserPool.getRemoteBrowser(id); + const currentPage = activeBrowser?.getCurrentPage(); + + if (!currentPage || !activeBrowser) { + logger.log("warn", `No active page for browser ${id}`); + socket.emit("paginationScrollTestResult", { + success: false, + error: "No active page" + }); + return; + } + + try { + const { listSelector } = data; + + logger.log("info", `Starting pagination scroll test for selector: ${listSelector}`); + + const initialCount = await currentPage.evaluate((selector) => { + function evaluateSelector(sel: string): Element[] { + try { + const isXPath = sel.startsWith('//') || sel.startsWith('(//'); + if (isXPath) { + const result = document.evaluate( + sel, + document, + null, + XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, + null + ); + const elements: Element[] = []; + for (let i = 0; i < result.snapshotLength; i++) { + const node = result.snapshotItem(i); + if (node && node.nodeType === Node.ELEMENT_NODE) { + elements.push(node as Element); + } + } + return elements; + } else { + return Array.from(document.querySelectorAll(sel)); + } + } catch (err) { + console.error('Selector evaluation failed:', sel, err); + return []; + } + } + + return evaluateSelector(selector).length; + }, listSelector); + + logger.log("info", `Initial list count: ${initialCount}`); + + const scrollInfo = await currentPage.evaluate(() => { + return { + scrollY: window.scrollY, + scrollHeight: document.documentElement.scrollHeight, + viewportHeight: window.innerHeight + }; + }); + + logger.log("info", `Scroll info:`, scrollInfo); + + await currentPage.evaluate(() => { + window.scrollTo(0, document.body.scrollHeight); + }); + + logger.log("info", "Scrolled to bottom, waiting for potential content load..."); + + await currentPage.waitForTimeout(2000); + + const newCount = await currentPage.evaluate((selector) => { + function evaluateSelector(sel: string): Element[] { + try { + const isXPath = sel.startsWith('//') || sel.startsWith('(//'); + if (isXPath) { + const result = document.evaluate( + sel, + document, + null, + XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, + null + ); + const elements: Element[] = []; + for (let i = 0; i < result.snapshotLength; i++) { + const node = result.snapshotItem(i); + if (node && node.nodeType === Node.ELEMENT_NODE) { + elements.push(node as Element); + } + } + return elements; + } else { + return Array.from(document.querySelectorAll(sel)); + } + } catch (err) { + return []; + } + } + + return evaluateSelector(selector).length; + }, listSelector); + + logger.log("info", `New list count after scroll: ${newCount}`); + + await currentPage.evaluate((originalY) => { + window.scrollTo(0, originalY); + }, scrollInfo.scrollY); + + const contentLoaded = newCount > initialCount; + + logger.log("info", `Scroll test result: ${contentLoaded ? 'Content loaded' : 'No new content'}`); + + socket.emit("paginationScrollTestResult", { + success: true, + contentLoaded: contentLoaded, + initialCount: initialCount, + newCount: newCount, + itemsAdded: newCount - initialCount + }); + + } catch (error) { + const { message } = error as Error; + logger.log("error", `Error during pagination scroll test: ${message}`); + socket.emit("paginationScrollTestResult", { + success: false, + error: message + }); + } +}; + /** * Helper function for registering the handlers onto established websocket connection. * Registers various input handlers. @@ -895,6 +1046,7 @@ const registerInputHandlers = (socket: Socket, userId: string) => { socket.on("dom:click", (data) => onDOMClickAction(data, userId)); socket.on("dom:keypress", (data) => onDOMKeyboardAction(data, userId)); socket.on("dom:addpair", (data) => onDOMWorkflowPair(data, userId)); + socket.on("testPaginationScroll", (data) => onTestPaginationScroll(data, userId, socket)); }; /** @@ -925,6 +1077,7 @@ const removeInputHandlers = (socket: Socket) => { socket.removeAllListeners("dom:keypress"); socket.removeAllListeners("dom:addpair"); socket.removeAllListeners("removeAction"); + socket.removeAllListeners("testPaginationScroll"); } catch (error: any) { console.warn(`Error removing input handlers: ${error.message}`); }