From 2a28feba4a9e3f445df0ab33515adb7da23b79ce Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 27 Feb 2024 13:26:56 +0000 Subject: [PATCH] Changes for jobs refactor in Hummingbird --- Package.swift | 2 +- .../PostgresJobsQueue.swift | 49 +- .../HummingbirdPostgresTests/JobsTests.swift | 441 ++++++++---------- 3 files changed, 228 insertions(+), 264 deletions(-) diff --git a/Package.swift b/Package.swift index b551780..21c9a7f 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( .library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]), ], dependencies: [ - .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"), + .package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x-jobs-refactor"), .package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"), ], targets: [ diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift index d06c603..4c5f4fe 100644 --- a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -6,7 +6,7 @@ import NIOConcurrencyHelpers @_spi(ConnectionPool) import PostgresNIO @_spi(ConnectionPool) -public final class HBPostgresJobQueue: HBJobQueue { +public final class HBPostgresQueue: HBJobQueueDriver { public typealias JobID = UUID /// what to do with failed/processing jobs from last time queue was handled @@ -47,9 +47,9 @@ public final class HBPostgresJobQueue: HBJobQueue { public init( jobTable: String = "_hb_jobs", jobQueueTable: String = "_hb_job_queue", - pendingJobsInitialization: HBPostgresJobQueue.JobInitialization = .doNothing, - failedJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun, - processingJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun, + pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing, + failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, + processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, pollTime: Duration = .milliseconds(100) ) { self.jobTable = jobTable @@ -61,12 +61,19 @@ public final class HBPostgresJobQueue: HBJobQueue { } } - let client: PostgresClient - let configuration: Configuration - let logger: Logger + /// Postgres client used by Job queue + public let client: PostgresClient + /// Job queue configuration + public let configuration: Configuration + /// Logger used by queue + public let logger: Logger let isStopped: NIOLockedValueBox /// Initialize a HBPostgresJobQueue + /// - Parameters: + /// - client: Postgres client + /// - configuration: Queue configuration + /// - logger: Logger used by queue public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) { self.client = client self.configuration = configuration @@ -82,7 +89,7 @@ public final class HBPostgresJobQueue: HBJobQueue { """ CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) ( id uuid PRIMARY KEY, - job json, + job bytea, status smallint ) """, @@ -116,9 +123,9 @@ public final class HBPostgresJobQueue: HBJobQueue { /// Push Job onto queue /// - Returns: Identifier of queued job - @discardableResult public func push(_ job: HBJob) async throws -> JobID { + @discardableResult public func push(data: Data) async throws -> JobID { try await self.client.withConnection { connection in - let queuedJob = HBQueuedJob(id: .init(), job: job) + let queuedJob = HBQueuedJob(id: .init(), jobData: data) try await add(queuedJob, connection: connection) try await addToQueue(jobId: queuedJob.id, connection: connection) return queuedJob.id @@ -179,10 +186,10 @@ public final class HBPostgresJobQueue: HBJobQueue { do { try await self.setStatus(jobId: jobId, status: .processing, connection: connection) // if failed to find a job in the job table try getting another index - guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else { + guard let data = try await stream2.decode(Data.self, context: .default).first(where: { _ in true }) else { continue } - return HBQueuedJob(id: jobId, job: job.job) + return HBQueuedJob(id: jobId, jobData: data) } catch { try await self.setStatus(jobId: jobId, status: .failed, connection: connection) throw JobQueueError.decodeJobFailed @@ -199,7 +206,7 @@ public final class HBPostgresJobQueue: HBJobQueue { try await connection.query( """ INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status) - VALUES (\(job.id), \(job.anyCodableJob), \(Status.pending)) + VALUES (\(job.id), \(job.jobData), \(Status.pending)) """, logger: self.logger ) @@ -262,9 +269,9 @@ public final class HBPostgresJobQueue: HBJobQueue { } /// extend HBPostgresJobQueue to conform to AsyncSequence -extension HBPostgresJobQueue { +extension HBPostgresQueue { public struct AsyncIterator: AsyncIteratorProtocol { - let queue: HBPostgresJobQueue + let queue: HBPostgresQueue public func next() async throws -> Element? { while true { @@ -285,4 +292,14 @@ extension HBPostgresJobQueue { } } -extension HBAnyCodableJob: PostgresCodable {} +@_spi(ConnectionPool) +extension HBJobQueueDriver where Self == HBPostgresQueue { + /// Return Postgres driver for Job Queue + /// - Parameters: + /// - client: Postgres client + /// - configuration: Queue configuration + /// - logger: Logger used by queue + public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self { + .init(client: client, configuration: configuration, logger: logger) + } +} diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift index eddc5c5..2a66e81 100644 --- a/Tests/HummingbirdPostgresTests/JobsTests.swift +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -17,6 +17,7 @@ import Hummingbird import HummingbirdJobs @testable @_spi(ConnectionPool) import HummingbirdJobsPostgres import HummingbirdXCT +import NIOConcurrencyHelpers @_spi(ConnectionPool) import PostgresNIO import ServiceLifecycle import XCTest @@ -39,15 +40,7 @@ final class JobsTests: XCTestCase { static let env = HBEnvironment() - /// Helper function for test a server - /// - /// Creates test client, runs test function abd ensures everything is - /// shutdown correctly - @discardableResult public func testJobQueue( - numWorkers: Int, - configuration: HBPostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - test: (HBPostgresJobQueue) async throws -> T - ) async throws -> T { + func createJobQueue(numWorkers: Int, configuration: HBPostgresQueue.Configuration) async throws -> HBJobQueue { let logger = { var logger = Logger(label: "JobsTests") logger.logLevel = .debug @@ -57,24 +50,32 @@ final class JobsTests: XCTestCase { configuration: getPostgresConfiguration(), backgroundLogger: logger ) - let postgresJobQueue = HBPostgresJobQueue( - client: postgresClient, - configuration: configuration, - logger: logger - ) - let jobQueueHandler = HBJobQueueHandler( - queue: postgresJobQueue, + return HBJobQueue( + HBPostgresQueue( + client: postgresClient, + configuration: configuration, + logger: logger + ), numWorkers: numWorkers, logger: logger ) + } + /// Helper function for test a server + /// + /// Creates test client, runs test function abd ensures everything is + /// shutdown correctly + @discardableResult public func testJobQueue( + jobQueue: HBJobQueue, + test: (HBJobQueue) async throws -> T + ) async throws -> T { do { return try await withThrowingTaskGroup(of: Void.self) { group in let serviceGroup = ServiceGroup( configuration: .init( - services: [PostgresClientService(client: postgresClient), jobQueueHandler], + services: [PostgresClientService(client: jobQueue.queue.client), jobQueue], gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger + logger: jobQueue.queue.logger ) ) group.addTask { @@ -82,7 +83,7 @@ final class JobsTests: XCTestCase { } try await Task.sleep(for: .seconds(1)) do { - let value = try await test(postgresJobQueue) + let value = try await test(jobQueue) await serviceGroup.triggerGracefulShutdown() return value } catch let error as PSQLError { @@ -100,311 +101,257 @@ final class JobsTests: XCTestCase { } } - func testBasic() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + /// Helper function for test a server + /// + /// Creates test client, runs test function abd ensures everything is + /// shutdown correctly + @discardableResult public func testJobQueue( + numWorkers: Int, + configuration: HBPostgresQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + test: (HBJobQueue) async throws -> T + ) async throws -> T { + let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration) + return try await self.testJobQueue(jobQueue: jobQueue, test: test) + } - let value: Int - func execute(logger: Logger) async throws { - print(self.value) + func testBasic() async throws { + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + let jobIdentifer = HBJobIdentifier(#function) + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - Self.expectation.fulfill() + expectation.fulfill() } - } - TestJob.register() - try await self.testJobQueue(numWorkers: 1) { jobQueue in - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) - - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) } } func testMultipleWorkers() async throws { - struct TestJob: HBJob { - static let name = "testMultipleWorkers" - static let runningJobCounter = ManagedAtomic(0) - static let maxRunningJobCounter = ManagedAtomic(0) - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + let jobIdentifer = HBJobIdentifier(#function) + let runningJobCounter = ManagedAtomic(0) + let maxRunningJobCounter = ManagedAtomic(0) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let value: Int - func execute(logger: Logger) async throws { - let runningJobs = Self.runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) - if runningJobs > Self.maxRunningJobCounter.load(ordering: .relaxed) { - Self.maxRunningJobCounter.store(runningJobs, ordering: .relaxed) + try await self.testJobQueue(numWorkers: 4) { jobQueue in + jobQueue.registerJob(jobIdentifer) { parameters, context in + let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { + maxRunningJobCounter.store(runningJobs, ordering: .relaxed) } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<100))) - print(self.value) - Self.expectation.fulfill() - Self.runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + context.logger.info("Parameters=\(parameters)") + expectation.fulfill() + runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) } - } - TestJob.register() - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) - XCTAssertGreaterThan(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 1) - XCTAssertLessThanOrEqual(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 4) - - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) + + XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) } } func testErrorRetryCount() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) struct FailedError: Error {} - - struct TestJob: HBJob { - static let name = "testErrorRetryCount" - static let maxRetryCount = 3 - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() throw FailedError() } - } - TestJob.register() - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - try await jobQueue.push(TestJob()) + try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [TestJob.expectation], timeout: 5) + await self.wait(for: [expectation], timeout: 5) try await Task.sleep(for: .milliseconds(200)) - let failedJobs = try await jobQueue.getJobs(withStatus: .failed) + let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed) XCTAssertEqual(failedJobs.count, 1) - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(pendingJobs.count, 0) } } - /// Test job is cancelled on shutdown - func testShutdownJob() async throws { - struct TestJob: HBJob { - static let name = "testShutdownJob" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - func execute(logger: Logger) async throws { - Self.expectation.fulfill() - try await Task.sleep(for: .seconds(10)) + func testJobSerialization() async throws { + struct TestJobParameters: Codable { + let id: Int + let message: String + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + let jobIdentifer = HBJobIdentifier(#function) + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() } + try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) + + await self.wait(for: [expectation], timeout: 5) } - TestJob.register() + } + + /// Test job is cancelled on shutdown + func testShutdownJob() async throws { + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .trace try await self.testJobQueue(numWorkers: 4) { jobQueue in - try await jobQueue.push(TestJob()) - await self.wait(for: [TestJob.expectation], timeout: 5) - } + jobQueue.registerJob(jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) + } + try await jobQueue.push(id: jobIdentifer, parameters: 0) + await self.wait(for: [expectation], timeout: 5) - try await self.testJobQueue( - numWorkers: 4, - configuration: .init(failedJobsInitialization: .doNothing, processingJobsInitialization: .doNothing) - ) { jobQueue in - let failedJobs = try await jobQueue.getJobs(withStatus: .processing) + let failedJobs = try await jobQueue.queue.getJobs(withStatus: .processing) XCTAssertEqual(failedJobs.count, 1) - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) XCTAssertEqual(pendingJobs.count, 0) + return jobQueue } } /// test job fails to decode but queue continues to process func testFailToDecode() async throws { - struct TestJob1: HBJob { - static let name = "testFailToDecode" - func execute(logger: Logger) async throws {} - } - struct TestJob2: HBJob { - static let name = "testFailToDecode2" - static var value: String? - static let expectation = XCTestExpectation(description: "TestJob2.execute was called") - let value: String - func execute(logger: Logger) async throws { - Self.value = self.value - Self.expectation.fulfill() - } - } - TestJob2.register() + let string: NIOLockedValueBox = .init("") + let jobIdentifer1 = HBJobIdentifier(#function) + let jobIdentifer2 = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) try await self.testJobQueue(numWorkers: 4) { jobQueue in - try await jobQueue.push(TestJob1()) - try await jobQueue.push(TestJob2(value: "test")) - // stall to give job chance to start running - await self.wait(for: [TestJob2.expectation], timeout: 5) - - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) + jobQueue.registerJob(jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() + } + try await jobQueue.push(id: jobIdentifer1, parameters: 2) + try await jobQueue.push(id: jobIdentifer2, parameters: "test") + await self.wait(for: [expectation], timeout: 5) + } + string.withLockedValue { + XCTAssertEqual($0, "test") } - - XCTAssertEqual(TestJob2.value, "test") } /// creates job that errors on first attempt, and is left on processing queue and /// is then rerun on startup of new server func testRerunAtStartup() async throws { struct RetryError: Error {} - struct TestJob: HBJob { - static let name = "testRerunAtStartup" - static let maxRetryCount: Int = 0 - static var firstTime = ManagedAtomic(true) - static var finished = ManagedAtomic(false) - static let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) - static let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) - func execute(logger: Logger) async throws { - if Self.firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { - Self.failedExpectation.fulfill() - throw RetryError() - } - Self.succeededExpectation.fulfill() - Self.finished.store(true, ordering: .relaxed) + let jobIdentifer = HBJobIdentifier(#function) + let firstTime = ManagedAtomic(true) + let finished = ManagedAtomic(false) + let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) + let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) + let job = HBJobDefinition(id: jobIdentifer) { _, _ in + if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { + failedExpectation.fulfill() + throw RetryError() } + succeededExpectation.fulfill() + finished.store(true, ordering: .relaxed) } - TestJob.register() + let jobQueue = try await createJobQueue(numWorkers: 1, configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)) + jobQueue.registerJob(job) + try await self.testJobQueue(jobQueue: jobQueue) { jobQueue in - try await self.testJobQueue(numWorkers: 4) { jobQueue in - try await jobQueue.push(TestJob()) + try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [TestJob.failedExpectation], timeout: 10) + await self.wait(for: [failedExpectation], timeout: 10) // stall to give job chance to start running try await Task.sleep(for: .milliseconds(50)) - XCTAssertFalse(TestJob.firstTime.load(ordering: .relaxed)) - XCTAssertFalse(TestJob.finished.load(ordering: .relaxed)) + XCTAssertFalse(firstTime.load(ordering: .relaxed)) + XCTAssertFalse(finished.load(ordering: .relaxed)) } - try await self.testJobQueue(numWorkers: 4, configuration: .init(failedJobsInitialization: .rerun)) { _ in - await self.wait(for: [TestJob.succeededExpectation], timeout: 10) - XCTAssertTrue(TestJob.finished.load(ordering: .relaxed)) - } - } - - func testCustomTableNames() async throws { - struct TestJob: HBJob { - static let name = "testBasic" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - let value: Int - func execute(logger: Logger) async throws { - print(self.value) - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - Self.expectation.fulfill() - } - } - TestJob.register() - try await self.testJobQueue( - numWorkers: 4, - configuration: .init(jobTable: "_test_job_table", jobQueueTable: "_test_job_queue_table") - ) { jobQueue in - try await jobQueue.push(TestJob(value: 1)) - try await jobQueue.push(TestJob(value: 2)) - try await jobQueue.push(TestJob(value: 3)) - try await jobQueue.push(TestJob(value: 4)) - try await jobQueue.push(TestJob(value: 5)) - try await jobQueue.push(TestJob(value: 6)) - try await jobQueue.push(TestJob(value: 7)) - try await jobQueue.push(TestJob(value: 8)) - try await jobQueue.push(TestJob(value: 9)) - try await jobQueue.push(TestJob(value: 10)) - - await self.wait(for: [TestJob.expectation], timeout: 5) - - let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) + let jobQueue2 = try await createJobQueue(numWorkers: 1, configuration: .init(failedJobsInitialization: .rerun)) + jobQueue2.registerJob(job) + try await self.testJobQueue(jobQueue: jobQueue2) { _ in + await self.wait(for: [succeededExpectation], timeout: 10) + XCTAssertTrue(finished.load(ordering: .relaxed)) } } func testMultipleJobQueueHandlers() async throws { - struct TestJob: HBJob { - static let name = "testMultipleJobQueues" - static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) - - let value: Int - func execute(logger: Logger) async throws { - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - Self.expectation.fulfill() - } - } - TestJob.register() + let jobIdentifer = HBJobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) let logger = { var logger = Logger(label: "HummingbirdJobsTests") logger.logLevel = .debug return logger }() + let job = HBJobDefinition(id: jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } let postgresClient = try await PostgresClient( configuration: getPostgresConfiguration(), backgroundLogger: logger ) - let postgresJobQueue = HBPostgresJobQueue( - client: postgresClient, - configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - logger: logger - ) - let jobQueueHandler = HBJobQueueHandler( - queue: postgresJobQueue, + let jobQueue = HBJobQueue( + .postgres(client: postgresClient, logger: logger), numWorkers: 2, logger: logger ) - let postgresJobQueue2 = HBPostgresJobQueue( - client: postgresClient, - configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - logger: logger - ) - let jobQueueHandler2 = HBJobQueueHandler( - queue: postgresJobQueue2, - numWorkers: 3, + let jobQueue2 = HBJobQueue( + HBPostgresQueue( + client: postgresClient, + logger: logger + ), + numWorkers: 2, logger: logger ) - - do { - try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [PostgresClientService(client: postgresClient), jobQueueHandler, jobQueueHandler2], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) + jobQueue.registerJob(job) + jobQueue2.registerJob(job) + + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [PostgresClientService(client: postgresClient), jobQueue, jobQueue2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger ) - group.addTask { - try await serviceGroup.run() - } - try await Task.sleep(for: .seconds(1)) - do { - for i in 0..<200 { - try await postgresJobQueue.push(TestJob(value: i)) - } - await self.wait(for: [TestJob.expectation], timeout: 5) - await serviceGroup.triggerGracefulShutdown() - } catch { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error + ) + group.addTask { + try await serviceGroup.run() + } + do { + for i in 0..<200 { + try await jobQueue.push(id: jobIdentifer, parameters: i) } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error } - } catch let error as PSQLError { - XCTFail("\(String(reflecting: error))") - throw error } } }