-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.go
99 lines (84 loc) · 2.23 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// SPDX-License-Identifier: BSD-3-Clause
// Copyright (c) 2022, The Zip API Object Framework Authors and Unikraft GmbH.
// Licensed under the BSD-3-Clause License (the "License").
// You may not use this file except in compliance with the License.
package zip
import "context"
// StreamClient provides the interface for connecting to a remote service which
// returns a stream.
type StreamClient[In ReferenceObject, Out any] struct {
fn Stream[In, Out]
config *ClientConfig
}
// NewStreamClient instantiates a new stream-enable client for the given stream
// provider.
func NewStreamClient[In ReferenceObject, Out any](
ctx context.Context,
fn Stream[In, Out],
opts ...ClientOption,
) (
StreamStrategy[In, Out],
error,
) {
client := StreamClient[In, Out]{
fn: fn,
config: &ClientConfig{},
}
for _, opt := range opts {
if err := opt(client.config); err != nil {
return nil, err
}
}
return &client, nil
}
// Channel implements StreamStrategy
func (client *StreamClient[In, Out]) Channel(
ctx context.Context,
req In,
) (
chan Out,
chan error,
error,
) {
// Handle OnBefore requests
for _, before := range client.config.before {
ret, err := before(ctx, req)
if err != nil {
return nil, nil, err
}
req = ret.(In)
}
// Invoke the method which returns the channel
events, errs, err := client.fn(ctx, req)
if err != nil {
return nil, nil, err
}
// Return early if the returning object cannot be handled by OnAfter
// interceptors (due to type constraint) or if there are no OnAfter handlers.
if _, ok := any(*new(Out)).(ReferenceObject); !ok || len(client.config.after) == 0 {
return events, errs, nil
}
// Handle OnAfter requests by intercepting the channel
go func(events *chan Out, errs *chan error) {
loop:
for {
select {
case event := <-*events:
in := any(event).(ReferenceObject)
for _, after := range client.config.after {
out, err := after(ctx, req, in)
if err != nil {
*errs <- err
}
// Re-assign the result from the OnAfter callback such that its value
// is passed to the next OnAfter invocation.
in = any(out).(ReferenceObject)
}
*events <- in.(Out)
case <-ctx.Done():
break loop
}
}
}(&events, &errs)
return events, errs, nil
}