Skip to content

Commit

Permalink
Replace Data with ByteBuffer in JobsQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Mar 4, 2024
1 parent 44f99d2 commit a6dc03f
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions Sources/HummingbirdJobsRedis/RedisJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public final class HBRedisQueue: HBJobQueueDriver {
/// - Parameters:
/// - data: Job data
/// - Returns: Queued job
@discardableResult public func push(data: Data) async throws -> JobID {
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
let jobInstanceID = JobID()
try await self.set(jobId: jobInstanceID, data: data)
try await self.set(jobId: jobInstanceID, buffer: buffer)
_ = try await self.redisConnectionPool.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get()
return jobInstanceID
}
Expand Down Expand Up @@ -142,8 +142,8 @@ public final class HBRedisQueue: HBJobQueueDriver {
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobID(key)
if let data = try await self.get(jobId: identifier) {
return .init(id: identifier, jobData: data)
if let buffer = try await self.get(jobId: identifier) {
return .init(id: identifier, jobBuffer: buffer)
} else {
throw RedisQueueError.jobMissing(identifier)
}
Expand Down Expand Up @@ -186,12 +186,12 @@ public final class HBRedisQueue: HBJobQueueDriver {
}
}

func get(jobId: JobID) async throws -> Data? {
return try await self.redisConnectionPool.get(jobId.redisKey, as: Data.self).get()
func get(jobId: JobID) async throws -> ByteBuffer? {
return try await self.redisConnectionPool.get(jobId.redisKey).get().byteBuffer
}

func set(jobId: JobID, data: Data) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, to: data).get()
func set(jobId: JobID, buffer: ByteBuffer) async throws {
return try await self.redisConnectionPool.set(jobId.redisKey, to: buffer).get()
}

func delete(jobId: JobID) async throws {
Expand Down Expand Up @@ -232,3 +232,16 @@ extension HBJobQueueDriver where Self == HBRedisQueue {
.init(redisConnectionPoolService, configuration: configuration)
}
}

// Extend ByteBuffer so that is conforms to `RESPValueConvertible`. Really not sure why
// this isnt available already
extension ByteBuffer: RESPValueConvertible {
public init?(fromRESP value: RESPValue) {
guard let buffer = value.byteBuffer else { return nil }
self = buffer
}

Check warning on line 242 in Sources/HummingbirdJobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdJobsRedis/RedisJobQueue.swift#L239-L242

Added lines #L239 - L242 were not covered by tests

public func convertedToRESPValue() -> RESPValue {
return .bulkString(self)
}
}

0 comments on commit a6dc03f

Please sign in to comment.