Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRV-348: Streaming api 2 #127

Merged
merged 22 commits into from
Mar 3, 2021
Merged

DRV-348: Streaming api 2 #127

merged 22 commits into from
Mar 3, 2021

Conversation

vadimLF
Copy link
Contributor

@vadimLF vadimLF commented Jan 28, 2021

Streaming api

import(
    f "github.com/fauna/faunadb-go/v3/faunadb"
    "log"
)
dbClient = f.NewFaunaClient("secret")
value, err := dbClient.Query(
	f.Create(f.Collection("collectionName"), f.Obj{"data": f.Obj{"name": "Magic"}}))
if err != nil {
	panic(err)
}
var ref f.RefV
value.At(f.ObjKey("ref")).Get(&ref)
var sub f.StreamSubscription

subscription = dbClient.Stream(ref)
subscription.Start()
for event := range subscription.Messages() {
	switch event.Type() {
	case f.StartEventT:
		fmt.Println("Start stream")
	case f.VersionEventT:
		fmt.Println("Version event")
	case f.ErrorEventT:
		fmt.Println("Error event")
		subscription.Close()
	}
}
  • Each started subscription has an event loop running as a goroutine with received stream events sended to a channel
  • Uses context.Context to impose deadlines on requests instead of relying on HttpClient.Timeout for timeouts.
    This change introduces faunaResponse type which stores the http response along with the context and cancel function context.CancelFunc. This allows for different request timeouts for query, set at 60 seconds.
    http.NewRequestWithContext is introduced in go 1.13 forcing us to drop support for versions < 1.13 https://golang.org/pkg/net/http/#NewRequestWithContext.
  • An ErrorEvent is triggered on io.EOF and related server/client errors. The connection is eventually closed and the subscription status updated

`

@vadimLF vadimLF changed the title Streaming api 2 DRV-348: Streaming api 2 Jan 28, 2021
.circleci/config.yml Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
}
}

if parsedResponse, err = parseJSON(response.Body); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does parseJSON support new-line separated JSONs?

https://github.com/fauna/faunadb-js/blob/master/src/_json.js#L19-L56

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json/encoder from go standard library handles this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Could you please add a test case to prevent regressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erickpintor I've added TestListenToLargeEvents() test for this purpose. Could you pls check ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically, you are right on this issue. I created separate ticket for this - DRV-513.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with handling this particular problem at a separate ticket.

faunadb/stream_events.go Outdated Show resolved Hide resolved
faunadb/stream_events.go Outdated Show resolved Hide resolved
faunadb/stream_events.go Outdated Show resolved Hide resolved

// Close eventually closes the stream
func (sub *StreamSubscription) Close() {
if !isClosed(sub.messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relationship between the messages and the closed status of a subscription? Can you rely on the status variable instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to close event channel in case we are going to close subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is: I don't think this isClosed function respects Golang channel semantics: https://play.golang.org/p/gXiR4_5fW8D

As I was referring to in the client receive loop, I think we need a close signal (channel) for this propose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erickpintor
my idea with sending close signal was the following:

1:for {
2:	select {
3:	case <- subscription.closeSignal:
4:		subscription.status.Set(StreamConnClosed)
5:		close(subscription.messages)
6:	default:
7:		if val, err := client.parseResponse()...
8:}

Here is the issue
when we obtain data we:
- parsing (line 7)
- event generated (this we are going to send closing signal)
- we are going to parse new portion of data (line 7)
- process logic inside Error event: in this case we are sending close signal
but this signal is never consumed by line 3 since we are at line 7 waiting for new chunk of data

Any idea how to handle this ? or we can just rewrite isClosed since it works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls ignore my previous comment and check new implementation.

Comment on lines 54 to 66
for evt := range sub.Messages() {
switch evt.Type() {
case f.StartEventT:
s.NotZero(evt.Txn())
s.client.Query(f.Update(ref, f.Obj{"data": f.Obj{"x": time.Now().String()}}))
case f.VersionEventT:
s.Equal(evt.Type(), f.VersionEventT)
sub.Close()
wg.Done()
case f.ErrorEventT:
s.defaultStreamError(evt)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good! 🤘

defer httpResponse.Body.Close()
defer response.cncl()

for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: What instruction in this for loop blocks until there's another message to be read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is client.callObserver function in parseResponse.

Copy link
Contributor

@erickpintor erickpintor Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a driver specific callback isn't it? I don't think that's blocking this loop until next event arrives. There are two concerns here: (a) the spinning loop consuming CPU; b) your genera inability to stop this loop at command.

I don't think (a) is happening but I want us to have clarity on which part of it is blocking and waiting for next event to arrive. On (b) I think we need a close signal that we can monitor so that this coroutine can be die when the subscription is closed.

faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved

// Close eventually closes the stream
func (sub *StreamSubscription) Close() {
if !isClosed(sub.messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is: I don't think this isClosed function respects Golang channel semantics: https://play.golang.org/p/gXiR4_5fW8D

As I was referring to in the client receive loop, I think we need a close signal (channel) for this propose.

@vadimLF
Copy link
Contributor Author

vadimLF commented Feb 22, 2021

@erickpintor I added provided signal based solution in this PR. Also I created separate ticket for json streaming issue.

@parkhomenko
Copy link
Contributor

hi @erickpintor ,
this PR is here for quite some time, I see that there is an issue that sometimes json that comes from the server is corrupted in terms we need to accumulate it somehow on a client-side. Can we address it as a separate task: https://faunadb.atlassian.net/browse/DRV-513 ?

@vadimLF vadimLF marked this pull request as ready for review February 23, 2021 10:19
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Outdated Show resolved Hide resolved
}
}

if parsedResponse, err = parseJSON(response.Body); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with handling this particular problem at a separate ticket.

Comment on lines 67 to 69
go func() {
sub.getNext <- true
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a co-routine here?

Copy link
Contributor Author

@vadimLF vadimLF Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using channels for "tracking" closing event I added select statement to startStream() which is waiting for two events closing or obtaining next chunk of data - code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need a coroutine because these are synchronous channels. If you let it buffer one item (make(chan..., 1)), you can remove the coroutine.

}()
}

func (sub *StreamSubscription) EventsMessages() chan StreamEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (sub *StreamSubscription) EventsMessages() chan StreamEvent {
func (sub *StreamSubscription) EventsMessages() <-chan StreamEvent {

Comment on lines 85 to 89
func (sub *StreamSubscription) Request() {
go func() {
sub.getNext <- true
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to actively request the next message? I believe you can rely on channels for reactiveness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"I believe you can rely on channels for reactiveness." - not sure I understand the way to implement this.
Since we are using channels for "tracking" closing event we also need a way to perform request if there is no closing event in the channel. Also the same behavior is in jvm driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds to me that there is something wrong with our model here. Let me restate my understanding:

We have a coroutine in a closed loop reading messages from the network and pushing then to a go channel. Then we have an external command to close the stream which should also kill the coroutine. We've introduced a separate channel to track signal that the subscription is closing which forced us to select between channels in the closed loop. Since reading from the network is a blocking operation, we were then forced to introduce a separate channel managed to the user to signal its code is ready to process the next event.

If that's correct, I believe the usage of select is the issue here. We can move closing resources to a separate coroutine that gets fired on sub.Close(). We'll have to refactor some of the state management for that matter. Please take a look at the following patch. See if it makes sense.

patch.txt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patch is applied and works as expected. @erickpintor could you please review one more time.


subscription = s.client.Stream(ref)
subscription.Start()
for evt := range subscription.EventsMessages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for evt := range subscription.EventsMessages() {
for evt := range subscription.Events() {

I think it looks better if it's just called Events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've already have Events in couple of other places. W/hat do you think about StreamEvents ?

Comment on lines 67 to 69
go func() {
sub.getNext <- true
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need a coroutine because these are synchronous channels. If you let it buffer one item (make(chan..., 1)), you can remove the coroutine.

for evt := range subscription.StreamEvents() {
switch evt.Type() {
case f.StartEventT:
//s.NotZero(evt.Txn())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//s.NotZero(evt.Txn())

faunadb/stream_test.go Outdated Show resolved Hide resolved
break
}
subscription.eventsMessages <- ErrorEvent{
txn: startTime.Unix(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fauna errors will report a transaction timestamp, we should use that.

Comment on lines 85 to 89
func (sub *StreamSubscription) Request() {
go func() {
sub.getNext <- true
}()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds to me that there is something wrong with our model here. Let me restate my understanding:

We have a coroutine in a closed loop reading messages from the network and pushing then to a go channel. Then we have an external command to close the stream which should also kill the coroutine. We've introduced a separate channel to track signal that the subscription is closing which forced us to select between channels in the closed loop. Since reading from the network is a blocking operation, we were then forced to introduce a separate channel managed to the user to signal its code is ready to process the next event.

If that's correct, I believe the usage of select is the issue here. We can move closing resources to a separate coroutine that gets fired on sub.Close(). We'll have to refactor some of the state management for that matter. Please take a look at the following patch. See if it makes sense.

patch.txt

Copy link
Contributor

@erickpintor erickpintor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last change and I think this is good to go.

faunadb/client.go Outdated Show resolved Hide resolved
faunadb/client.go Show resolved Hide resolved
break
}
subscription.events <- ErrorEvent{
txn: startTime.Unix(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't introduce synthetic transaction times. That can lead to consistency issues if users rely on this marker for their domain logic. In this cases, it's best to not report back the a transaction time (you this line and let go assign the vars default value 0).

This change has to be replicated to all ErrorEvent initializations. As a guideline, pick the transaction time from the server. If not possible, remove the txn and let it be zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove timestamp from manually generated events.
Done.

Copy link
Contributor

@erickpintor erickpintor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐑 it! Great job!

@vadimLF vadimLF merged commit feb1acc into fauna:master Mar 3, 2021
@vadimLF vadimLF deleted the streaming-api-2 branch March 3, 2021 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants