Skip to content
Permalink
Browse files
[fix](s3) fix s3 Temp file may write failed because of has no space o…
…n disk (#9421)
  • Loading branch information
yangzhg committed May 9, 2022
1 parent 5df5d39 commit 6834fb23cadb7c065ac16aa0e27b73060a3951ed
Showing 8 changed files with 159 additions and 151 deletions.
@@ -737,7 +737,6 @@ CONF_Validator(string_type_length_soft_limit_bytes,
// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");

} // namespace config

} // namespace doris
@@ -23,6 +23,8 @@
#include <aws/s3/model/PutObjectRequest.h>

#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/tmp_file_mgr.h"
#include "service/backend_options.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -41,10 +43,15 @@ S3Writer::S3Writer(const std::map<std::string, std::string>& properties, const s
: _properties(properties),
_path(path),
_uri(path),
_client(ClientFactory::instance().create(_properties)),
_temp_file(std::make_shared<Aws::Utils::TempFile>(
std::ios_base::binary | std::ios_base::trunc | std::ios_base::in |
std::ios_base::out)) {
_client(ClientFactory::instance().create(_properties)) {
std::string tmp_path = ExecEnv::GetInstance()->tmp_file_mgr()->get_tmp_dir_path();
LOG(INFO) << "init aws s3 client with tmp path " << tmp_path;
if (tmp_path.at(tmp_path.size() - 1) != '/') {
tmp_path.append("/");
}
_temp_file = std::make_shared<Aws::Utils::TempFile>(
tmp_path.c_str(), ".doris_tmp",
std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | std::ios_base::out);
DCHECK(_client) << "init aws s3 client error.";
}

@@ -78,13 +85,19 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len)
return Status::OK();
}
if (!_temp_file) {
return Status::BufferAllocFailed("The internal temporary file is not writable. at " +
BackendOptions::get_localhost());
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::BufferAllocFailed(
fmt::format("The internal temporary file is not writable for {}. at {}",
strerror(errno), BackendOptions::get_localhost())),
"write temp file error");
}
_temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
if (!_temp_file->good()) {
return Status::BufferAllocFailed("Could not append to the internal temporary file. at " +
BackendOptions::get_localhost());
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::BufferAllocFailed(
fmt::format("Could not append to the internal temporary file for {}. at {}",
strerror(errno), BackendOptions::get_localhost())),
"write temp file error");
}
*written_len = buf_len;
return Status::OK();
@@ -100,8 +113,11 @@ Status S3Writer::close() {

Status S3Writer::_sync() {
if (!_temp_file) {
return Status::BufferAllocFailed("The internal temporary file is not writable. at " +
BackendOptions::get_localhost());
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::BufferAllocFailed(
fmt::format("The internal temporary file is not writable for {}. at {}",
strerror(errno), BackendOptions::get_localhost())),
"write temp file error");
}
CHECK_S3_CLIENT(_client);
Aws::S3::Model::PutObjectRequest request;
@@ -157,7 +157,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
int offset = convert_to<int>(string_val->ptr);
int64_t offset = convert_to<int64_t>(string_val->ptr);
string_val->ptr = tuple_data + offset;

// Why we do this mask? Field len of StringValue is changed from int to size_t in
@@ -225,18 +225,18 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
// is_compressed
output_batch->set_is_compressed(false);
// tuple data
size_t size = total_byte_size();
size_t tuple_byte_size = total_byte_size();
std::string* mutable_tuple_data = nullptr;
if (allocated_buf != nullptr) {
allocated_buf->resize(size);
allocated_buf->resize(tuple_byte_size);
// all tuple data will be written in the allocated_buf
// instead of tuple_data in PRowBatch
mutable_tuple_data = allocated_buf;
// tuple_data is a required field
output_batch->set_tuple_data("");
} else {
mutable_tuple_data = output_batch->mutable_tuple_data();
mutable_tuple_data->resize(size);
mutable_tuple_data->resize(tuple_byte_size);
}

// Copy tuple data, including strings, into output_batch (converting string
@@ -261,37 +261,51 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
mutable_tuple_offsets->Add((int32_t)offset);
mutable_new_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
CHECK_LE(offset, size);
CHECK_LE(offset, tuple_byte_size);
}
}
CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;

if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
// smaller
uint32_t max_compressed_size = snappy::MaxCompressedLength(size);

if (_compression_scratch.size() < max_compressed_size) {
CHECK_EQ(offset, tuple_byte_size)
<< "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size;

size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
if (can_compress) {
try {
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
_compression_scratch.resize(max_compressed_size);
} catch (const std::bad_alloc& e) {
can_compress = false;
LOG(WARNING) << "Try to alloc " << max_compressed_size
<< " bytes for compression scratch failed. " << e.what();
} catch (...) {
can_compress = false;
std::exception_ptr p = std::current_exception();
LOG(WARNING) << "Try to alloc " << max_compressed_size
<< " bytes for compression scratch failed. "
<< (p ? p.__cxa_exception_type()->name() : "null");
}

}
if (can_compress) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
// smaller
size_t compressed_size = 0;
char* compressed_output = _compression_scratch.data();
snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size);

if (LIKELY(compressed_size < size)) {
snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output,
&compressed_size);
if (LIKELY(compressed_size < tuple_byte_size)) {
_compression_scratch.resize(compressed_size);
mutable_tuple_data->swap(_compression_scratch);
output_batch->set_is_compressed(true);
}

VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
<< ", compressed size: " << compressed_size;
}

// return compressed and uncompressed size
size_t pb_size = get_batch_size(*output_batch);
if (allocated_buf == nullptr) {
*uncompressed_size = pb_size - mutable_tuple_data->size() + size;
*uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
@@ -302,7 +316,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
pb_size));
}
} else {
*uncompressed_size = pb_size + size;
*uncompressed_size = pb_size + tuple_byte_size;
*compressed_size = pb_size + mutable_tuple_data->size();
}
return Status::OK();
@@ -21,6 +21,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <filesystem>
#include <random>

#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
@@ -160,6 +161,14 @@ string TmpFileMgr::get_tmp_dir_path(DeviceId device_id) const {
return _tmp_dirs[device_id].path();
}

std::string TmpFileMgr::get_tmp_dir_path() {
std::vector<DeviceId> devices = active_tmp_devices();
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(devices.begin(), devices.end(), g);
return get_tmp_dir_path(devices.front());
}

void TmpFileMgr::blacklist_device(DeviceId device_id) {
DCHECK(_initialized);
DCHECK(device_id >= 0 && device_id < _tmp_dirs.size());
@@ -126,6 +126,9 @@ class TmpFileMgr {
// Return the scratch directory path for the device.
std::string get_tmp_dir_path(DeviceId device_id) const;

// Return a random scratch directory path from the devices.
std::string get_tmp_dir_path();

// Total number of devices with tmp directories that are active. There is one tmp
// directory per device.
int num_active_tmp_devices();
@@ -42,8 +42,7 @@ BRpcService::~BRpcService() {}

Status BRpcService::start(int port) {
// Add service
_server->AddService(new PInternalServiceImpl<PBackendService>(_exec_env),
brpc::SERVER_OWNS_SERVICE);
_server->AddService(new PInternalServiceImpl(_exec_env), brpc::SERVER_OWNS_SERVICE);
// start service
brpc::ServerOptions options;
if (config::brpc_num_threads != -1) {

0 comments on commit 6834fb2

Please sign in to comment.