Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 35 additions & 40 deletions lib/workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { mkdirp } from 'mkdirp'
import { Worker } from 'worker_threads'
import { EventEmitter } from 'events'
import ms from 'ms'
import merge from 'lodash.merge'

const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
Expand Down Expand Up @@ -66,21 +67,21 @@ const createWorker = (workerObject, isPoolMode = false) => {
stdout: true,
stderr: true,
})

// Pipe worker stdout/stderr to main process
if (worker.stdout) {
worker.stdout.setEncoding('utf8')
worker.stdout.on('data', (data) => {
worker.stdout.on('data', data => {
process.stdout.write(data)
})
}
if (worker.stderr) {
worker.stderr.setEncoding('utf8')
worker.stderr.on('data', (data) => {
worker.stderr.on('data', data => {
process.stderr.write(data)
})
}

worker.on('error', err => {
console.error(`[Main] Worker Error:`, err)
output.error(`Worker Error: ${err.stack}`)
Expand Down Expand Up @@ -221,13 +222,13 @@ class WorkerObject {

addConfig(config) {
const oldConfig = JSON.parse(this.options.override || '{}')

// Remove customLocatorStrategies from both old and new config before JSON serialization
// since functions cannot be serialized and will be lost, causing workers to have empty strategies
const configWithoutFunctions = { ...config }

// Clean both old and new config
const cleanConfig = (cfg) => {
const cleanConfig = cfg => {
if (cfg.helpers) {
cfg.helpers = { ...cfg.helpers }
Object.keys(cfg.helpers).forEach(helperName => {
Expand All @@ -239,14 +240,12 @@ class WorkerObject {
}
return cfg
}

const cleanedOldConfig = cleanConfig(oldConfig)
const cleanedNewConfig = cleanConfig(configWithoutFunctions)

const newConfig = {
...cleanedOldConfig,
...cleanedNewConfig,
}

// Deep merge configurations to preserve all helpers from base config
const newConfig = merge({}, cleanedOldConfig, cleanedNewConfig)
this.options.override = JSON.stringify(newConfig)
}

Expand Down Expand Up @@ -280,8 +279,8 @@ class Workers extends EventEmitter {
this.setMaxListeners(50)
this.codeceptPromise = initializeCodecept(config.testConfig, config.options)
this.codecept = null
this.config = config // Save config
this.numberOfWorkersRequested = numberOfWorkers // Save requested worker count
this.config = config // Save config
this.numberOfWorkersRequested = numberOfWorkers // Save requested worker count
this.options = config.options || {}
this.errors = []
this.numberOfWorkers = 0
Expand All @@ -304,11 +303,8 @@ class Workers extends EventEmitter {
// Initialize workers in these cases:
// 1. Positive number requested AND no manual workers pre-spawned
// 2. Function-based grouping (indicated by negative number) AND no manual workers pre-spawned
const shouldAutoInit = this.workers.length === 0 && (
(Number.isInteger(this.numberOfWorkersRequested) && this.numberOfWorkersRequested > 0) ||
(this.numberOfWorkersRequested < 0 && isFunction(this.config.by))
)

const shouldAutoInit = this.workers.length === 0 && ((Number.isInteger(this.numberOfWorkersRequested) && this.numberOfWorkersRequested > 0) || (this.numberOfWorkersRequested < 0 && isFunction(this.config.by)))

if (shouldAutoInit) {
this._initWorkers(this.numberOfWorkersRequested, this.config)
}
Expand Down Expand Up @@ -371,9 +367,9 @@ class Workers extends EventEmitter {
* @param {Number} numberOfWorkers
*/
createGroupsOfTests(numberOfWorkers) {
// If Codecept isn't initialized yet, return empty groups as a safe fallback
if (!this.codecept) return populateGroups(numberOfWorkers)
const files = this.codecept.testFiles
// If Codecept isn't initialized yet, return empty groups as a safe fallback
if (!this.codecept) return populateGroups(numberOfWorkers)
const files = this.codecept.testFiles
const mocha = Container.mocha()
mocha.files = files
mocha.loadFiles()
Expand Down Expand Up @@ -430,7 +426,7 @@ class Workers extends EventEmitter {
for (const file of files) {
this.testPool.push(file)
}

this.testPoolInitialized = true
}

Expand All @@ -443,17 +439,17 @@ class Workers extends EventEmitter {
if (!this.testPoolInitialized) {
this._initializeTestPool()
}

return this.testPool.shift()
}

/**
* @param {Number} numberOfWorkers
*/
createGroupsOfSuites(numberOfWorkers) {
// If Codecept isn't initialized yet, return empty groups as a safe fallback
if (!this.codecept) return populateGroups(numberOfWorkers)
const files = this.codecept.testFiles
// If Codecept isn't initialized yet, return empty groups as a safe fallback
if (!this.codecept) return populateGroups(numberOfWorkers)
const files = this.codecept.testFiles
const groups = populateGroups(numberOfWorkers)

const mocha = Container.mocha()
Expand Down Expand Up @@ -494,7 +490,7 @@ class Workers extends EventEmitter {
recorder.startUnlessRunning()
event.dispatcher.emit(event.workers.before)
process.env.RUNS_WITH_WORKERS = 'true'

// Create workers and set up message handlers immediately (not in recorder queue)
// This prevents a race condition where workers start sending messages before handlers are attached
const workerThreads = []
Expand All @@ -503,11 +499,11 @@ class Workers extends EventEmitter {
this._listenWorkerEvents(workerThread)
workerThreads.push(workerThread)
}

recorder.add('workers started', () => {
// Workers are already running, this is just a placeholder step
})

return new Promise(resolve => {
this.on('end', resolve)
})
Expand Down Expand Up @@ -591,7 +587,7 @@ class Workers extends EventEmitter {
// Otherwise skip - we'll emit based on finished state
break
case event.test.passed:
// Skip individual passed events - we'll emit based on finished state
// Skip individual passed events - we'll emit based on finished state
break
case event.test.skipped:
this.emit(event.test.skipped, deserializeTest(message.data))
Expand All @@ -602,15 +598,15 @@ class Workers extends EventEmitter {
const data = message.data
const uid = data?.uid
const isFailed = !!data?.err || data?.state === 'failed'

if (uid) {
// Track states for each test UID
if (!this._testStates) this._testStates = new Map()

if (!this._testStates.has(uid)) {
this._testStates.set(uid, { states: [], lastData: data })
}

const testState = this._testStates.get(uid)
testState.states.push({ isFailed, data })
testState.lastData = data
Expand All @@ -622,7 +618,7 @@ class Workers extends EventEmitter {
this.emit(event.test.passed, deserializeTest(data))
}
}

this.emit(event.test.finished, deserializeTest(data))
}
break
Expand Down Expand Up @@ -682,11 +678,10 @@ class Workers extends EventEmitter {
// For tests with retries configured, emit all failures + final success
// For tests without retries, emit only final state
const lastState = states[states.length - 1]

// Check if this test had retries by looking for failure followed by success
const hasRetryPattern = states.length > 1 &&
states.some((s, i) => s.isFailed && i < states.length - 1 && !states[i + 1].isFailed)

const hasRetryPattern = states.length > 1 && states.some((s, i) => s.isFailed && i < states.length - 1 && !states[i + 1].isFailed)

if (hasRetryPattern) {
// Emit all intermediate failures and final success for retries
for (const state of states) {
Expand Down
Loading