Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Consuming or closing a RecordBatchReader created from a Dataset Scanner does not close underlying files #41771

Closed
adamreeve opened this issue May 22, 2024 · 1 comment
Assignees
Milestone

Comments

@adamreeve
Copy link
Contributor

Describe the bug, including details regarding any error messages, version, and platform.

Code to reproduce as a unit test that I added to cpp/src/arrow/dataset/dataset_test.cc, which logs the open files in the dataset directory (only works on Linux). This needs some extra headers:

#include <unistd.h>
#include <filesystem>
#include "arrow/dataset/file_ipc.h"
#include "arrow/ipc/api.h" 

Test methods:

void ListOpenFilesInDir(const std::string& directory, const std::string& context) {
  std::cout << "Open files in directory " << directory << " " << context << ":" << std::endl;
  auto open_files = std::filesystem::directory_iterator("/proc/self/fd");
  for (const auto& entry : open_files)
  {
    char target_path[PATH_MAX];
    ssize_t len = ::readlink(entry.path().c_str(), target_path, PATH_MAX - 1);
    if (len != -1) {
      target_path[len] = '\0';
      std::string open_file_path(target_path);
      if (open_file_path.find(directory) == 0)
      {
        std::cout << open_file_path << std::endl;
      }
    }
  }
}

TEST(TestDatasetScan, ScanToRecordBatchReader) {
  ASSERT_OK_AND_ASSIGN(auto tempdir, arrow::internal::TemporaryDir::Make("dataset-scan-test-"));
  std::string tempdir_path = tempdir->path().ToString();

  auto schema = arrow::schema({field("x", int64()), field("y", int64())});
  auto table = TableFromJSON(schema, {R"([
      [1, 2],
      [3, 4]
    ])"});

  auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
  auto file_system = std::make_shared<fs::LocalFileSystem>();
  ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join("data.arrow"));
  std::string file_path_str = file_path.ToString();

  {
    EXPECT_OK_AND_ASSIGN(auto out_stream, file_system->OpenOutputStream(file_path_str));
    ASSERT_OK_AND_ASSIGN(
        auto file_writer,
        MakeFileWriter(out_stream, schema, arrow::ipc::IpcWriteOptions::Defaults()));
    ASSERT_OK(file_writer->WriteTable(*table));
    ASSERT_OK(file_writer->Close());
  }

  std::vector<std::string> paths {file_path_str};
  FileSystemFactoryOptions options;
  ASSERT_OK_AND_ASSIGN(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(file_system, paths, format, options));
  ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish());

  {
    ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
    ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
    {
      ASSERT_OK_AND_ASSIGN(auto record_batch_reader, scanner->ToRecordBatchReader());
      ASSERT_OK_AND_ASSIGN(auto read_table, record_batch_reader->ToTable());
      ListOpenFilesInDir(tempdir_path, "after read");
      ASSERT_OK(record_batch_reader->Close());
      ListOpenFilesInDir(tempdir_path, "after close");
    }
    ListOpenFilesInDir(tempdir_path, "after reader destruct");
  }
  ListOpenFilesInDir(tempdir_path, "after scanner destruct");
}

When I run this (on Fedora 39, using GCC 13) I get output like:

Open files in directory /tmp/dataset-scan-test-268jyz3s/ after read:
/tmp/dataset-scan-test-268jyz3s/data.arrow
Open files in directory /tmp/dataset-scan-test-268jyz3s/ after close:
/tmp/dataset-scan-test-268jyz3s/data.arrow
Open files in directory /tmp/dataset-scan-test-268jyz3s/ after reader destruct:
Open files in directory /tmp/dataset-scan-test-268jyz3s/ after scanner destruct:

This shows that neither consuming the RecordBatchReader by reading it into a table nor calling the Close method results in the IPC file being closed, it's only closed after the reader is destroyed. The Close implementation doesn't do anything other than consume all the data:

Status Close() override {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(ReadNext(&batch));
while (batch != nullptr) {
RETURN_NOT_OK(ReadNext(&batch));
}
return Status::OK();
}

For context, this causes errors trying to remove the dataset directory in Windows when using the GLib bindings via Ruby, where there isn't a way to force destruction of the reader and we have to rely on GC (#41750).

Component(s)

C++

kou added a commit to kou/arrow that referenced this issue May 25, 2024
…l values

Iterator keeps its resource (ptr_) until it's deleted but we can
release its resource immediately when it reads all values. If Iterator
keeps its resource until it's deleted, it may block closing file. See
apacheGH-41771 for this case.
kou added a commit to kou/arrow that referenced this issue May 25, 2024
…l values

Iterator keeps its resource (ptr_) until it's deleted but we can
release its resource immediately when it reads all values. If Iterator
keeps its resource until it's deleted, it may block closing file. See
apacheGH-41771 for this case.
bkietz pushed a commit that referenced this issue May 28, 2024
…ads all values (#41824)

### Rationale for this change

`Iterator` keeps its resource (`ptr_`) until it's deleted but we can release its resource immediately when it reads all values. If `Iterator` keeps its resource until it's deleted, it may block closing a file. See GH-41771 for this case.

### What changes are included in this PR?

Releases `ptr_` when `Next()` returns the end.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #41771

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
@bkietz
Copy link
Member

bkietz commented May 28, 2024

Issue resolved by pull request 41824
#41824

@bkietz bkietz added this to the 17.0.0 milestone May 28, 2024
@bkietz bkietz closed this as completed May 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants