/
streams_fetch.go
65 lines (60 loc) · 1.82 KB
/
streams_fetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package kinesis
import (
"context"
"github.com/OpsHelmInc/cloudquery/client"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/cloudquery/plugin-sdk/schema"
)
func fetchKinesisStreams(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- any) error {
c := meta.(*client.Client)
svc := c.Services().Kinesis
input := kinesis.ListStreamsInput{}
for {
response, err := svc.ListStreams(ctx, &input)
if err != nil {
return err
}
res <- response.StreamNames
if !aws.ToBool(response.HasMoreStreams) {
break
}
input.ExclusiveStartStreamName = aws.String(response.StreamNames[len(response.StreamNames)-1])
}
return nil
}
func getStream(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource) error {
c := meta.(*client.Client)
streamName := resource.Item.(string)
svc := c.Services().Kinesis
streamSummary, err := svc.DescribeStreamSummary(ctx, &kinesis.DescribeStreamSummaryInput{
StreamName: aws.String(streamName),
})
if err != nil {
return err
}
resource.Item = streamSummary.StreamDescriptionSummary
return nil
}
func resolveKinesisStreamTags(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
cl := meta.(*client.Client)
svc := cl.Services().Kinesis
summary := resource.Item.(*types.StreamDescriptionSummary)
input := kinesis.ListTagsForStreamInput{
StreamName: summary.StreamName,
}
var tags []types.Tag
for {
output, err := svc.ListTagsForStream(ctx, &input)
if err != nil {
return err
}
tags = append(tags, output.Tags...)
if !aws.ToBool(output.HasMoreTags) {
break
}
input.ExclusiveStartTagKey = aws.String(*output.Tags[len(output.Tags)-1].Key)
}
return resource.Set(c.Name, client.TagsToMap(tags))
}