-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
operations.go
159 lines (135 loc) · 4.8 KB
/
operations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pubsublite
import (
"context"
"time"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)
// SeekTarget is the target location to seek a subscription to. Implemented by
// BacklogLocation, PublishTime, EventTime.
type SeekTarget interface {
setRequest(req *pb.SeekSubscriptionRequest)
}
// BacklogLocation refers to a location with respect to the message backlog.
// It implements the SeekTarget interface.
type BacklogLocation int
const (
// End refers to the location past all currently published messages. End
// skips the entire message backlog.
End BacklogLocation = iota + 1
// Beginning refers to the location of the oldest retained message.
Beginning
)
func (b BacklogLocation) setRequest(req *pb.SeekSubscriptionRequest) {
target := pb.SeekSubscriptionRequest_TAIL
if b == End {
target = pb.SeekSubscriptionRequest_HEAD
}
req.Target = &pb.SeekSubscriptionRequest_NamedTarget_{
NamedTarget: target,
}
}
// PublishTime is a message publish timestamp. It implements the SeekTarget
// interface.
type PublishTime time.Time
func (p PublishTime) setRequest(req *pb.SeekSubscriptionRequest) {
req.Target = &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_PublishTime{tspb.New(time.Time(p))},
},
}
}
// EventTime is a message event timestamp. It implements the SeekTarget
// interface.
type EventTime time.Time
func (e EventTime) setRequest(req *pb.SeekSubscriptionRequest) {
req.Target = &pb.SeekSubscriptionRequest_TimeTarget{
TimeTarget: &pb.TimeTarget{
Time: &pb.TimeTarget_EventTime{tspb.New(time.Time(e))},
},
}
}
// SeekSubscriptionOption is reserved for future options.
type SeekSubscriptionOption interface{}
// SeekSubscriptionResult is the result of a seek subscription operation.
// Currently empty.
type SeekSubscriptionResult struct{}
// OperationMetadata stores metadata for long-running operations.
type OperationMetadata struct {
// The target of the operation. For example, targets of seeks are
// subscriptions, structured like:
// "projects/PROJECT_ID/locations/LOCATION/subscriptions/SUBSCRIPTION_ID"
Target string
// The verb describing the kind of operation.
Verb string
// The time the operation was created.
CreateTime time.Time
// The time the operation finished running. Is zero if the operation has not
// completed.
EndTime time.Time
}
func protoToOperationMetadata(o *pb.OperationMetadata) (*OperationMetadata, error) {
if err := o.GetCreateTime().CheckValid(); err != nil {
return nil, err
}
metadata := &OperationMetadata{
Target: o.Target,
Verb: o.Verb,
CreateTime: o.GetCreateTime().AsTime(),
}
if o.GetEndTime() != nil {
if err := o.GetEndTime().CheckValid(); err != nil {
return nil, err
}
metadata.EndTime = o.GetEndTime().AsTime()
}
return metadata, nil
}
// SeekSubscriptionOperation manages a long-running seek operation from
// AdminClient.SeekSubscription.
type SeekSubscriptionOperation struct {
op *vkit.SeekSubscriptionOperation
}
// Name returns the path of the seek operation, in the format:
// "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID".
func (s *SeekSubscriptionOperation) Name() string {
return s.op.Name()
}
// Done returns whether the seek operation has completed.
func (s *SeekSubscriptionOperation) Done() bool {
return s.op.Done()
}
// Metadata returns metadata associated with the seek operation. To get the
// latest metadata, call this method after a successful call to Wait.
func (s *SeekSubscriptionOperation) Metadata() (*OperationMetadata, error) {
m, err := s.op.Metadata()
if err != nil {
return nil, err
}
return protoToOperationMetadata(m)
}
// Wait polls until the seek operation is complete and returns one of the
// following:
// - A SeekSubscriptionResult and nil error if the operation is complete and
// succeeded.
// - Error containing failure reason if the operation is complete and failed.
// - Error if polling the operation status failed due to a non-retryable error.
func (s *SeekSubscriptionOperation) Wait(ctx context.Context) (*SeekSubscriptionResult, error) {
if _, err := s.op.Wait(ctx); err != nil {
return nil, err
}
return &SeekSubscriptionResult{}, nil
}