Skip to content

Commit

Permalink
Refactor Producer Results + Better FS2 Error Handling
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman committed Jun 5, 2023
1 parent 01f2353 commit 5ac970e
Show file tree
Hide file tree
Showing 15 changed files with 402 additions and 410 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object LocalstackFS2KinesisProducer {
KinesisClient[F],
StreamNameOrArn
) => F[Either[ShardMapCache.Error, ShardMap]],
callback: Producer.Res[PutRecordsResponse] => F[Unit]
callback: Producer.Result[PutRecordsResponse] => F[Unit]
)(implicit F: Async[F]) {

def withLocalstackConfig(localstackConfig: LocalstackConfig): Builder[F] =
Expand Down Expand Up @@ -139,7 +139,7 @@ object LocalstackFS2KinesisProducer {
Nil,
(client: KinesisClient[F], snoa: StreamNameOrArn) =>
KinesisProducer.getShardMap(client, snoa),
(_: Producer.Res[PutRecordsResponse]) => F.unit
(_: Producer.Result[PutRecordsResponse]) => F.unit
)

@annotation.unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ final class FS2KinesisProducer[F[_]] private[kinesis4cats] (
override protected val channel: Channel[F, Record],
override protected val underlying: KinesisProducer[F]
)(
override protected val callback: Producer.Res[PutRecordsResponse] => F[Unit]
override protected val callback: Producer.Result[PutRecordsResponse] => F[
Unit
]
)(implicit
F: Async[F]
) extends FS2Producer[F, PutRecordsRequest, PutRecordsResponse]
Expand All @@ -63,7 +65,7 @@ object FS2KinesisProducer {
clientResource: Resource[F, KinesisClient[F]],
encoders: KinesisProducer.LogEncoders,
logger: StructuredLogger[F],
callback: Producer.Res[PutRecordsResponse] => F[Unit]
callback: Producer.Result[PutRecordsResponse] => F[Unit]
)(implicit F: Async[F]) {
def withConfig(config: FS2Producer.Config[F]): Builder[F] = copy(
config = config
Expand Down Expand Up @@ -108,7 +110,7 @@ object FS2KinesisProducer {
KinesisClient.Builder.default.build,
KinesisProducer.LogEncoders.show,
Slf4jLogger.getLogger,
(_: Producer.Res[PutRecordsResponse]) => F.unit
(_: Producer.Result[PutRecordsResponse]) => F.unit
)

@annotation.unused
Expand Down
Loading

0 comments on commit 5ac970e

Please sign in to comment.