diff --git a/autotest/ogr/ogr_gpkg.py b/autotest/ogr/ogr_gpkg.py index 553c6420f2d8..59046d8fa1bc 100755 --- a/autotest/ogr/ogr_gpkg.py +++ b/autotest/ogr/ogr_gpkg.py @@ -8084,7 +8084,24 @@ def test_ogr_gpkg_arrow_stream_numpy(tmp_vsimem): # Test GetArrowStreamAsNumPy() and multi-threading -def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem): +@pytest.mark.parametrize( + "num_features,batch_size,num_threads", + [ + (201, 100, 1), + (200, 100, 2), + (201, 100, 2), + (201, 100, 3), + (299, 100, 3), + (300, 100, 3), + (301, 100, 3), + (400, 100, 3), + (901, 100, 3), + (1001, 100, 3), + ], +) +def test_ogr_gpkg_arrow_stream_numpy_multi_threading( + tmp_vsimem, num_features, batch_size, num_threads +): pytest.importorskip("osgeo.gdal_array") pytest.importorskip("numpy") @@ -8093,7 +8110,7 @@ def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem): ds = gdal.GetDriverByName("GPKG").Create(filename, 0, 0, 0, gdal.GDT_Unknown) lyr = ds.CreateLayer("test", geom_type=ogr.wkbPoint) - for i in range(1000): + for i in range(num_features): f = ogr.Feature(lyr.GetLayerDefn()) f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(f"POINT({i} {i})")) lyr.CreateFeature(f) @@ -8102,22 +8119,30 @@ def test_ogr_gpkg_arrow_stream_numpy_multi_threading(tmp_vsimem): ds = ogr.Open(filename) lyr = ds.GetLayer(0) - num_threads = max(2, min(gdal.GetNumCPUs(), 4)) with gdaltest.config_option("OGR_GPKG_NUM_THREADS", str(num_threads)): stream = lyr.GetArrowStreamAsNumPy( - options=["USE_MASKED_ARRAYS=NO", "MAX_FEATURES_IN_BATCH=10"] + options=["USE_MASKED_ARRAYS=NO", f"MAX_FEATURES_IN_BATCH={batch_size}"] ) + + got_msg = [] + + def my_handler(errorClass, errno, msg): + if errorClass != gdal.CE_Debug: + got_msg.append(msg) + return + + with gdaltest.error_handler(my_handler): batches = [batch for batch in stream] - assert len(batches) == 100 - i = 0 - for batch in batches: - for wkb in batch["geom"]: - assert ( - ogr.CreateGeometryFromWkb(wkb).ExportToIsoWkt() - == f"POINT ({i} {i})" - ) - i += 1 - assert i == 1000 + + assert len(got_msg) == 0 + + assert len(batches) == (num_features + batch_size - 1) // batch_size + i = 0 + for batch in batches: + for wkb in batch["geom"]: + assert ogr.CreateGeometryFromWkb(wkb).ExportToIsoWkt() == f"POINT ({i} {i})" + i += 1 + assert i == num_features ############################################################################### diff --git a/doc/source/drivers/vector/gpkg.rst b/doc/source/drivers/vector/gpkg.rst index b3e33a14f82d..f570e52abd3d 100644 --- a/doc/source/drivers/vector/gpkg.rst +++ b/doc/source/drivers/vector/gpkg.rst @@ -463,10 +463,9 @@ available: This is the number of threads used when reading tables through the ArrowArray interface, when no filter is applied and when features have consecutive feature ID numbering. + The default is the minimum of 4 and the number of CPUs. Note that setting this value too high is not recommended: a value of 4 is close to the optimal. - Although note that at time of writing, this option might not be fully - reliable (cf https://lists.osgeo.org/pipermail/gdal-dev/2024-January/058177.html) Metadata diff --git a/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp b/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp index 765c2c3d1ffd..3666a2ee88c1 100644 --- a/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp +++ b/ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp @@ -8375,9 +8375,10 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, { // Should not normally happen, unless the user messes with // GetNextFeature() - CPLError( - CE_Failure, CPLE_AppDefined, - "Worker thread task has not expected m_iStartShapeId value"); + CPLError(CE_Failure, CPLE_AppDefined, + "Worker thread task has not expected m_iStartShapeId " + "value. Got " CPL_FRMT_GIB ", expected " CPL_FRMT_GIB, + task->m_iStartShapeId, m_iNextShapeId); if (task->m_psArrowArray->release) task->m_psArrowArray->release(task->m_psArrowArray.get()); @@ -8440,7 +8441,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, const char *pszMaxThreads = CPLGetConfigOption("OGR_GPKG_NUM_THREADS", nullptr); if (pszMaxThreads == nullptr) - return 0; + return std::min(4, CPLGetNumCPUs()); else if (EQUAL(pszMaxThreads, "ALL_CPUS")) return CPLGetNumCPUs(); else @@ -8456,8 +8457,10 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream, CPLGetUsablePhysicalRAM() > 1024 * 1024 * 1024) { const int nMaxTasks = static_cast(std::min( - DIV_ROUND_UP(m_nTotalFeatureCount - m_iNextShapeId, nMaxBatchSize), + DIV_ROUND_UP(m_nTotalFeatureCount - nMaxBatchSize - m_iNextShapeId, + nMaxBatchSize), GetThreadsAvailable())); + CPLDebug("GPKG", "Using %d threads", nMaxTasks); GDALOpenInfo oOpenInfo(m_poDS->GetDescription(), GA_ReadOnly); oOpenInfo.papszOpenOptions = m_poDS->GetOpenOptions(); oOpenInfo.nOpenFlags = GDAL_OF_VECTOR;