Skip to content

Commit

Permalink
Fixed problem where 'Files' column not updated properly after split
Browse files Browse the repository at this point in the history
- Fixed problem where Gc.Interval was being treated as seconds
- Fixed core dump on exit with hypertable interpreter
  • Loading branch information
Doug Judd committed Feb 2, 2009
1 parent 6d9ccb9 commit d5b01c8
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 193 deletions.
189 changes: 97 additions & 92 deletions src/cc/Benchmark/random/random_write_test.cc
Expand Up @@ -32,6 +32,7 @@
#include <boost/thread/xtime.hpp>

#include "Common/Checksum.h"
#include "Common/Config.h"
#include "Common/Error.h"
#include "Common/FileUtils.h"
#include "Common/Random.h"
Expand All @@ -44,26 +45,39 @@
#include "Hypertable/Lib/KeySpec.h"

using namespace Hypertable;
using namespace Hypertable::Config;
using namespace std;


namespace {
const char *usage[] = {
"usage: random_write_test [options] <total-bytes>",
"",
" options:",
" --blocksize=<n> Size of value to write",
" --checksum-file=<file> Write keys + value checksums to <file>",
" --config=<file> Read Hypertable config properties from <file>",
" --seed=<n> Random number generator seed",
"",
" This program ...",
"",
(const char *)0
};

const char *usage =
"Usage: random_write_test [options] <total-bytes>\n\n"
"Description:\n"
" This program will generate random inserts into a table called\n"
" 'RandomTest' with a single column called 'Field'. The keys are\n"
" a randomly generated 12 digit number. The values contain randomly\n"
" generated ASCII and are of size 'blocksize'\n\n"
"Options";

struct AppPolicy : Config::Policy {
static void init_options() {
cmdline_desc(usage).add_options()
("blocksize", i32()->default_value(1000), "Size of value to write");
cmdline_desc(usage).add_options()
("checksum-file", str(), "File to contain, for each insert, key '\t' <value-checksum> pairs");
cmdline_desc(usage).add_options()
("seed", i32()->default_value(1234), "Random number generator seed");
cmdline_hidden_desc().add_options()("total-bytes", i64(), "");
cmdline_positional_desc().add("total-bytes", -1);
}
//static void init() {
//}

};
}

typedef Meta::list<AppPolicy, DefaultPolicy> Policies;

int main(int argc, char **argv) {
ClientPtr hypertable_client_ptr;
Expand All @@ -74,117 +88,108 @@ int main(int argc, char **argv) {
char *value_ptr;
uint64_t total = 0;
size_t blocksize = 0;
unsigned long seed = 1234;
unsigned long seed;
String config_file;
bool write_checksums = false;
uint32_t checksum;
ofstream checksum_out;
size_t R;
Stopwatch stopwatch;

for (size_t i=1; i<(size_t)argc; i++) {
if (argv[i][0] == '-') {
if (!strncmp(argv[i], "--blocksize=", 12)) {
blocksize = atoi(&argv[i][12]);
}
else if (!strncmp(argv[i], "--seed=", 7)) {
seed = atoi(&argv[i][7]);
}
else if (!strncmp(argv[i], "--checksum-file=", 16)) {
checksum_out.open(&argv[i][16]);
write_checksums = true;
}
else if (!strncmp(argv[i], "--config=", 9)) {
config_file = &argv[i][9];
}
else
Usage::dump_and_exit(usage);
}
else {
if (total != 0)
Usage::dump_and_exit(usage);
total = strtoll(argv[i], 0, 0);
}
}
try {
init_with_policies<Policies>(argc, argv);

if (total == 0)
Usage::dump_and_exit(usage);
ReactorFactory::initialize(1);

System::initialize();
blocksize = get_i32("blocksize");

Random::seed(seed);
seed = get_i32("seed");

if (blocksize == 0)
blocksize = 1000;
if (has("checksum-file")) {
String checksum_filename = get("checksum-file", String());
checksum_out.open( checksum_filename.c_str() );
write_checksums = true;
}

size_t R = total / blocksize;
total = get_i64("total-bytes");

random_chars.reset( new char [ R + blocksize ] );
System::initialize();

Random::fill_buffer_with_random_ascii(random_chars.get(), R + blocksize);
Random::seed(seed);

Random::seed(seed);
R = total / blocksize;

try {
if (config_file != "")
hypertable_client_ptr = new Hypertable::Client(
System::locate_install_dir(argv[0]), config_file);
else
hypertable_client_ptr = new Hypertable::Client(
System::locate_install_dir(argv[0]));
random_chars.reset( new char [ R + blocksize ] );

table_ptr = hypertable_client_ptr->open_table("RandomTest");
Random::fill_buffer_with_random_ascii(random_chars.get(), R + blocksize);

mutator_ptr = table_ptr->create_mutator();
}
catch (Hypertable::Exception &e) {
cerr << "error: " << Error::get_text(e.code()) << " - " << e.what() << endl;
return 1;
}
Random::seed(seed);

key.column_family = "Field";
try {
if (config_file != "")
hypertable_client_ptr = new Hypertable::Client(
System::locate_install_dir(argv[0]), config_file);
else
hypertable_client_ptr = new Hypertable::Client(
System::locate_install_dir(argv[0]));

char key_data[32];
table_ptr = hypertable_client_ptr->open_table("RandomTest");

key.row_len = 12;
key.row = key_data; // Row key: a random 12-digit number.
key_data[key.row_len] = '\0';
mutator_ptr = table_ptr->create_mutator();
}
catch (Hypertable::Exception &e) {
cerr << "error: " << Error::get_text(e.code()) << " - " << e.what() << endl;
return 1;
}

Stopwatch stopwatch;
key.column_family = "Field";

{
boost::progress_display progress_meter(R);
char key_data[32];

try {
key.row_len = 12;
key.row = key_data; // Row key: a random 12-digit number.
key_data[key.row_len] = '\0';

value_ptr = random_chars.get();
{
boost::progress_display progress_meter(R);

for (size_t i = 0; i < R; ++i) {
try {

Random::fill_buffer_with_random_ascii(key_data, 12);
value_ptr = random_chars.get();

if (write_checksums) {
checksum = fletcher32(value_ptr, blocksize);
checksum_out << key_data << "\t" << checksum << "\n";
}
mutator_ptr->set(key, value_ptr, blocksize);
for (size_t i = 0; i < R; ++i) {

value_ptr++;
Random::fill_buffer_with_random_ascii(key_data, 12);

progress_meter += 1;
if (write_checksums) {
checksum = fletcher32(value_ptr, blocksize);
checksum_out << key_data << "\t" << checksum << "\n";
}
mutator_ptr->set(key, value_ptr, blocksize);

}
value_ptr++;

mutator_ptr->flush();
}
catch (Hypertable::Exception &e) {
HT_ERROR_OUT << e << HT_END;
exit(1);
progress_meter += 1;

}

mutator_ptr->flush();
}
catch (Hypertable::Exception &e) {
HT_ERROR_OUT << e << HT_END;
exit(1);
}
}
}

stopwatch.stop();
stopwatch.stop();

if (write_checksums)
checksum_out.close();
if (write_checksums)
checksum_out.close();
}
catch (Exception &e) {
HT_ERROR_OUT << e << HT_END;
_exit(1);
}

double total_written = (double)total + (double)(R*12);
printf(" Elapsed time: %.2f s\n", stopwatch.elapsed());
Expand All @@ -193,5 +198,5 @@ int main(int argc, char **argv) {
total_written / stopwatch.elapsed());
printf(" Throughput: %.2f inserts/s\n",
(double)R / stopwatch.elapsed());
exit(0); // don't bother with static objects
_exit(0); // don't bother with static objects
}
8 changes: 8 additions & 0 deletions src/cc/Common/Config.cc
Expand Up @@ -206,6 +206,14 @@ void init_default_options() {
"Hyperspace Grace period (see Chubby paper)")
("Hypertable.Client.Timeout", i32()->default_value(120000), "Timeout in "
"(millisec) for Hypertable client API")
("Hypertable.Lib.Mutator.FlushDelay", i32(), "Number of "
"milliseconds to wait prior to flushing scatter buffers (for testing)")
("Hypertable.Lib.Mutator.ScatterBuffer.FlushLimit.PerServer",
i32()->default_value(1*M), "Amount of updates (bytes) accumulated for a "
"single server to trigger a scatter buffer flush")
("Hypertable.Lib.Mutator.ScatterBuffer.FlushLimit.Aggregate",
i64()->default_value(40*M), "Amount of updates (bytes) accumulated for "
"all servers to trigger a scatter buffer flush")
("Hypertable.LocationCache.MaxEntries", i64()->default_value(1*M),
"Size of range location cache in number of entries")
("Hypertable.Master.Timeout", i32()->default_value(30000), "Timeout "
Expand Down
19 changes: 13 additions & 6 deletions src/cc/Hypertable/Lib/TableMutator.cc
Expand Up @@ -28,17 +28,15 @@ extern "C" {

#include <boost/algorithm/string.hpp>

#include "Common/Config.h"
#include "Common/StringExt.h"

#include "Defaults.h"
#include "Key.h"
#include "TableMutator.h"

using namespace Hypertable;

namespace {
const uint64_t DEFAULT_MAX_MEMORY = 20000000LL;
}
using namespace Hypertable::Config;

void TableMutator::handle_exceptions() {
try {
Expand Down Expand Up @@ -70,11 +68,17 @@ void TableMutator::handle_exceptions() {
TableMutator::TableMutator(Comm *comm, Table *table, SchemaPtr &schema,
RangeLocatorPtr &range_locator, uint32_t timeout_ms)
: m_comm(comm), m_schema(schema), m_range_locator(range_locator),
m_table(table), m_memory_used(0), m_max_memory(DEFAULT_MAX_MEMORY),
m_resends(0), m_timeout_ms(timeout_ms), m_last_error(Error::OK),
m_table(table), m_memory_used(0), m_resends(0),
m_timeout_ms(timeout_ms), m_flush_delay(0), m_last_error(Error::OK),
m_last_op(0) {

HT_ASSERT(timeout_ms);

if (has("Hypertable.Lib.Mutator.FlushDelay"))
m_flush_delay = properties->get_i32("Hypertable.Lib.Mutator.FlushDelay");

m_max_memory = properties->get_i64(
"Hypertable.Lib.Mutator.ScatterBuffer.FlushLimit.Aggregate");

m_buffer = new TableMutatorScatterBuffer(m_comm, &m_table->identifier(),
m_schema, m_range_locator, timeout_ms);
Expand Down Expand Up @@ -200,6 +204,9 @@ void TableMutator::auto_flush(Timer &timer) {
if (m_prev_buffer)
wait_for_previous_buffer(timer);

if (m_flush_delay)
poll(0, 0, m_flush_delay);

m_buffer->send();
m_prev_buffer = m_buffer;
m_buffer = new TableMutatorScatterBuffer(m_comm,
Expand Down
1 change: 1 addition & 0 deletions src/cc/Hypertable/Lib/TableMutator.h
Expand Up @@ -206,6 +206,7 @@ namespace Hypertable {
TableMutatorScatterBufferPtr m_prev_buffer;
uint64_t m_resends;
uint32_t m_timeout_ms;
uint32_t m_flush_delay;

int32_t m_last_error;
int m_last_op;
Expand Down
17 changes: 9 additions & 8 deletions src/cc/Hypertable/Lib/TableMutatorScatterBuffer.cc
Expand Up @@ -20,6 +20,7 @@
*/

#include "Common/Compat.h"
#include "Common/Config.h"
#include "Common/Timer.h"

#include "Defaults.h"
Expand All @@ -29,11 +30,7 @@
#include "TableMutatorScatterBuffer.h"

using namespace Hypertable;

namespace {
const uint32_t MAX_SEND_BUFFER_SIZE = 1000000;
}

using namespace Hypertable::Config;

/**
*
Expand All @@ -47,6 +44,10 @@ TableMutatorScatterBuffer::TableMutatorScatterBuffer(Comm *comm,
m_timeout_ms(timeout_ms) {

m_range_locator_ptr->get_location_cache(m_cache_ptr);

m_server_flush_limit = properties->get_i32(
"Hypertable.Lib.Mutator.ScatterBuffer.FlushLimit.PerServer");

}


Expand Down Expand Up @@ -83,7 +84,7 @@ TableMutatorScatterBuffer::set(const Key &key, const void *value,
key.column_family_code, key.column_qualifier, key.timestamp);
append_as_byte_string((*iter).second->accum, value, value_len);

if ((*iter).second->accum.fill() > MAX_SEND_BUFFER_SIZE)
if ((*iter).second->accum.fill() > m_server_flush_limit)
m_full = true;
}

Expand Down Expand Up @@ -126,7 +127,7 @@ void TableMutatorScatterBuffer::set_delete(const Key &key, Timer &timer) {
key.column_family_code, key.column_qualifier, key.timestamp);
append_as_byte_string((*iter).second->accum, 0, 0);

if ((*iter).second->accum.fill() > MAX_SEND_BUFFER_SIZE)
if ((*iter).second->accum.fill() > m_server_flush_limit)
m_full = true;
}

Expand Down Expand Up @@ -166,7 +167,7 @@ TableMutatorScatterBuffer::set(SerializedKey key, ByteString value,
(*iter).second->accum.add(key.ptr, (ptr-key.ptr)+len);
(*iter).second->accum.add(value.ptr, value.length());

if ((*iter).second->accum.fill() > MAX_SEND_BUFFER_SIZE)
if ((*iter).second->accum.fill() > m_server_flush_limit)
m_full = true;
}

Expand Down
1 change: 1 addition & 0 deletions src/cc/Hypertable/Lib/TableMutatorScatterBuffer.h
Expand Up @@ -90,6 +90,7 @@ namespace Hypertable {
FailedMutations m_failed_mutations;
FlyweightString m_constant_strings;
uint32_t m_timeout_ms;
uint32_t m_server_flush_limit;
};

typedef intrusive_ptr<TableMutatorScatterBuffer> TableMutatorScatterBufferPtr;
Expand Down

0 comments on commit d5b01c8

Please sign in to comment.