Skip to content

Adding Redis to Cap#817

Merged
xiangxiren merged 6 commits intodotnetcore:masterfrom
MahmoudSamir101:master
Apr 29, 2021
Merged

Adding Redis to Cap#817
xiangxiren merged 6 commits intodotnetcore:masterfrom
MahmoudSamir101:master

Conversation

@MahmoudSamir101
Copy link
Member

Hello their,

this is my PR for adding redis to Cap project, the PR provides pub/sub using redis channels, and later on, we can add redis streams support.

the PR has a sample for testing, you may need docker to run redis image, as there is a post build command that will run redis image container

let me know if it helps, your observations most welcome.

Thanks.

@yang-xiaodong
Copy link
Member

Hello , Thanks for your PR.

Do as you said, I noticed that Redis 5.0 streams support acknowledgment of tasks as they are completed by a group. It seems to be the feature we need, so we need to discuss whether to complete this work using streams now.

Useful link: https://redis.io/topics/streams-intro

What do you think? @xiangxiren

@xiangxiren
Copy link
Contributor

xiangxiren commented Apr 7, 2021

Hi,
As far as I know, Stream looks great and we should use it.

@yang-xiaodong
Copy link
Member

Hello, @MahmoudSamir101

What do you think about using Redis Stream to implement publish and subscribe?

@MahmoudSamir101
Copy link
Member Author

MahmoudSamir101 commented Apr 7, 2021

@yang-xiaodong, yes redis streams are a good feature to be added to CAP, we can plan to implement this feature with this PR or another one.

As I see, streams and channels can be used interchangeably based on the use case, many cases can benefit from the pub/sub nature of channels without the need to append these messages indefinitely to the storage or even go forward /backward with messages like streams.

Publishing messages to redis as a distributed cache through CAP is a use case that can benefit from channels, these messages are used to sync the state of cached data with clients who are concern about updating their cache in a pub/sub manner without the need to persist the messages on the storage.

Another use case can be, using redis as a backplane for signalr notifications in a web farm deployment, typically a web application that propagates real-time notifications through signalr connections, these notifications should be sent after business transactions committed.

the notifications will be published through CAP to redis channels, CAP consumers (multiple deployed instances of the web application) consume these messages and broadcasting them through signalr HubLifetimeManager to connected clients.

Anyway, if you have concerns about the nature of channels' work and they will not fit with the CAP project, let's go for streams implementation with CAP.

@yang-xiaodong
Copy link
Member

Hi @MahmoudSamir101 ,

Let me explain the goal of CAP

CAP is a solution to deal with transaction consistency in distributed scenarios. We need to try our best to ensure the reliability of messages in most unreliable scenarios, so we adopt the outbox pattern and use local transactions on the producer side to achieve strong consistency. This allows us to first ensure that the message on the producer side will not be lost due to failure to reach the MQ or exceptions in the delivery process.

On the consumer side, we need to use the Ack feature provided by the message queue to ensure that the message is stored in the inbox, so as to achieve "at least once". At the same time, we provide features similar to Kafka consumer groups to implement load balancing or broadcasting of messages on the consumer side.

Prior to this, we did not implement Redis Transport because Redis did not have the features we needed and could not achieve our goals. Now when I learned about the Streams feature, I found that it provides the functionality we need and is able to do this and I investigated other EDA architectures, for example, Dapr also use Redis Streams to implement publish and subscribe.

Therefore, I think Streams is more suitable for CAP.

At the same time I want to invite you to join us, do you have interest to join us as a maintainer?

@MahmoudSamir101
Copy link
Member Author

@yang-xiaodong, I got your point, and I'm interested to have a look at implementing Redis streams with CAP.

I would like to join the CAP community for contribution and maintenance for sure.

@yang-xiaodong
Copy link
Member

Hello @MahmoudSamir101, Any updates about this PR?

@MahmoudSamir101
Copy link
Member Author

MahmoudSamir101 commented Apr 19, 2021

@yang-xiaodong , yes I'm planning to push my updates soon, may be next Thursday or before that.

@MahmoudSamir101
Copy link
Member Author

@yang-xiaodong could you please check the updates.

Some points to be considered:

  • StackExchange.Redis doesn't support blocking commands which are used to enable redis to push new deliverable messages to consumer groups, we do polling as a fallback strategy.

  • Pending messages are messages delivered to a consumer group without ack back from the consumer, we pull pending messages only when the CAP server starts, when new messages delivered to a consumer and it fails to ack them, redis marked them as a pending message and we pull them again with the next start of CAP server, it was designed like that to limit the polling of messages while the CAP server is running for new messages only.

Let me know your thoughts about that.

Copy link
Contributor

@xiangxiren xiangxiren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there kafka here?
image
image

using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using StackExchange.Redis;
using System;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to our specifications, the System namespace needs to be placed at the first

public static CapOptions UseRedis(this CapOptions options, Action<CapRedisOptions> configure)
{
if (configure is null)
throw new ArgumentNullException(nameof(configure));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add directly after the if line, or use {} to include on a new line

@@ -0,0 +1,71 @@
// Copyright (c) .NET Core Community. All rights reserved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the file to be deleted?

	<ItemGroup>
	  <Compile Remove="IProcessingServer.PollPendingTopic.cs" />
	</ItemGroup>

using System.Threading.Tasks;
using System.Reflection;

namespace DotNetCore.CAP.Redis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be namespace Microsoft.Extensions.DependencyInjection

Comment on lines +5 to +10
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place System namespace at first

Comment on lines +14 to +16
<ItemGroup>
<Compile Remove="IProcessingServer.PollPendingTopic.cs" />
</ItemGroup>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unexpected file?

Comment on lines +9 to +13
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Place System namespace at first

using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace DotNetCore.CAP

Comment on lines +103 to +107
catch (Exception ex)
{
logger.LogError($"Redis error when trying read consumer group {consumerGroup}", ex);
return (false, Array.Empty<RedisStream>());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my test, we will encounter two exception here.

  1. When the Redis server restart, the first exception will be encountered

SocketFailure (ReadSocketError/ConnectionReset, last-recv: 5) on 192.168.3.57:6379/Interactive, Idle/Faulted, last:
XREADGROUP

  1. After the redis server restarts, the consumer group will be lost, The exception information is as follows

NOGROUP No such key 'test-message' or consumer group 'Samples.Redis.SqlServer.v1' in XREADGROUP with GROUP option

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: If transport throw BrokerConnectionException exception, the TransportCheckProcessor will restart consumer every 30 seconds

@yang-xiaodong
Copy link
Member

yang-xiaodong commented Apr 26, 2021

Hello @MahmoudSamir101 , Thanks for your PR.

During my review process, I think we need to consider several points

  1. DotNetCore.CAP.Redis may not be able to accurately distinguish between Transport and Storage, do we consider changing the name to DotNetCore.CAP.RedisStreams or other names that are easy to distinguish.

  2. When the Redis server restarts, we need to consider how to re-register the consumer group, otherwise we will encounter NOGROUP exceptions. This may be because I did not enable AOF, but we need to consider it

  3. When the storage fails, the consumer uses Reject to make the Transport resend the message. How do we do this now?

@MahmoudSamir101
Copy link
Member Author

@yang-xiaodong @xiangxiren thanks for your review, kindly find my response inline

DotNetCore.CAP.Redis may not be able to accurately distinguish between Transport and Storage, do we consider changing the name to DotNetCore.CAP.RedisStreams or other names that are easy to distinguish.

renamed to RedisStreams

When the Redis server restarts, we need to consider how to re-register the consumer group, otherwise we will encounter NOGROUP exceptions. This may be because I did not enable AOF, but we need to consider it

OK, good point, now while reading a consumer group, if it does not exist or the stream was not created, we are creating both again.

When the storage fails, the consumer uses Reject to make the Transport resend the message. How do we do this now?

Let me explain why we might not need that. Messages are not marked as ack until the CAP server invokes the client commit method while reading from a consumer group; therefore, if the storage fails, there will be no invoke for client commit to ack the message; instead, the message is marked as pending on the consumer group, and pending messages are consumed again with the next START of Cap server.

I believe we should double-check this assumption. The CAP server consumed pending messages again with the next start.

@xiangxiren xiangxiren merged commit ef1d1fa into dotnetcore:master Apr 29, 2021
@MahmoudSamir101
Copy link
Member Author

@yang-xiaodong Will you want me to change the docs ?

@yang-xiaodong
Copy link
Member

@MahmoudSamir101 Yes, that will be nice

@MahmoudSamir101
Copy link
Member Author

@yang-xiaodong, I created a PR #862 for documentation updates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants