/
record_list.go
122 lines (113 loc) · 5.12 KB
/
record_list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package KINESIS
import (
"fmt"
"github.com/Appkube-awsx/awsx-common/authenticate"
"github.com/Appkube-awsx/awsx-common/awsclient"
"github.com/Appkube-awsx/awsx-common/model"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/spf13/cobra"
"log"
)
var AwsxKinesisRecordListCmd = &cobra.Command{
Use: "getKinesisRecordList",
Short: "getKinesisRecordList command gets list of kinesis instances of an aws account",
Long: `getKinesisRecordList command gets list of kinesis instances of an aws account`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("executing getKinesisRecordList command")
var authFlag, clientAuth, err = authenticate.AuthenticateCommand(cmd)
if err != nil {
log.Printf("error during authentication: %v\n", err)
err := cmd.Help()
if err != nil {
return
}
return
}
if authFlag {
resp, err := ListKinesisRecordInstances(clientAuth, nil)
if err != nil {
log.Println("error getting getKinesisRecordList: ", err)
return
}
fmt.Println(resp)
}
},
}
func ListKinesisRecordInstances(clientAuth *model.Auth, client *kinesis.Kinesis) ([]*kinesis.Record, error) {
if client == nil {
client = awsclient.GetClient(*clientAuth, awsclient.KINESIS_CLIENT).(*kinesis.Kinesis)
}
streamList, err := GetKinesisStreamList(clientAuth, client)
if err != nil {
log.Println("error in getting kinesis streams", err)
return nil, err
}
allRecords := []*kinesis.Record{}
shardIteratorType := "LATEST"
for _, name := range streamList.StreamNames {
kinesisDetail, err := GetKinesisInstanceByStreamName(*name, clientAuth, client)
if err != nil {
log.Println("error in getting kinesis detail", err)
continue
}
shards := kinesisDetail.StreamDescription.Shards
for _, shard := range shards {
shardIteratorInput := &kinesis.GetShardIteratorInput{
ShardId: shard.ShardId,
ShardIteratorType: aws.String(shardIteratorType),
StreamName: aws.String(*name),
}
shardIteratorOutput, err := client.GetShardIterator(shardIteratorInput)
if err != nil {
fmt.Println("error in getting kinesis shard iterator:", err)
continue
}
shardIterator := shardIteratorOutput.ShardIterator
recordsInput := &kinesis.GetRecordsInput{
ShardIterator: shardIterator,
}
recordsOutput, err := client.GetRecords(recordsInput)
if err != nil {
fmt.Println("error getting kinesis shard records', err")
return nil, err
}
for _, record := range recordsOutput.Records {
allRecords = append(allRecords, record)
}
}
}
return allRecords, nil
}
func init() {
AwsxKinesisRecordListCmd.PersistentFlags().String("rootVolumeId", "", "root volume id")
AwsxKinesisRecordListCmd.PersistentFlags().String("ebsVolume1Id", "", "ebs volume 1 id")
AwsxKinesisRecordListCmd.PersistentFlags().String("ebsVolume2Id", "", "ebs volume 2 id")
AwsxKinesisRecordListCmd.PersistentFlags().String("elementId", "", "element id")
AwsxKinesisRecordListCmd.PersistentFlags().String("cmdbApiUrl", "", "cmdb api")
AwsxKinesisRecordListCmd.PersistentFlags().String("vaultUrl", "", "vault end point")
AwsxKinesisRecordListCmd.PersistentFlags().String("vaultToken", "", "vault token")
AwsxKinesisRecordListCmd.PersistentFlags().String("landingZoneId", "", "aws landingZoneId")
AwsxKinesisRecordListCmd.PersistentFlags().String("zone", "", "aws region")
AwsxKinesisRecordListCmd.PersistentFlags().String("accessKey", "", "aws access key")
AwsxKinesisRecordListCmd.PersistentFlags().String("secretKey", "", "aws secret key")
AwsxKinesisRecordListCmd.PersistentFlags().String("crossAccountRoleArn", "", "aws cross account role arn")
AwsxKinesisRecordListCmd.PersistentFlags().String("externalId", "", "aws external id")
AwsxKinesisRecordListCmd.PersistentFlags().String("cloudWatchQueries", "", "aws cloudwatch metric queries")
AwsxKinesisRecordListCmd.PersistentFlags().String("serviceName", "", "service name")
AwsxKinesisRecordListCmd.PersistentFlags().String("elementType", "", "element type")
AwsxKinesisRecordListCmd.PersistentFlags().String("instanceId", "", "instance id")
AwsxKinesisRecordListCmd.PersistentFlags().String("tagName", "", "tag name")
AwsxKinesisRecordListCmd.PersistentFlags().String("apiKey", "", "api gateway key/id")
AwsxKinesisRecordListCmd.PersistentFlags().String("clusterName", "", "cluster name")
AwsxKinesisRecordListCmd.PersistentFlags().String("tableName", "", "dynamo-db table name")
AwsxKinesisRecordListCmd.PersistentFlags().String("streamName", "", "kinesis stream name")
AwsxKinesisRecordListCmd.PersistentFlags().String("keyId", "", "kms key id")
AwsxKinesisRecordListCmd.PersistentFlags().String("functionName", "", "lambda function name")
AwsxKinesisRecordListCmd.PersistentFlags().String("bucketName", "", "s3 bucket name")
AwsxKinesisRecordListCmd.PersistentFlags().String("arn", "", "arn")
AwsxKinesisRecordListCmd.PersistentFlags().String("query", "", "query")
AwsxKinesisRecordListCmd.PersistentFlags().String("startTime", "", "start time")
AwsxKinesisRecordListCmd.PersistentFlags().String("endTime", "", "end time")
AwsxKinesisRecordListCmd.PersistentFlags().String("responseType", "", "response type. json/frame")
}