Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamARN parameter to support CAA #552

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions aws/kinesis/core/kinesis_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,12 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) {
cfg);
}

Pipeline* KinesisProducer::create_pipeline(const std::string& stream) {
Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn) {
LOG(info) << "Created pipeline for stream \"" << stream << "\"";
return new Pipeline(
region_,
stream,
stream_arn,
config_,
executor_,
kinesis_client_,
Expand Down Expand Up @@ -291,7 +292,11 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) {
std::chrono::milliseconds(config_->record_max_buffered_time()));
ur->set_expiration_from_now(
std::chrono::milliseconds(config_->record_ttl()));
pipelines_[ur->stream()].put(ur);
if (ur->stream_arn()) {
pipelines_[ur->stream_arn().get()].put(ur);
} else {
pipelines_[ur->stream()].put(ur);
}
}

void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) {
Expand Down
12 changes: 9 additions & 3 deletions aws/kinesis/core/kinesis_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ class KinesisProducer : boost::noncopyable {
cw_creds_provider_(std::move(cw_creds_provider)),
executor_(std::move(executor)),
ipc_manager_(std::move(ipc_manager)),
pipelines_([this](auto& stream) {
return this->create_pipeline(stream);
pipelines_([this](auto& stream_or_arn) {
std::regex kinesisStreamArnRegex("^arn:aws.*:kinesis:.*:\\d{12}:stream/\\S+$");
std::smatch match;
if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) {
return this->create_pipeline(match[1].str(), stream_or_arn);
} else {
return this->create_pipeline(stream_or_arn, boost::none);
}
}),
shutdown_(false) {
create_kinesis_client(ca_path);
Expand Down Expand Up @@ -80,7 +86,7 @@ class KinesisProducer : boost::noncopyable {

void create_sts_client(const std::string& ca_path);

Pipeline* create_pipeline(const std::string& stream);
Pipeline* create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn);

void drain_messages();

Expand Down
9 changes: 7 additions & 2 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Pipeline : boost::noncopyable {
Pipeline(
std::string region,
std::string stream,
boost::optional<std::string> stream_arn,
std::shared_ptr<Configuration> config,
std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
Expand All @@ -58,7 +59,7 @@ class Pipeline : boost::noncopyable {
Retrier::UserRecordCallback finish_user_record_cb)
: stream_(std::move(stream)),
region_(std::move(region)),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
Expand Down Expand Up @@ -205,7 +206,11 @@ class Pipeline : boost::noncopyable {
// Retrieve the account ID and partition from the STS service.
static std::string init_stream_arn(const std::shared_ptr<Aws::STS::STSClient>& sts_client,
const std::string &region,
const std::string &stream_name) {
const std::string &stream_name,
const boost::optional<std::string> &stream_arn_) {
if (!stream_arn_) {
return stream_arn_.get();
}
Aws::STS::Model::GetCallerIdentityRequest request;
auto outcome = sts_client->GetCallerIdentity(request);
if (outcome.IsSuccess()) {
Expand Down
4 changes: 4 additions & 0 deletions aws/kinesis/core/user_record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m)
source_id_ = m.id();
auto put_record = m.put_record();
stream_ = std::move(put_record.stream_name());
has_stream_arn_ = put_record.has_stream_arn();
if (has_stream_arn_) {
stream_arn_ = std::move(put_record.stream_arn());
}
partition_key_ = std::move(put_record.partition_key());
data_ = std::move(put_record.data());
has_explicit_hash_key_ = put_record.has_explicit_hash_key();
Expand Down
10 changes: 10 additions & 0 deletions aws/kinesis/core/user_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class UserRecord : public aws::utils::TimeSensitive {
return ss.str();
}

boost::optional<std::string> stream_arn() const noexcept {
if (has_stream_arn_) {
return stream_arn_;
} else {
return boost::none;
}
}

boost::optional<std::string> explicit_hash_key() const noexcept {
if (has_explicit_hash_key_) {
return hash_key_decimal_str();
Expand All @@ -99,12 +107,14 @@ class UserRecord : public aws::utils::TimeSensitive {
private:
uint64_t source_id_;
std::string stream_;
std::string stream_arn_;
std::string partition_key_;
uint128_t hash_key_;
std::string data_;
std::vector<Attempt> attempts_;
boost::optional<uint64_t> predicted_shard_;
bool has_explicit_hash_key_;
bool has_stream_arn_;
bool finished_;
};

Expand Down
122 changes: 85 additions & 37 deletions aws/kinesis/protobuf/messages.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading