Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-10675 add SQS Pump Backend support #740

Closed
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/aws/aws-sdk-go-v2 v1.16.14
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/fatih/structs v1.1.0
Expand All @@ -21,9 +23,11 @@ require (
github.com/influxdata/influxdb v1.8.10
github.com/influxdata/influxdb-client-go/v2 v2.6.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/kr/pretty v0.3.1
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
github.com/moesif/moesifapi-go v1.0.6
github.com/oklog/ulid/v2 v2.1.0
github.com/olivere/elastic/v7 v7.0.28
github.com/oschwald/maxminddb-golang v1.11.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -54,16 +58,15 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 // indirect
github.com/aws/smithy-go v1.13.2 // indirect
github.com/aws/smithy-go v1.16.0 // indirect
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
Expand Down
23 changes: 16 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,28 @@ github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTg
github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.16.14 h1:db6GvO4Z2UqHt5gvT0lr6J5x5P+oQ7bdRzczVaRekMU=
github.com/aws/aws-sdk-go-v2 v1.16.14/go.mod h1:s/G+UV29dECbF5rf+RNj1xhlmvoNurGSr+McVSRj59w=
github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U=
github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c=
github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s=
github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 h1:r6470olsn2qyOe2aLzK6q+wfO3dzNcMujRT3gqBgBB8=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 h1:FKaqk7geL3oIqSwGJt5SWUKj8uJ+qLZNqlBuqq6sFyA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0/go.mod h1:KqEkRkxm/+1Pd/rENRNbQpfblDBYeg5HDSqjB6ks8hA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 h1:/T5wKsw/po118HEDvnSE8YU7TESxvZbYM2rnn+Oi7Kk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 h1:VnrCAJTp1bDxU79UuW/D4z7bwZ7xOc7JjDKpqXL/m04=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs=
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 h1:7N7RsEVvUcvEg7jrWKU5AnSi4/6b6eY9+wG1g6W4ExE=
Expand All @@ -84,8 +88,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc=
github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.13.2 h1:TBLKyeJfXTrTXRHmsv4qWt9IQGYyWThLYaJWSahTOGE=
github.com/aws/smithy-go v1.13.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik=
github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/benbjohnson/tmpl v1.1.0/go.mod h1:N7W0NUGWuG26caFrID5sE4tvyLaKVp1fbV3Vr+MCul8=
Expand Down Expand Up @@ -216,7 +220,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -338,6 +341,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -396,6 +400,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254=
github.com/olivere/elastic v6.2.31+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
Expand All @@ -420,11 +426,13 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -457,6 +465,7 @@ github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
Expand Down
1 change: 1 addition & 0 deletions pumps/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ func init() {
AvailablePumps["sql-graph"] = &GraphSQLPump{}
AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{}
AvailablePumps["resurfaceio"] = &ResurfacePump{}
AvailablePumps["sqs"] = &SQSPump{}
}
177 changes: 177 additions & 0 deletions pumps/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package pumps

import (
"context"
"encoding/json"
"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/kr/pretty"
"github.com/mitchellh/mapstructure"
"github.com/oklog/ulid/v2"
"github.com/sirupsen/logrus"
"time"
)

type SQSSendMessageBatchAPI interface {
GetQueueUrl(ctx context.Context,
params *sqs.GetQueueUrlInput,
optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)

SendMessageBatch(ctx context.Context,
params *sqs.SendMessageBatchInput,
optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
}

type SQSPump struct {
SQSClient SQSSendMessageBatchAPI
SQSQueueURL *string
SQSConf *SQSConf
log *logrus.Entry
CommonPumpConfig
}

var SQSPrefix = "sqs-pump"
var SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX

// @PumpConf SQS
type SQSConf struct {
EnvPrefix string `mapstructure:"meta_env_prefix"`
QueueName string `mapstructure:"aws_queue_name"`
AWSRegion string `mapstructure:"aws_region"`
AWSSecret string `mapstructure:"aws_secret"`
AWSKey string `mapstructure:"aws_key"`
AWSEndpoint string `mapstructure:"aws_endpoint"`
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`
AWSMessageGroupID string `mapstructure:"aws_message_group_id"`
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`
AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"`
masoudhaghbin marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *SQSPump) New() Pump {
newPump := SQSPump{}
return &newPump
}

func (s *SQSPump) GetName() string {
return "SQS Pump"
}

func (s *SQSPump) GetEnvPrefix() string {
return s.SQSConf.EnvPrefix
}

func (s *SQSPump) Init(config interface{}) error {
s.SQSConf = &SQSConf{}
s.log = log.WithField("prefix", SQSPrefix)

err := mapstructure.Decode(config, &s.SQSConf)
if err != nil {
s.log.Fatal("Failed to decode configuration: ", err)
return err
}

processPumpEnvVars(s, s.log, s.SQSConf, SQSDefaultENV)

s.SQSClient, err = s.NewSQSPublisher()
if err != nil {
s.log.Fatal("Failed to create sqs client: ", err)
return err
}
// Get URL of queue
gQInput := &sqs.GetQueueUrlInput{
QueueName: aws.String(s.SQSConf.QueueName),
}

result, err := s.SQSClient.GetQueueUrl(context.TODO(), gQInput)
if err != nil {
return err
}
s.SQSQueueURL = result.QueueUrl

s.log.Info(s.GetName() + " Initialized")

return nil
}

func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {
s.log.Info("Attempting to write ", len(data), " records...")
startTime := time.Now()

messages := make([]types.SendMessageBatchRequestEntry, len(data))
for i, v := range data {
decoded := v.(analytics.AnalyticsRecord)
decodedMessageByteArray, _ := json.Marshal(decoded)
messages[i] = types.SendMessageBatchRequestEntry{
MessageBody: aws.String(string(decodedMessageByteArray)),
Id: aws.String(ulid.Make().String()),
}
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSDelaySeconds != 0 {
messages[i].DelaySeconds = s.SQSConf.AWSDelaySeconds
}

// for FIFO SQS
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSMessageIDDeduplicationEnabled {
messages[i].MessageDeduplicationId = messages[i].Id
}
}
SQSError := s.write(ctx, messages)
if SQSError != nil {
s.log.WithError(SQSError).Error("unable to write message")

return SQSError
}
s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime))
s.log.Info("Purged ", len(data), " records...")
return nil
}

func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequestEntry) error {
pretty.Print(messages)
masoudhaghbin marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < len(messages); i += s.SQSConf.AWSSQSBatchLimit {
end := i + s.SQSConf.AWSSQSBatchLimit

if end > len(messages) {
end = len(messages)
}
sMInput := &sqs.SendMessageBatchInput{
Entries: messages[i:end],
QueueUrl: s.SQSQueueURL,
}

if _, err := s.SQSClient.SendMessageBatch(c, sMInput); err != nil {
return err
}
}

return nil
}

func (s *SQSPump) NewSQSPublisher() (c *sqs.Client, err error) {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(s.SQSConf.AWSRegion),
)
if err != nil {
return nil, err
}

client := sqs.NewFromConfig(cfg, func(options *sqs.Options) {
if s.SQSConf.AWSEndpoint != "" {
options.BaseEndpoint = aws.String(s.SQSConf.AWSEndpoint)
}
if s.SQSConf.AWSKey != "" && s.SQSConf.AWSSecret != "" {
options.Credentials = credentials.NewStaticCredentialsProvider(s.SQSConf.AWSKey, s.SQSConf.AWSSecret, "")
}
})

return client, nil
}