Skip to content

Commit

Permalink
Merge pull request #8 from hummingbird-project/jobqueue-persist-migra…
Browse files Browse the repository at this point in the history
…tions

Use migration system for JobQueue and Persist frameworks
  • Loading branch information
adam-fowler committed Mar 7, 2024
2 parents 4fbb393 + 7b45b60 commit 8d97094
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 93 deletions.
48 changes: 48 additions & 0 deletions Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreateJobQueue: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_pg_job_queue (
job_id uuid PRIMARY KEY,
createdAt timestamp with time zone
)
""",
logger: logger
)
try await connection.query(
"""
CREATE INDEX IF NOT EXISTS _hb_job_queueidx
ON _hb_pg_job_queue (createdAt ASC)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_pg_job_queue",
logger: logger
)
}

var name: String { "_Create_JobQueue_Table_" }
var group: HBMigrationGroup { .jobQueue }
}
48 changes: 48 additions & 0 deletions Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreateJobs: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_pg_jobs (
id uuid PRIMARY KEY,
job bytea,
status smallint,
lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_pg_jobs",
logger: logger
)
}

var name: String { "_Create_Jobs_Table_" }
var group: HBMigrationGroup { .jobQueue }
}

extension HBMigrationGroup {
/// JobQueue migration group
public static var jobQueue: Self { .init("_hb_jobqueue") }
}
89 changes: 38 additions & 51 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
@@ -1,3 +1,17 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
Expand Down Expand Up @@ -38,23 +52,17 @@ public final class HBPostgresQueue: HBJobQueueDriver {

/// Queue configuration
public struct Configuration: Sendable {
let jobTable: String
let jobQueueTable: String
let pendingJobsInitialization: JobInitialization
let failedJobsInitialization: JobInitialization
let processingJobsInitialization: JobInitialization
let pollTime: Duration

public init(
jobTable: String = "_hb_jobs",
jobQueueTable: String = "_hb_job_queue",
pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
pendingJobsInitialization: JobInitialization = .doNothing,
failedJobsInitialization: JobInitialization = .rerun,
processingJobsInitialization: JobInitialization = .rerun,
pollTime: Duration = .milliseconds(100)
) {
self.jobTable = jobTable
self.jobQueueTable = jobQueueTable
self.pendingJobsInitialization = pendingJobsInitialization
self.failedJobsInitialization = failedJobsInitialization
self.processingJobsInitialization = processingJobsInitialization
Expand All @@ -68,50 +76,28 @@ public final class HBPostgresQueue: HBJobQueueDriver {
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger

let migrations: HBPostgresMigrations
let isStopped: NIOLockedValueBox<Bool>

/// Initialize a HBPostgresJobQueue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) {
public init(client: PostgresClient, migrations: HBPostgresMigrations, configuration: Configuration = .init(), logger: Logger) async {
self.client = client
self.configuration = configuration
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
}

/// Run on initialization of the job queue
public func onInit() async throws {
do {
self.logger.info("Waiting for JobQueue migrations")
try await self.migrations.waitUntilCompleted()
_ = try await self.client.withConnection { connection in
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) (
id uuid PRIMARY KEY,
job bytea,
status smallint
)
""",
logger: self.logger
)
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) (
job_id uuid PRIMARY KEY,
createdAt timestamp with time zone
)
""",
logger: self.logger
)
try await connection.query(
"""
CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx
ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC)
""",
logger: self.logger
)
self.logger.info("Update Jobs at initialization")
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
Expand Down Expand Up @@ -163,10 +149,10 @@ public final class HBPostgresQueue: HBJobQueueDriver {
let stream = try await connection.query(
"""
DELETE
FROM \(unescaped: self.configuration.jobQueueTable) pse
FROM _hb_pg_job_queue pse
WHERE pse.job_id =
(SELECT pse_inner.job_id
FROM \(unescaped: self.configuration.jobQueueTable) pse_inner
FROM _hb_pg_job_queue pse_inner
ORDER BY pse_inner.createdAt ASC
FOR UPDATE SKIP LOCKED
LIMIT 1)
Expand All @@ -180,7 +166,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
}
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
"SELECT job FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)

Expand All @@ -206,7 +192,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
func add(_ job: HBQueuedJob<JobID>, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status)
INSERT INTO _hb_pg_jobs (id, job, status)
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
Expand All @@ -215,31 +201,31 @@ public final class HBPostgresQueue: HBJobQueueDriver {

func delete(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)
}

func addToQueue(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now))
INSERT INTO _hb_pg_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now))
""",
logger: self.logger
)
}

func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)",
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
return try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)",
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
Expand All @@ -254,12 +240,13 @@ public final class HBPostgresQueue: HBJobQueueDriver {
switch onInit {
case .remove:
try await connection.query(
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)",
"DELETE FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
case .rerun:
guard status != .pending else { return }
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobId in jobs {
try await self.addToQueue(jobId: jobId, connection: connection)
}
Expand Down Expand Up @@ -300,7 +287,7 @@ extension HBJobQueueDriver where Self == HBPostgresQueue {
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self {
.init(client: client, configuration: configuration, logger: logger)
public static func postgres(client: PostgresClient, migrations: HBPostgresMigrations, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) async -> Self {
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
}
}
46 changes: 46 additions & 0 deletions Sources/HummingbirdPostgres/CreatePersistTable.swift
@@ -0,0 +1,46 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreatePersistTable: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_pg_persist (
"id" text PRIMARY KEY,
"data" json NOT NULL,
"expires" timestamp with time zone NOT NULL
)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_pg_persist",
logger: logger
)
}

var name: String { "_Create_Persist_Table_" }
var group: HBMigrationGroup { .persist }
}

extension HBMigrationGroup {
/// Persist driver migration group
public static var persist: Self { .init("_hb_pg_persist") }
}
9 changes: 6 additions & 3 deletions Sources/HummingbirdPostgres/Migrations.swift
Expand Up @@ -65,7 +65,7 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted
@_spi(ConnectionPool)
public func apply(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun)
}

/// Revery database migrations
Expand All @@ -75,14 +75,15 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted
@_spi(ConnectionPool)
public func revert(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun)
}

private func migrate(
client: PostgresClient,
migrations: [HBPostgresMigration],
groups: [HBMigrationGroup],
logger: Logger,
completeMigrations: Bool,
dryRun: Bool
) async throws {
switch self.state {
Expand Down Expand Up @@ -151,7 +152,9 @@ public actor HBPostgresMigrations {
self.setFailed(error)
throw error
}
self.setCompleted()
if completeMigrations {
self.setCompleted()
}
}

/// Report if the migration process has completed
Expand Down

0 comments on commit 8d97094

Please sign in to comment.