Skip to content

Commit

Permalink
closes #14 Multi instance apps should share consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
skarlsson committed Jan 5, 2018
1 parent 9ff3b17 commit 761b2ed
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 31 deletions.
33 changes: 17 additions & 16 deletions include/kspp/app_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,44 @@
namespace kspp {
struct app_info {
/**
* multi instance apps - state stores will be prefixed with instance_id
* metrics will have instance_id and app_instance_name tags
* multi tenant apps - state stores will be prefixed with tenant_id
* metrics will have tennant_id and app_id tags
*/
app_info(std::string _app_namespace,
std::string _app_id,
std::string _app_instance_id)
: app_namespace(_app_namespace)
, app_id(_app_id)
, app_instance_id(_app_instance_id){
app_info(std::string my_namespace,
std::string my_app_id,
std::string tenant_id)
: app_namespace(my_namespace)
, app_id(my_app_id)
, app_tenant_id(tenant_id){
}

/**
* single instance apps - state stores will not be prefixed with instance_id
* metrics will not have instance_id or app_instance_name tag ??
* maybe they should be marked as single instance?
*/
app_info(std::string _app_namespace,
std::string _app_id)
: app_namespace(_app_namespace), app_id(_app_id) {}
app_info(std::string my_namespace,
std::string my_app_id)
: app_namespace(my_namespace), app_id(my_app_id) {
}

std::string identity() const {
if (app_instance_id.size() == 0)
if (app_tenant_id.size() == 0)
return app_namespace + "::" + app_id;
else
return app_namespace + "::" + app_id + "#" + app_instance_id;
return app_namespace + "::" + app_id + "#" + app_tenant_id;
}

std::string consumer_group() const {
if (app_instance_id.size() == 0)
if (app_tenant_id.size() == 0)
return app_namespace + "_" + app_id;
else
return app_namespace + "_" + app_id + "_" + app_instance_id;
return app_namespace + "_" + app_id + "_" + app_tenant_id;
}

const std::string app_namespace;
const std::string app_id;
const std::string app_instance_id;
const std::string app_tenant_id;
};

inline std::string to_string(const app_info &obj) {
Expand Down
8 changes: 8 additions & 0 deletions include/kspp/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ namespace kspp {
void set_producer_message_timeout(std::chrono::milliseconds timeout);
std::chrono::milliseconds get_producer_message_timeout() const;

void set_min_topology_buffering(std::chrono::milliseconds timeout);
std::chrono::milliseconds get_min_topology_buffering() const;

void set_max_pending_sink_messages(size_t sz);
size_t get_max_pending_sink_messages() const;

void set_ca_cert_path(std::string path);
std::string get_ca_cert_path() const;

Expand Down Expand Up @@ -64,11 +70,13 @@ namespace kspp {
std::string client_cert_path_;
std::string private_key_path_;
std::string private_key_passphrase_;
std::chrono::milliseconds min_topology_buffering_;
std::chrono::milliseconds producer_buffering_;
std::chrono::milliseconds producer_message_timeout_;
std::chrono::milliseconds consumer_buffering_;
std::chrono::milliseconds schema_registry_timeout_;
std::chrono::seconds cluster_state_timeout_;
size_t max_pending_sink_messages_;
std::string root_path_;
std::string schema_registry_uri_;
bool fail_fast_;
Expand Down
6 changes: 5 additions & 1 deletion include/kspp/sinks/array_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
#include <kspp/kspp.h>
#pragma once

/**
* array_topic_sink - useful for testing
* stores the records in an std::vector given in constructor
*/

namespace kspp {
template<class K, class V>
class array_topic_sink : public topic_sink<K, V> {
Expand Down Expand Up @@ -45,7 +50,6 @@ namespace kspp {
auto r = this->_queue.pop_and_get();
this->_lag.add_event_time(tick, r->event_time());
++(this->_processed_count);
std::cerr << r->event_time() << std::endl;
_array->push_back((r->record()));
++processed;
}
Expand Down
5 changes: 4 additions & 1 deletion include/kspp/topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace kspp {

bool eof();

std::size_t process_1s();

std::size_t process(int64_t ts); // =milliseconds_since_epoch()

void close();
Expand Down Expand Up @@ -160,7 +162,8 @@ namespace kspp {
std::vector<std::shared_ptr<processor>> _sinks;
std::vector<std::shared_ptr<partition_processor>> _top_partition_processors;
int64_t _next_gc_ts;

int64_t _min_buffering_ms;
size_t _max_pending_sink_messages;
std::set<std::string> _precondition_topics;
std::string _precondition_consumer_group;
};
Expand Down
20 changes: 19 additions & 1 deletion src/cluster_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ using namespace std::chrono_literals;

namespace kspp {
cluster_config::cluster_config()
: producer_buffering_(std::chrono::milliseconds(1000))
: min_topology_buffering_(std::chrono::milliseconds(1000))
, producer_buffering_(std::chrono::milliseconds(1000))
, producer_message_timeout_(std::chrono::milliseconds(0))
, consumer_buffering_(std::chrono::milliseconds(1000))
, schema_registry_timeout_(std::chrono::milliseconds(10000))
, cluster_state_timeout_(std::chrono::seconds(60))
, max_pending_sink_messages_(50000)
, fail_fast_(true) {
}

Expand Down Expand Up @@ -147,6 +149,22 @@ namespace kspp {
return producer_message_timeout_;
}

void cluster_config::set_min_topology_buffering(std::chrono::milliseconds timeout) {
min_topology_buffering_ = timeout;
}

std::chrono::milliseconds cluster_config::get_min_topology_buffering() const{
return min_topology_buffering_;
}

void cluster_config::set_max_pending_sink_messages(size_t sz){
max_pending_sink_messages_ = sz;
}

size_t cluster_config::get_max_pending_sink_messages() const {
return max_pending_sink_messages_;
}

void cluster_config::set_fail_fast(bool state) {
fail_fast_ = state;
}
Expand Down
48 changes: 45 additions & 3 deletions src/topology.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <kspp/topology.h>
#include <kspp/kspp.h>
#include <kspp/utils/kafka_utils.h>
#include <algorithm>

using namespace std::chrono_literals;

Expand All @@ -12,8 +13,9 @@ namespace kspp {
, _cluster_config(cc)
, _is_started(false)
, _topology_id(topology_id)
, _next_gc_ts(0) {

, _next_gc_ts(0)
, _min_buffering_ms(cc->get_min_topology_buffering().count())
, _max_pending_sink_messages(cc->get_max_pending_sink_messages()) {
LOG(INFO) << "topology created, name:" << name();
}

Expand Down Expand Up @@ -171,7 +173,7 @@ namespace kspp {
size_t sink_queue_len = 0;
for (auto &&i : _sinks)
sink_queue_len += i->outbound_queue_len();
if (sink_queue_len > 50000)
if (sink_queue_len > _max_pending_sink_messages)
return 0;

//int64_t tick = milliseconds_since_epoch();
Expand All @@ -195,6 +197,46 @@ namespace kspp {
return res;
}

std::size_t topology::process_1s(){
for (auto &&i : _sinks)
i->poll(0);
for (auto &&i : _partition_processors)
i->poll(0);

size_t sink_queue_len = 0;
for (auto &&i : _sinks)
sink_queue_len += i->outbound_queue_len();
if (sink_queue_len > _max_pending_sink_messages)
return 0;

int64_t min_ts = INT64_MAX;
for (auto &&i : _partition_processors)
min_ts = std::min(min_ts, i->next_event_time());


int64_t max_ts = std::min(min_ts+1000, kspp::milliseconds_since_epoch()-_min_buffering_ms);

for (auto &&i : _sinks)
i->process(max_ts);

size_t ev_count=0;

for (int64_t ts = min_ts; ts != max_ts; ++ts)
for (auto &&i : _partition_processors)
i->process(ts);

for (auto &&i : _sinks)
i->process(max_ts);

if (max_ts > _next_gc_ts) {
for (auto &&i : _partition_processors)
i->garbage_collect(max_ts);
for (auto &&i : _sinks)
i->garbage_collect(max_ts);
_next_gc_ts = max_ts + 10000; // 10 sec
}
}

void topology::close() {
for (auto &&i : _partition_processors) {
i->close();
Expand Down
14 changes: 5 additions & 9 deletions tests/test8_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <kspp/state_stores/mem_store.h>
#include <kspp/processors/ktable.h>
#include <kspp/processors/join.h>
//#include <kspp/sinks/generic_sink.h>
#include <kspp/sinks/array_sink.h>

/*
Expand Down Expand Up @@ -112,8 +111,7 @@ int main(int argc, char **argv) {

topology->start(kspp::OFFSET_BEGINNING);

for (int64_t ts=0; ts!=20; ++ts)
topology->process(ts);
topology->process_1s();

assert(expected.size() == actual.size());
for (int i = 0; i != expected.size(); ++i)
Expand Down Expand Up @@ -161,8 +159,7 @@ int main(int argc, char **argv) {

topology->start(kspp::OFFSET_BEGINNING);

for (int64_t ts=0; ts!=20; ++ts)
topology->process(ts);
topology->process_1s();

assert(expected.size() == actual.size());
for (int i = 0; i != expected.size(); ++i)
Expand Down Expand Up @@ -210,8 +207,7 @@ int main(int argc, char **argv) {

topology->start(kspp::OFFSET_BEGINNING);

for (int64_t ts=0; ts!=20; ++ts)
topology->process(ts);
topology->process_1s();

assert(expected.size() == actual.size());
for (int i = 0; i != expected.size(); ++i)
Expand Down Expand Up @@ -259,8 +255,8 @@ int main(int argc, char **argv) {

topology->start(kspp::OFFSET_BEGINNING);

for (int64_t ts=0; ts!=20; ++ts)
topology->process(ts);
topology->process_1s();


assert(expected.size() == actual.size());
for (int i = 0; i != expected.size(); ++i)
Expand Down

0 comments on commit 761b2ed

Please sign in to comment.