Skip to content

Commit

Permalink
Add multithreaded support
Browse files Browse the repository at this point in the history
  • Loading branch information
Ekaterina Korsun committed Dec 12, 2020
1 parent 1c6a4e9 commit c6cddae
Show file tree
Hide file tree
Showing 36 changed files with 538 additions and 92 deletions.
5 changes: 4 additions & 1 deletion cloudqueryclient/client.go
Expand Up @@ -39,13 +39,16 @@ type Client struct {

func NewLogger(verbose bool, options ...zap.Option) (*zap.Logger, error) {
level := zap.NewAtomicLevelAt(zap.InfoLevel)
disableCaller := true
if verbose {
level = zap.NewAtomicLevelAt(zap.DebugLevel)
disableCaller = false
}
return zap.Config{
Sampling: nil,
Level: level,
Development: true,
DisableCaller: true,
DisableCaller: disableCaller,
Encoding: "console",
EncoderConfig: zap.NewDevelopmentEncoderConfig(),
OutputPaths: []string{"stderr"},
Expand Down
10 changes: 9 additions & 1 deletion providers/aws/autoscaling/launch_configurations.go
Expand Up @@ -89,6 +89,10 @@ type LaunchConfiguration struct {
UserData *string
}

func (LaunchConfiguration)TableName() string {
return "aws_autoscaling_launch_configurations"
}

type LaunchConfigurationBlockDeviceMapping struct {
ID uint `gorm:"primarykey"`
LaunchConfigurationID uint
Expand All @@ -109,6 +113,10 @@ type LaunchConfigurationBlockDeviceMapping struct {
VirtualName *string
}

func (LaunchConfigurationBlockDeviceMapping)TableName() string {
return "aws_autoscaling_launch_configuration_block_device_mapping"
}

func (c *Client) transformLaunchConfigurationBlockDeviceMapping(value *autoscaling.BlockDeviceMapping) *LaunchConfigurationBlockDeviceMapping {
return &LaunchConfigurationBlockDeviceMapping{
DeviceName: value.DeviceName,
Expand Down Expand Up @@ -184,7 +192,7 @@ func (c *Client) launchConfigurations(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&LaunchConfiguration{})
common.ChunkedCreate(c.db, c.transformLaunchConfigurations(output.LaunchConfigurations))
c.log.Info("Fetched resources", zap.Int("count", len(output.LaunchConfigurations)))
c.log.Info("Fetched resources", zap.String("resource", "auto_scaling.launch_configurations"), zap.Int("count", len(output.LaunchConfigurations)))
if aws.StringValue(output.NextToken) == "" {
break
}
Expand Down
6 changes: 5 additions & 1 deletion providers/aws/directconnect/gateways.go
Expand Up @@ -20,6 +20,10 @@ type Gateway struct {
StateChangeError *string
}

func (Gateway)TableName() string {
return "aws_directconnect_gateways"
}

func (c *Client) transformGateway(value *directconnect.Gateway) *Gateway {
return &Gateway{
Region: c.region,
Expand Down Expand Up @@ -63,7 +67,7 @@ func (c *Client) gateways(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&Gateway{})
common.ChunkedCreate(c.db, c.transformGateways(output.DirectConnectGateways))
c.log.Info("Fetched resources", zap.Int("count", len(output.DirectConnectGateways)))
c.log.Info("Fetched resources", zap.String("resource", "directconnect.gateways"), zap.Int("count", len(output.DirectConnectGateways)))
if aws.StringValue(output.NextToken) == "" {
break
}
Expand Down
4 changes: 4 additions & 0 deletions providers/aws/ec2/byoip_cidr.go
Expand Up @@ -18,6 +18,10 @@ type ByoipCidr struct {
StatusMessage *string
}

func (ByoipCidr)TableName() string {
return "aws_ec2_byoip_cidrs"
}

func (c *Client) transformByoipCidr(value *ec2.ByoipCidr) *ByoipCidr {
return &ByoipCidr{
Region: c.region,
Expand Down
10 changes: 9 additions & 1 deletion providers/aws/ec2/customer_gateways.go
Expand Up @@ -21,13 +21,21 @@ type CustomerGateway struct {
Type *string
}

func (CustomerGateway)TableName() string {
return "aws_ec2_customer_gateways"
}

type CustomerGatewayTag struct {
ID uint `gorm:"primarykey"`
CustomerGatewayID uint
Key *string
Value *string
}

func (CustomerGatewayTag)TableName() string {
return "aws_ec2_customer_gateway_tags"
}

func (c *Client) transformCustomerGatewayTag(value *ec2.Tag) *CustomerGatewayTag {
return &CustomerGatewayTag{
Key: value.Key,
Expand Down Expand Up @@ -89,6 +97,6 @@ func (c *Client) customerGateways(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&CustomerGateway{})
common.ChunkedCreate(c.db, c.transformCustomerGateways(output.CustomerGateways))
c.log.Info("Fetched resources", zap.Int("count", len(output.CustomerGateways)))
c.log.Info("Fetched resources", zap.String("resource", "ec2.customer_gateways"), zap.Int("count", len(output.CustomerGateways)))
return nil
}
10 changes: 9 additions & 1 deletion providers/aws/ec2/flow_logs.go
Expand Up @@ -29,13 +29,21 @@ type FlowLog struct {
TrafficType *string
}

func (FlowLog)TableName() string {
return "aws_ec2_flow_logs"
}

type FlowLogTag struct {
ID uint `gorm:"primarykey"`
FlowLogID uint
Key *string
Value *string
}

func (FlowLogTag)TableName() string {
return "aws_ec2_flow_log_tags"
}

func (c *Client) transformFlowLogTag(value *ec2.Tag) *FlowLogTag {
return &FlowLogTag{
Key: value.Key,
Expand Down Expand Up @@ -103,7 +111,7 @@ func (c *Client) FlowLogs(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&FlowLog{})
common.ChunkedCreate(c.db, c.transformFlowLogs(output.FlowLogs))
c.log.Info("Fetched resources", zap.Int("count", len(output.FlowLogs)))
c.log.Info("Fetched resources", zap.String("resource", "ec2.flow_logs"), zap.Int("count", len(output.FlowLogs)))
if aws.StringValue(output.NextToken) == "" {
break
}
Expand Down
18 changes: 17 additions & 1 deletion providers/aws/ec2/images.go
Expand Up @@ -39,6 +39,10 @@ type Image struct {
VirtualizationType *string
}

func (Image)TableName() string {
return "aws_ec2_images"
}

type ImageBlockDeviceMapping struct {
ID uint `gorm:"primarykey"`
ImageID uint
Expand All @@ -48,20 +52,32 @@ type ImageBlockDeviceMapping struct {
VirtualName *string
}

func (ImageBlockDeviceMapping)TableName() string {
return "aws_ec2_image_block_device_mappings"
}

type ImageProductCode struct {
ID uint `gorm:"primarykey"`
ImageID uint
ProductCodeId *string
ProductCodeType *string
}

func (ImageProductCode)TableName() string {
return "aws_ec2_image_product_codes"
}

type ImageTag struct {
ID uint `gorm:"primarykey"`
ImageID uint
Key *string
Value *string
}

func (ImageTag)TableName() string {
return "aws_ec2_image_tags"
}

func (c *Client) transformImageBlockDeviceMapping(value *ec2.BlockDeviceMapping) *ImageBlockDeviceMapping {
return &ImageBlockDeviceMapping{
DeviceName: value.DeviceName,
Expand Down Expand Up @@ -175,6 +191,6 @@ func (c *Client) images(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&Image{})
common.ChunkedCreate(c.db, c.transformImages(output.Images))
c.log.Info("Fetched resources", zap.Int("count", len(output.Images)))
c.log.Info("Fetched resources", zap.String("resource", "ec2.images"), zap.Int("count", len(output.Images)))
return nil
}
50 changes: 49 additions & 1 deletion providers/aws/ec2/instances.go
Expand Up @@ -62,20 +62,32 @@ type Instance struct {
VpcId *string
}

func (Instance)TableName() string {
return "aws_ec2_instances"
}

type InstanceBlockDeviceMapping struct {
ID uint `gorm:"primarykey"`
InstanceID uint
DeviceName *string
Ebs *ec2.EbsInstanceBlockDevice `gorm:"embedded;embeddedPrefix:ebs_"`
}

func (InstanceBlockDeviceMapping)TableName() string {
return "aws_ec2_instance_block_device_mappings"
}

type InstanceCapacityReservationSpecificationResponse struct {
ID uint `gorm:"primarykey"`
InstanceID uint
CapacityReservationPreference *string
CapacityReservationTarget *ec2.CapacityReservationTargetResponse `gorm:"embedded"`
}

func (InstanceCapacityReservationSpecificationResponse)TableName() string {
return "aws_ec2_instance_capacity_reservation_specification_responses"
}

type InstanceElasticGpuAssociation struct {
ID uint `gorm:"primarykey"`
InstanceID uint
Expand All @@ -85,6 +97,10 @@ type InstanceElasticGpuAssociation struct {
ElasticGpuId *string
}

func (InstanceElasticGpuAssociation)TableName() string {
return "aws_ec2_instance_elastic_gpu_associations"
}

type InstanceElasticInferenceAcceleratorAssociation struct {
ID uint `gorm:"primarykey"`
InstanceID uint
Expand All @@ -94,12 +110,20 @@ type InstanceElasticInferenceAcceleratorAssociation struct {
ElasticInferenceAcceleratorAssociationTime *time.Time
}

func (InstanceElasticInferenceAcceleratorAssociation)TableName() string {
return "aws_ec2_instance_elastic_inference_accelerator_associations"
}

type InstanceLicenseConfiguration struct {
ID uint `gorm:"primarykey"`
InstanceID uint
LicenseConfigurationArn *string
}

func (InstanceLicenseConfiguration)TableName() string {
return "aws_ec2_instance_license_configurations"
}

type InstanceNetworkInterface struct {
ID uint `gorm:"primarykey"`
InstanceID uint
Expand All @@ -120,19 +144,31 @@ type InstanceNetworkInterface struct {
VpcId *string
}

func (InstanceNetworkInterface)TableName() string {
return "aws_ec2_instance_network_interfaces"
}

type InstanceGroupIdentifier struct {
ID uint `gorm:"primarykey"`
InstanceID uint
GroupId *string
GroupName *string
}

func (InstanceGroupIdentifier)TableName() string {
return "aws_ec2_instance_group_identifiers"
}

type InstanceIpv6Address struct {
ID uint `gorm:"primarykey"`
InstanceNetworkInterfaceID uint
Ipv6Address *string
}

func (InstanceIpv6Address)TableName() string {
return "aws_ec2_instance_ipv6_addresses"
}

type InstancePrivateIpAddress struct {
ID uint `gorm:"primarykey"`
InstanceNetworkInterfaceID uint
Expand All @@ -142,20 +178,32 @@ type InstancePrivateIpAddress struct {
PrivateIpAddress *string
}

func (InstancePrivateIpAddress)TableName() string {
return "aws_ec2_instance_private_ip_addresses"
}

type InstanceProductCode struct {
ID uint `gorm:"primarykey"`
InstanceID uint
ProductCodeId *string
ProductCodeType *string
}

func (InstanceProductCode)TableName() string {
return "aws_ec2_instance_product_codes"
}

type InstanceTag struct {
ID uint `gorm:"primarykey"`
InstanceID uint
Key *string
Value *string
}

func (InstanceTag)TableName() string {
return "aws_ec2_instance_tags"
}

func (c *Client) transformInstanceBlockDeviceMapping(value *ec2.InstanceBlockDeviceMapping) *InstanceBlockDeviceMapping {
return &InstanceBlockDeviceMapping{
DeviceName: value.DeviceName,
Expand Down Expand Up @@ -425,7 +473,7 @@ func (c *Client) instances(gConfig interface{}) error {
}
c.db.Where("region = ?", c.region).Where("account_id = ?", c.accountID).Delete(&Instance{})
for _, reservation := range output.Reservations {
c.log.Info("Fetched resources", zap.Int("count", len(reservation.Instances)))
c.log.Info("Fetched resources", zap.String("resource", "ec2.instances"), zap.Int("count", len(reservation.Instances)))
common.ChunkedCreate(c.db, c.transformInstances(reservation.Instances))
}
if aws.StringValue(output.NextToken) == "" {
Expand Down
12 changes: 12 additions & 0 deletions providers/aws/ec2/internet_gateways.go
Expand Up @@ -18,20 +18,32 @@ type InternetGateway struct {
Tags []*InternetGatewayTag `gorm:"constraint:OnDelete:CASCADE;"`
}

func (InternetGateway)TableName() string {
return "aws_ec2_internet_gateways"
}

type InternetGatewayAttachment struct {
ID uint `gorm:"primarykey"`
InternetGatewayID uint
State *string
VpcId *string
}

func (InternetGatewayAttachment)TableName() string {
return "aws_ec2_internet_gateway_attachments"
}

type InternetGatewayTag struct {
ID uint `gorm:"primarykey"`
InternetGatewayID uint
Key *string
Value *string
}

func (InternetGatewayTag)TableName() string {
return "aws_ec2_internet_gateway_tags"
}

func (c *Client) transformInternetGatewayAttachment(value *ec2.InternetGatewayAttachment) *InternetGatewayAttachment {
return &InternetGatewayAttachment{
State: value.State,
Expand Down

0 comments on commit c6cddae

Please sign in to comment.