/
items.go
66 lines (54 loc) · 1.65 KB
/
items.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package hackernews
import (
"context"
"io"
pb "github.com/alexdunne/gs-onboarding/internal/api/protobufs"
"github.com/alexdunne/gs-onboarding/internal/models"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/emptypb"
)
// FetchAll fetches and returns all items from the gRPC server
func (c *client) FetchAll(ctx context.Context) ([]models.Item, error) {
clientStream, err := c.client.ListAll(ctx, &emptypb.Empty{})
if err != nil {
return nil, errors.Wrap(err, "streaming all items")
}
return collectStreamItems(ctx, clientStream)
}
// FetchAll fetches and returns all story items from the gRPC server
func (c *client) FetchStories(ctx context.Context) ([]models.Item, error) {
clientStream, err := c.client.ListStories(ctx, &emptypb.Empty{})
if err != nil {
return nil, errors.Wrap(err, "streaming story items")
}
return collectStreamItems(ctx, clientStream)
}
// FetchAll fetches and returns all jobs items from the gRPC server
func (c *client) FetchJobs(ctx context.Context) ([]models.Item, error) {
clientStream, err := c.client.ListJobs(ctx, &emptypb.Empty{})
if err != nil {
return nil, errors.Wrap(err, "streaming job items")
}
return collectStreamItems(ctx, clientStream)
}
type itemStream interface {
Recv() (*pb.Item, error)
}
func collectStreamItems(ctx context.Context, s itemStream) ([]models.Item, error) {
items := []models.Item{}
for {
select {
case <-ctx.Done():
return items, nil
default:
item, err := s.Recv()
if err != nil {
if err == io.EOF {
return items, nil
}
return nil, errors.Wrap(err, "receiving items from server")
}
items = append(items, models.Ptoi(item))
}
}
}