Creating a shared job infrastructure for pubsub management. #24
Creating a shared job infrastructure for pubsub management. #24
Conversation
The channel controller only reconciles the topic. There is large improvements to the PullSubscription controller. The invoker is not implemented. Please take a look. |
In the morning I will remove all the channel parts of this pr so just look at the pubsub rec and jobs work |
return OpsOngoingState, nil | ||
} | ||
|
||
func (r *PubSubBase) getJob(ctx context.Context, owner metav1.Object, ls labels.Selector) (*v1.Job, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we can't use a well defined name and not have to list them using label selectors. I think there might be a race here. We had this same discussion with Channels and decided to use predictable naming and that also has been what serving has been doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using generate name, there should be no race because I am watching the job resource for jobs owned by the resource under reconcile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(without looking at any code) There's still a race if work is queued and is processed before the informer cache is updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vagababov just created github.com/knative/serving/pkg/resources.ChildName(parent, suffix string)
for generating suffixed child resource names in a way that automagically shortens the parent name to make the total fit in 63-characters. I'd be 👍 on moving to knative/pkg
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
followup: #27
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still slightly concerned with the "Job for Subscription operations" model, but I'm not going to block this PR on it.
A few other concerns -- some may be resolvable with a TODO rather than a code change in this PR.
|
||
// ChannelConditionInvokerDeployed has status True when the Channel has had | ||
// its invoker deployment created. | ||
ChannelConditionInvokerDeployed apis.ConditionType = "InvokerDeployed" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition isn't in channelCondSet
. Oversight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not finished with this api yet.
} | ||
|
||
func (a *Invoker) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { | ||
//logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file is WIP
|
||
// inbound is the cloudevents client to use to receive events. | ||
inbound cloudevents.Client | ||
inboundTransport *cepubsub.Transport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is inboundTransport set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is removed in a followup. It was being used to delete subscriptions but I decided it was a bad idea.
event.SetID(tx.ID) | ||
event.SetTime(tx.PublishTime) | ||
event.SetSource(v1alpha1.PubSubEventSource(tx.Project, tx.Topic)) | ||
event.SetDataContentType(*cloudevents.StringOfApplicationJSON()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this always JSON?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, WIP file, sorry. No converter on the invoker. will delete.
event.SetType(v1alpha1.PubSubEventType) | ||
event.SetID(uuid.New().String()) | ||
event.Data = msg.Data | ||
// TODO: this will drop the other metadata related to the the topic and subscription names. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add a TODO to handle attributes as extensions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already does in the cloudevents sdk.
limitations under the License. | ||
*/ | ||
|
||
package operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need a short README.md in pkg/pubsub/operations/
explaining the model and usage of these jobs. (Something similar to the description I overheard in person would be helpful.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
|
||
type OpsState string | ||
|
||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly concerned about usin a state machine (vs goal-based reconciliation).
On the other hand, these don't actually look to be used as a state machine, just a regular enum. Maybe OpStatus
and remove State
from the names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will do that in a later PR.
pkg/reconciler/pubsub/reconciler.go
Outdated
OpsGetFailedState OpsState = "JOB_GET_FAILED" | ||
OpsCreatedState OpsState = "JOB_CREATED" | ||
OpsCreateFailedState OpsState = "JOB_CREATE_FAILED" | ||
OpsCompeteSuccessfulState OpsState = "JOB_SUCCESSFUL" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to save on Ls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
source.Status.SubscriptionID = "" | ||
removeFinalizer(source) | ||
|
||
case pubsub.OpsCreateFailedState, pubsub.OpsCompeteFailedState: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaves the finalizer hanging around -- should we give up on the finalizer at some point (for example, if we don't have permissions to delete the subscription, or if it is already deleted)?
I seem to recall that the GitHub source had issues where it would stick around forever if (for example) you created a source but it hadn't had a chance to register the webhook yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can edit the resource and remove the finalizer.
addFinalizer(source) | ||
source.Status.MarkSubscriptionOperation("Creating", | ||
"Created Job to create Subscription %q.", | ||
source.Status.SubscriptionID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possible race here where the SubscriptionID gets stored in status
, the subscribe job gets created, and then the resource is deleted, and a delete job is created while the create job is still running?
At which point, does the job lookup in getJob
work correctly, or does it return a random job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh interesting. and uug... I will make a bug for this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Thanks for including a "why" on the use of Jobs in the README.md
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evankanderson, n3wscott The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
The following is the coverage report on pkg/.
|
@@ -172,19 +206,40 @@ func (c *Reconciler) reconcile(ctx context.Context, source *v1alpha1.PullSubscri | |||
source.Status.MarkSink(sinkURI) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MarkTransformer? Same on line#203?
No description provided.