New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google cloud pubsub #10

Merged
merged 45 commits into from Dec 14, 2018

Conversation

Projects
None yet
3 participants
@maclav3
Copy link
Collaborator

maclav3 commented Nov 20, 2018

Work in progress

@maclav3 maclav3 changed the title Google cloud pubsub WIP: Google cloud pubsub Nov 20, 2018

@maclav3 maclav3 changed the title WIP: Google cloud pubsub 🚧 Google cloud pubsub Nov 20, 2018

@maclav3 maclav3 changed the title 🚧 Google cloud pubsub WIP Google cloud pubsub Nov 20, 2018

@maclav3 maclav3 referenced this pull request Nov 20, 2018

Closed

Google Cloud PubSub #4

ProjectID string
CreateSubscriptionIfMissing bool

ConsumersCount int

This comment has been minimized.

@roblaszczak

roblaszczak Nov 25, 2018

Member

ConsumersCount is not used anywhere (and is probably not needed?)

This comment has been minimized.

@maclav3

maclav3 Nov 29, 2018

Author Collaborator

True, it's a leftover from the early prototype. The google client library takes care of spawning goroutines itself.

This comment has been minimized.

@maclav3

maclav3 Nov 29, 2018

Author Collaborator
return
}

output <- msg

This comment has been minimized.

@roblaszczak

roblaszczak Nov 25, 2018

Member

it should be

select {
    case output <- msg:
        // ok
     case <-s.closing:
         return errors.New("some error")
}

because when client is not consuming and we will call Close it will block forever

This comment has been minimized.

@roblaszczak

roblaszczak Nov 25, 2018

Member

you also should check that when returning error it will cause nack

This comment has been minimized.

@maclav3

maclav3 Nov 29, 2018

Author Collaborator

The callback to subscription.Receive doesn't return any error.

Anyway, when s.closing receives, ctx is canceled before output is closed. So with luck, we should never get a write on a closed output channel, because Receive is canceled with ctx.

If a race happens and output is closed before Receive is effectively canceled, writing to a closed channel would cause a panic, which may actually make sense here, as long as it is recovered in a sane way. Because it kind of is an abnormal situation.

So if there's no error to return here, maybe I should just recover from panic with an error which causes Nack? Seems a bit artificial though.

This comment has been minimized.

@maclav3

maclav3 Nov 29, 2018

Author Collaborator

OK, now I understand what you meant - that there would be no reader for output on the other end and it would hang.

2df32f9 should fix this, and pass Ack/Nack back to the client library, as specified in https://godoc.org/cloud.google.com/go/pubsub#Message.Ack

I'm not entirely sure if I'm using Nack correctly. https://godoc.org/cloud.google.com/go/pubsub suggests that sending a Nack tells the server "could not process this message, please redeliver ASAP". But if the message is, for example, malformed, it would make no sense to redeliver, rather Ack and put it on a poison queue, I guess?

This comment has been minimized.

@roblaszczak

roblaszczak Nov 29, 2018

Member

"could not process this message, please redeliver ASAP"

this is good, when message is malformed poision queue of retry should be used, but it is not responsibility of the PubSub

s.activeSubscriptionsLock.RLock()
var subscriptionID string
for i := 0; ; i++ {
subscriptionID = fmt.Sprintf(subscriptionIDTemplate, topic, i)

This comment has been minimized.

@roblaszczak

roblaszczak Nov 25, 2018

Member

won't it be easier to use map[string][int]?

This comment has been minimized.

@roblaszczak

roblaszczak Nov 25, 2018

Member

because this for is hmm, strange 🍗

This comment has been minimized.

@maclav3

maclav3 Nov 29, 2018

Author Collaborator

It actually was pretty weird 😆
532ef91

maclav3 added some commits Nov 29, 2018

Handle closing subscriber when waiting for message to be consumed.
Pass Ack and Nack back to the client library, according to spec
Unify Marshaler/Unmarshaler into one (for tests) and fix the implemen…
…tation somewhat

Change the tests to general pubsub tests
Modify default settings (by default, create sub/topic if missing).
Fix subscriber logic so that it doesn't lock
Use subscription name generator function instead of just string
Also, include the original ctx in the struct
Resolve goroutine sync properly in `Subscribe()`
Reflect GC pubsub's needs in tests (create subscriptions before publishing)

@maclav3 maclav3 changed the title WIP Google cloud pubsub Google cloud pubsub Dec 13, 2018

@roblaszczak roblaszczak merged commit 79db23c into master Dec 14, 2018

4 checks passed

License Compliance All checks passed.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
deploy/netlify Deploy preview ready!
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment