Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

Commit

Permalink
Migrate Add sns topics commit from main repository
Browse files Browse the repository at this point in the history
  • Loading branch information
yevgenypats committed Feb 11, 2021
1 parent 7aeee6a commit 1b407ac
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 3 deletions.
3 changes: 2 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ const configYaml = `
- name: redshift.clusters
- name: redshift.cluster_subnet_groups
- name: s3.buckets
- name: sns.subscriptions`
- name: sns.subscriptions
- name: sns.topics`
3 changes: 2 additions & 1 deletion provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ var tablesArr = [][]interface{}{
redshift.ClusterSubnetGroupTables,
s3.BucketTables,
sns.SubscriptionTables,
sns.TopicTables,
}

func (p *Provider) Init(driver string, dsn string, verbose bool) error {
Expand Down Expand Up @@ -336,4 +337,4 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string,

func main() {
sdk.ServePlugin(&Provider{})
}
}
4 changes: 3 additions & 1 deletion sns/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func (c *Client) CollectResource(resource string, config interface{}) error {
switch resource {
case "subscriptions":
return c.subscriptions(config)
case "topics":
return c.topics(config)
default:
return fmt.Errorf("unsupported resource ec2.%s", resource)
return fmt.Errorf("unsupported resource sns.%s", resource)
}
}
142 changes: 142 additions & 0 deletions sns/topics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package sns

import (
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
)

type Topic struct {
ID uint `gorm:"primarykey"`
AccountID string
Region string
Owner *string
TopicArn *string
Policy *string
DeliveryPolicy *string
DisplayName *string
SubscriptionsConfirmed *int
SubscriptionsDeleted *int
SubscriptionsPending *int
EffectiveDeliveryPolicy *string
FifoTopic *bool
ContentBasedDeduplication *bool
}

func (Topic) TableName() string {
return "aws_sns_topics"
}

func GetTopicAttributes(c *Client, TopicArn *string) (map[string]*string, error) {
params := &sns.GetTopicAttributesInput{
TopicArn: TopicArn,
}
output, err := c.svc.GetTopicAttributes(params)
if err != nil {
return nil, err
}
return output.Attributes, nil
}

func getOrZero(attrs map[string]*string, keyName string) string {
if v, found := attrs[keyName]; found {
return *v
}
return "0"
}

func (c *Client) transformTopics(values []*sns.Topic) ([]*Topic, error) {
var tValues []*Topic
for _, value := range values {

// All topic attributes are returned as a string; we have to handle type conversion
output, err := GetTopicAttributes(c, value.TopicArn)
if err != nil {
return nil, err
}

subsConfirmed, err := strconv.Atoi(getOrZero(output, "SubscriptionsConfirmed"))
if err != nil {
return nil, err
}

subsDeleted, err := strconv.Atoi(getOrZero(output, "SubscriptionsDeleted"))
if err != nil {
return nil, err
}

subsPending, err := strconv.Atoi(getOrZero(output, "SubscriptionsPending"))
if err != nil {
return nil, err
}

isFifo, err := strconv.ParseBool(getOrZero(output, "FifoTopic"))
if err != nil {
return nil, err
}

isContentBasedDeduped, err := strconv.ParseBool(getOrZero(output, "ContentBasedDeduplication"))
if err != nil {
return nil, err
}

tValue := Topic{
AccountID: c.accountID,
Region: c.region,
TopicArn: value.TopicArn,
Policy: output["Policy"],
DeliveryPolicy: output["DeliveryPolicy"],
DisplayName: output["DisplayName"],
Owner: output["Owner"],
EffectiveDeliveryPolicy: output["EffectiveDeliveryPolicy"],
SubscriptionsConfirmed: &subsConfirmed,
SubscriptionsDeleted: &subsDeleted,
SubscriptionsPending: &subsPending,
FifoTopic: &isFifo,
ContentBasedDeduplication: &isContentBasedDeduped,
}
tValues = append(tValues, &tValue)
}
return tValues, nil
}

type TopicConfig struct {
Filter string
}

var TopicTables = []interface{}{
&Topic{},
}

func (c *Client) topics(gConfig interface{}) error {
var config sns.ListTopicsInput
err := mapstructure.Decode(gConfig, &config)
if err != nil {
return err
}
c.db.Where("region", c.region).Where("account_id", c.accountID).Delete(TopicTables...)

for {
output, listErr := c.svc.ListTopics(&config)
if listErr != nil {
return listErr
}

topics, transformErr := c.transformTopics(output.Topics)
if transformErr != nil {
return transformErr
}

c.db.ChunkedCreate(topics)
c.log.Info("Fetched resources", zap.String("resource", "sns.topics"), zap.Int("count", len(output.Topics)))
if aws.StringValue(output.NextToken) == "" {
break
}
config.NextToken = output.NextToken
}

return nil
}

0 comments on commit 1b407ac

Please sign in to comment.