Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shell: copy_data command supports filter #271

Merged
merged 7 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocksdb
Submodule rocksdb updated 1 files
+5 −5 docs/Gemfile.lock
285 changes: 185 additions & 100 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>

#include <rrdb/rrdb.code.definition.h>
#include <pegasus/version.h>
Expand Down Expand Up @@ -101,6 +102,11 @@ struct scan_data_context
int split_id;
int max_batch_count;
int timeout_ms;
bool no_overwrite; // if set true, then use check_and_set() instead of set()
// when inserting data to destination table for copy_data,
// to not overwrite old data if it aleady exist.
pegasus::pegasus_client::filter_type value_filter_type;
std::string value_filter_pattern;
pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
pegasus::pegasus_client *client;
pegasus::geo::geo_client *geoclient;
Expand Down Expand Up @@ -133,6 +139,8 @@ struct scan_data_context
split_id(split_id_),
max_batch_count(max_batch_count_),
timeout_ms(timeout_ms_),
no_overwrite(false),
value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
scanner(scanner_),
client(client_),
geoclient(geoclient_),
Expand All @@ -150,6 +158,12 @@ struct scan_data_context
// when split_request_count = 1
dassert(max_batch_count > 1, "");
}
void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
value_filter_type = type;
value_filter_pattern = pattern;
}
void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
Expand All @@ -160,6 +174,36 @@ inline void update_atomic_max(std::atomic_long &max, long value)
}
}
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
const std::string &filter_pattern,
const std::string &value)
{
switch (filter_type) {
case pegasus::pegasus_client::FT_NO_FILTER:
return true;
case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
case pegasus::pegasus_client::FT_MATCH_PREFIX:
case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
} else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
} else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
}
}
default:
dassert(false, "unsupported filter type: %d", filter_type);
}
return false;
}
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
Expand All @@ -171,112 +215,153 @@ inline void scan_data_next(scan_data_context *context)
std::string &&value,
pegasus::pegasus_client::internal_info &&info) {
if (ret == pegasus::PERR_OK) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
context->client->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
if (context->value_filter_type == pegasus::pegasus_client::FT_NO_FILTER ||
validate_filter(
context->value_filter_type, context->value_filter_pattern, value)) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
if (context->no_overwrite) {
auto callback = [context](
int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);

long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);

long value_size = value.size();
context->value_size_histogram.Add(value_size);

long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);

if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);

long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);

long value_size = value.size();
context->value_size_histogram.Add(value_size);

long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);

if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
}
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
Expand Down
Loading