Skip to content

Commit

Permalink
Merge pull request #50063 from KevinyhZou/minor_improve_hdfs_read_buffer
Browse files Browse the repository at this point in the history
Minor improve HDFS ReadBuffer for read end of file
  • Loading branch information
kssenii committed Aug 3, 2023
2 parents 6b330e3 + 73ac1bf commit 68b48a0
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/Storages/HDFS/ReadBufferFromHDFS.cpp
Expand Up @@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S

off_t file_offset = 0;
off_t read_until_position = 0;

std::optional<size_t> file_size;
off_t file_size;

explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
Expand All @@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
Expand All @@ -68,23 +66,32 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));

if (file_size_.has_value())
{
file_size = file_size_.value();
}
else
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
{
hdfsCloseFile(fs.get(), fin);
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
}
file_size = static_cast<size_t>(file_info->mSize);
hdfsFreeFileInfo(file_info, 1);
}
}

~ReadBufferFromHDFSImpl() override
{
hdfsCloseFile(fs.get(), fin);
}

size_t getFileSize()
size_t getFileSize() const
{
if (file_size)
return *file_size;

auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
return file_size;
}

bool nextImpl() override
Expand All @@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}

ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;
Expand Down

0 comments on commit 68b48a0

Please sign in to comment.