Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

implement splitting/extracting block log for eosio_blocklog #10037

Merged
merged 4 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 119 additions & 40 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,20 +398,24 @@ namespace eosio { namespace chain {
block_log_data log_data;
block_log_index log_index;

block_log_bundle(fc::path block_dir) {
block_file_name = block_dir / "blocks.log";
index_file_name = block_dir / "blocks.index";
block_log_bundle(fc::path block_file, fc::path index_file)
: block_file_name(block_file)
, index_file_name(index_file) {

log_data.open(block_file_name);
log_index.open(index_file_name);

uint32_t log_num_blocks = log_data.num_blocks();
uint32_t index_num_blocks = log_index.num_blocks();

EOS_ASSERT(
log_num_blocks == index_num_blocks, block_log_exception,
"${block_file_name} says it has ${log_num_blocks} blocks which disagrees with ${index_num_blocks} indicated by ${index_file_name}",
("block_file_name", block_file_name)("log_num_blocks", log_num_blocks)("index_num_blocks", index_num_blocks)("index_file_name", index_file_name));
EOS_ASSERT(log_num_blocks == index_num_blocks, block_log_exception,
"${block_file_name} says it has ${log_num_blocks} blocks which disagrees with ${index_num_blocks} "
"indicated by ${index_file_name}",
("block_file_name", block_file_name)("log_num_blocks", log_num_blocks)(
"index_num_blocks", index_num_blocks)("index_file_name", index_file_name));
}

block_log_bundle(fc::path block_dir) : block_log_bundle(block_dir / "blocks.log", block_dir / "blocks.index") {
}
};

Expand Down Expand Up @@ -1085,6 +1089,59 @@ namespace eosio { namespace chain {
return std::clamp(version, min_supported_version, max_supported_version) == version;
}

void extract_blocklog_i(block_log_bundle& log_bundle, fc::path new_block_filename, fc::path new_index_filename,
uint32_t first_block_num, uint32_t num_blocks) {

auto position_for_block = [&log_bundle](uint64_t block_num) {
uint64_t block_order = block_num - log_bundle.log_data.first_block_num();
if (block_order < static_cast<uint64_t>(log_bundle.log_index.num_blocks()))
return log_bundle.log_index.nth_block_position(block_order);
return log_bundle.log_data.size();
};

const auto num_blocks_to_skip = first_block_num - log_bundle.log_data.first_block_num();
const uint64_t first_kept_block_pos = position_for_block(first_block_num);
const uint64_t nbytes_to_trim =
num_blocks_to_skip == 0 ? 0 : first_kept_block_pos - block_log_preamble::nbytes_with_chain_id;
const uint64_t last_block_num = first_block_num + num_blocks;
const uint64_t last_block_pos = position_for_block(last_block_num);
const auto new_block_file_size = last_block_pos - nbytes_to_trim;

boost::iostreams::mapped_file_sink new_block_file;
create_mapped_file(new_block_file, new_block_filename.generic_string(), new_block_file_size);

if (num_blocks_to_skip == 0) {
memcpy(new_block_file.data(), log_bundle.log_data.data(), new_block_file_size);

boost::iostreams::mapped_file_sink new_index_file;
const uint64_t index_file_size = num_blocks * sizeof(uint64_t);
create_mapped_file(new_index_file, new_index_filename.generic_string(), index_file_size);
memcpy(new_index_file.data(), log_bundle.log_index.data(), index_file_size);
return;
}

fc::datastream<char*> ds(new_block_file.data(), new_block_file.size());
block_log_preamble preamble;
// version 4 or above have different log entry format; therefore version 1 to 3 can only be upgrade up to version
// 3 format.
preamble.version = log_bundle.log_data.version() < pruned_transaction_version ? genesis_state_or_chain_id_version
: block_log::max_supported_version;
preamble.first_block_num = first_block_num;
preamble.chain_context = log_bundle.log_data.chain_id();
preamble.write_to(ds);
ds.write(log_bundle.log_data.data() + first_kept_block_pos, last_block_pos - first_kept_block_pos);

index_writer index(new_index_filename, num_blocks);

// walk along the block position of each block entry and decrement its value by nbytes_to_trim
for (auto itr = make_reverse_block_position_iterator(new_block_file, block_log_preamble::nbytes_with_chain_id);
itr.get_value() != block_log::npos; ++itr) {
auto new_pos = itr.get_value() - nbytes_to_trim;
index.write(new_pos);
itr.set_value(new_pos);
}
}

bool block_log::trim_blocklog_front(const fc::path& block_dir, const fc::path& temp_dir, uint32_t truncate_at_block) {
EOS_ASSERT( block_dir != temp_dir, block_log_exception, "block_dir and temp_dir need to be different directories" );

Expand All @@ -1105,42 +1162,13 @@ namespace eosio { namespace chain {
// ****** create the new block log file and write out the header for the file
fc::create_directories(temp_dir);
fc::path new_block_filename = temp_dir / "blocks.log";
fc::path new_index_filename = temp_dir / "blocks.index";

static_assert( block_log::max_supported_version == pruned_transaction_version,
"Code was written to support format of version 4 or lower, need to update this code for latest format." );

const auto preamble_size = block_log_preamble::nbytes_with_chain_id;
const auto num_blocks_to_truncate = truncate_at_block - log_bundle.log_data.first_block_num();
const uint64_t first_kept_block_pos = log_bundle.log_index.nth_block_position(num_blocks_to_truncate);
const uint64_t nbytes_to_trim = first_kept_block_pos - preamble_size;
const auto new_block_file_size = log_bundle.log_data.size() - nbytes_to_trim;

boost::iostreams::mapped_file_sink new_block_file;
create_mapped_file(new_block_file, new_block_filename.generic_string(), new_block_file_size);
fc::datastream<char*> ds(new_block_file.data(), new_block_file.size());

block_log_preamble preamble;
// version 4 or above have different log entry format; therefore version 1 to 3 can only be upgrade up to version 3 format.
preamble.version = log_bundle.log_data.version() < pruned_transaction_version ? genesis_state_or_chain_id_version : block_log::max_supported_version;
preamble.first_block_num = truncate_at_block;
preamble.chain_context = log_bundle.log_data.chain_id();
preamble.write_to(ds);

memcpy(new_block_file.data() + preamble_size, log_bundle.log_data.data() + first_kept_block_pos, new_block_file_size - preamble_size);

fc::path new_index_filename = temp_dir / "blocks.index";
index_writer index(new_index_filename, log_bundle.log_index.num_blocks() - num_blocks_to_truncate);

// walk along the block position of each block entry and decrement its value by nbytes_to_trim
for (auto itr = make_reverse_block_position_iterator(new_block_file, preamble_size);
itr.get_value() != block_log::npos; ++itr) {
auto new_pos = itr.get_value() - nbytes_to_trim;
index.write(new_pos);
itr.set_value(new_pos);
}

index.close();
new_block_file.close();
extract_blocklog_i(log_bundle, new_block_filename, new_index_filename, truncate_at_block,
log_bundle.log_data.last_block_num() - truncate_at_block + 1);

fc::path old_log = temp_dir / "old.log";
rename(log_bundle.block_file_name, old_log);
Expand Down Expand Up @@ -1198,4 +1226,55 @@ namespace eosio { namespace chain {
bool block_log::exists(const fc::path& data_dir) {
return fc::exists(data_dir / "blocks.log") && fc::exists(data_dir / "blocks.index");
}
}} /// eosio::chain

std::pair<fc::path, fc::path> blocklog_files(const fc::path& dir, uint32_t start_block_num, uint32_t num_blocks) {
const int bufsize = 64;
char buf[bufsize];
snprintf(buf, bufsize, "blocks-%d-%d.log", start_block_num, start_block_num + num_blocks - 1);
fc::path new_block_filename = dir / buf;
fc::path new_index_filename(new_block_filename);
new_index_filename.replace_extension(".index");
return std::make_pair(new_block_filename, new_index_filename);
}

void block_log::extract_blocklog(const fc::path& log_filename, const fc::path& index_filename,
const fc::path& dest_dir, uint32_t start_block_num, uint32_t num_blocks) {

block_log_bundle log_bundle(log_filename, index_filename);

EOS_ASSERT(start_block_num >= log_bundle.log_data.first_block_num(), block_log_exception,
"The first available block is block ${first_block}.",
("first_block", log_bundle.log_data.first_block_num()));

EOS_ASSERT(start_block_num + num_blocks - 1 <= log_bundle.log_data.last_block_num(), block_log_exception,
"The last available block is block ${last_block}.",
("last_block", log_bundle.log_data.last_block_num()));

if (!fc::exists(dest_dir))
fc::create_directories(dest_dir);

auto [new_block_filename, new_index_filename] = blocklog_files(dest_dir, start_block_num, num_blocks);

extract_blocklog_i(log_bundle, new_block_filename, new_index_filename, start_block_num, num_blocks);
}

void block_log::split_blocklog(const fc::path& block_dir, const fc::path& dest_dir, uint32_t stride) {

block_log_bundle log_bundle(block_dir);
const uint32_t first_block_num = log_bundle.log_data.first_block_num();
const uint32_t last_block_num = log_bundle.log_data.last_block_num();

if (!fc::exists(dest_dir))
fc::create_directories(dest_dir);

for (uint32_t i = (first_block_num - 1) / stride; i < (last_block_num + stride - 1) / stride; ++i) {
uint32_t start_block_num = std::max(i * stride + 1, first_block_num);
uint32_t num_blocks = std::min((i + 1) * stride, last_block_num) - start_block_num + 1;

auto [new_block_filename, new_index_filename] = blocklog_files(dest_dir, start_block_num, num_blocks);

extract_blocklog_i(log_bundle, new_block_filename, new_index_filename, start_block_num, num_blocks);
}
}
} // namespace chain
} // namespace eosio
6 changes: 5 additions & 1 deletion libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ namespace eosio { namespace chain {
*/
static void smoke_test(fc::path block_dir, uint32_t n);

private:
static void extract_blocklog(const fc::path& log_filename, const fc::path& index_filename,
const fc::path& dest_dir, uint32_t start_block, uint32_t num_blocks);
static void split_blocklog(const fc::path& block_dir, const fc::path& dest_dir, uint32_t stride);

private:
std::unique_ptr<detail::block_log_impl> my;
};
} }
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/log_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class log_index {
uint64_t back() const { return *(this->end() - 1); }
int num_blocks() const { return file.size() / sizeof(uint64_t); }
uint64_t nth_block_position(uint32_t n) const { return *(begin() + n); }

const char* data() const { return file.data(); }
};

} // namespace chain
Expand Down
71 changes: 64 additions & 7 deletions programs/eosio-blocklog/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct blocklog {
bool smoke_test = false;
bool prune_transactions = false;
bool help = false;
bool extract_blocklog = false;
uint32_t blocklog_split_stride = 0;
};

struct report_time {
Expand Down Expand Up @@ -176,6 +178,9 @@ void blocklog::set_program_options(options_description& cli)
cli.add_options()
("blocks-dir", bpo::value<bfs::path>()->default_value("blocks"),
"the location of the blocks directory (absolute path or relative to the current directory)")
("blocks-filebase", bpo::value<bfs::path>()->default_value("blocks"),
"the name of the blocks log/index files without file extension (absolute path or relative to the current directory)."
" This can only used for extract-blocklog.")
("state-history-dir", bpo::value<bfs::path>()->default_value("state-history"),
"the location of the state-history directory (absolute path or relative to the current dir)")
("output-file,o", bpo::value<bfs::path>(),
Expand All @@ -201,6 +206,13 @@ void blocklog::set_program_options(options_description& cli)
("transaction,t", bpo::value<std::vector<std::string> >()->multitoken(), "The transaction id to be pruned")
("prune-transactions", bpo::bool_switch(&prune_transactions)->default_value(false),
"Prune the context free data and signatures from specified transactions of specified block-num.")
("output-dir", bpo::value<bfs::path>()->default_value("."),
"the output location for 'split-blocklog' or 'extract-blocklog'.")
("split-blocklog", bpo::value<uint32_t>(&blocklog_split_stride)->default_value(0),
"split the block log file based on the stride and store the result in the specified 'output-dir'.")
("extract-blocklog", bpo::bool_switch(&extract_blocklog)->default_value(false),
"Extract blocks from blocks.log and blocks.index and keep the original."
" Must give 'blocks-dir' or 'blocks-filebase','output-dir', 'first' and 'last'.")
("help,h", bpo::bool_switch(&help)->default_value(false), "Print this help message and exit.")
;
}
Expand Down Expand Up @@ -287,6 +299,9 @@ int prune_transactions(bfs::path block_dir, bfs::path state_history_dir, uint32_
prune_transactions<state_history_traces_log>("state history traces log", state_history_dir, block_num, ids);
}

inline bfs::path operator+(bfs::path left, bfs::path right){return bfs::path(left)+=right;}


int main(int argc, char** argv) {
std::ios::sync_with_stdio(false); // for potential performance boost for large block log files
options_description cli ("eosio-blocklog command line options");
Expand All @@ -296,16 +311,25 @@ int main(int argc, char** argv) {
variables_map vmap;
bpo::store(bpo::parse_command_line(argc, argv, cli), vmap);
bpo::notify(vmap);

if (blog.help) {
cli.print(std::cerr);
return 0;
}

const auto blocks_dir = vmap["blocks-dir"].as<bfs::path>();

if (!blog.extract_blocklog && !block_log::exists(blocks_dir)) {
std::cerr << "The specified blocks-dir must contain blocks.log and blocks.index files";
return -1;
}

if (blog.smoke_test) {
smoke_test(vmap.at("blocks-dir").as<bfs::path>());
smoke_test(blocks_dir);
return 0;
}
if (blog.fix_irreversible_blocks) {
fix_irreversible_blocks(vmap.at("blocks-dir").as<bfs::path>());
fix_irreversible_blocks(blocks_dir);
return 0;
}
if (blog.trim_log) {
Expand All @@ -314,17 +338,16 @@ int main(int argc, char** argv) {
return -1;
}
if (blog.last_block != std::numeric_limits<uint32_t>::max()) {
if (trim_blocklog_end(vmap.at("blocks-dir").as<bfs::path>(), blog.last_block) != 0)
if (trim_blocklog_end(blocks_dir, blog.last_block) != 0)
return -1;
}
if (blog.first_block != 0) {
if (!trim_blocklog_front(vmap.at("blocks-dir").as<bfs::path>(), blog.first_block))
if (!trim_blocklog_front(blocks_dir, blog.first_block))
return -1;
}
return 0;
}
if (blog.make_index) {
const bfs::path blocks_dir = vmap.at("blocks-dir").as<bfs::path>();
bfs::path out_file = blocks_dir / "blocks.index";
const bfs::path block_file = blocks_dir / "blocks.log";

Expand All @@ -339,8 +362,9 @@ int main(int argc, char** argv) {
rt.report();
return 0;
}


if (blog.prune_transactions) {
const auto blocks_dir = vmap["blocks-dir"].as<bfs::path>();
const auto state_history_dir = vmap["state-history-dir"].as<bfs::path>();
const auto block_num = vmap["block-num"].as<uint32_t>();
const auto ids = vmap.count("transaction") ? vmap["transaction"].as<std::vector<string>>() : std::vector<string>{};
Expand All @@ -350,7 +374,40 @@ int main(int argc, char** argv) {
rt.report();
return ret;
}
//else print blocks.log as JSON

const auto output_dir = vmap["output-dir"].as<bfs::path>();

if (blog.blocklog_split_stride != 0) {

block_log::split_blocklog(blocks_dir, output_dir, blog.blocklog_split_stride);
return 0;
}

if (blog.extract_blocklog) {

if (blog.first_block == 0 && blog.last_block == std::numeric_limits<uint32_t>::max()) {
std::cerr << "extract_blocklog does nothing unless specify first and/or last block.";
}

bfs::path blocks_filebase = vmap["blocks-filebase"].as<bfs::path>();
if (blocks_filebase.empty() && !blocks_dir.empty()) {
blocks_filebase = blocks_dir / "blocks";
}

bfs::path log_filename = blocks_filebase + ".log";
bfs::path index_filename = blocks_filebase + ".index";

if (!bfs::exists(log_filename) || !bfs::exists(index_filename)){
std::cerr << "Both "<< log_filename << " and " << index_filename << " must exist";
return -1;
}

block_log::extract_blocklog(log_filename, index_filename, output_dir, blog.first_block,
blog.last_block - blog.first_block + 1);
return 0;
}

// else print blocks.log as JSON
blog.initialize(vmap);
blog.read_log();
} catch( const fc::exception& e ) {
Expand Down
Loading