Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes required by Jobs refactor #13

Merged
merged 7 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let package = Package(
.library(name: "HummingbirdJobsRedis", targets: ["HummingbirdJobsRedis"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x"),
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"),
],
targets: [
Expand All @@ -28,13 +28,13 @@ let package = Package(
.testTarget(name: "HummingbirdRedisTests", dependencies: [
.byName(name: "HummingbirdRedis"),
.product(name: "Hummingbird", package: "hummingbird"),
.product(name: "HummingbirdXCT", package: "hummingbird"),
.product(name: "HummingbirdTesting", package: "hummingbird"),
]),
.testTarget(name: "HummingbirdJobsRedisTests", dependencies: [
.byName(name: "HummingbirdJobsRedis"),
.product(name: "Hummingbird", package: "hummingbird"),
.product(name: "HummingbirdJobs", package: "hummingbird"),
.product(name: "HummingbirdXCT", package: "hummingbird"),
.product(name: "HummingbirdTesting", package: "hummingbird"),
]),
]
)
2 changes: 1 addition & 1 deletion Sources/HummingbirdJobsRedis/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import NIOCore
import RediStack

extension HBRedisJobQueue {
extension HBRedisQueue {
/// what to do with failed/processing jobs from last time queue was handled
public enum JobInitialization: Sendable {
case doNothing
Expand Down
68 changes: 42 additions & 26 deletions Sources/HummingbirdJobsRedis/RedisJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import NIOCore
import RediStack

/// Redis implementation of job queues
public final class HBRedisJobQueue: HBJobQueue {
/// Redis implementation of job queue driver
public final class HBRedisQueue: HBJobQueueDriver {
public struct JobID: Sendable, CustomStringConvertible {
let id: String

Expand Down Expand Up @@ -73,7 +73,7 @@

/// Initialize redis job queue
/// - Parameters:
/// - redisConnectionPoolGroup: Redis connection pool group
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public init(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: Configuration = .init()) {
self.redisConnectionPool = redisConnectionPoolService
Expand All @@ -92,15 +92,15 @@
try await self.initQueue(queueKey: self.configuration.failedQueueKey, onInit: self.configuration.failedJobsInitialization)
}

/// Push Job onto queue
/// Push job data onto queue
/// - Parameters:
/// - job: Job descriptor
/// - Returns: Queued job identifier
@discardableResult public func push(_ job: any HBJob) async throws -> JobID {
let id = JobID()
try await self.set(jobId: id, job: job)
_ = try await self.redisConnectionPool.lpush(id.redisKey, into: self.configuration.queueKey).get()
return id
/// - data: Job data
/// - Returns: Queued job
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
let jobInstanceID = JobID()
try await self.set(jobId: jobInstanceID, buffer: buffer)
_ = try await self.redisConnectionPool.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get()
return jobInstanceID
}

/// Flag job is done
Expand Down Expand Up @@ -142,8 +142,8 @@
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobID(key)
if let job = try await self.get(jobId: identifier) {
return .init(id: identifier, job: job)
if let buffer = try await self.get(jobId: identifier) {
return .init(id: identifier, jobBuffer: buffer)
} else {
throw RedisQueueError.jobMissing(identifier)
}
Expand Down Expand Up @@ -186,19 +186,12 @@
}
}

func get(jobId: JobID) async throws -> HBJob? {
guard let data = try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get() else {
return nil
}
do {
return try JSONDecoder().decode(HBAnyCodableJob.self, from: data).job
} catch {
throw JobQueueError.decodeJobFailed
}
func get(jobId: JobID) async throws -> ByteBuffer? {
return try await self.redisConnectionPool.get(jobId.redisKey).get().byteBuffer
}

func set(jobId: JobID, job: HBJob) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, toJSON: HBAnyCodableJob(job)).get()
func set(jobId: JobID, buffer: ByteBuffer) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, to: buffer).get()
}

func delete(jobId: JobID) async throws {
Expand All @@ -207,9 +200,9 @@
}

/// extend HBRedisJobQueue to conform to AsyncSequence
extension HBRedisJobQueue {
extension HBRedisQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: HBRedisJobQueue
let queue: HBRedisQueue

public func next() async throws -> Element? {
while true {
Expand All @@ -229,3 +222,26 @@
return .init(queue: self)
}
}

extension HBJobQueueDriver where Self == HBRedisQueue {
/// Return Redis driver for Job Queue
/// - Parameters:
/// - redisConnectionPoolService: Redis connection pool
/// - configuration: configuration
public static func redis(_ redisConnectionPoolService: HBRedisConnectionPoolService, configuration: HBRedisQueue.Configuration = .init()) -> Self {
.init(redisConnectionPoolService, configuration: configuration)
}
}

// Extend ByteBuffer so that is conforms to `RESPValueConvertible`. Really not sure why
// this isnt available already
extension ByteBuffer: RESPValueConvertible {
public init?(fromRESP value: RESPValue) {
guard let buffer = value.byteBuffer else { return nil }
self = buffer
}

Check warning on line 242 in Sources/HummingbirdJobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdJobsRedis/RedisJobQueue.swift#L239-L242

Added lines #L239 - L242 were not covered by tests

public func convertedToRESPValue() -> RESPValue {
return .bulkString(self)
}
}
Loading
Loading