From a5150df5b5053dc1017e6f584b1d7bbe3a7e1c1c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 16 Sep 2024 12:21:41 +0100 Subject: [PATCH 1/4] Delete JobsPostgres as it has moved to its own repo --- Snippets/SoakTestQueue.swift | 72 +++ .../Migrations/CreateJobDelay.swift | 38 -- .../Migrations/CreateJobQueue.swift | 48 -- .../Migrations/CreateJobQueueMetadata.swift | 41 -- .../JobsPostgres/Migrations/CreateJobs.swift | 55 -- .../PostgresClient+Transaction.swift | 31 -- Sources/JobsPostgres/PostgresJobsQueue.swift | 364 ------------- Tests/JobsPostgresTests/JobsTests.swift | 487 ------------------ 8 files changed, 72 insertions(+), 1064 deletions(-) create mode 100644 Snippets/SoakTestQueue.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobDelay.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobQueue.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift delete mode 100644 Sources/JobsPostgres/Migrations/CreateJobs.swift delete mode 100644 Sources/JobsPostgres/PostgresClient+Transaction.swift delete mode 100644 Sources/JobsPostgres/PostgresJobsQueue.swift delete mode 100644 Tests/JobsPostgresTests/JobsTests.swift diff --git a/Snippets/SoakTestQueue.swift b/Snippets/SoakTestQueue.swift new file mode 100644 index 0000000..00502ec --- /dev/null +++ b/Snippets/SoakTestQueue.swift @@ -0,0 +1,72 @@ +import HummingbirdPostgres +import Jobs +import JobsPostgres +import Logging +import NIOCore +import NIOPosix +import PostgresNIO +import ServiceLifecycle + +var logger = Logger(label: "Soak") +logger.logLevel = .debug +let postgresClient = PostgresClient( + configuration: .init(host: "localhost", port: 5432, username: "test_user", password: "test_password", database: "test_db", tls: .disable), + backgroundLogger: logger +) +let postgresMigrations = PostgresMigrations() +let jobQueue = await JobQueue( + .postgres( + client: postgresClient, + migrations: postgresMigrations, + configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .remove, processingJobsInitialization: .remove, pollTime: .milliseconds(1)), + logger: logger + ), + numWorkers: 4, + logger: logger +) + +struct MyJob: JobParameters { + static var jobName = "Test" + + let sleep: Int +} + +struct MyError: Error {} +jobQueue.registerJob(parameters: MyJob.self, maxRetryCount: 4) { parameters, _ in + try await Task.sleep(for: .milliseconds(parameters.sleep)) + if Int.random(in: 0..<100) < 3 { + throw MyError() + } +} + +try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [postgresClient, jobQueue], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + group.addTask { + try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) + } + try await group.next() + group.addTask { + for _ in 0..<100_000 { + try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) + try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) + } + } + group.addTask { + for _ in 0..<100_000 { + try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) + try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) + } + } + try await group.next() + try await group.next() + await serviceGroup.triggerGracefulShutdown() +} diff --git a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift b/Sources/JobsPostgres/Migrations/CreateJobDelay.swift deleted file mode 100644 index aff928c..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobDelay.swift +++ /dev/null @@ -1,38 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobDelay: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - ALTER TABLE _hb_pg_job_queue ADD COLUMN IF NOT EXISTS delayed_until TIMESTAMP WITH TIME ZONE - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "ALTER TABLE _hb_pg_job_queue DROP COLUMN delayed_until", - logger: logger - ) - } - - var name: String { "_Create_JobQueueDelay_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift b/Sources/JobsPostgres/Migrations/CreateJobQueue.swift deleted file mode 100644 index c64f958..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueue.swift +++ /dev/null @@ -1,48 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueue: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue ( - job_id uuid PRIMARY KEY, - createdAt timestamp with time zone - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_queueidx - ON _hb_pg_job_queue(createdAt ASC) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift b/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift deleted file mode 100644 index 7c7c2fe..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobQueueMetadata.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobQueueMetadata: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_job_queue_metadata ( - key text PRIMARY KEY, - value bytea - ) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_job_queue_metadata", - logger: logger - ) - } - - var name: String { "_Create_JobQueue_Metadata_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} diff --git a/Sources/JobsPostgres/Migrations/CreateJobs.swift b/Sources/JobsPostgres/Migrations/CreateJobs.swift deleted file mode 100644 index a900263..0000000 --- a/Sources/JobsPostgres/Migrations/CreateJobs.swift +++ /dev/null @@ -1,55 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import PostgresMigrations -import PostgresNIO - -struct CreateJobs: DatabaseMigration { - func apply(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - """ - CREATE TABLE IF NOT EXISTS _hb_pg_jobs ( - id uuid PRIMARY KEY, - job bytea, - status smallint, - lastModified TIMESTAMPTZ DEFAULT NOW() - ) - """, - logger: logger - ) - try await connection.query( - """ - CREATE INDEX IF NOT EXISTS _hb_job_status - ON _hb_pg_jobs(status) - """, - logger: logger - ) - } - - func revert(connection: PostgresConnection, logger: Logger) async throws { - try await connection.query( - "DROP TABLE _hb_pg_jobs", - logger: logger - ) - } - - var name: String { "_Create_Jobs_Table_" } - var group: DatabaseMigrationGroup { .jobQueue } -} - -extension DatabaseMigrationGroup { - /// JobQueue migration group - public static var jobQueue: Self { .init("_hb_jobqueue") } -} diff --git a/Sources/JobsPostgres/PostgresClient+Transaction.swift b/Sources/JobsPostgres/PostgresClient+Transaction.swift deleted file mode 100644 index 7771b2e..0000000 --- a/Sources/JobsPostgres/PostgresClient+Transaction.swift +++ /dev/null @@ -1,31 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import PostgresNIO - -extension PostgresClient { - func withTransaction(logger: Logger, _ process: (PostgresConnection) async throws -> Value) async throws -> Value { - try await withConnection { connection in - do { - try await connection.query("BEGIN;", logger: logger) - let value = try await process(connection) - try await connection.query("COMMIT;", logger: logger) - return value - } catch { - try await connection.query("ROLLBACK;", logger: logger) - throw error - } - } - } -} diff --git a/Sources/JobsPostgres/PostgresJobsQueue.swift b/Sources/JobsPostgres/PostgresJobsQueue.swift deleted file mode 100644 index f537d5b..0000000 --- a/Sources/JobsPostgres/PostgresJobsQueue.swift +++ /dev/null @@ -1,364 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2024 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Foundation -import Jobs -import Logging -import NIOConcurrencyHelpers -import NIOCore -import PostgresMigrations -import PostgresNIO - -/// Postgres Job queue implementation -/// -/// The Postgres driver uses the database migration service ``/HummingbirdPostgres/PostgresMigrations`` -/// to create its database tables. Before the server is running you should run the migrations -/// to build your table. -/// ``` -/// let migrations = PostgresMigrations() -/// let jobqueue = await JobQueue( -/// PostgresQueue( -/// client: postgresClient, -/// migrations: postgresMigrations, -/// configuration: configuration, -/// logger: logger -/// ), -/// numWorkers: numWorkers, -/// logger: logger -/// ) -/// var app = Application(...) -/// app.beforeServerStarts { -/// try await migrations.apply(client: postgresClient, logger: logger, dryRun: applyMigrations) -/// } -/// ``` -public final class PostgresJobQueue: JobQueueDriver { - public typealias JobID = UUID - - /// what to do with failed/processing jobs from last time queue was handled - public enum JobInitialization: Sendable { - case doNothing - case rerun - case remove - } - - /// Errors thrown by PostgresJobQueue - public enum PostgresQueueError: Error, CustomStringConvertible { - case failedToAdd - - public var description: String { - switch self { - case .failedToAdd: - return "Failed to add job to queue" - } - } - } - - /// Job Status - enum Status: Int16, PostgresCodable { - case pending = 0 - case processing = 1 - case failed = 2 - } - - /// Queue configuration - public struct Configuration: Sendable { - let pendingJobsInitialization: JobInitialization - let failedJobsInitialization: JobInitialization - let processingJobsInitialization: JobInitialization - let pollTime: Duration - - public init( - pendingJobsInitialization: JobInitialization = .doNothing, - failedJobsInitialization: JobInitialization = .rerun, - processingJobsInitialization: JobInitialization = .rerun, - pollTime: Duration = .milliseconds(100) - ) { - self.pendingJobsInitialization = pendingJobsInitialization - self.failedJobsInitialization = failedJobsInitialization - self.processingJobsInitialization = processingJobsInitialization - self.pollTime = pollTime - } - } - - /// Postgres client used by Job queue - public let client: PostgresClient - /// Job queue configuration - public let configuration: Configuration - /// Logger used by queue - public let logger: Logger - - let migrations: DatabaseMigrations - let isStopped: NIOLockedValueBox - - /// Initialize a PostgresJobQueue - /// - Parameters: - /// - client: Postgres client - /// - migrations: Database migrations to update - /// - configuration: Queue configuration - /// - logger: Logger used by queue - public init(client: PostgresClient, migrations: DatabaseMigrations, configuration: Configuration = .init(), logger: Logger) async { - self.client = client - self.configuration = configuration - self.logger = logger - self.isStopped = .init(false) - self.migrations = migrations - await migrations.add(CreateJobs()) - await migrations.add(CreateJobQueue()) - await migrations.add(CreateJobQueueMetadata()) - await migrations.add(CreateJobDelay()) - } - - /// Run on initialization of the job queue - public func onInit() async throws { - do { - self.logger.info("Waiting for JobQueue migrations") - try await self.migrations.waitUntilCompleted() - _ = try await self.client.withConnection { connection in - self.logger.info("Update Jobs at initialization") - try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) - try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) - try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) - } - } catch let error as PSQLError { - print("\(String(reflecting: error))") - throw error - } - } - - /// Push Job onto queue - /// - Returns: Identifier of queued job - @discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID { - try await self.client.withTransaction(logger: self.logger) { connection in - let queuedJob = QueuedJob(id: .init(), jobBuffer: buffer) - try await self.add(queuedJob, connection: connection) - try await self.addToQueue(jobId: queuedJob.id, connection: connection, delayUntil: options.delayUntil) - return queuedJob.id - } - } - - /// This is called to say job has finished processing and it can be deleted - public func finished(jobId: JobID) async throws { - try await self.delete(jobId: jobId) - } - - /// This is called to say job has failed to run and should be put aside - public func failed(jobId: JobID, error: Error) async throws { - try await self.setStatus(jobId: jobId, status: .failed) - } - - /// stop serving jobs - public func stop() async { - self.isStopped.withLockedValue { $0 = true } - } - - /// shutdown queue once all active jobs have been processed - public func shutdownGracefully() async {} - - public func getMetadata(_ key: String) async throws -> ByteBuffer? { - let stream = try await self.client.query( - "SELECT value FROM _hb_pg_job_queue_metadata WHERE key = \(key)", - logger: self.logger - ) - for try await value in stream.decode(ByteBuffer.self) { - return value - } - return nil - } - - public func setMetadata(key: String, value: ByteBuffer) async throws { - try await self.client.query( - """ - INSERT INTO _hb_pg_job_queue_metadata (key, value) VALUES (\(key), \(value)) - ON CONFLICT (key) - DO UPDATE SET value = \(value) - """, - logger: self.logger - ) - } - - func popFirst() async throws -> QueuedJob? { - do { - let result = try await self.client.withTransaction(logger: self.logger) { connection -> Result?, Error> in - while true { - try Task.checkCancellation() - - let stream = try await connection.query( - """ - DELETE FROM - _hb_pg_job_queue - USING ( - SELECT job_id FROM _hb_pg_job_queue - WHERE (delayed_until IS NULL OR delayed_until <= NOW()) - ORDER BY createdAt, delayed_until ASC - LIMIT 1 - FOR UPDATE SKIP LOCKED - ) queued - WHERE queued.job_id = _hb_pg_job_queue.job_id - RETURNING _hb_pg_job_queue.job_id - """, - logger: self.logger - ) - // return nil if nothing in queue - guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { - return Result.success(nil) - } - // select job from job table - let stream2 = try await connection.query( - "SELECT job FROM _hb_pg_jobs WHERE id = \(jobId) FOR UPDATE SKIP LOCKED", - logger: self.logger - ) - - do { - try await self.setStatus(jobId: jobId, status: .processing, connection: connection) - // if failed to find a job in the job table try getting another index - guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else { - continue - } - return Result.success(QueuedJob(id: jobId, jobBuffer: buffer)) - } catch { - try await self.setStatus(jobId: jobId, status: .failed, connection: connection) - return Result.failure(JobQueueError.decodeJobFailed) - } - } - } - return try result.get() - } catch let error as PSQLError { - logger.error("Failed to get job from queue", metadata: [ - "error": "\(String(reflecting: error))", - ]) - throw error - } catch let error as JobQueueError { - logger.error("Job failed", metadata: [ - "error": "\(String(reflecting: error))", - ]) - throw error - } - } - - func add(_ job: QueuedJob, connection: PostgresConnection) async throws { - try await connection.query( - """ - INSERT INTO _hb_pg_jobs (id, job, status) - VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) - """, - logger: self.logger - ) - } - - func delete(jobId: JobID) async throws { - try await self.client.query( - "DELETE FROM _hb_pg_jobs WHERE id = \(jobId)", - logger: self.logger - ) - } - - func addToQueue(jobId: JobID, connection: PostgresConnection, delayUntil: Date?) async throws { - try await connection.query( - """ - INSERT INTO _hb_pg_job_queue (job_id, createdAt, delayed_until) - VALUES (\(jobId), \(Date.now), \(delayUntil)) - ON CONFLICT (job_id) - DO UPDATE SET delayed_until = \(delayUntil) - """, - logger: self.logger - ) - } - - func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { - try await connection.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", - logger: self.logger - ) - } - - func setStatus(jobId: JobID, status: Status) async throws { - try await self.client.query( - "UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", - logger: self.logger - ) - } - - func getJobs(withStatus status: Status) async throws -> [JobID] { - let stream = try await self.client.query( - "SELECT id FROM _hb_pg_jobs WHERE status = \(status) FOR UPDATE SKIP LOCKED", - logger: self.logger - ) - var jobs: [JobID] = [] - for try await id in stream.decode(JobID.self, context: .default) { - jobs.append(id) - } - return jobs - } - - func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws { - switch onInit { - case .remove: - try await connection.query( - "DELETE FROM _hb_pg_jobs WHERE status = \(status) ", - logger: self.logger - ) - - case .rerun: - guard status != .pending else { return } - - let jobs = try await getJobs(withStatus: status) - self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") - for jobId in jobs { - try await self.addToQueue(jobId: jobId, connection: connection, delayUntil: nil) - } - - case .doNothing: - break - } - } -} - -/// extend PostgresJobQueue to conform to AsyncSequence -extension PostgresJobQueue { - public struct AsyncIterator: AsyncIteratorProtocol { - public typealias Element = QueuedJob - - let queue: PostgresJobQueue - - public func next() async throws -> Element? { - while true { - if self.queue.isStopped.withLockedValue({ $0 }) { - return nil - } - - if let job = try await queue.popFirst() { - return job - } - // we only sleep if we didn't receive a job - try await Task.sleep(for: self.queue.configuration.pollTime) - } - } - } - - public func makeAsyncIterator() -> AsyncIterator { - return .init(queue: self) - } -} - -extension JobQueueDriver where Self == PostgresJobQueue { - /// Return Postgres driver for Job Queue - /// - Parameters: - /// - client: Postgres client - /// - migrations: Database migrations to update - /// - configuration: Queue configuration - /// - logger: Logger used by queue - public static func postgres(client: PostgresClient, migrations: DatabaseMigrations, configuration: PostgresJobQueue.Configuration = .init(), logger: Logger) async -> Self { - await Self(client: client, migrations: migrations, configuration: configuration, logger: logger) - } -} diff --git a/Tests/JobsPostgresTests/JobsTests.swift b/Tests/JobsPostgresTests/JobsTests.swift deleted file mode 100644 index f3f9770..0000000 --- a/Tests/JobsPostgresTests/JobsTests.swift +++ /dev/null @@ -1,487 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Jobs -@testable import JobsPostgres -import NIOConcurrencyHelpers -import PostgresMigrations -import PostgresNIO -import ServiceLifecycle -import XCTest - -func getPostgresConfiguration() async throws -> PostgresClient.Configuration { - return .init( - host: ProcessInfo.processInfo.environment["POSTGRES_HOSTNAME"] ?? "localhost", - port: 5432, - username: ProcessInfo.processInfo.environment["POSTGRES_USER"] ?? "test_user", - password: ProcessInfo.processInfo.environment["POSTGRES_PASSWORD"] ?? "test_password", - database: ProcessInfo.processInfo.environment["POSTGRES_DB"] ?? "test_db", - tls: .disable - ) -} - -extension XCTestExpectation { - convenience init(description: String, expectedFulfillmentCount: Int) { - self.init(description: description) - self.expectedFulfillmentCount = expectedFulfillmentCount - } -} - -final class JobsTests: XCTestCase { - func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { - #if (os(Linux) && swift(<5.9)) || swift(<5.8) - super.wait(for: expectations, timeout: timeout) - #else - await fulfillment(of: expectations, timeout: timeout) - #endif - } - - func createJobQueue(numWorkers: Int, configuration: PostgresJobQueue.Configuration, function: String = #function) async throws -> JobQueue { - let logger = { - var logger = Logger(label: function) - logger.logLevel = .debug - return logger - }() - let postgresClient = try await PostgresClient( - configuration: getPostgresConfiguration(), - backgroundLogger: logger - ) - let postgresMigrations = DatabaseMigrations() - return await JobQueue( - .postgres( - client: postgresClient, - migrations: postgresMigrations, - configuration: configuration, - logger: logger - ), - numWorkers: numWorkers, - logger: logger, - options: .init( - maximumBackoff: 0.01, - maxJitter: 0.01, - minJitter: 0.0 - ) - ) - } - - /// Helper function for test a server - /// - /// Creates test client, runs test function abd ensures everything is - /// shutdown correctly - @discardableResult public func testJobQueue( - jobQueue: JobQueue, - revertMigrations: Bool = false, - test: (JobQueue) async throws -> T - ) async throws -> T { - do { - return try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [jobQueue.queue.client, jobQueue], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: jobQueue.queue.logger - ) - ) - group.addTask { - try await serviceGroup.run() - } - do { - let migrations = jobQueue.queue.migrations - let client = jobQueue.queue.client - let logger = jobQueue.queue.logger - if revertMigrations { - try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false) - } - try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false) - let value = try await test(jobQueue) - await serviceGroup.triggerGracefulShutdown() - return value - } catch let error as PSQLError { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error - } catch { - await serviceGroup.triggerGracefulShutdown() - throw error - } - } - } catch let error as PSQLError { - XCTFail("\(String(reflecting: error))") - throw error - } - } - - /// Helper function for test a server - /// - /// Creates test client, runs test function abd ensures everything is - /// shutdown correctly - @discardableResult public func testJobQueue( - numWorkers: Int, - configuration: PostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - revertMigrations: Bool = true, - function: String = #function, - test: (JobQueue) async throws -> T - ) async throws -> T { - let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration, function: function) - return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: revertMigrations, test: test) - } - - func testBasic() async throws { - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobIdentifer = JobIdentifier(#function) - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) - - await self.wait(for: [expectation], timeout: 5) - } - } - - func testDelayedJobs() async throws { - let jobIdentifer = JobIdentifier(#function) - let jobIdentifer2 = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2) - let jobExecutionSequence: NIOLockedValueBox<[Int]> = .init([]) - - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - jobExecutionSequence.withLockedValue { - $0.append(parameters) - } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } - try await jobQueue.push( - id: jobIdentifer, - parameters: 1, - options: .init( - delayUntil: Date.now.addingTimeInterval(1)) - ) - try await jobQueue.push(id: jobIdentifer2, parameters: 5) - - let processingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) - XCTAssertEqual(processingJobs.count, 2) - - await self.wait(for: [expectation], timeout: 10) - - let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) - } - XCTAssertEqual(jobExecutionSequence.withLockedValue { $0 }, [5, 1]) - } - - func testMultipleWorkers() async throws { - let jobIdentifer = JobIdentifier(#function) - let runningJobCounter = ManagedAtomic(0) - let maxRunningJobCounter = ManagedAtomic(0) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) - if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { - maxRunningJobCounter.store(runningJobs, ordering: .relaxed) - } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - context.logger.info("Parameters=\(parameters)") - expectation.fulfill() - runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) - } - - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) - - await self.wait(for: [expectation], timeout: 5) - - XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) - XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) - } - } - - func testErrorRetryCount() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) - struct FailedError: Error {} - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in - expectation.fulfill() - throw FailedError() - } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - - await self.wait(for: [expectation], timeout: 5) - try await Task.sleep(for: .milliseconds(200)) - - let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed) - XCTAssertEqual(failedJobs.count, 1) - let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) - } - } - - func testErrorRetryAndThenSucceed() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2) - let currentJobTryCount: NIOLockedValueBox = .init(0) - struct FailedError: Error {} - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in - defer { - currentJobTryCount.withLockedValue { - $0 += 1 - } - } - expectation.fulfill() - if (currentJobTryCount.withLockedValue { $0 }) == 0 { - throw FailedError() - } - } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - - await self.wait(for: [expectation], timeout: 5) - try await Task.sleep(for: .milliseconds(200)) - - let failedJobs = try await jobQueue.queue.getJobs(withStatus: .failed) - XCTAssertEqual(failedJobs.count, 0) - let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) - } - XCTAssertEqual(currentJobTryCount.withLockedValue { $0 }, 2) - } - - func testJobSerialization() async throws { - struct TestJobParameters: Codable { - let id: Int - let message: String - } - let expectation = XCTestExpectation(description: "TestJob.execute was called") - let jobIdentifer = JobIdentifier(#function) - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, _ in - XCTAssertEqual(parameters.id, 23) - XCTAssertEqual(parameters.message, "Hello!") - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) - - await self.wait(for: [expectation], timeout: 5) - } - } - - /// Test job is cancelled on shutdown - func testShutdownJob() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { _, _ in - expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [expectation], timeout: 5) - - let processingJobs = try await jobQueue.queue.getJobs(withStatus: .processing) - XCTAssertEqual(processingJobs.count, 1) - let pendingJobs = try await jobQueue.queue.getJobs(withStatus: .pending) - XCTAssertEqual(pendingJobs.count, 0) - return jobQueue - } - } - - /// test job fails to decode but queue continues to process - func testFailToDecode() async throws { - let string: NIOLockedValueBox = .init("") - let jobIdentifer1 = JobIdentifier(#function) - let jobIdentifer2 = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in - string.withLockedValue { $0 = parameters } - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer1, parameters: 2) - try await jobQueue.push(id: jobIdentifer2, parameters: "test") - await self.wait(for: [expectation], timeout: 5) - } - string.withLockedValue { - XCTAssertEqual($0, "test") - } - } - - /// creates job that errors on first attempt, and is left on processing queue and - /// is then rerun on startup of new server - func testRerunAtStartup() async throws { - struct RetryError: Error {} - let jobIdentifer = JobIdentifier(#function) - let firstTime = ManagedAtomic(true) - let finished = ManagedAtomic(false) - let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) - let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) - let job = JobDefinition(id: jobIdentifer) { _, _ in - if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { - failedExpectation.fulfill() - throw RetryError() - } - succeededExpectation.fulfill() - finished.store(true, ordering: .relaxed) - } - let jobQueue = try await createJobQueue(numWorkers: 1, configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .rerun)) - jobQueue.registerJob(job) - try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true) { jobQueue in - // stall to give onInit a chance to run, so it can remove any pendng jobs - try await Task.sleep(for: .milliseconds(100)) - - try await jobQueue.push(id: jobIdentifer, parameters: 0) - - await self.wait(for: [failedExpectation], timeout: 10) - - XCTAssertFalse(firstTime.load(ordering: .relaxed)) - XCTAssertFalse(finished.load(ordering: .relaxed)) - } - - let jobQueue2 = try await createJobQueue(numWorkers: 1, configuration: .init(failedJobsInitialization: .rerun)) - jobQueue2.registerJob(job) - try await self.testJobQueue(jobQueue: jobQueue2) { _ in - await self.wait(for: [succeededExpectation], timeout: 10) - XCTAssertTrue(finished.load(ordering: .relaxed)) - } - } - - func testMultipleJobQueueHandlers() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) - let logger = { - var logger = Logger(label: "testMultipleJobQueueHandlers") - logger.logLevel = .debug - return logger - }() - let job = JobDefinition(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } - let postgresClient = try await PostgresClient( - configuration: getPostgresConfiguration(), - backgroundLogger: logger - ) - let postgresMigrations = DatabaseMigrations() - let jobQueue = await JobQueue( - .postgres( - client: postgresClient, - migrations: postgresMigrations, - configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - logger: logger - ), - numWorkers: 2, - logger: logger - ) - let postgresMigrations2 = DatabaseMigrations() - let jobQueue2 = await JobQueue( - .postgres( - client: postgresClient, - migrations: postgresMigrations2, - configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - logger: logger - ), - numWorkers: 2, - logger: logger - ) - jobQueue.registerJob(job) - jobQueue2.registerJob(job) - - try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [postgresClient, jobQueue, jobQueue2], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - ) - group.addTask { - try await serviceGroup.run() - } - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - do { - for i in 0..<200 { - try await jobQueue.push(id: jobIdentifer, parameters: i) - } - await self.wait(for: [expectation], timeout: 5) - await serviceGroup.triggerGracefulShutdown() - } catch { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error - } - } - } - - func testMetadata() async throws { - let logger = Logger(label: "testMetadata") - try await withThrowingTaskGroup(of: Void.self) { group in - let postgresClient = try await PostgresClient( - configuration: getPostgresConfiguration(), - backgroundLogger: logger - ) - group.addTask { - await postgresClient.run() - } - let postgresMigrations = DatabaseMigrations() - let jobQueue = await PostgresJobQueue( - client: postgresClient, - migrations: postgresMigrations, - configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), - logger: logger - ) - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - - let value = ByteBuffer(string: "Testing metadata") - try await jobQueue.setMetadata(key: "test", value: value) - let metadata = try await jobQueue.getMetadata("test") - XCTAssertEqual(metadata, value) - let value2 = ByteBuffer(string: "Testing metadata again") - try await jobQueue.setMetadata(key: "test", value: value2) - let metadata2 = try await jobQueue.getMetadata("test") - XCTAssertEqual(metadata2, value2) - - // cancel postgres client task - group.cancelAll() - } - } -} From 1330db619b7b7979c2d1f3b41695580d73ef198c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 16 Sep 2024 12:26:28 +0100 Subject: [PATCH 2/4] Delete from Package.swift as well --- Package.swift | 16 ------- Package@swift-6.0.swift | 16 ------- Snippets/PSQLSoakTestQueue.swift | 72 -------------------------------- Snippets/SoakTestQueue.swift | 72 -------------------------------- 4 files changed, 176 deletions(-) delete mode 100644 Snippets/PSQLSoakTestQueue.swift delete mode 100644 Snippets/SoakTestQueue.swift diff --git a/Package.swift b/Package.swift index e33add4..ead3624 100644 --- a/Package.swift +++ b/Package.swift @@ -11,7 +11,6 @@ let package = Package( products: [ .library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]), .library(name: "PostgresMigrations", targets: ["PostgresMigrations"]), - .library(name: "JobsPostgres", targets: ["JobsPostgres"]), ], dependencies: [ .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0"), @@ -35,15 +34,6 @@ let package = Package( ], swiftSettings: swiftSettings ), - .target( - name: "JobsPostgres", - dependencies: [ - "PostgresMigrations", - .product(name: "Jobs", package: "swift-jobs"), - .product(name: "PostgresNIO", package: "postgres-nio"), - ], - swiftSettings: swiftSettings - ), .testTarget( name: "HummingbirdPostgresTests", dependencies: [ @@ -57,11 +47,5 @@ let package = Package( "PostgresMigrations", ] ), - .testTarget( - name: "JobsPostgresTests", - dependencies: [ - "JobsPostgres", - ] - ), ] ) diff --git a/Package@swift-6.0.swift b/Package@swift-6.0.swift index f903bbb..30148dd 100644 --- a/Package@swift-6.0.swift +++ b/Package@swift-6.0.swift @@ -9,11 +9,9 @@ let package = Package( products: [ .library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]), .library(name: "PostgresMigrations", targets: ["PostgresMigrations"]), - .library(name: "JobsPostgres", targets: ["JobsPostgres"]), ], dependencies: [ .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0"), - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/vapor/postgres-nio", from: "1.21.0"), ], targets: [ @@ -31,14 +29,6 @@ let package = Package( .product(name: "PostgresNIO", package: "postgres-nio"), ] ), - .target( - name: "JobsPostgres", - dependencies: [ - "PostgresMigrations", - .product(name: "Jobs", package: "swift-jobs"), - .product(name: "PostgresNIO", package: "postgres-nio"), - ] - ), .testTarget( name: "HummingbirdPostgresTests", dependencies: [ @@ -52,11 +42,5 @@ let package = Package( "PostgresMigrations", ] ), - .testTarget( - name: "JobsPostgresTests", - dependencies: [ - "JobsPostgres", - ] - ), ] ) diff --git a/Snippets/PSQLSoakTestQueue.swift b/Snippets/PSQLSoakTestQueue.swift deleted file mode 100644 index fee119a..0000000 --- a/Snippets/PSQLSoakTestQueue.swift +++ /dev/null @@ -1,72 +0,0 @@ -import Jobs -import JobsPostgres -import Logging -import NIOCore -import NIOPosix -import PostgresMigrations -import PostgresNIO -import ServiceLifecycle - -var logger = Logger(label: "Soak") -logger.logLevel = .debug -let postgresClient = PostgresClient( - configuration: .init(host: "localhost", port: 5432, username: "test_user", password: "test_password", database: "test_db", tls: .disable), - backgroundLogger: logger -) -let postgresMigrations = DatabaseMigrations() -let jobQueue = await JobQueue( - .postgres( - client: postgresClient, - migrations: postgresMigrations, - configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .remove, processingJobsInitialization: .remove, pollTime: .milliseconds(1)), - logger: logger - ), - numWorkers: 4, - logger: logger -) - -struct MyJob: JobParameters { - static let jobName = "Test" - - let sleep: Int -} - -struct MyError: Error {} -jobQueue.registerJob(parameters: MyJob.self, maxRetryCount: 4) { parameters, _ in - try await Task.sleep(for: .milliseconds(parameters.sleep)) - if Int.random(in: 0..<100) < 3 { - throw MyError() - } -} - -try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [postgresClient, jobQueue], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - ) - group.addTask { - try await serviceGroup.run() - } - group.addTask { - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - } - try await group.next() - group.addTask { - for _ in 0..<100_000 { - try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) - try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) - } - } - group.addTask { - for _ in 0..<100_000 { - try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) - try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) - } - } - try await group.next() - try await group.next() - await serviceGroup.triggerGracefulShutdown() -} diff --git a/Snippets/SoakTestQueue.swift b/Snippets/SoakTestQueue.swift deleted file mode 100644 index 00502ec..0000000 --- a/Snippets/SoakTestQueue.swift +++ /dev/null @@ -1,72 +0,0 @@ -import HummingbirdPostgres -import Jobs -import JobsPostgres -import Logging -import NIOCore -import NIOPosix -import PostgresNIO -import ServiceLifecycle - -var logger = Logger(label: "Soak") -logger.logLevel = .debug -let postgresClient = PostgresClient( - configuration: .init(host: "localhost", port: 5432, username: "test_user", password: "test_password", database: "test_db", tls: .disable), - backgroundLogger: logger -) -let postgresMigrations = PostgresMigrations() -let jobQueue = await JobQueue( - .postgres( - client: postgresClient, - migrations: postgresMigrations, - configuration: .init(pendingJobsInitialization: .remove, failedJobsInitialization: .remove, processingJobsInitialization: .remove, pollTime: .milliseconds(1)), - logger: logger - ), - numWorkers: 4, - logger: logger -) - -struct MyJob: JobParameters { - static var jobName = "Test" - - let sleep: Int -} - -struct MyError: Error {} -jobQueue.registerJob(parameters: MyJob.self, maxRetryCount: 4) { parameters, _ in - try await Task.sleep(for: .milliseconds(parameters.sleep)) - if Int.random(in: 0..<100) < 3 { - throw MyError() - } -} - -try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [postgresClient, jobQueue], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - ) - group.addTask { - try await serviceGroup.run() - } - group.addTask { - try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false) - } - try await group.next() - group.addTask { - for _ in 0..<100_000 { - try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) - try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) - } - } - group.addTask { - for _ in 0..<100_000 { - try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20))) - try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10))) - } - } - try await group.next() - try await group.next() - await serviceGroup.triggerGracefulShutdown() -} From ce1589bc0a0abdc0d319334340be92dcfd47e8b8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 16 Sep 2024 12:58:59 +0100 Subject: [PATCH 3/4] doc fixes --- Sources/HummingbirdPostgres/PostgresPersistDriver.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift index ed70fb2..7f613b4 100644 --- a/Sources/HummingbirdPostgres/PostgresPersistDriver.swift +++ b/Sources/HummingbirdPostgres/PostgresPersistDriver.swift @@ -30,11 +30,11 @@ extension PSQLError { /// Postgres driver for persist system for storing persistent cross request key/value pairs /// -/// The Postgres driver uses the database migration service ``PostgresMigrations`` to +/// The Postgres driver uses the database migration service ``/PostgresMigrations/DatabaseMigrations`` to /// create its database table. Before the server is running you should run the migrations /// to build your table. /// ``` -/// let migrations = PostgresMigrations() +/// let migrations = DatabaseMigrations() /// let persist = PostgresPersistDriver(client: postgresClient, migrations: migrations) /// var app = Application(...) /// app.runBeforeServerStart { @@ -68,7 +68,7 @@ public final class PostgresPersistDriver: PersistDriver { /// Initialize PostgresPersistDriver /// - Parameters: /// - client: Postgres client - /// - migrations: DatabaseMigrations array to add persist migrations + /// - migrations: ``/PostgresMigrations/DatabaseMigrations`` array to add persist migrations /// - tidyUpFrequency: How frequently cleanup expired database entries should occur /// - logger: Logger used by persist public init(client: PostgresClient, migrations: DatabaseMigrations, tidyUpFrequency: Duration = .seconds(600), logger: Logger) async { From 36c8e7a12a5427252c3758beada1504f74db0083 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 16 Sep 2024 12:59:20 +0100 Subject: [PATCH 4/4] PostgresMigrationError -> DatabaseMigrationError --- Sources/PostgresMigrations/MigrationError.swift | 4 ++-- Sources/PostgresMigrations/Migrations.swift | 4 ++-- Tests/PostgresMigrationsTests/MigrationTests.swift | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/PostgresMigrations/MigrationError.swift b/Sources/PostgresMigrations/MigrationError.swift index 5c82fd3..813cc73 100644 --- a/Sources/PostgresMigrations/MigrationError.swift +++ b/Sources/PostgresMigrations/MigrationError.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// /// Error thrown by migration code -public struct PostgresMigrationError: Error, Equatable { +public struct DatabaseMigrationError: Error, Equatable { enum _Internal { case requiresChanges case cannotRevertMigration @@ -32,7 +32,7 @@ public struct PostgresMigrationError: Error, Equatable { public static var cannotRevertMigration: Self { .init(.cannotRevertMigration) } } -extension PostgresMigrationError: CustomStringConvertible { +extension DatabaseMigrationError: CustomStringConvertible { public var description: String { switch self.value { case .requiresChanges: "Database requires changes. Run `migrate` with `dryRun` set to false." diff --git a/Sources/PostgresMigrations/Migrations.swift b/Sources/PostgresMigrations/Migrations.swift index bd3b592..c33912f 100644 --- a/Sources/PostgresMigrations/Migrations.swift +++ b/Sources/PostgresMigrations/Migrations.swift @@ -154,7 +154,7 @@ public actor DatabaseMigrations { guard let migration = registeredMigrations[migrationName] else { logger.error("Failed to find migration \(migrationName)") - throw PostgresMigrationError.cannotRevertMigration + throw DatabaseMigrationError.cannotRevertMigration } logger.info("Reverting \(migrationName) from group \(group.name) \(dryRun ? " (dry run)" : "")") if !dryRun { @@ -185,7 +185,7 @@ public actor DatabaseMigrations { } // if changes are required guard requiresChanges == false else { - throw PostgresMigrationError.requiresChanges + throw DatabaseMigrationError.requiresChanges } } } catch { diff --git a/Tests/PostgresMigrationsTests/MigrationTests.swift b/Tests/PostgresMigrationsTests/MigrationTests.swift index b30c7a7..81501bc 100644 --- a/Tests/PostgresMigrationsTests/MigrationTests.swift +++ b/Tests/PostgresMigrationsTests/MigrationTests.swift @@ -231,7 +231,7 @@ final class MigrationTests: XCTestCase { try await migrations.apply(client: client, groups: [.default], logger: Self.logger, dryRun: true) } XCTFail("Shouldn't get here") - } catch let error as PostgresMigrationError where error == .requiresChanges {} + } catch let error as DatabaseMigrationError where error == .requiresChanges {} try await self.testMigrations(groups: [.default, .test]) { migrations in await migrations.add(TestMigration(name: "test1")) await migrations.add(TestMigration(name: "test2"))