Skip to content

Commit

Permalink
Merge pull request #19 from LuckySherry/master
Browse files Browse the repository at this point in the history
v2.0.2 update
  • Loading branch information
LuckySherry authored Aug 8, 2019
2 parents c23c4da + c711abc commit eb4b6e1
Show file tree
Hide file tree
Showing 19 changed files with 885 additions and 58 deletions.
9 changes: 8 additions & 1 deletion Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log
## [Unreleased] - 2019-07-24
## [2.0.2] - 2019-08-08
### Added
* 新增us.conf 默认参数配置
* 新增 dynamic_host_default 模式request/response policy, 支持运行时定义下游服务 ip:port
### Changed
### Fixed
* 修复 flow 中多个 rank 不生效的问题 [issue#14](https://github.com/baidu/unit-uskit/issues/14)
## [2.0.1] - 2019-07-24
### Added
* 新增异步并发模式
* 新增异步全局退出模式
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ mkdir -p _build/conf/us/demo

--url_path:指定uskit服务的url路径,默认为`/us`

-http_verbose: 在stderr输出http网络请求和返回的数据

-http_verbose_max_body_length: 指定http_verbose输出数据的最大长度

-redis_verbose:在stderr输出redis请求和返回的数据

如果配置有错,会导致服务启动失败;

成功启动uskit服务后,可以通过`<HOST>:8888/us`发起HTTP POST请求,请求体使用json格式,请求参数如下:
Expand Down
7 changes: 7 additions & 0 deletions proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message RequestConfig {
repeated KVE dynamic_args = 11;
optional string dynamic_args_node = 12 [default=""];
optional string dynamic_args_path = 13 [default="/"];
optional string host_ip_port = 14;
}

message ResponseConfig {
Expand Down Expand Up @@ -139,4 +140,10 @@ message FlowEngineConfig {
message UnifiedSchedulerConfig {
optional string root_dir = 1 [default="./conf/us"];
repeated string load = 2;
message RequiredParam {
required string param_name = 3;
optional string param_path = 4;
optional string default_value = 5;
}
repeated RequiredParam required_params = 6;
}
12 changes: 8 additions & 4 deletions src/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ int Backend::init(const BackendConfig& config) {
for (int i = 0; i < config.request_template_size(); ++i) {
const RequestConfig& request_template = config.request_template(i);
std::unique_ptr<BackendRequestConfig> request_config;
if (protocol == "http" && !_is_dynamic) {
request_config.reset(new HttpRequestConfig);
} else if (protocol == "http" && _is_dynamic) {
request_config.reset(new DynamicHttpRequestConfig);
if (protocol == "http") {
if (!_is_dynamic) {
request_config.reset(new HttpRequestConfig);
} else if (request_template.has_host_ip_port()) {
request_config.reset(new HostDynHttpRequestConfig);
} else {
request_config.reset(new DynamicHttpRequestConfig);
}
} else if (protocol == "redis") {
request_config.reset(new RedisRequestConfig);
} else {
Expand Down
139 changes: 139 additions & 0 deletions src/dynamic_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,142 @@ int DynamicHttpRequestConfig::run(expression::ExpressionContext& context) const
return 0;
}

int HostDynHttpRequestConfig::init(
const RequestConfig& config,
const BackendRequestConfig* template_config) {
if (BackendRequestConfig::init(config, template_config) != 0) {
return -1;
}

if (_http_header.init(config.http_header()) != 0) {
return -1;
}
if (_http_query.init(config.http_query()) != 0) {
return -1;
}
if (_http_body.init(config.http_body()) != 0) {
return -1;
}

if (config.has_host_ip_port()) {
expression::Driver driver;
if (driver.parse("", config.host_ip_port()) != 0) {
LOG(ERROR) << "Failed to parse expression";
return -1;
}
_host_ip_port = driver.get_expression();
}
if (config.has_http_method()) {
_http_method = Expr(new expression::String(config.http_method()));
}
if (config.has_http_uri()) {
_http_uri = Expr(new expression::String(config.http_uri()));
}

return 0;
}

int HostDynHttpRequestConfig::run(expression::ExpressionContext& context) const {
if (BackendRequestConfig::run(context) != 0) {
return -1;
}
rapidjson::Document::AllocatorType& allocator = context.allocator();

if (_http_method) {
rapidjson::Value value;
if (_http_method->run(context, value) != 0) {
return -1;
}
context.set_variable("http_method", value);
}
if (_http_uri) {
rapidjson::Value value;
if (_http_uri->run(context, value) != 0) {
return -1;
}
context.set_variable("http_uri", value);
}
if (_host_ip_port) {
rapidjson::Value value;
if (_host_ip_port->run(context, value) != 0) {
return -1;
}
context.set_variable("host_ip_port", value);
}

rapidjson::Document http_header_doc(&allocator);
if (_http_header.run(context, http_header_doc) != 0) {
US_LOG(ERROR) << "Failed to evaluate http header";
return -1;
}
if (!http_header_doc.IsNull()) {
context.merge_variable("http_header", http_header_doc);
}

rapidjson::Document http_query_doc(&allocator);
if (_http_query.run(context, http_query_doc) != 0) {
US_LOG(ERROR) << "Failed to evaluate http query";
return -1;
}
if (!http_query_doc.IsNull()) {
context.merge_variable("http_query", http_query_doc);
}

rapidjson::Document http_body_doc(&allocator);
if (_http_body.run(context, http_body_doc) != 0) {
US_LOG(ERROR) << "Failed to evaluate http body";
return -1;
}
if (!http_body_doc.IsNull()) {
context.merge_variable("http_body", http_body_doc);
}

return 0;
}

int NsheadRequestConfig::init(
const RequestConfig& config,
const BackendRequestConfig* template_config) {
if (BackendRequestConfig::init(config, template_config) != 0) {
return -1;
}
if (_http_body.init(config.http_body()) != 0) {
return -1;
}
if (config.has_data_encode()) {
_data_encode = Expr(new expression::String(config.data_encode()));
}

return 0;
}

int NsheadRequestConfig::run(expression::ExpressionContext& context) const {
if (BackendRequestConfig::run(context) != 0) {
return -1;
}

rapidjson::Document::AllocatorType& allocator = context.allocator();

if (_data_encode) {
rapidjson::Value value;
if (_data_encode->run(context, value) != 0) {
return -1;
}
context.set_variable("data_encode", value);
}

rapidjson::Document http_body_doc(&allocator);
if (_http_body.run(context, http_body_doc) != 0) {
US_LOG(ERROR) << "Failed to evaluate http body";
return -1;
}
if (!http_body_doc.IsNull()) {
context.merge_variable("http_body", http_body_doc);
}

return 0;
}

int RedisCommand::init(const RequestConfig::RedisCommandConfig& config) {
_op = config.op();
if (_arg.init(config.arg()) != 0) {
Expand Down Expand Up @@ -942,6 +1078,9 @@ int FlowRankConfig::run(const RankEngine* rank_engine, expression::ExpressionCon
}
rank_result = std::move(top_k_rank_result);
}
if (context.has_variable("rank")) {
context.erase_variable("rank");
}
context.set_variable("rank", rank_result);

return 0;
Expand Down
44 changes: 44 additions & 0 deletions src/dynamic_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,50 @@ class DynamicHttpRequestConfig : public BackendRequestConfig {
KEMap _dynamic_args;
};

// Dynamic configuration of HTTP request with host not pre-defined.
class HostDynHttpRequestConfig : public BackendRequestConfig {
public:
HostDynHttpRequestConfig() {}
~HostDynHttpRequestConfig() {}
HostDynHttpRequestConfig(HostDynHttpRequestConfig&&) = default;
// Initialize from configuration and template(optional).
// Returns 0 on success, -1 otherwise.
int init(const RequestConfig& config, const BackendRequestConfig* template_config);
// Evaluate all expressions within given context and generate HTTP
// request configuration.
// Returns 0 on success, -1 otherwise.
int run(expression::ExpressionContext& context) const;

private:
Expr _http_method;
Expr _http_uri;
Expr _host_ip_port;

KEMap _http_header;
KEMap _http_query;
KEMap _http_body;
};

// Dynamic configuration of NsHead request.
class NsheadRequestConfig : public BackendRequestConfig {
public:
NsheadRequestConfig() {}
~NsheadRequestConfig() {}
NsheadRequestConfig(NsheadRequestConfig&&) = default;
// Initialize from configuration and template(optional).
// Returns 0 on success, -1 otherwise.
int init(const RequestConfig& config, const BackendRequestConfig* template_config);
// Evaluate all expressions within given context and generate HTTP
// request configuration.
// Returns 0 on success, -1 otherwise.
int run(expression::ExpressionContext& context) const;

private:
Expr _data_encode;

KEMap _http_body;
};

// Dynamic configuration of Redis command.
class RedisCommand {
public:
Expand Down
19 changes: 16 additions & 3 deletions src/function/builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,7 @@ int for_each_get_by_path(rapidjson::Value& args, rapidjson::Document& return_val
if (value != nullptr) {
rapidjson::Value ele_value(*value, return_value.GetAllocator());
return_value.PushBack(ele_value, return_value.GetAllocator());
}
else {
} else {
rapidjson::Value ele_value;
if (args.Size() == 2) {
ele_value.SetNull();
Expand Down Expand Up @@ -577,7 +576,7 @@ int json_decode(rapidjson::Value& args, rapidjson::Document& return_value) {
return 0;
}

int replace_all(rapidjson::Value& args, rapidjson::Document& return_value){
int replace_all(rapidjson::Value& args, rapidjson::Document& return_value) {
if (args.Size() != 3) {
US_LOG(ERROR) << "Function expects 3 arguments, " << args.Size() << " were given";
return -1;
Expand Down Expand Up @@ -907,6 +906,20 @@ int sha1_hash(rapidjson::Value& args, rapidjson::Document& return_value) {

int time(rapidjson::Value& args, rapidjson::Document& return_value) {
std::time_t timestamp = std::time(nullptr);
if (args.Size() == 1) {
if (!args[0].IsString()) {
US_LOG(ERROR) << "Function expects seconde argement to be string, "
<< get_value_type(args[0]) << "were given";
return -1;
}
std::tm now_tm ;
localtime_r(&timestamp, &now_tm);
char buffer[128];
std::strftime(buffer, sizeof(buffer), args[0].GetString(), &now_tm);
std::string ret_str(buffer);
return_value.SetString(ret_str.c_str(), ret_str.length(), return_value.GetAllocator());
return 0;
}
return_value.SetInt(timestamp);
return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions src/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "policy/backend/http_policy.h"
#include "policy/backend/redis_policy.h"
#include "policy/backend/dynamic_policy.h"
#include "policy/backend/host_dyn_http_policy.h"
#include "policy/flow/default_policy.h"
#include "policy/flow/recurrent_policy.h"
#include "policy/flow/global_policy.h"
Expand Down Expand Up @@ -59,13 +60,16 @@ void register_policy() {
REGISTER_REQUEST_POLICY("redis_default", policy::backend::RedisRequestPolicy);

REGISTER_REQUEST_POLICY("dynamic_default", policy::backend::DynamicHttpRequestPolicy);
REGISTER_REQUEST_POLICY("dynamic_host_default", policy::backend::HostDynHttpRequestPolicy);

// Response policy
REGISTER_RESPONSE_POLICY("http_default", policy::backend::HttpResponsePolicy);
REGISTER_RESPONSE_POLICY("redis_default", policy::backend::RedisResponsePolicy);

REGISTER_RESPONSE_POLICY("dynamic_default", policy::backend::DynamicHttpResponsePolicy);

REGISTER_RESPONSE_POLICY("dynamic_host_default", policy::backend::HostDynHttpResponsePolicy);

// Flow policy
REGISTER_FLOW_POLICY("default", policy::flow::DefaultPolicy);
REGISTER_FLOW_POLICY("recurrent", policy::flow::RecurrentPolicy);
Expand Down
Loading

0 comments on commit eb4b6e1

Please sign in to comment.