Skip to content

Commit

Permalink
2.x.x Job Queue changes from Hummingbird (#12)
Browse files Browse the repository at this point in the history
* Updates for changes to job queue system

* Add timeout minutes to CI jobs

* Use gracefulShutdown()

* Add testMultipleJobQueueHandlers

* Update Package.swift

* Changes from project template

* Swift format
  • Loading branch information
adam-fowler committed Feb 20, 2024
1 parent 2ce69e3 commit 5fd9f31
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 58 deletions.
21 changes: 21 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
groups:
dependencies:
patterns:
- "*"
- package-ecosystem: "swift"
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 6
allow:
- dependency-type: all
groups:
all-dependencies:
patterns:
- "*"
5 changes: 4 additions & 1 deletion .github/workflows/api-breakage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ name: API breaking changes

on:
pull_request:
branches:
- main

jobs:
linux:
runs-on: ubuntu-latest
timeout-minutes: 15
container:
image: swift:5.9
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0
# https://github.com/actions/checkout/issues/766
Expand Down
6 changes: 2 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ on:
branches:
- main
- 2.x.x
paths:
- '**.swift'
- '**.yml'
workflow_dispatch:

jobs:
linux:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
matrix:
image:
Expand All @@ -39,7 +37,7 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Test
run: |
swift test --enable-code-coverage
Expand Down
5 changes: 2 additions & 3 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
name: Swift nightly build

on:
schedule:
- cron: '0 1 * * 1'
workflow_dispatch:

jobs:
linux:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
matrix:
image: ['nightly-focal', 'nightly-jammy', 'nightly-amazonlinux2']
Expand All @@ -25,7 +24,7 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Test
run: |
swift test --enable-test-discovery
6 changes: 4 additions & 2 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ on:
pull_request:
branches:
- main
- 2.x.x

jobs:
validate:
runs-on: macOS-latest
timeout-minutes: 15
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install Dependencies
run: |
brew install mint
mint install NickLockwood/SwiftFormat@0.48.17 --no-link
mint install NickLockwood/SwiftFormat@0.51.15 --no-link
- name: run script
run: ./scripts/validate.sh
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ xcuserdata/
Package.resolved
/public
/docs
.benchmarkBaselines
6 changes: 3 additions & 3 deletions .swiftformat
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Minimum swiftformat version
--minversion 0.47.4
--minversion 0.51.0

# Swift version
--swiftversion 5.3
--swiftversion 5.9

# file options
--exclude .build

# rules
--disable redundantReturn, extensionAccessControl
--disable redundantReturn, extensionAccessControl, typeSugar

# format options
--ifdef no-indent
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Legal
By submitting a pull request, you represent that you have the right to license your contribution to the community, and agree by submitting the patch
that your contributions are licensed under the Apache 2.0 license (see [LICENSE](LICENSE.txt)).
that your contributions are licensed under the Apache 2.0 license (see [LICENSE](LICENSE)).

## Contributor Conduct
All contributors are expected to adhere to the project's [Code of Conduct](CODE_OF_CONDUCT.md).
Expand Down Expand Up @@ -30,4 +30,4 @@ The main development branch of the repository is `main`.

### Formatting

We use Nick Lockwood's SwiftFormat for formatting code. PRs will not be accepted if they haven't be formatted. The current version of SwiftFormat we are using is v0.48.17.
We use Nick Lockwood's SwiftFormat for formatting code. PRs will not be accepted if they haven't be formatted. The current version of SwiftFormat we are using is v0.51.15.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ================================
# Build image
# ================================
FROM swift:5.7 as build
FROM swift:5.9 as build

WORKDIR /build

Expand Down
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let package = Package(
.library(name: "HummingbirdJobsRedis", targets: ["HummingbirdJobsRedis"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.1"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"),
],
targets: [
Expand Down
72 changes: 47 additions & 25 deletions Sources/HummingbirdJobsRedis/RedisJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import Atomics
import struct Foundation.Data
import class Foundation.JSONDecoder
import struct Foundation.UUID
import Hummingbird
import HummingbirdJobs
import HummingbirdRedis
Expand All @@ -23,9 +24,38 @@ import RediStack

/// Redis implementation of job queues
public final class HBRedisJobQueue: HBJobQueue {
public struct JobID: Sendable, CustomStringConvertible {
let id: String

public init() {
self.id = UUID().uuidString
}

/// Initialize JobID from String
/// - Parameter value: string value
public init(_ value: String) {
self.id = value
}

public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.id = try container.decode(String.self)
}

public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.id)
}

var redisKey: RedisKey { .init(self.description) }

/// String description of Identifier
public var description: String { self.id }
}

public enum RedisQueueError: Error, CustomStringConvertible {
case unexpectedRedisKeyType
case jobMissing(JobIdentifier)
case jobMissing(JobID)

public var description: String {
switch self {
Expand All @@ -45,7 +75,7 @@ public final class HBRedisJobQueue: HBJobQueue {
/// - Parameters:
/// - redisConnectionPoolGroup: Redis connection pool group
/// - configuration: configuration
public init(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: Configuration) {
public init(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: Configuration = .init()) {
self.redisConnectionPool = redisConnectionPoolService
self.configuration = configuration
self.isStopped = .init(false)
Expand All @@ -66,19 +96,19 @@ public final class HBRedisJobQueue: HBJobQueue {
/// - Parameters:
/// - job: Job descriptor
/// - Returns: Queued job identifier
@discardableResult public func push(_ job: any HBJob) async throws -> JobIdentifier {
let queuedJob = HBQueuedJob(job)
_ = try await self.set(jobId: queuedJob.id, job: queuedJob.job)
_ = try await self.redisConnectionPool.lpush(queuedJob.id.redisKey, into: self.configuration.queueKey).get()
return queuedJob.id
@discardableResult public func push(_ job: any HBJob) async throws -> JobID {
let id = JobID()
try await self.set(jobId: id, job: job)
_ = try await self.redisConnectionPool.lpush(id.redisKey, into: self.configuration.queueKey).get()
return id
}

/// Flag job is done
///
/// Removes job id from processing queue
/// - Parameters:
/// - jobId: Job id
public func finished(jobId: JobIdentifier) async throws {
public func finished(jobId: JobID) async throws {
_ = try await self.redisConnectionPool.lrem(jobId.description, from: self.configuration.processingQueueKey, count: 0).get()
try await self.delete(jobId: jobId)
}
Expand All @@ -88,7 +118,7 @@ public final class HBRedisJobQueue: HBJobQueue {
/// Removes job id from processing queue, adds to failed queue
/// - Parameters:
/// - jobId: Job id
public func failed(jobId: JobIdentifier, error: Error) async throws {
public func failed(jobId: JobID, error: Error) async throws {
_ = try await self.redisConnectionPool.lrem(jobId.redisKey, from: self.configuration.processingQueueKey, count: 0).get()
_ = try await self.redisConnectionPool.lpush(jobId.redisKey, into: self.configuration.failedQueueKey).get()
}
Expand All @@ -102,7 +132,7 @@ public final class HBRedisJobQueue: HBJobQueue {
/// Pop Job off queue and add to pending queue
/// - Parameter eventLoop: eventLoop to do work on
/// - Returns: queued job
func popFirst() async throws -> HBQueuedJob? {
func popFirst() async throws -> HBQueuedJob<JobID>? {
let pool = self.redisConnectionPool.pool
let key = try await pool.rpoplpush(from: self.configuration.queueKey, to: self.configuration.processingQueueKey).get()
guard !key.isNull else {
Expand All @@ -111,7 +141,7 @@ public final class HBRedisJobQueue: HBJobQueue {
guard let key = String(fromRESP: key) else {
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobIdentifier(fromKey: key)
let identifier = JobID(key)
if let job = try await self.get(jobId: identifier) {
return .init(id: identifier, job: job)
} else {
Expand Down Expand Up @@ -151,27 +181,27 @@ public final class HBRedisJobQueue: HBJobQueue {
guard let key = String(fromRESP: key) else {
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobIdentifier(fromKey: key)
let identifier = JobID(key)
try await self.delete(jobId: identifier)
}
}

func get(jobId: JobIdentifier) async throws -> HBJobInstance? {
func get(jobId: JobID) async throws -> HBJob? {
guard let data = try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get() else {
return nil
}
do {
return try JSONDecoder().decode(HBJobInstance.self, from: data)
return try JSONDecoder().decode(HBAnyCodableJob.self, from: data).job
} catch {
throw JobQueueError.decodeJobFailed
}
}

func set(jobId: JobIdentifier, job: HBJobInstance) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, toJSON: job).get()
func set(jobId: JobID, job: HBJob) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, toJSON: HBAnyCodableJob(job)).get()
}

func delete(jobId: JobIdentifier) async throws {
func delete(jobId: JobID) async throws {
_ = try await self.redisConnectionPool.delete(jobId.redisKey).get()
}
}
Expand Down Expand Up @@ -199,11 +229,3 @@ extension HBRedisJobQueue {
return .init(queue: self)
}
}

extension JobIdentifier {
var redisKey: RedisKey { .init(self.description) }

init(fromKey key: String) {
self.init(key)
}
}
6 changes: 3 additions & 3 deletions Sources/HummingbirdRedis/Persist+Redis.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public struct HBRedisPersistDriver: HBPersistDriver {
}

/// create new key with value. If key already exist throw `HBPersistError.duplicate` error
public func create<Object: Codable>(key: String, value: Object, expires: Duration?) async throws {
public func create(key: String, value: some Codable, expires: Duration?) async throws {
let expiration: RedisSetCommandExpiration? = expires.map { .seconds(Int($0.components.seconds)) }
let result = try await self.redisConnectionPool.set(.init(key), toJSON: value, onCondition: .keyDoesNotExist, expiration: expiration).get()
switch result {
Expand All @@ -36,8 +36,8 @@ public struct HBRedisPersistDriver: HBPersistDriver {
}

/// set value for key. If value already exists overwrite it
public func set<Object: Codable>(key: String, value: Object, expires: Duration?) async throws {
if let expires = expires {
public func set(key: String, value: some Codable, expires: Duration?) async throws {
if let expires {
let expiration = Int(expires.components.seconds)
return try await self.redisConnectionPool.setex(.init(key), toJSON: value, expirationInSeconds: expiration).get()
} else {
Expand Down

0 comments on commit 5fd9f31

Please sign in to comment.