Skip to content

Commit

Permalink
Merge pull request #105 from buntec/fix/pub-sub
Browse files Browse the repository at this point in the history
Fix/pub sub
  • Loading branch information
armanbilge committed Nov 22, 2023
2 parents 0d0c2d6 + 1a4916f commit 24314e1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,10 @@ object RedisConnection{
private def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] =
Stream(Stream.emits(nel.toList).evalMap(identity)).covary[F].parJoinUnbounded.take(1).compile.lastOrError

private class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] {
private[rediculous] case class TimeoutConnection[F[_]: Temporal](rC: RedisConnection[F], duration: Duration) extends RedisConnection[F] {

def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] =
rC.runRequest(inputs, key).timeout(duration)

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ object RedisPubSub {
* connections to all nodes.
**/
def fromConnection[F[_]: Concurrent](connection: RedisConnection[F], maxBytes: Int = 8096, clusterBroadcast: Boolean = false): Resource[F, RedisPubSub[F]] = connection match {
case RedisConnection.TimeoutConnection(conn, _) => fromConnection(conn, maxBytes, clusterBroadcast)
case RedisConnection.Queued(_, sockets) =>
sockets.flatMap{managed =>
val messagesR = Concurrent[F].ref(Map[String, RedisPubSub.PubSubMessage => F[Unit]]())
Expand Down Expand Up @@ -284,4 +285,4 @@ object RedisPubSub {
Array(Some(List(BulkString(Some(message)), BulkString(Some(foo)), BulkString(Some(hi there!)))))
Array(Some(List(BulkString(Some(punsubscribe)), BulkString(Some(foos)), Integer(1))))
*/
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.chrisdavenport.rediculous

import cats._
import cats.implicits._
import cats.syntax.all._
import scodec.bits.ByteVector

trait RedisResult[+A]{
Expand Down Expand Up @@ -125,15 +125,20 @@ object RedisResult extends RedisResultLowPriority{
implicit def kv[K: RedisResult, V: RedisResult]: RedisResult[List[(K, V)]] =
new RedisResult[List[(K, V)]] {
def decode(resp: Resp): Either[Resp,List[(K, V)]] = {
def pairs(l: List[Resp]): Either[Resp, List[(K, V)]] = l match {
case Nil => Nil.asRight
case _ :: Nil => Left(resp)
case x1 :: x2 :: xs => for {
k <- RedisResult[K].decode(x1)
v <- RedisResult[V].decode(x2)
kvs <- pairs(xs)
} yield (k, v) :: kvs
}

def pairs(l: List[Resp]): Either[Resp,List[(K, V)]] =
Monad[Either[Resp, *]].tailRecM[(List[Resp], List[(K, V)]), List[(K, V)]]((l, Nil)){
case (l, acc) =>
l match {
case Nil => Right(Right(acc))
case _ :: Nil => Left(resp)
case x1 :: x2 :: xs => for {
k <- RedisResult[K].decode(x1)
v <- RedisResult[V].decode(x2)
} yield Left((xs, (k, v) :: acc))
}
}.map(_.reverse)

resp match {
case Resp.Array(Some(rs)) => pairs(rs)
case otherwise => Left(otherwise)
Expand Down

0 comments on commit 24314e1

Please sign in to comment.