Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/duckdb_py/pyfilesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ int64_t PythonFilesystem::Write(FileHandle &handle, void *buffer, int64_t nr_byt
return py::int_(write(data));
}
void PythonFilesystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
Seek(handle, location);

Write(handle, buffer, nr_bytes);
PythonGILWrapper gil;
auto &py_handle = PythonFileHandle::GetHandle(handle);
py_handle.attr("seek")(location);
auto data = py::bytes(std::string(const_char_ptr_cast(buffer), nr_bytes));
py_handle.attr("write")(data);
}

int64_t PythonFilesystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
Expand All @@ -116,9 +118,11 @@ int64_t PythonFilesystem::Read(FileHandle &handle, void *buffer, int64_t nr_byte
}

void PythonFilesystem::Read(duckdb::FileHandle &handle, void *buffer, int64_t nr_bytes, uint64_t location) {
Seek(handle, location);

Read(handle, buffer, nr_bytes);
PythonGILWrapper gil;
auto &py_handle = PythonFileHandle::GetHandle(handle);
py_handle.attr("seek")(location);
string data = py::bytes(py_handle.attr("read")(nr_bytes));
memcpy(buffer, data.c_str(), data.size());
}
bool PythonFilesystem::FileExists(const string &filename, optional_ptr<FileOpener> opener) {
return Exists(filename, "isfile");
Expand Down
50 changes: 50 additions & 0 deletions tests/fast/api/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,53 @@ def __init__(self) -> None:

result = duckdb_cursor.read_parquet(file_globs=["deadlock://a", "deadlock://b"], union_by_name=True)
assert len(result.fetchall()) == 100_000

def test_fsspec_seek_read_atomicity(self, duckdb_cursor, tmp_path):
"""Regression test: concurrent positional reads must be atomic (seek+read under one GIL hold).

Without the fix, separate seek and read GIL acquisitions allow another thread to
seek the same handle between them, corrupting data. We stress this by reading 4 files
with distinct data in parallel (union_by_name) and verifying no cross-contamination.
"""
files = {}
for i, name in enumerate(["a", "b", "c", "d"]):
file_path = tmp_path / f"{name}.parquet"
duckdb_cursor.sql(f"COPY (SELECT {i} AS file_id FROM range(10000)) TO '{file_path!s}' (FORMAT parquet)")
files[name] = file_path.read_bytes()

class AtomicityTestFS(fsspec.AbstractFileSystem):
protocol = "atomtest"

@property
def fsid(self):
return "atomtest"

def ls(self, path, detail=True, **kwargs):
vals = [k for k in self._data if k.startswith(path)]
if detail:
return [
{"name": p, "size": len(self._data[p]), "type": "file", "created": 0, "islink": False}
for p in vals
]
return vals

def modified(self, path):
return datetime.datetime.now()

def _open(self, path, **kwargs):
return io.BytesIO(self._data[path])

def __init__(self) -> None:
super().__init__()
self._data = files

fsspec.register_implementation("atomtest", AtomicityTestFS, clobber=True)
duckdb_cursor.register_filesystem(fsspec.filesystem("atomtest"))

globs = ["atomtest://a", "atomtest://b", "atomtest://c", "atomtest://d"]
for _ in range(10):
result = duckdb_cursor.sql(
f"SELECT file_id, count(*) AS cnt FROM read_parquet({globs}, union_by_name=true) "
"GROUP BY ALL ORDER BY file_id"
).fetchall()
assert result == [(0, 10000), (1, 10000), (2, 10000), (3, 10000)]
Loading