From 1b407accb61ba57b28411f6a8cf2a037567b27bc Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Thu, 11 Feb 2021 09:56:47 +0200 Subject: [PATCH] Migrate Add sns topics commit from main repository https://github.com/cloudquery/cloudquery/commit/7215796aa858520d583ad887563c92689402f9f0 --- config.go | 3 +- provider.go | 3 +- sns/client.go | 4 +- sns/topics.go | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 sns/topics.go diff --git a/config.go b/config.go index 4ee3ad138..ad2d0667e 100644 --- a/config.go +++ b/config.go @@ -49,4 +49,5 @@ const configYaml = ` - name: redshift.clusters - name: redshift.cluster_subnet_groups - name: s3.buckets - - name: sns.subscriptions` + - name: sns.subscriptions + - name: sns.topics` diff --git a/provider.go b/provider.go index db54a65b1..3e1f2e9c7 100644 --- a/provider.go +++ b/provider.go @@ -140,6 +140,7 @@ var tablesArr = [][]interface{}{ redshift.ClusterSubnetGroupTables, s3.BucketTables, sns.SubscriptionTables, + sns.TopicTables, } func (p *Provider) Init(driver string, dsn string, verbose bool) error { @@ -336,4 +337,4 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, func main() { sdk.ServePlugin(&Provider{}) -} \ No newline at end of file +} diff --git a/sns/client.go b/sns/client.go index c65941657..5d380780d 100644 --- a/sns/client.go +++ b/sns/client.go @@ -35,7 +35,9 @@ func (c *Client) CollectResource(resource string, config interface{}) error { switch resource { case "subscriptions": return c.subscriptions(config) + case "topics": + return c.topics(config) default: - return fmt.Errorf("unsupported resource ec2.%s", resource) + return fmt.Errorf("unsupported resource sns.%s", resource) } } diff --git a/sns/topics.go b/sns/topics.go new file mode 100644 index 000000000..bc196c760 --- /dev/null +++ b/sns/topics.go @@ -0,0 +1,142 @@ +package sns + +import ( + "strconv" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sns" + "github.com/mitchellh/mapstructure" + "go.uber.org/zap" +) + +type Topic struct { + ID uint `gorm:"primarykey"` + AccountID string + Region string + Owner *string + TopicArn *string + Policy *string + DeliveryPolicy *string + DisplayName *string + SubscriptionsConfirmed *int + SubscriptionsDeleted *int + SubscriptionsPending *int + EffectiveDeliveryPolicy *string + FifoTopic *bool + ContentBasedDeduplication *bool +} + +func (Topic) TableName() string { + return "aws_sns_topics" +} + +func GetTopicAttributes(c *Client, TopicArn *string) (map[string]*string, error) { + params := &sns.GetTopicAttributesInput{ + TopicArn: TopicArn, + } + output, err := c.svc.GetTopicAttributes(params) + if err != nil { + return nil, err + } + return output.Attributes, nil +} + +func getOrZero(attrs map[string]*string, keyName string) string { + if v, found := attrs[keyName]; found { + return *v + } + return "0" +} + +func (c *Client) transformTopics(values []*sns.Topic) ([]*Topic, error) { + var tValues []*Topic + for _, value := range values { + + // All topic attributes are returned as a string; we have to handle type conversion + output, err := GetTopicAttributes(c, value.TopicArn) + if err != nil { + return nil, err + } + + subsConfirmed, err := strconv.Atoi(getOrZero(output, "SubscriptionsConfirmed")) + if err != nil { + return nil, err + } + + subsDeleted, err := strconv.Atoi(getOrZero(output, "SubscriptionsDeleted")) + if err != nil { + return nil, err + } + + subsPending, err := strconv.Atoi(getOrZero(output, "SubscriptionsPending")) + if err != nil { + return nil, err + } + + isFifo, err := strconv.ParseBool(getOrZero(output, "FifoTopic")) + if err != nil { + return nil, err + } + + isContentBasedDeduped, err := strconv.ParseBool(getOrZero(output, "ContentBasedDeduplication")) + if err != nil { + return nil, err + } + + tValue := Topic{ + AccountID: c.accountID, + Region: c.region, + TopicArn: value.TopicArn, + Policy: output["Policy"], + DeliveryPolicy: output["DeliveryPolicy"], + DisplayName: output["DisplayName"], + Owner: output["Owner"], + EffectiveDeliveryPolicy: output["EffectiveDeliveryPolicy"], + SubscriptionsConfirmed: &subsConfirmed, + SubscriptionsDeleted: &subsDeleted, + SubscriptionsPending: &subsPending, + FifoTopic: &isFifo, + ContentBasedDeduplication: &isContentBasedDeduped, + } + tValues = append(tValues, &tValue) + } + return tValues, nil +} + +type TopicConfig struct { + Filter string +} + +var TopicTables = []interface{}{ + &Topic{}, +} + +func (c *Client) topics(gConfig interface{}) error { + var config sns.ListTopicsInput + err := mapstructure.Decode(gConfig, &config) + if err != nil { + return err + } + c.db.Where("region", c.region).Where("account_id", c.accountID).Delete(TopicTables...) + + for { + output, listErr := c.svc.ListTopics(&config) + if listErr != nil { + return listErr + } + + topics, transformErr := c.transformTopics(output.Topics) + if transformErr != nil { + return transformErr + } + + c.db.ChunkedCreate(topics) + c.log.Info("Fetched resources", zap.String("resource", "sns.topics"), zap.Int("count", len(output.Topics))) + if aws.StringValue(output.NextToken) == "" { + break + } + config.NextToken = output.NextToken + } + + return nil +}