Skip to content

Commit

Permalink
Refactor RedisConnection and RedisPipeline into protocols.
Browse files Browse the repository at this point in the history
Motivation:

The goal of this commit is to make it easier for library users to implement their own types for creating connections and pipelines without losing all of the convenience command extensions.

This also splits executing commands from the concept of a "connection" to make it more swifty in `RedisPipeline`.
  • Loading branch information
Mordil committed Mar 19, 2019
1 parent 6825451 commit 6b30127
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 104 deletions.
19 changes: 9 additions & 10 deletions Sources/NIORedis/Commands/BasicCommands.swift
@@ -1,21 +1,21 @@
import Foundation
import NIO

extension RedisConnection {
extension RedisCommandExecutor {
/// Select the Redis logical database having the specified zero-based numeric index.
/// New connections always use the database 0.
/// New connections always use the database `0`.
///
/// [https://redis.io/commands/select](https://redis.io/commands/select)
public func select(_ id: Int) -> EventLoopFuture<Void> {
return command("SELECT", arguments: [RESPValue(bulk: id.description)])
public func select(database id: Int) -> EventLoopFuture<Void> {
return send(command: "SELECT", with: [id.description])
.map { _ in return () }
}

/// Request for authentication in a password-protected Redis server.
///
/// [https://redis.io/commands/auth](https://redis.io/commands/auth)
public func authorize(with password: String) -> EventLoopFuture<Void> {
return command("AUTH", arguments: [RESPValue(bulk: password)])
return send(command: "AUTH", with: [password])
.map { _ in return () }
}

Expand All @@ -24,8 +24,7 @@ extension RedisConnection {
/// [https://redis.io/commands/del](https://redis.io/commands/del)
/// - Returns: A future number of keys that were removed.
public func delete(_ keys: String...) -> EventLoopFuture<Int> {
let keyArgs = keys.map { RESPValue(bulk: $0) }
return command("DEL", arguments: keyArgs)
return send(command: "DEL", with: keys)
.flatMapThrowing { res in
guard let count = res.int else {
throw RedisError(identifier: "delete", reason: "Unexpected response: \(res)")
Expand All @@ -42,7 +41,7 @@ extension RedisConnection {
/// - after: The lifetime (in seconds) the key will expirate at.
/// - Returns: A future bool indicating if the expiration was set or not.
public func expire(_ key: String, after deadline: Int) -> EventLoopFuture<Bool> {
return command("EXPIRE", arguments: [RESPValue(bulk: key), RESPValue(bulk: deadline.description)])
return send(command: "EXPIRE", with: [key, deadline.description])
.flatMapThrowing { res in
guard let value = res.int else {
throw RedisError(identifier: "expire", reason: "Unexpected response: \(res)")
Expand All @@ -57,7 +56,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/get](https://redis.io/commands/get)
public func get(_ key: String) -> EventLoopFuture<String?> {
return command("GET", arguments: [RESPValue(bulk: key)])
return send(command: "GET", with: [key])
.map { return $0.string }
}

Expand All @@ -67,7 +66,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/set](https://redis.io/commands/set)
public func set(_ key: String, to value: String) -> EventLoopFuture<Void> {
return command("SET", arguments: [RESPValue(bulk: key), RESPValue(bulk: value)])
return send(command: "SET", with: [key, value])
.map { _ in return () }
}

Expand Down
22 changes: 11 additions & 11 deletions Sources/NIORedis/Commands/SetCommands.swift
@@ -1,7 +1,7 @@
import Foundation
import NIO

extension RedisConnection {
extension RedisCommandExecutor {
/// Returns the all of the elements of the set stored at key.
///
/// Ordering of results are stable between multiple calls of this method to the same set.
Expand All @@ -10,7 +10,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/smembers](https://redis.io/commands/smembers)
public func smembers(_ key: String) -> EventLoopFuture<RESPValue> {
return command("SMEMBERS", arguments: [RESPValue(bulk: key)])
return send(command: "SMEMBERS", with: [key])
}

/// Checks if the provided item is included in the set stored at key.
Expand All @@ -29,7 +29,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/scard](https://redis.io/commands/scard)
public func scard(_ key: String) -> EventLoopFuture<Int> {
return command("SCARD", arguments: [RESPValue(bulk: key)])
return send(command: "SCARD", with: [key])
.flatMapThrowing {
guard let count = $0.int else { throw RedisError.respConversion(to: Int.self) }
return count
Expand Down Expand Up @@ -68,7 +68,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/spop](https://redis.io/commands/spop)
public func spop(_ key: String) -> EventLoopFuture<RESPValue> {
return command("SPOP", arguments: [RESPValue(bulk: key)])
return send(command: "SPOP", with: [key])
}

/// Randomly selects elements from the set stored at string, up to the `count` provided.
Expand All @@ -82,14 +82,14 @@ extension RedisConnection {
public func srandmember(_ key: String, max count: Int = 1) -> EventLoopFuture<RESPValue> {
assert(count != 0, "A count of zero is a noop for selecting a random element.")

return command("SRANDMEMBER", arguments: [RESPValue(bulk: key), RESPValue(bulk: count.description)])
return send(command: "SRANDMEMBER", with: [key, count.description])
}

/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
///
/// [https://redis.io/commands/sdiff](https://redis.io/commands/sdiff)
public func sdiff(_ keys: String...) -> EventLoopFuture<[RESPValue]> {
return command("SDIFF", arguments: keys.map(RESPValue.init(bulk:)))
return send(command: "SDIFF", with: keys)
.flatMapThrowing {
guard let elements = $0.array else { throw RedisError.respConversion(to: Array<RESPValue>.self) }
return elements
Expand All @@ -102,7 +102,7 @@ extension RedisConnection {
/// [https://redis.io/commands/sdiffstore](https://redis.io/commands/sdiffstore)
/// - Important: If the `destination` key already exists, it is overwritten.
public func sdiffstore(destination dest: String, _ keys: String...) -> EventLoopFuture<Int> {
return command("SDIFFSTORE", arguments: [RESPValue(bulk: dest)] + keys.map(RESPValue.init(bulk:)))
return send(command: "SDIFFSTORE", with: [dest] + keys)
.flatMapThrowing {
guard let count = $0.int else { throw RedisError.respConversion(to: Int.self) }
return count
Expand All @@ -113,7 +113,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/sinter](https://redis.io/commands/sinter)
public func sinter(_ keys: String...) -> EventLoopFuture<[RESPValue]> {
return command("SINTER", arguments: keys.map(RESPValue.init(bulk:)))
return send(command: "SINTER", with: keys)
.flatMapThrowing {
guard let elements = $0.array else { throw RedisError.respConversion(to: Array<RESPValue>.self) }
return elements
Expand All @@ -126,7 +126,7 @@ extension RedisConnection {
/// [https://redis.io/commands/sinterstore](https://redis.io/commands/sinterstore)
/// - Important: If the `destination` key already exists, it is overwritten.
public func sinterstore(destination dest: String, _ keys: String...) -> EventLoopFuture<Int> {
return command("SINTERSTORE", arguments: [RESPValue(bulk: dest)] + keys.map(RESPValue.init(bulk:)))
return send(command: "SINTERSTORE", with: [dest] + keys)
.flatMapThrowing {
guard let count = $0.int else { throw RedisError.respConversion(to: Int.self) }
return count
Expand All @@ -149,7 +149,7 @@ extension RedisConnection {
///
/// [https://redis.io/commands/sunion](https://redis.io/commands/sunion)
public func sunion(_ keys: String...) -> EventLoopFuture<[RESPValue]> {
return command("SUNION", arguments: keys.map(RESPValue.init(bulk:)))
return send(command: "SUNION", with: keys)
.flatMapThrowing {
guard let elements = $0.array else { throw RedisError.respConversion(to: Array<RESPValue>.self) }
return elements
Expand All @@ -162,7 +162,7 @@ extension RedisConnection {
/// [https://redis.io/commands/sunionstore](https://redis.io/commands/sunionstore)
/// - Important: If the `destination` key already exists, it is overwritten.
public func sunionstore(destination dest: String, _ keys: String...) -> EventLoopFuture<Int> {
return command("SUNIONSTORE", arguments: [RESPValue(bulk: dest)] + keys.map(RESPValue.init(bulk:)))
return send(command: "SUNIONSTORE", with: [dest] + keys)
.flatMapThrowing {
guard let count = $0.int else { throw RedisError.respConversion(to: Int.self) }
return count
Expand Down
89 changes: 59 additions & 30 deletions Sources/NIORedis/RedisConnection.swift
@@ -1,17 +1,62 @@
import NIO
import NIOConcurrencyHelpers

/// A connection to a Redis database instance, with the ability to send and receive commands.
/// An object capable of sending commands and receiving responses.
///
/// let result = connection.send(command: "GET", arguments: ["my_key"]
/// let executor = ...
/// let result = executor.send(command: "GET", arguments: ["my_key"]
/// // result == EventLoopFuture<RESPValue>
///
/// See [https://redis.io/commands](https://redis.io/commands)
public final class RedisConnection {
public protocol RedisCommandExecutor {
/// The `EventLoop` that this executor operates on.
var eventLoop: EventLoop { get }

/// Sends the desired command with the specified arguments.
/// - Parameters:
/// - command: The command to execute.
/// - arguments: The arguments, if any, to be sent with the command.
/// - Returns: An `EventLoopFuture` that will resolve with the Redis command response.
func send(command: String, with arguments: [RESPValueConvertible]) -> EventLoopFuture<RESPValue>
}

extension RedisCommandExecutor {
/// Sends the desired command without arguments.
/// - Parameter command: The command keyword to execute.
/// - Returns: An `EventLoopFuture` that will resolve with the Redis command response.
func send(command: String) -> EventLoopFuture<RESPValue> {
return self.send(command: command, with: [])
}
}

/// An individual connection to a Redis database instance for executing commands or building `RedisPipeline`s.
///
/// See `RedisCommandExecutor`.
public protocol RedisConnection: AnyObject, RedisCommandExecutor {
/// The `Channel` this connection is associated with.
var channel: Channel { get }
/// Has the connection been closed?
var isClosed: Bool { get }

/// Creates a `RedisPipeline` for executing a batch of commands.
func makePipeline() -> RedisPipeline

/// Closes the connection to Redis.
/// - Returns: An `EventLoopFuture` that resolves when the connection has been closed.
@discardableResult
func close() -> EventLoopFuture<Void>
}

extension RedisConnection {
public var eventLoop: EventLoop { return self.channel.eventLoop }
}

/// A basic `RedisConnection`.
public final class NIORedisConnection: RedisConnection {
/// See `RedisConnection.channel`.
public let channel: Channel

/// Has the connection been closed?
/// See `RedisConnection.isClosed`.
public var isClosed: Bool { return _isClosed.load() }
private var _isClosed = Atomic<Bool>(value: false)

Expand All @@ -24,8 +69,7 @@ public final class RedisConnection {
self.channel = channel
}

/// Closes the connection to Redis.
/// - Returns: An `EventLoopFuture` that resolves when the connection has been closed.
/// See `RedisConnection.close()`.
@discardableResult
public func close() -> EventLoopFuture<Void> {
guard !_isClosed.exchange(with: true) else { return channel.eventLoop.makeSucceededFuture(()) }
Expand All @@ -38,40 +82,25 @@ public final class RedisConnection {
}
}

/// Sends the desired command with the specified arguments.
/// - Parameters:
/// - command: The command to execute.
/// - arguments: The arguments to be sent with the command.
/// - Returns: An `EventLoopFuture` that will resolve with the Redis command response.
public func send(command: String, with arguments: [RESPValueConvertible] = []) -> EventLoopFuture<RESPValue> {
let args = arguments.map { $0.convertedToRESPValue() }
return self.command(command, arguments: args)
/// See `RedisConnection.makePipeline()`.
public func makePipeline() -> RedisPipeline {
return NIORedisPipeline(channel: channel)
}

/// Invokes a command against Redis with the provided arguments.
/// - Important: Arguments should be stored as `.bulkString`.
/// - Parameters:
/// - command: The command to execute.
/// - arguments: The arguments to be sent with the command.
/// - Returns: An `EventLoopFuture` that will resolve with the Redis command response.
public func command(_ command: String, arguments: [RESPValue] = []) -> EventLoopFuture<RESPValue> {
guard !_isClosed.load() else {
return channel.eventLoop.makeFailedFuture(RedisError.connectionClosed)
}
/// See `RedisCommandExecutor.send(command:with:)`.
public func send(command: String, with arguments: [RESPValueConvertible] = []) -> EventLoopFuture<RESPValue> {
guard !_isClosed.load() else { return channel.eventLoop.makeFailedFuture(RedisError.connectionClosed) }

let args = arguments.map { $0.convertedToRESPValue() }

let promise = channel.eventLoop.makePromise(of: RESPValue.self)
let context = RedisCommandContext(
command: .array([RESPValue(bulk: command)] + arguments),
command: .array([RESPValue(bulk: command)] + args),
promise: promise
)

_ = channel.writeAndFlush(context)

return promise.futureResult
}

/// Creates a `RedisPipeline` for executing a batch of commands.
public func makePipeline() -> RedisPipeline {
return .init(channel: channel)
}
}
4 changes: 2 additions & 2 deletions Sources/NIORedis/RedisDriver.swift
Expand Up @@ -51,11 +51,11 @@ public final class RedisDriver {
hostname: String = "localhost",
port: Int = 6379,
password: String? = nil
) -> EventLoopFuture<RedisConnection> {
) -> EventLoopFuture<NIORedisConnection> {
let bootstrap = ClientBootstrap.makeForRedis(using: eventLoopGroup)

return bootstrap.connect(host: hostname, port: port)
.map { return RedisConnection(channel: $0) }
.map { return NIORedisConnection(channel: $0) }
.flatMap { connection in
guard let pw = password else {
return self.eventLoopGroup.next().makeSucceededFuture(connection)
Expand Down

0 comments on commit 6b30127

Please sign in to comment.