Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting to Consumer and Producer Client #4027

Merged
merged 6 commits into from
May 27, 2024
Merged

Conversation

udsamani
Copy link
Collaborator

Currently our NATS streaming client does multiple things. On requester node side. it is used as a consumer of log messages coming via NATS and on compute node it acts as write, where it reads logs from appropriate execution engines and puts them as messages on NATS subject.

With this change we go with single responsibility per client and split the client into ConsumerClient and ProducerClient. This makes it easy and each client has single responsibility.

With this change we also add the logic of producer client sending heart beats to consumer client on a heart beat request subject, when a connection is made.

@udsamani udsamani self-assigned this May 22, 2024
@udsamani udsamani requested review from wdbaruni and frrist May 22, 2024 22:17
Copy link

coderabbitai bot commented May 22, 2024

Important

Review Skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Tip

Early Access Features
  • gpt-4o model for chat

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to full the review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@udsamani udsamani changed the title Splitting to Consumer and Producer Client Closes #4025 : Splitting to Consumer and Producer Client May 22, 2024
@udsamani udsamani changed the title Closes #4025 : Splitting to Consumer and Producer Client Splitting to Consumer and Producer Client May 22, 2024
ctxWithCancel, cancel := context.WithCancel(ctx)
pc.heartBeatCancelFunc = cancel

ticker := time.NewTicker(10 * time.Second)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make it customizabel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it should be customizable as well as the heartbeat request timeout window

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wdbaruni I have customized it. But our configs are really all over place. The best place i see to integrate these are

NATSConfig, and then from there you pass it on creation of computer_handler on registering of endpoint. Do you have any other place where this will be well suited to integrate ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree our configs are all over the place for now. They are going through major refactoring and I wouldn't recommend touching them for now. You can put these as constants in proxy package , or as default values in the stream package.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will do that.

@udsamani udsamani force-pushed the 4025-producer-consumer-client branch from 0d87c1c to 138ffb4 Compare May 22, 2024 22:25
pkg/nats/stream/types.go Outdated Show resolved Hide resolved
pkg/nats/stream/producer_client.go Outdated Show resolved Hide resolved
pkg/nats/proxy/compute_handler.go Outdated Show resolved Hide resolved
sc, err := stream.NewClient(stream.ClientParams{Conn: params.Conn})
sc, err := stream.NewConsumerClient(stream.ConsumerClientParams{
Conn: params.Conn,
HeartBeatRequestSub: requesterEndpointPublishSubject(params.Conn.Opts.Name, StreamHeartBeat),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the heartbeat subject be fixed and deterministic across node restarts, or should there be a new and unique subject for each client instance? I don't know the right answer, and there seem value to both, but since the consumer is creating a new SINBOX subject per client, it might make more sense to do the same for the heartbeat subject. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets break down the solutions.

  • Lets say we have a constant subject node.request.node-0.StreamBeat/v1. Now if the requester node goes down and comes up again, (before the timeout of request) the loop will be completed by by requester node still getting a message from compute node and responding with 0 streams active.

  • Lets say we have a subject per client creation node.request.node-0.abx839.StreamBeat/v1. Now if the requester node goes down and comes up again, (before the timeout of request) the loop will not be completed cleanly rather the compute node would drop the stream ids because of request time out.

Having a distinct subject and unique one per client would actually boil down to the fact if we want to differentiate between request timeout and compute node getting a response from request node. There is very thin line of difference on how we handle the 2.

But I do agree we can utilize the SINBOX which is created per client rather then having a new subject. As there is no prominent advantage of having a deterministic subjects across node restarts.

Copy link
Member

@wdbaruni wdbaruni May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point:

  • Having a deterministic subject will allow the compute node to fail faster when the requester restarts (assuming the requester will restart within ~5 seconds)
  • Whereas a random subject means the compute heartbeat will either timeout, or the nats server might return a no active subscriber error. If there was a timeout, we shouldn't have the compute node drop the streamIDs immediately as it might a transient error. If nats server returns no active subscriber, then we can drop the streams immediately, but I doubt nats server will figure that fast enough

Both are valid options and feel free to use whatever you think is right. My concern would be about using Conn.Opts.Name as the subject identifier. I understand we are today using the nodeID as the connection name, but it is an optional field in nats and we might change things in the future if we were to allow multiple nats connections per node for different purposes. So if you were to use a deterministic heartbeat subject, pass the nodeID to the compute proxy to allow it to construct the streaming producer client

CallbackSubjectPrefix = "node.orchestrator"
ManagementSubjectPrefix = "node.management"
ComputeEndpointSubjectPrefix = "node.compute"
RequesterEndpointSubjectPrefix = "node.requester"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we are moving away from the term requester to orchestrator. They mean the same thing, but new code should use orchestrator as much as we can

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. As per the above comment we will just use the SINBOX subject. So getting rid of it cmpletely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh are you planning to use SINBOX subject for both heartbeats and logs? My comment was to use something similar to SINBOX subject creation but dedicated for heartbeats, but using the same SINBOX prefix can be interesting as well :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I actually mean this.

Currently we use this for response subject of logs.

_SINBOX.GwLlKanYanG8WcN0tZOOhh.KK7F9jas

I plan to use something like this for orchestrator.

OrchestratorHeartBeatRequestSub.GwLlKanYanG8WcN0tZOOhh. This should be unique per client.

pkg/nats/proxy/constants.go Outdated Show resolved Hide resolved
pkg/nats/stream/constants.go Outdated Show resolved Hide resolved
Comment on lines +75 to +98
func (pc *ProducerClient) heartBeat(ctx context.Context) {
ctxWithCancel, cancel := context.WithCancel(ctx)
pc.heartBeatCancelFunc = cancel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you are starting the heartbeat with the AddConnDetails context, which is a context tied to the incoming request and is expected to be short lived. Meaning when the request is done, we will be cancelling and thus stopping the heartbeat for all other streams

My recommendation is to start the heartbeat go routine with the producer is created and with using a ctx passed to the constructor. This should simplify things, and shouldn't degrade performance as long as we are not doing heavy work if there are no active streams.

One thing to keep an eye on is if the producer is stopped, such as by cancelling the provided context. Then we need to start rejecting all incoming connections and to close existing ones. Closing a producer should mainly happen during node shutdown.

Feel free to handle producer shutdown in a follow up PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4037

Created an issue for this.

However, do you feel it is just okay to do this until i fix it as part of other PR

func NewProducerClient(params ProducerClientParams) (*ProducerClient, error) {
	nc := &ProducerClient{
		Conn:                               params.Conn,
		ID:                                 params.ID,
		activeStreamInfo:                   make(map[string]map[string]StreamInfo),
		activeConnHeartBeatRequestSubjects: make(map[string]string),
		config:                             params.Config,
	}

	go nc.heartBeat(context.Background())

	return nc, nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, except we pass the context instead of using a background context

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of #4037 please address the comments above regarding a separate Start method.

Comment on lines 85 to 107
case <-ctxWithCancel.Done():
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a debug log message can be helpful here

ctxWithCancel, cancel := context.WithCancel(ctx)
pc.heartBeatCancelFunc = cancel

ticker := time.NewTicker(10 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it should be customizable as well as the heartbeat request timeout window

Comment on lines 107 to 112
pc.mu.Lock()
for c, ids := range results {
log.Info().Msgf("Ids = %s", ids)
pc.activeStreamIds[c] = ids
}
pc.mu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments here:

  1. The heartbeat request should include the streamIds this producer is managing for the specific consumer. This will allow the consumer to take action and fail any non-recent stream that wasn't reported by the producer
  2. The consumer response should include a list of streamIds that are not active in the consumer to allow the producer to close them. The producer should only close non-recent streams in case of race conditions.
  3. This means both consumer and producer should keep track of the create time of each stream, and give some buffer before closing recent streams. For example 10 seconds should be more than enough
  4. The heartbeat shouldn't create new streams such as what is happening here. We shouldn't expose a backdoor as these streams don't have active writers and are useless
  5. When we close a stream during heartbeat consolidation, we don't close the writer right away and only close it when it tries to write something. Maybe we can be more proactive if possible
  6. I am guessing you forgot to remove the info logging statement here :)

pkg/nats/proxy/compute_handler.go Show resolved Hide resolved
@udsamani udsamani force-pushed the 4025-producer-consumer-client branch from 8afa6ea to 77ab87a Compare May 24, 2024 19:07
@udsamani udsamani force-pushed the 4025-producer-consumer-client branch from 77ab87a to 07fdaf0 Compare May 24, 2024 23:49
@udsamani udsamani marked this pull request as ready for review May 24, 2024 23:50
@udsamani udsamani requested a review from wdbaruni May 24, 2024 23:50
@@ -71,7 +71,7 @@ func (suite *ClientTestSuite) TestRequestWithContextCancellation() {
cancel()

// Attempt to make the request
_, err := suite.streamingClient.OpenStream(ctx, subj, payload)
_, err := suite.streamingClient.OpenStream(ctx, subj, "", payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowing blank producer identifiers is scaring me. We should add validations to prevent that as it can be a cause of bugs.

Also can we simplify things and just use the subj as the producer identifier? I understanding splitting them offers greater level of flexibility, but not sure if it is worth the added complexity. This means that producers in a single heartbeat request, will group heartbeats based on source subject. This will handle scenarios if the producer is listening to a wildcard, and if we have multiple producer clients for different stream types in the same producer node

Copy link
Collaborator Author

@udsamani udsamani May 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely understand your concern for blank producer identifiers. Ideally it should be fixed.

However, I don't think subject would make any things better.

Lets take an example. Say we have a Consumer 1 and Producer 1. Tomorrow may be Consumer 1 and Producer 1 are communicating on multiple request subjects like node-Producer1.ExecutionLogRequest/V1 and node-Producer1.ContainerNetworkLogs.

If Producer 1 starts collecting stream ids and grouping them based on request subjects, there is no value which consumer gets from it. (from heart beat request) With heart beating the idea is to determine if the connection between the producer and connection client is alive. To determine liveness, we should just be worried about the connection details.

Let me know if you think otherwise ?

Comment on lines 271 to 279
var nonRecentStreamIds []string
for k, v := range nc.respMap {
if v.producerConnID == request.ProducerConnID &&
time.Since(v.createdAt) > nc.config.StreamCancellationBufferDuration {
nonRecentStreamIds = append(nonRecentStreamIds, k)
}
}

data, err := json.Marshal(ConsumerHeartBeatResponse{NonActiveStreamIds: Difference(nonRecentStreamIds, request.ActiveStreamIds)})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to revisit your choice of data structures to avoid having to loop over all respMap entries to handle each heartbeat

Comment on lines 343 to 344
func Difference(a, b []string) []string {
i, j := 0, 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lo.Difference might be helpful here

Comment on lines 42 to 44
go nc.heartBeat(context.Background())

return nc, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to change NewProducerClient constructor to make it accept a context and pass it here.
e.g. NewProducerClient(ctx context.Context, params ProducerClientParams)

Comment on lines 90 to 93
if len(pc.activeStreamInfo) == 0 && pc.heartBeatCancelFunc != nil {
pc.heartBeatCancelFunc()
pc.heartBeatCancelFunc = nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in previous review, you shouldn't stop the heartbeating even if we have no more streams open

pkg/nats/proxy/compute_handler.go Show resolved Hide resolved
Comment on lines 24 to 25

StreamHeartBeat = "StreamHeartBeat/v1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

nc.respSubLen = len(nc.respSubPrefix)
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.heartBeatRequestSub = fmt.Sprintf("%s.%s", "OrchestratorHeartBeatRequestSub", newInbox)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

treat the stream pkg as a library that is agnostic on how it is being used. Right now the heartbeat subject looks like OrchestratorHeartBeatRequestSub._SINBOX.ieeC7GRB7FMRMayXVtN1YW. Doing something like _HEARTBEAT._SINBOX.ieeC7GRB7FMRMayXVtN1YW can be good enough.

Also prefer to define such strings as constants whenever possible :)

Comment on lines 156 to 160
for _, v := range streamIds {
if v.ID == conn.StreamID {
active = true
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revisit the choice of data structure to avoid looping over all streamIds for each write

Comment on lines 179 to 195
updatedStreams := make(map[string]StreamInfo)
for streamID, streamInfo := range pc.activeStreamInfo[connID] {
if _, found := nonActiveMap[streamID]; !found ||
time.Since(streamInfo.CreatedAt) <= pc.config.StreamCancellationBufferDuration {
updatedStreams[streamID] = streamInfo
}
}

if len(updatedStreams) > 0 {
updates[connID] = updatedStreams
}
}

pc.mu.Lock()
for c, s := range updates {
pc.activeStreamInfo[c] = s
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments explaining what it is going on, what edge cases are you handling, and why we are adding a buffer. The whole PR can benefit from more comments as these are not simple changes :).

Also instead of updating the map to only keep the active streams, it might make things more readable to update the map to delete the inactive streams. This can enable more logic in the future for cleaning up deleted streams, and we should add a debug log line for the streams that are being deleted

@udsamani udsamani requested a review from wdbaruni May 27, 2024 11:10
Copy link
Member

@wdbaruni wdbaruni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Minor comments and you can merge after addressing

pkg/nats/stream/consumer_client.go Outdated Show resolved Hide resolved
pkg/nats/stream/types.go Outdated Show resolved Hide resolved
pkg/nats/stream/producer_client.go Outdated Show resolved Hide resolved
@udsamani udsamani merged commit e3e0145 into main May 27, 2024
12 checks passed
@udsamani udsamani deleted the 4025-producer-consumer-client branch May 27, 2024 13:01
err := json.Unmarshal(msg.Data, streamRequest)
if err != nil {
_ = writer.CloseWithCode(stream.CloseBadRequest,
fmt.Sprintf("error decoding %s: %s", reflect.TypeOf(streamRequest).Name(), err))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a better error message here? This will be hard to find if triggered.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we shouldn't be printing errors, right? They should be going to log?

Copy link
Collaborator Author

@udsamani udsamani May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mis-read it :) If you take a closer look, fmt.Sprintf("error decoding %s: %s", reflect.TypeOf(streamRequest) is within the function call
writer.CloseWithCode. It is closing the stream by passing correct error on the NATS subject which the client then consumes and logs error.

base = 62
nuidSize = 22
inboxPrefix = "_SINBOX."
heartBeatPrefix = "_HEARTBEAT"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these prefixes or suffixes? If the latter, why do they have a leading underscore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are prefixes. Ideally this is done to differentiate between a normal subject in NATS and an internal subject(very similar to reserved).

It is following the same principles which NATS uses in its Request-Reply mechanism.

request := new(HeartBeatRequest)
err := json.Unmarshal(msg.Data, request)
if err != nil {
log.Err(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a more indicative error message would help here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. Will add more indicative error.

}

// Delete all inactive streams with minimal lock contention.
pc.mu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be doing ".lock()" "defer .unlock()"? Or finally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are doing the locking at very tail end of the function, we may be fine here.

Mainly because defer places the function call on a stack to be executed when the surrounding function finishes.

But I agree we can run into a situation where if delete panics, we won't be unlocking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

Copy link
Member

@frrist frrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologizes for the late review. Please address comments in a follow on, or respond in comments if direction is unclear/incorrect.

request := new(HeartBeatRequest)
err := json.Unmarshal(msg.Data, request)
if err != nil {
log.Err(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method call alone will not write a log event (ref). This needs to be one of the below:

  1. log.Err(err).Send()
  2. log.Err(err).Msg("failed to unmarshal heartbeat request")

Preference for 2. with suggestions from David incorporated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know log.Err(err) didn't do anything. Thanks for pointing that out

Comment on lines +277 to +283
log.Err(err)
return
}

err = nc.Conn.Publish(msg.Reply, data)
if err != nil {
log.Err(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above regarding these logs never producing a message.

respScanf string // The scanf template to extract mux token
respMux *nats.Subscription // A single response subscription
respMap map[string]*streamingBucket // Request map for the response msg channels
reqSubMap map[string][]*streamingBucket // Request Subject map which hold a request subject where request was sent for streams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does reqSubMap have locking similar to respMap? Are they sharing a lock?


// Loop through all active stream ids at producer
for subject, producerStreamIds := range activeStreamIDsAtProducer {
consumerBuckets, consumerHasSubject := nc.reqSubMap[subject]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need locking similar to createNewRequestAndSend?

return time.Since(bucket.createdAt) < nc.config.StreamCancellationBufferDuration
})

//If no non recent buckets, means all are active streams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make sure all code comments are formatted correctly, this needs to be prefixed with a space (surprised the linter didn't complain)

var heartBeatResponse ConsumerHeartBeatResponse
err = json.Unmarshal(msg.Data, &heartBeatResponse)
if err != nil {
log.Ctx(ctx).Err(err).Msg("error while parsing heart beat response from NATS streaming consumer client")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Ctx(ctx).Err(err).Msg("error while parsing heart beat response from NATS streaming consumer client")
log.Ctx(ctx).Err(err).Msg("error while parsing heart beat response from NATS streaming consumer client")

nonActiveMap[id] = true
}

if streamInfo, ok := pc.activeStreamInfo[connID]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need to take a lock here again.

Comment on lines +30 to +85
type Request struct {
// ConsumerID is the connection id of the consumer streaming client originating the request.
ConsumerID string `json:"consumerId"`
// StreamId is the id of the stream being created.
StreamID string `json:"streamId"`
// HeartBeatSub is the heart beat subject where the producer client will send its heart beat.
HeartBeatRequestSub string `json:"heartBeatRequestSub"`
// Data represents request of different stream type. For example currently we support Log request
// in that case it would be ExecutionLogRequest
Data []byte `json:"body"`
}

// StreamInfo represents information about the stream.
type StreamInfo struct {
// ID is the identifier of the stream.
ID string
// RequestSub is the subject on which the request for this stream was sent.
RequestSub string
// CreatedAt represents the time the stream was created.
CreatedAt time.Time
}

// StreamProducerClientConfig represents the configuration of NATS based streaming
// client acting as a producer.
type StreamProducerClientConfig struct {
// HeartBeatIntervalDuration represents the duration between two heart beats from the producer client
// to consumer client.
HeartBeatIntervalDuration time.Duration
// HeartBeatRequestTimeout represents the time within which the producer client should receive the
// response from the consumer client.
HeartBeatRequestTimeout time.Duration
// StreamCancellationBufferDuration represents the time interval for which consumer or producer client
// should wait before killing the stream in case of race conditions on heart beats and request origination.
StreamCancellationBufferDuration time.Duration
}

// StreamConsumerClientConfig represents the configuration of NATS based streaming
// client acting as a consumer.
type StreamConsumerClientConfig struct {
StreamCancellationBufferDuration time.Duration
}

// HeartBeatRequest sent by producer client to the consumer client.
type HeartBeatRequest struct {
// ActiveStreamIds is a map of active stream ids on producer client, where key is the RequestSubject, where
// the original request to initiate a streaming connection was sent.
ActiveStreamIds map[string][]string
}

// ConsumerHeartBeatResponse represents a heart beat response from the consumer client.
type ConsumerHeartBeatResponse struct {
// NonActiveStreamIds represents a map, where key is the request subject where consumer sent
// request for opening a stream, and value is the list of streamIDs which should no longer be
// active.
NonActiveStreamIds map[string][]string
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heartbeat is one word - remove camel case (here and everywhere else) s.t. names read Heartbeat similar to other Heartbeat "things" in code base.

streamRequest := new(stream.Request)
err := json.Unmarshal(msg.Data, streamRequest)
if err != nil {
_ = writer.CloseWithCode(stream.CloseBadRequest,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseWithCode returns an error in the event json marshaling fails, or if publishing the data to the subject fails. At a minimum we need to log the error from this method if one is returned. Please address this change here and elsewhere.


type ProducerClient struct {
Conn *nats.Conn
mu sync.RWMutex // Protects access to activeStreamInfo and activeConnHeartBeatRequestSubjects
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the usage of this lock it appears a regular mutex (sync.Mutex) would be sufficient. Is there something I am missing that requires this to be a RWMutex?

@udsamani
Copy link
Collaborator Author

Sure thing! Will create a follow up PR for this for addressing the comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Separate out streaming client into consumer and producer client and add heart beat logic
4 participants