Skip to content

Commit

Permalink
Merge pull request #9030 from OSGeo/backport-9026-to-release/3.8
Browse files Browse the repository at this point in the history
[Backport release/3.8] GPKG: fix multi-threaded ArrowArray interface and re-enable it by default
  • Loading branch information
rouault committed Jan 4, 2024
2 parents 48a0bae + 25ee106 commit 674a97d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 21 deletions.
53 changes: 39 additions & 14 deletions autotest/ogr/ogr_gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8084,7 +8084,24 @@ def test_ogr_gpkg_arrow_stream_numpy(tmp_vsimem):
# Test GetArrowStreamAsNumPy() and multi-threading


def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem):
@pytest.mark.parametrize(
"num_features,batch_size,num_threads",
[
(201, 100, 1),
(200, 100, 2),
(201, 100, 2),
(201, 100, 3),
(299, 100, 3),
(300, 100, 3),
(301, 100, 3),
(400, 100, 3),
(901, 100, 3),
(1001, 100, 3),
],
)
def test_ogr_gpkg_arrow_stream_numpy_multi_threading(
tmp_vsimem, num_features, batch_size, num_threads
):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

Expand All @@ -8093,7 +8110,7 @@ def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem):
ds = gdal.GetDriverByName("GPKG").Create(filename, 0, 0, 0, gdal.GDT_Unknown)
lyr = ds.CreateLayer("test", geom_type=ogr.wkbPoint)

for i in range(1000):
for i in range(num_features):
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(f"POINT({i} {i})"))
lyr.CreateFeature(f)
Expand All @@ -8102,22 +8119,30 @@ def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem):

ds = ogr.Open(filename)
lyr = ds.GetLayer(0)
num_threads = max(2, min(gdal.GetNumCPUs(), 4))
with gdaltest.config_option("OGR_GPKG_NUM_THREADS", str(num_threads)):
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "MAX_FEATURES_IN_BATCH=10"]
options=["USE_MASKED_ARRAYS=NO", f"MAX_FEATURES_IN_BATCH={batch_size}"]
)

got_msg = []

def my_handler(errorClass, errno, msg):
if errorClass != gdal.CE_Debug:
got_msg.append(msg)
return

with gdaltest.error_handler(my_handler):
batches = [batch for batch in stream]
assert len(batches) == 100
i = 0
for batch in batches:
for wkb in batch["geom"]:
assert (
ogr.CreateGeometryFromWkb(wkb).ExportToIsoWkt()
== f"POINT ({i} {i})"
)
i += 1
assert i == 1000

assert len(got_msg) == 0

assert len(batches) == (num_features + batch_size - 1) // batch_size
i = 0
for batch in batches:
for wkb in batch["geom"]:
assert ogr.CreateGeometryFromWkb(wkb).ExportToIsoWkt() == f"POINT ({i} {i})"
i += 1
assert i == num_features


###############################################################################
Expand Down
3 changes: 1 addition & 2 deletions doc/source/drivers/vector/gpkg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,9 @@ available:
This is the number of threads used when reading tables through the
ArrowArray interface, when no filter is applied and when features have
consecutive feature ID numbering.
The default is the minimum of 4 and the number of CPUs.
Note that setting this value too high is not recommended: a value of 4 is
close to the optimal.
Although note that at time of writing, this option might not be fully
reliable (cf https://lists.osgeo.org/pipermail/gdal-dev/2024-January/058177.html)


Metadata
Expand Down
13 changes: 8 additions & 5 deletions ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8375,9 +8375,10 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
{
// Should not normally happen, unless the user messes with
// GetNextFeature()
CPLError(
CE_Failure, CPLE_AppDefined,
"Worker thread task has not expected m_iStartShapeId value");
CPLError(CE_Failure, CPLE_AppDefined,
"Worker thread task has not expected m_iStartShapeId "
"value. Got " CPL_FRMT_GIB ", expected " CPL_FRMT_GIB,
task->m_iStartShapeId, m_iNextShapeId);
if (task->m_psArrowArray->release)
task->m_psArrowArray->release(task->m_psArrowArray.get());

Expand Down Expand Up @@ -8440,7 +8441,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
const char *pszMaxThreads =
CPLGetConfigOption("OGR_GPKG_NUM_THREADS", nullptr);
if (pszMaxThreads == nullptr)
return 0;
return std::min(4, CPLGetNumCPUs());
else if (EQUAL(pszMaxThreads, "ALL_CPUS"))
return CPLGetNumCPUs();
else
Expand All @@ -8456,8 +8457,10 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
CPLGetUsablePhysicalRAM() > 1024 * 1024 * 1024)
{
const int nMaxTasks = static_cast<int>(std::min<GIntBig>(
DIV_ROUND_UP(m_nTotalFeatureCount - m_iNextShapeId, nMaxBatchSize),
DIV_ROUND_UP(m_nTotalFeatureCount - nMaxBatchSize - m_iNextShapeId,
nMaxBatchSize),
GetThreadsAvailable()));
CPLDebug("GPKG", "Using %d threads", nMaxTasks);
GDALOpenInfo oOpenInfo(m_poDS->GetDescription(), GA_ReadOnly);
oOpenInfo.papszOpenOptions = m_poDS->GetOpenOptions();
oOpenInfo.nOpenFlags = GDAL_OF_VECTOR;
Expand Down

0 comments on commit 674a97d

Please sign in to comment.