diff --git a/autotest/cpp/test_ogr.cpp b/autotest/cpp/test_ogr.cpp index 518b11e43547..a1d6e77b4f44 100644 --- a/autotest/cpp/test_ogr.cpp +++ b/autotest/cpp/test_ogr.cpp @@ -3961,4 +3961,90 @@ TEST_F(test_ogr, OGRFeature_SerializeToBinary) } } +// Test OGRGeometry::IsRectangle() +TEST_F(test_ogr, OGRGeometry_IsRectangle) +{ + // Not a polygon + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POINT EMPTY", nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon empty + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POLYGON EMPTY", nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon with inner ring + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt( + "POLYGON ((0 0,0 1,1 1,1 0,0 0),(0.2 0.2,0.2 0.8,0.8 0.8,0.8 " + "0.2,0.2 0.2))", + nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon with 3 points + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POLYGON ((0 0,0 1,1 1))", nullptr, + &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon with 6 points + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt( + "POLYGON ((0 0,0.1 0,0.2 0,0.3 0,1 1,0 0))", nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon with 5 points, but last one not matching first (invalid) + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt( + "POLYGON ((0 0,0 1,1 1,1 0,-999 -999))", nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Polygon with 5 points, but not rectangle + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POLYGON ((0 0,0 1.1,1 1,1 0,0 0))", + nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_FALSE(poGeom->IsRectangle()); + delete poGeom; + } + // Rectangle (type 1) + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POLYGON ((0 0,0 1,1 1,1 0,0 0))", + nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_TRUE(poGeom->IsRectangle()); + delete poGeom; + } + // Rectangle2(type 1) + { + OGRGeometry *poGeom = nullptr; + OGRGeometryFactory::createFromWkt("POLYGON ((0 0,1 0,1 1,0 1,0 0))", + nullptr, &poGeom); + ASSERT_NE(poGeom, nullptr); + EXPECT_TRUE(poGeom->IsRectangle()); + delete poGeom; + } +} + } // namespace diff --git a/autotest/ogr/data/parquet/test_with_fid_and_geometry_bbox.parquet b/autotest/ogr/data/parquet/test_with_fid_and_geometry_bbox.parquet new file mode 100644 index 000000000000..79f577585eae Binary files /dev/null and b/autotest/ogr/data/parquet/test_with_fid_and_geometry_bbox.parquet differ diff --git a/autotest/ogr/ogr_parquet.py b/autotest/ogr/ogr_parquet.py index a7679ad7d3a5..82ed7c7afb71 100755 --- a/autotest/ogr/ogr_parquet.py +++ b/autotest/ogr/ogr_parquet.py @@ -86,6 +86,26 @@ def _validate(filename, check_data=False): assert not ret +############################################################################### + + +def _has_arrow_dataset(): + drv = gdal.GetDriverByName("Parquet") + return drv is not None and drv.GetMetadataItem("ARROW_DATASET") is not None + + +############################################################################### + + +@pytest.fixture(scope="module", params=[True, False], ids=["arrow-dataset", "regular"]) +def with_arrow_dataset_or_not(request): + + if request.param and not _has_arrow_dataset(): + pytest.skip("Test requires build with ArrowDataset") + + yield request.param + + ############################################################################### # Read invalid file @@ -102,9 +122,12 @@ def test_ogr_parquet_invalid(): def _check_test_parquet( filename, + expect_layer_geom_type=True, expect_fast_feature_count=True, expect_fast_get_extent=True, expect_ignore_fields=True, + expect_domain=True, + fid_reliable_after_spatial_filtering=True, ): with gdaltest.config_option("OGR_PARQUET_BATCH_SIZE", "2"): ds = gdal.OpenEx(filename) @@ -121,7 +144,8 @@ def _check_test_parquet( srs = lyr_defn.GetGeomFieldDefn(0).GetSpatialRef() assert srs is not None assert srs.GetAuthorityCode(None) == "4326" - assert lyr_defn.GetGeomFieldDefn(0).GetType() == ogr.wkbPoint + if expect_layer_geom_type: + assert lyr_defn.GetGeomFieldDefn(0).GetType() == ogr.wkbPoint # import pprint # pprint.pprint(got_field_defns) expected_field_defns = [ @@ -245,17 +269,18 @@ def _check_test_parquet( with pytest.raises(Exception): lyr.GetExtent(geom_field=1) - assert ds.GetFieldDomainNames() == ["dictDomain"] - assert ds.GetFieldDomain("not_existing") is None - for _ in range(2): - domain = ds.GetFieldDomain("dictDomain") - assert domain is not None - assert domain.GetName() == "dictDomain" - assert domain.GetDescription() == "" - assert domain.GetDomainType() == ogr.OFDT_CODED - assert domain.GetFieldType() == ogr.OFTInteger - assert domain.GetFieldSubType() == ogr.OFSTNone - assert domain.GetEnumeration() == {"0": "foo", "1": "bar", "2": "baz"} + if expect_domain: + assert ds.GetFieldDomainNames() == ["dictDomain"] + assert ds.GetFieldDomain("not_existing") is None + for _ in range(2): + domain = ds.GetFieldDomain("dictDomain") + assert domain is not None + assert domain.GetName() == "dictDomain" + assert domain.GetDescription() == "" + assert domain.GetDomainType() == ogr.OFDT_CODED + assert domain.GetFieldType() == ogr.OFTInteger + assert domain.GetFieldSubType() == ogr.OFSTNone + assert domain.GetEnumeration() == {"0": "foo", "1": "bar", "2": "baz"} f = lyr.GetNextFeature() assert f.GetFID() == 0 @@ -398,7 +423,9 @@ def _check_test_parquet( lyr.SetSpatialFilterRect(4, 2, 4, 2) lyr.ResetReading() f = lyr.GetNextFeature() - assert f.GetFID() == 4 + if fid_reliable_after_spatial_filtering: + assert f.GetFID() == 4 + assert f.GetGeometryRef().ExportToWkt() == "POINT (4 2)" lyr.SetSpatialFilterRect(-100, -100, -100, -100) lyr.ResetReading() @@ -491,6 +518,33 @@ def test_ogr_parquet_1(use_vsi): gdal.Unlink(vsifilename) +############################################################################### + + +@pytest.mark.skipif(not _has_arrow_dataset(), reason="GDAL not built with ArrowDataset") +@pytest.mark.parametrize("use_vsi", [False, True]) +def test_ogr_parquet_check_dataset(use_vsi): + + filename = "data/parquet/test.parquet" + if use_vsi: + vsifilename = "/vsimem/test.parquet" + gdal.FileFromMemBuffer(vsifilename, open(filename, "rb").read()) + filename = vsifilename + + try: + _check_test_parquet( + "PARQUET:" + filename, + expect_layer_geom_type=False, + expect_fast_feature_count=False, + expect_fast_get_extent=False, + expect_domain=False, + fid_reliable_after_spatial_filtering=False, + ) + finally: + if use_vsi: + gdal.Unlink(vsifilename) + + ############################################################################### # Run test_ogrsf @@ -542,6 +596,25 @@ def test_ogr_parquet_test_ogrsf_all_geoms(): assert "ERROR" not in ret +############################################################################### +# Run test_ogrsf + + +@pytest.mark.skipif(not _has_arrow_dataset(), reason="GDAL not built with ArrowDataset") +def test_ogr_parquet_test_ogrsf_all_geoms_with_arrow_dataset(): + + if test_cli_utilities.get_test_ogrsf_path() is None: + pytest.skip() + + ret = gdaltest.runexternal( + test_cli_utilities.get_test_ogrsf_path() + + " -ro PARQUET:data/parquet/all_geoms.parquet" + ) + + assert "INFO" in ret + assert "ERROR" not in ret + + ############################################################################### # Test write support @@ -1388,16 +1461,40 @@ def test_ogr_parquet_multiple_geom_columns(): "string >= 'l'", "decimal128 = -1234.567", "decimal256 = -1234.567", - # not optimized + "uint8 = 5 AND int8 = 2", + "uint8 = -5 AND int8 = 2", + "int8 = 2 AND uint8 = -5", + "uint8 = -5 AND int8 = 200", + "NOT uint8 = 5 AND uint8 IS NOT NULL", + "NOT uint8 = 50 AND uint8 IS NOT NULL", + # not optimized for non-dataset layer + "FID = 0", "boolean = 0 OR boolean = 1", + "uint8 = 1 OR uint8 = -1", + "uint8 = -1 OR uint8 = 1", "1 = 1", "boolean = boolean", "FID = 1", '"struct_field.a" = 1', '"struct_field.a" = 0', + "string LIKE 'd'", + "string LIKE 'D'", + "string ILIKE 'D'", + "string LIKE 'f'", + "timestamp_ms_gmt = '2019/01/01 14:00:00.500Z'", + "timestamp_ms_gmt < '2019/01/01 14:00:00.500Z'", + "timestamp_s_no_tz = '2019/01/01 14:00:00'", + "timestamp_s_no_tz < '2019/01/01 14:00:00'", + # partially optimized + "boolean = 0 AND OGR_GEOMETRY IS NOT NULL", + "OGR_GEOMETRY IS NOT NULL AND boolean = 0", + # not optimized + "OGR_GEOMETRY IS NOT NULL AND OGR_GEOMETRY IS NOT NULL", + "boolean = 0 OR OGR_GEOMETRY IS NOT NULL", + "OGR_GEOMETRY IS NOT NULL OR boolean = 0", ], ) -def test_ogr_parquet_attribute_filter(filter): +def test_ogr_parquet_attribute_filter(filter, with_arrow_dataset_or_not): with gdaltest.config_option("OGR_PARQUET_OPTIMIZED_ATTRIBUTE_FILTER", "NO"): ds = ogr.Open("data/parquet/test.parquet") @@ -1406,12 +1503,26 @@ def test_ogr_parquet_attribute_filter(filter): ref_fc = lyr.GetFeatureCount() ds = None - ds = ogr.Open("data/parquet/test.parquet") + prefix = "PARQUET:" if with_arrow_dataset_or_not else "" + ds = ogr.Open(prefix + "data/parquet/test.parquet") lyr = ds.GetLayer(0) assert lyr.SetAttributeFilter(filter) == ogr.OGRERR_NONE assert lyr.GetFeatureCount() == ref_fc +def test_ogr_parquet_attribute_filter_on_fid_column(with_arrow_dataset_or_not): + + filter = "fid = 1" + + prefix = "PARQUET:" if with_arrow_dataset_or_not else "" + ds = ogr.Open(prefix + "data/parquet/test_with_fid_and_geometry_bbox.parquet") + lyr = ds.GetLayer(0) + assert lyr.SetAttributeFilter(filter) == ogr.OGRERR_NONE + f = lyr.GetNextFeature() + assert f.GetFID() == 1 + assert lyr.GetNextFeature() is None + + def test_ogr_parquet_attribute_filter_and_then_ignored_fields(): ds = ogr.Open("data/parquet/test.parquet") @@ -1455,7 +1566,7 @@ def test_ogr_parquet_ignored_fields_and_then_attribute_filter(): assert lyr.GetFeatureCount() == 1 -def test_ogr_parquet_attribute_filter_and_spatial_filter(): +def test_ogr_parquet_attribute_filter_and_spatial_filter(with_arrow_dataset_or_not): filter = "int8 != 0" @@ -1468,13 +1579,40 @@ def test_ogr_parquet_attribute_filter_and_spatial_filter(): assert ref_fc > 0 ds = None - ds = ogr.Open("data/parquet/test.parquet") + prefix = "PARQUET:" if with_arrow_dataset_or_not else "" + ds = ogr.Open(prefix + "data/parquet/test.parquet") lyr = ds.GetLayer(0) lyr.SetSpatialFilterRect(4, 2, 4, 2) assert lyr.SetAttributeFilter(filter) == ogr.OGRERR_NONE assert lyr.GetFeatureCount() == ref_fc +def test_ogr_parquet_attribute_filter_and_spatial_filter_with_spatial_index( + tmp_path, with_arrow_dataset_or_not +): + + filename = str(tmp_path / "test.parquet") + gdal.VectorTranslate(filename, "data/parquet/test.parquet") + + filter = "uint8 != 1" + + with gdaltest.config_option("OGR_PARQUET_OPTIMIZED_ATTRIBUTE_FILTER", "NO"): + ds = ogr.Open("data/parquet/test.parquet") + lyr = ds.GetLayer(0) + lyr.SetSpatialFilterRect(1, 2, 3, 2) + assert lyr.SetAttributeFilter(filter) == ogr.OGRERR_NONE + ref_fc = lyr.GetFeatureCount() + assert ref_fc > 0 + ds = None + + prefix = "PARQUET:" if with_arrow_dataset_or_not else "" + ds = ogr.Open(prefix + filename) + lyr = ds.GetLayer(0) + lyr.SetSpatialFilterRect(1, 2, 3, 2) + assert lyr.SetAttributeFilter(filter) == ogr.OGRERR_NONE + assert lyr.GetFeatureCount() == ref_fc + + ############################################################################### # Test IS NULL / IS NOT NULL @@ -1509,14 +1647,6 @@ def test_ogr_parquet_is_null(): gdal.Unlink(outfilename) -############################################################################### - - -def _has_arrow_dataset(): - drv = gdal.GetDriverByName("Parquet") - return drv is not None and drv.GetMetadataItem("ARROW_DATASET") is not None - - ############################################################################### # Test reading a flat partitioned dataset @@ -1643,10 +1773,34 @@ def test_ogr_parquet_read_partitioned_geo(): assert lyr.GetExtent() == (1, 3, 2, 4) assert lyr.GetExtent() == (1, 3, 2, 4) + assert lyr.GetLayerDefn().GetFieldCount() == 0 + + assert lyr.TestCapability(ogr.OLCFastSpatialFilter) == 1 + lyr.SetSpatialFilterRect(0, 0, 10, 10) lyr.ResetReading() assert lyr.GetFeatureCount() == 2 + lyr.SetSpatialFilterRect(0.9, 1.9, 1.1, 2.1) + lyr.ResetReading() + assert lyr.GetFeatureCount() == 1 + + lyr.SetSpatialFilterRect(0.9, 1.9, 0.95, 2.1) + lyr.ResetReading() + assert lyr.GetFeatureCount() == 0 + + lyr.SetSpatialFilterRect(1.05, 1.9, 1.1, 2.1) + lyr.ResetReading() + assert lyr.GetFeatureCount() == 0 + + lyr.SetSpatialFilterRect(0.9, 1.9, 1.1, 1.95) + lyr.ResetReading() + assert lyr.GetFeatureCount() == 0 + + lyr.SetSpatialFilterRect(0.9, 2.05, 1.1, 2.1) + lyr.ResetReading() + assert lyr.GetFeatureCount() == 0 + lyr.SetSpatialFilterRect(-100, -100, -100, -100) lyr.ResetReading() assert lyr.GetNextFeature() is None @@ -3597,7 +3751,12 @@ def check_file(filename): @pytest.mark.parametrize("covering_bbox", [True, False]) @gdaltest.enable_exceptions() def test_ogr_parquet_geoarrow( - tmp_vsimem, tmp_path, wkt, check_with_pyarrow, covering_bbox + tmp_vsimem, + tmp_path, + wkt, + check_with_pyarrow, + covering_bbox, + with_arrow_dataset_or_not, ): geom = ogr.CreateGeometryFromWkt(wkt) @@ -3680,17 +3839,28 @@ def check(lyr): f = lyr.GetNextFeature() ogrtest.check_feature_geometry(f, geom2) - ds = ogr.Open(filename) + filename_to_open = ("PARQUET:" if with_arrow_dataset_or_not else "") + filename + + ds = ogr.Open(filename_to_open) lyr = ds.GetLayer(0) check(lyr) + if ( + covering_bbox + or not with_arrow_dataset_or_not + or lyr.GetGeomType() in (ogr.wkbPoint, ogr.wkbPoint25D) + ): + assert lyr.TestCapability(ogr.OLCFastSpatialFilter) + else: + assert not lyr.TestCapability(ogr.OLCFastSpatialFilter) + # Check that ignoring attribute fields doesn't impact geometry reading - ds = ogr.Open(filename) + ds = ogr.Open(filename_to_open) lyr = ds.GetLayer(0) lyr.SetIgnoredFields(["foo"]) check(lyr) - ds = ogr.Open(filename) + ds = ogr.Open(filename_to_open) lyr = ds.GetLayer(0) minx, maxx, miny, maxy = geom.GetEnvelope() @@ -3872,3 +4042,34 @@ def test_ogr_parquet_read_arrow_json_extension(): assert lyr.GetLayerDefn().GetFieldDefn(0).GetSubType() == ogr.OFSTJSON f = lyr.GetNextFeature() assert f["extension_json"] == '{"foo":"bar"}' + + +############################################################################### +# Test ignored fields with arrow::dataset and bounding box column + + +@pytest.mark.skipif(not _has_arrow_dataset(), reason="GDAL not built with ArrowDataset") +def test_ogr_parquet_ignored_fields_bounding_box_column_arrow_dataset(tmp_path): + + filename = str(tmp_path / "test.parquet") + ds = ogr.GetDriverByName("Parquet").CreateDataSource(filename) + lyr = ds.CreateLayer("test", geom_type=ogr.wkbPoint, options=["FID=fid"]) + f = ogr.Feature(lyr.GetLayerDefn()) + f.SetFID(1) + f.SetGeometryDirectly(ogr.CreateGeometryFromWkt("POINT (1 2)")) + lyr.CreateFeature(f) + f = None + ds.Close() + + ds = ogr.Open("PARQUET:" + filename) + lyr = ds.GetLayer(0) + lyr.SetIgnoredFields([lyr.GetGeometryColumn()]) + lyr.SetSpatialFilterRect(0, 0, 10, 10) + lyr.ResetReading() + f = lyr.GetNextFeature() + assert f.GetFID() == 1 + assert f.GetGeometryRef() is None + + lyr.SetSpatialFilterRect(0, 0, 0, 0) + lyr.ResetReading() + assert lyr.GetNextFeature() is None diff --git a/doc/source/drivers/vector/parquet.rst b/doc/source/drivers/vector/parquet.rst index 634949bad436..fd8c9d84e1d2 100644 --- a/doc/source/drivers/vector/parquet.rst +++ b/doc/source/drivers/vector/parquet.rst @@ -179,7 +179,11 @@ Starting with GDAL 3.6.0, the driver can read directories that contain several Parquet files, and expose them as a single layer. This support is only enabled if the driver is built against the ``arrowdataset`` C++ library. -Note that no optimization is currently done regarding filtering. +It is also possible to force opening single Parquet file in that mode by prefixing +their filename with ``PARQUET:``. + +Optimized spatial and attribute filtering for Arrow datasets is available since +GDAL 3.10. Metadata -------- diff --git a/ogr/ogr_geometry.h b/ogr/ogr_geometry.h index 7bfbdf5994bb..73bc05f71678 100644 --- a/ogr/ogr_geometry.h +++ b/ogr/ogr_geometry.h @@ -631,6 +631,8 @@ class CPL_DLL OGRGeometry virtual void swapXY(); + bool IsRectangle() const; + //! @cond Doxygen_Suppress static OGRGeometry *CastToIdentity(OGRGeometry *poGeom) { diff --git a/ogr/ogrgeometry.cpp b/ogr/ogrgeometry.cpp index 9d7c7454aa3f..52d91d4d21b2 100644 --- a/ogr/ogrgeometry.cpp +++ b/ogr/ogrgeometry.cpp @@ -8493,3 +8493,52 @@ void OGRwkbExportOptionsSetPrecision( if (hPrecisionOptions) psOptions->sPrecision.SetFrom(*hPrecisionOptions); } + +/************************************************************************/ +/* IsRectangle() */ +/************************************************************************/ + +/** + * \brief Returns whether the geometry is a polygon with 4 corners forming + * a rectangle. + * + * @since GDAL 3.10 + */ +bool OGRGeometry::IsRectangle() const +{ + if (wkbFlatten(getGeometryType()) != wkbPolygon) + return false; + + const OGRPolygon *poPoly = toPolygon(); + + if (poPoly->getNumInteriorRings() != 0) + return false; + + const OGRLinearRing *poRing = poPoly->getExteriorRing(); + if (!poRing) + return false; + + if (poRing->getNumPoints() > 5 || poRing->getNumPoints() < 4) + return false; + + // If the ring has 5 points, the last should be the first. + if (poRing->getNumPoints() == 5 && (poRing->getX(0) != poRing->getX(4) || + poRing->getY(0) != poRing->getY(4))) + return false; + + // Polygon with first segment in "y" direction. + if (poRing->getX(0) == poRing->getX(1) && + poRing->getY(1) == poRing->getY(2) && + poRing->getX(2) == poRing->getX(3) && + poRing->getY(3) == poRing->getY(0)) + return true; + + // Polygon with first segment in "x" direction. + if (poRing->getY(0) == poRing->getY(1) && + poRing->getX(1) == poRing->getX(2) && + poRing->getY(2) == poRing->getY(3) && + poRing->getX(3) == poRing->getX(0)) + return true; + + return false; +} diff --git a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h index b47cb907d16f..6179fd009b07 100644 --- a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h +++ b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h @@ -61,6 +61,28 @@ enum class OGRArrowGeomEncoding GEOARROW_STRUCT_MULTIPOLYGON, }; +/************************************************************************/ +/* OGRArrowIsGeoArrowStruct() */ +/************************************************************************/ + +inline bool OGRArrowIsGeoArrowStruct(OGRArrowGeomEncoding eEncoding) +{ + switch (eEncoding) + { + case OGRArrowGeomEncoding::GEOARROW_STRUCT_GENERIC: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_LINESTRING: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_POLYGON: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTIPOINT: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTILINESTRING: + case OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTIPOLYGON: + return true; + + default: + return false; + } +} + /************************************************************************/ /* OGRArrowLayer */ /************************************************************************/ @@ -121,6 +143,16 @@ class OGRArrowLayer CPL_NON_FINAL std::vector m_anMapGeomFieldIndexToArrowColumn{}; std::vector m_aeGeomEncoding{}; + //! Whether bounding box based spatial filter should be skipped. + // This is set to true by OGRParquetDatasetLayer when there is a bounding + // box field, as an optimization. + bool m_bBaseArrowIgnoreSpatialFilterRect = false; + + //! Whether spatial filter should be skipped (by GetNextArrowArray()) + // This is set to true by OGRParquetDatasetLayer when filtering points in + // a rectangle. + bool m_bBaseArrowIgnoreSpatialFilter = false; + //! Describe the bbox column of a geometry column struct GeomColBBOX { @@ -159,6 +191,10 @@ class OGRArrowLayer CPL_NON_FINAL // m_bIgnoredFields is set int m_nRequestedFIDColumn = -1; // only valid when m_bIgnoredFields is set + int m_nExpectedBatchColumns = + -1; // Should be equal to m_poBatch->num_columns() (when + // m_bIgnoredFields is set) + bool m_bEOF = false; int64_t m_nFeatureIdx = 0; int64_t m_nIdxInBatch = 0; @@ -174,6 +210,11 @@ class OGRArrowLayer CPL_NON_FINAL std::vector m_asAttributeFilterConstraints{}; + //! Whether attribute filter should be skipped. + // This is set to true by OGRParquetDatasetLayer when it can fully translate + // a filter, as an optimization. + bool m_bBaseArrowIgnoreAttributeFilter = false; + std::map> LoadGDALSchema(const arrow::KeyValueMetadata *kv_metadata); @@ -237,6 +278,10 @@ class OGRArrowLayer CPL_NON_FINAL // Refreshes Constraint.iArrayIdx from iField. To be called by SetIgnoredFields() void ComputeConstraintsArrayIdx(); + static const swq_expr_node *GetColumnSubNode(const swq_expr_node *poNode); + static const swq_expr_node *GetConstantSubNode(const swq_expr_node *poNode); + static bool IsComparisonOp(int op); + virtual bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const; bool FastGetExtent3D(int iGeomField, OGREnvelope3D *psExtent) const; static OGRErr GetExtentFromMetadata(const CPLJSONObject &oJSONDef, @@ -252,6 +297,8 @@ class OGRArrowLayer CPL_NON_FINAL ++m_nFeatureIdx; } + void SanityCheckOfSetBatch() const; + public: virtual ~OGRArrowLayer() override; diff --git a/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp b/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp index d1ee6ec52263..31ab3688429f 100644 --- a/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp +++ b/ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp @@ -3130,7 +3130,9 @@ inline void OGRArrowLayer::ResetReading() /* GetColumnSubNode() */ /***********************************************************************/ -static const swq_expr_node *GetColumnSubNode(const swq_expr_node *poNode) +/* static*/ +inline const swq_expr_node * +OGRArrowLayer::GetColumnSubNode(const swq_expr_node *poNode) { if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2) { @@ -3146,7 +3148,9 @@ static const swq_expr_node *GetColumnSubNode(const swq_expr_node *poNode) /* GetConstantSubNode() */ /***********************************************************************/ -static const swq_expr_node *GetConstantSubNode(const swq_expr_node *poNode) +/* static */ +inline const swq_expr_node * +OGRArrowLayer::GetConstantSubNode(const swq_expr_node *poNode) { if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2) { @@ -3162,7 +3166,8 @@ static const swq_expr_node *GetConstantSubNode(const swq_expr_node *poNode) /* IsComparisonOp() */ /***********************************************************************/ -static bool IsComparisonOp(int op) +/* static*/ +inline bool OGRArrowLayer::IsComparisonOp(int op) { return (op == SWQ_EQ || op == SWQ_NE || op == SWQ_LT || op == SWQ_LE || op == SWQ_GT || op == SWQ_GE); @@ -3858,9 +3863,12 @@ OGRArrowLayer::SetBatch(const std::shared_ptr &poBatch) m_poArrayYMaxFloat = nullptr; if (m_poBatch) + { m_poBatchColumns = m_poBatch->columns(); + SanityCheckOfSetBatch(); + } - if (m_poBatch && m_poFilterGeom) + if (m_poBatch && m_poFilterGeom && !m_bBaseArrowIgnoreSpatialFilterRect) { int iCol; if (m_bIgnoredFields) @@ -3959,6 +3967,67 @@ OGRArrowLayer::SetBatch(const std::shared_ptr &poBatch) } } +/************************************************************************/ +/* SanityCheckOfSetBatch() */ +/************************************************************************/ + +inline void OGRArrowLayer::SanityCheckOfSetBatch() const +{ +#ifdef DEBUG + CPLAssert(m_poBatch); + + const auto &poColumns = m_poBatch->columns(); + + // Sanity checks + CPLAssert(m_poBatch->num_columns() == (m_bIgnoredFields + ? m_nExpectedBatchColumns + : m_poSchema->num_fields())); + const auto &fields = m_poSchema->fields(); + + for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i) + { + int iCol; + if (m_bIgnoredFields) + { + iCol = m_anMapFieldIndexToArrayIndex[i]; + if (iCol < 0) + continue; + } + else + { + iCol = m_anMapFieldIndexToArrowColumn[i][0]; + } + CPL_IGNORE_RET_VAL(iCol); // to make cppcheck happy + + CPLAssert(iCol < static_cast(poColumns.size())); + CPLAssert(fields[m_anMapFieldIndexToArrowColumn[i][0]]->type()->id() == + poColumns[iCol]->type_id()); + } + + for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i) + { + int iCol; + if (m_bIgnoredFields) + { + iCol = m_anMapGeomFieldIndexToArrayIndex[i]; + if (iCol < 0) + continue; + } + else + { + iCol = m_anMapGeomFieldIndexToArrowColumn[i]; + } + CPL_IGNORE_RET_VAL(iCol); // to make cppcheck happy + + CPLAssert(iCol < static_cast(poColumns.size())); + CPLAssert(fields[m_anMapGeomFieldIndexToArrowColumn[i]]->type()->id() == + poColumns[iCol]->type_id()); + } +#else + CPL_IGNORE_RET_VAL(m_nExpectedBatchColumns); +#endif +} + /************************************************************************/ /* GetNextRawFeature() */ /************************************************************************/ @@ -3977,7 +4046,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature() // Evaluate spatial filter by computing the bounding box of each geometry // but without creating a OGRGeometry - if (m_poFilterGeom) + if (m_poFilterGeom && !m_bBaseArrowIgnoreSpatialFilterRect) { int iCol; if (m_bIgnoredFields) @@ -3989,7 +4058,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature() iCol = m_anMapGeomFieldIndexToArrowColumn[m_iGeomFieldFilter]; } - if (m_poArrayXMinFloat || m_poArrayXMinDouble) + if (iCol >= 0 && (m_poArrayXMinFloat || m_poArrayXMinDouble)) { OGREnvelope sEnvelopeSkipToNextFeatureDueToBBOX; const auto IntersectsBBOX = @@ -4813,13 +4882,8 @@ inline void OGRArrowLayer::SetSpatialFilter(int iGeomField, OGRGeometry *poGeomIn) { - if (iGeomField < 0 || (iGeomField >= GetLayerDefn()->GetGeomFieldCount() && - !(iGeomField == 0 && poGeomIn == nullptr))) - { - CPLError(CE_Failure, CPLE_AppDefined, - "Invalid geometry field index : %d", iGeomField); + if (!ValidateGeometryFieldIndexForSetSpatialFilter(iGeomField, poGeomIn)) return; - } // When changing filters, we need to invalidate cached batches, as // PostFilterArrowArray() has potentially modified array contents @@ -5518,6 +5582,10 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, } } + const bool bNeedsPostFilter = + (m_poAttrQuery && !m_bBaseArrowIgnoreAttributeFilter) || + (m_poFilterGeom && !m_bBaseArrowIgnoreSpatialFilter); + struct ArrowSchema schema; memset(&schema, 0, sizeof(schema)); auto status = arrow::ExportRecordBatch(*m_poBatch, out_array, &schema); @@ -5635,7 +5703,7 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream, for (int64_t i = 0; i < m_nIdxInBatch; ++i) IncrFeatureIdx(); - if (m_poAttrQuery || m_poFilterGeom) + if (bNeedsPostFilter) { CPLStringList aosOptions; if (m_iFIDArrowColumn < 0) diff --git a/ogr/ogrsf_frmts/generic/ogrlayer.cpp b/ogr/ogrsf_frmts/generic/ogrlayer.cpp index eb814d220e02..fdf592d0a931 100644 --- a/ogr/ogrsf_frmts/generic/ogrlayer.cpp +++ b/ogr/ogrsf_frmts/generic/ogrlayer.cpp @@ -1680,42 +1680,7 @@ int OGRLayer::InstallFilter(OGRGeometry *poFilter) m_pPreparedFilterGeom = OGRCreatePreparedGeometry(OGRGeometry::ToHandle(m_poFilterGeom)); - /* -------------------------------------------------------------------- */ - /* Now try to determine if the filter is really a rectangle. */ - /* -------------------------------------------------------------------- */ - if (wkbFlatten(m_poFilterGeom->getGeometryType()) != wkbPolygon) - return TRUE; - - OGRPolygon *poPoly = m_poFilterGeom->toPolygon(); - - if (poPoly->getNumInteriorRings() != 0) - return TRUE; - - OGRLinearRing *poRing = poPoly->getExteriorRing(); - if (poRing == nullptr) - return TRUE; - - if (poRing->getNumPoints() > 5 || poRing->getNumPoints() < 4) - return TRUE; - - // If the ring has 5 points, the last should be the first. - if (poRing->getNumPoints() == 5 && (poRing->getX(0) != poRing->getX(4) || - poRing->getY(0) != poRing->getY(4))) - return TRUE; - - // Polygon with first segment in "y" direction. - if (poRing->getX(0) == poRing->getX(1) && - poRing->getY(1) == poRing->getY(2) && - poRing->getX(2) == poRing->getX(3) && - poRing->getY(3) == poRing->getY(0)) - m_bFilterIsEnvelope = TRUE; - - // Polygon with first segment in "x" direction. - if (poRing->getY(0) == poRing->getY(1) && - poRing->getX(1) == poRing->getX(2) && - poRing->getY(2) == poRing->getY(3) && - poRing->getX(3) == poRing->getX(0)) - m_bFilterIsEnvelope = TRUE; + m_bFilterIsEnvelope = m_poFilterGeom->IsRectangle(); return TRUE; } @@ -1879,22 +1844,38 @@ bool OGRLayer::FilterWKBGeometry(const GByte *pabyWKB, size_t nWKBSize, bool bEnvelopeAlreadySet, OGREnvelope &sEnvelope) const { - if (!m_poFilterGeom) + OGRPreparedGeometry *pPreparedFilterGeom = m_pPreparedFilterGeom; + bool bRet = FilterWKBGeometry( + pabyWKB, nWKBSize, bEnvelopeAlreadySet, sEnvelope, m_poFilterGeom, + m_bFilterIsEnvelope, m_sFilterEnvelope, pPreparedFilterGeom); + const_cast(this)->m_pPreparedFilterGeom = pPreparedFilterGeom; + return bRet; +} + +/* static */ +bool OGRLayer::FilterWKBGeometry(const GByte *pabyWKB, size_t nWKBSize, + bool bEnvelopeAlreadySet, + OGREnvelope &sEnvelope, + const OGRGeometry *poFilterGeom, + bool bFilterIsEnvelope, + const OGREnvelope &sFilterEnvelope, + OGRPreparedGeometry *&pPreparedFilterGeom) +{ + if (!poFilterGeom) return true; if ((bEnvelopeAlreadySet || OGRWKBGetBoundingBox(pabyWKB, nWKBSize, sEnvelope)) && - m_sFilterEnvelope.Intersects(sEnvelope)) + sFilterEnvelope.Intersects(sEnvelope)) { - if (m_bFilterIsEnvelope && m_sFilterEnvelope.Contains(sEnvelope)) + if (bFilterIsEnvelope && sFilterEnvelope.Contains(sEnvelope)) { return true; } else { - if (m_bFilterIsEnvelope && - OGRWKBIntersectsPessimistic(pabyWKB, nWKBSize, - m_sFilterEnvelope)) + if (bFilterIsEnvelope && + OGRWKBIntersectsPessimistic(pabyWKB, nWKBSize, sFilterEnvelope)) { return true; } @@ -1905,12 +1886,19 @@ bool OGRLayer::FilterWKBGeometry(const GByte *pabyWKB, size_t nWKBSize, if (OGRGeometryFactory::createFromWkb(pabyWKB, nullptr, &poGeom, nWKBSize) == OGRERR_NONE) { - if (m_pPreparedFilterGeom) + if (!pPreparedFilterGeom) + { + pPreparedFilterGeom = + OGRCreatePreparedGeometry(OGRGeometry::ToHandle( + const_cast(poFilterGeom))); + } + if (pPreparedFilterGeom) ret = OGRPreparedGeometryIntersects( - m_pPreparedFilterGeom, - OGRGeometry::ToHandle(poGeom)); + pPreparedFilterGeom, + OGRGeometry::ToHandle( + const_cast(poGeom))); else - ret = m_poFilterGeom->Intersects(poGeom); + ret = poFilterGeom->Intersects(poGeom); } delete poGeom; return CPL_TO_BOOL(ret); diff --git a/ogr/ogrsf_frmts/ogrsf_frmts.h b/ogr/ogrsf_frmts/ogrsf_frmts.h index 47ec16626c09..e3089568cf52 100644 --- a/ogr/ogrsf_frmts/ogrsf_frmts.h +++ b/ogr/ogrsf_frmts/ogrsf_frmts.h @@ -387,6 +387,14 @@ class CPL_DLL OGRLayer : public GDALMajorObject bool FilterWKBGeometry(const GByte *pabyWKB, size_t nWKBSize, bool bEnvelopeAlreadySet, OGREnvelope &sEnvelope) const; + + static bool FilterWKBGeometry(const GByte *pabyWKB, size_t nWKBSize, + bool bEnvelopeAlreadySet, + OGREnvelope &sEnvelope, + const OGRGeometry *poFilterGeom, + bool bFilterIsEnvelope, + const OGREnvelope &sFilterEnvelope, + OGRPreparedGeometry *&poPreparedFilterGeom); //! @endcond /** Field name used by GetArrowSchema() for a FID column when diff --git a/ogr/ogrsf_frmts/parquet/ogr_include_parquet.h b/ogr/ogrsf_frmts/parquet/ogr_include_parquet.h index d2d08e4f30f7..0a56ea3f1447 100644 --- a/ogr/ogrsf_frmts/parquet/ogr_include_parquet.h +++ b/ogr/ogrsf_frmts/parquet/ogr_include_parquet.h @@ -61,6 +61,9 @@ #ifdef GDAL_USE_ARROWDATASET #include "arrow/filesystem/filesystem.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/registry.h" #include "arrow/dataset/dataset.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_parquet.h" diff --git a/ogr/ogrsf_frmts/parquet/ogr_parquet.h b/ogr/ogrsf_frmts/parquet/ogr_parquet.h index 8b6c84c08c65..9323f54bac2b 100644 --- a/ogr/ogrsf_frmts/parquet/ogr_parquet.h +++ b/ogr/ogrsf_frmts/parquet/ogr_parquet.h @@ -31,6 +31,8 @@ #include "ogrsf_frmts.h" +#include "cpl_json.h" + #include #include @@ -57,12 +59,23 @@ class OGRParquetLayerBase CPL_NON_FINAL : public OGRArrowLayer CPLStringList m_aosGeomPossibleNames{}; std::string m_osCRS{}; + static int GetNumCPUs(); + void LoadGeoMetadata( const std::shared_ptr &kv_metadata); bool DealWithGeometryColumn( int iFieldIdx, const std::shared_ptr &field, std::function computeGeometryTypeFun); + void InvalidateCachedBatches() override; + + static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef, + std::string &osBBOXColumn, + std::string &osXMin, + std::string &osYMin, + std::string &osXMax, + std::string &osYMax); + public: int TestCapability(const char *) override; @@ -97,11 +110,6 @@ class OGRParquetLayer final : public OGRParquetLayerBase int64_t m_nFeatureIdxSelected = 0; std::vector m_anRequestedParquetColumns{}; // only valid when // m_bIgnoredFields is set -#ifdef DEBUG - int m_nExpectedBatchColumns = - 0; // Should be equal to m_poBatch->num_columns() (when - // m_bIgnoredFields is set) -#endif CPLStringList m_aosFeatherMetadata{}; //! Describe the bbox column of a geometry column @@ -224,9 +232,24 @@ class OGRParquetLayer final : public OGRParquetLayerBase class OGRParquetDatasetLayer final : public OGRParquetLayerBase { + bool m_bIsVSI = false; + bool m_bRebuildScanner = true; + bool m_bSkipFilterGeometry = false; + std::shared_ptr m_poDataset{}; std::shared_ptr m_poScanner{}; + std::vector m_aosProjectedFields{}; void EstablishFeatureDefn(); + void + ProcessGeometryColumnCovering(const std::shared_ptr &field, + const CPLJSONObject &oJSONGeometryColumn); + + void BuildScanner(); + + //! Translate a OGR SQL expression into an Arrow one + // bFullyTranslated should be set to true before calling this method. + arrow::compute::Expression BuildArrowFilter(const swq_expr_node *poNode, + bool &bFullyTranslated); protected: std::string GetDriverUCName() const override @@ -236,22 +259,34 @@ class OGRParquetDatasetLayer final : public OGRParquetLayerBase bool ReadNextBatch() override; - void InvalidateCachedBatches() override; - bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override; public: OGRParquetDatasetLayer( - OGRParquetDataset *poDS, const char *pszLayerName, - const std::shared_ptr &scanner, - const std::shared_ptr &schema, + OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI, + const std::shared_ptr &dataset, CSLConstList papszOpenOptions); + OGRFeature *GetNextFeature() override; + GIntBig GetFeatureCount(int bForce) override; OGRErr GetExtent(OGREnvelope *psExtent, int bForce = TRUE) override; OGRErr GetExtent(int iGeomField, OGREnvelope *psExtent, int bForce = TRUE) override; + void SetSpatialFilter(OGRGeometry *poGeom) override + { + SetSpatialFilter(0, poGeom); + } + + void SetSpatialFilter(int iGeomField, OGRGeometry *poGeom) override; + + OGRErr SetAttributeFilter(const char *pszFilter) override; + + OGRErr SetIgnoredFields(CSLConstList papszFields) override; + + int TestCapability(const char *) override; + // TODO std::unique_ptr BuildDomain(const std::string & /*osDomainName*/, diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetdatasetlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetdatasetlayer.cpp index da81f47b0f44..d6c55f63cced 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetdatasetlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetdatasetlayer.cpp @@ -5,7 +5,7 @@ * Author: Even Rouault, * ****************************************************************************** - * Copyright (c) 2022, Planet Labs + * Copyright (c) 2022-2024, Planet Labs * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -29,32 +29,105 @@ #include "ogrsf_frmts.h" #include +#include #include #include #include +#include "cpl_time.h" +#include "ogr_api.h" + #include "ogr_parquet.h" #include "../arrow_common/ograrrowlayer.hpp" #include "../arrow_common/ograrrowdataset.hpp" +#if PARQUET_VERSION_MAJOR >= 13 +// Using field indices for FieldRef is only supported since +// https://github.com/apache/arrow/commit/10eedbe63c71f4cf8f0621f3a2304ab3168a2ae5 +#define SUPPORTS_INDICES_IN_FIELD_REF +#endif + +namespace cp = ::arrow::compute; + /************************************************************************/ /* OGRParquetLayer() */ /************************************************************************/ OGRParquetDatasetLayer::OGRParquetDatasetLayer( - OGRParquetDataset *poDS, const char *pszLayerName, - const std::shared_ptr &scanner, - const std::shared_ptr &schema, CSLConstList papszOpenOptions) + OGRParquetDataset *poDS, const char *pszLayerName, bool bIsVSI, + const std::shared_ptr &dataset, + CSLConstList papszOpenOptions) : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions), - m_poScanner(scanner) + m_bIsVSI(bIsVSI), m_poDataset(dataset) { - m_poSchema = schema; + m_poSchema = m_poDataset->schema(); EstablishFeatureDefn(); CPLAssert(static_cast(m_aeGeomEncoding.size()) == m_poFeatureDefn->GetGeomFieldCount()); } +/************************************************************************/ +/* ProcessGeometryColumnCovering() */ +/************************************************************************/ + +/** Process GeoParquet JSON geometry field object to extract information about + * its bounding box column, and appropriately fill m_oMapGeomFieldIndexToGeomColBBOX + * member with information on that bounding box column. + */ +void OGRParquetDatasetLayer::ProcessGeometryColumnCovering( + const std::shared_ptr &field, + const CPLJSONObject &oJSONGeometryColumn) +{ + std::string osBBOXColumn; + std::string osXMin, osYMin, osXMax, osYMax; + if (ParseGeometryColumnCovering(oJSONGeometryColumn, osBBOXColumn, osXMin, + osYMin, osXMax, osYMax)) + { + OGRArrowLayer::GeomColBBOX sDesc; + sDesc.iArrowCol = m_poSchema->GetFieldIndex(osBBOXColumn); + const auto fieldBBOX = m_poSchema->GetFieldByName(osBBOXColumn); + if (sDesc.iArrowCol >= 0 && fieldBBOX && + fieldBBOX->type()->id() == arrow::Type::STRUCT) + { + const auto fieldBBOXStruct = + std::static_pointer_cast(fieldBBOX->type()); + const auto fieldXMin = fieldBBOXStruct->GetFieldByName(osXMin); + const auto fieldYMin = fieldBBOXStruct->GetFieldByName(osYMin); + const auto fieldXMax = fieldBBOXStruct->GetFieldByName(osXMax); + const auto fieldYMax = fieldBBOXStruct->GetFieldByName(osYMax); + const int nXMinIdx = fieldBBOXStruct->GetFieldIndex(osXMin); + const int nYMinIdx = fieldBBOXStruct->GetFieldIndex(osYMin); + const int nXMaxIdx = fieldBBOXStruct->GetFieldIndex(osXMax); + const int nYMaxIdx = fieldBBOXStruct->GetFieldIndex(osYMax); + if (nXMinIdx >= 0 && nYMinIdx >= 0 && nXMaxIdx >= 0 && + nYMaxIdx >= 0 && fieldXMin && fieldYMin && fieldXMax && + fieldYMax && + (fieldXMin->type()->id() == arrow::Type::FLOAT || + fieldXMin->type()->id() == arrow::Type::DOUBLE) && + fieldXMin->type()->id() == fieldYMin->type()->id() && + fieldXMin->type()->id() == fieldXMax->type()->id() && + fieldXMin->type()->id() == fieldYMax->type()->id()) + { + CPLDebug("PARQUET", + "Bounding box column '%s' detected for " + "geometry column '%s'", + osBBOXColumn.c_str(), field->name().c_str()); + sDesc.iArrowSubfieldXMin = nXMinIdx; + sDesc.iArrowSubfieldYMin = nYMinIdx; + sDesc.iArrowSubfieldXMax = nXMaxIdx; + sDesc.iArrowSubfieldYMax = nYMaxIdx; + sDesc.bIsFloat = + (fieldXMin->type()->id() == arrow::Type::FLOAT); + + m_oMapGeomFieldIndexToGeomColBBOX + [m_poFeatureDefn->GetGeomFieldCount() - 1] = + std::move(sDesc); + } + } + } +} + /************************************************************************/ /* EstablishFeatureDefn() */ /************************************************************************/ @@ -69,6 +142,26 @@ void OGRParquetDatasetLayer::EstablishFeatureDefn() LoadGDALMetadata(kv_metadata.get()); + const bool bUseBBOX = + CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES")); + + // Keep track of declared bounding box columns in GeoParquet JSON metadata, + // in order not to expose them as regular fields. + std::set oSetBBOXColumns; + if (bUseBBOX) + { + for (const auto &iter : m_oMapGeometryColumns) + { + std::string osBBOXColumn; + std::string osXMin, osYMin, osXMax, osYMax; + if (ParseGeometryColumnCovering(iter.second, osBBOXColumn, osXMin, + osYMin, osXMax, osYMax)) + { + oSetBBOXColumns.insert(osBBOXColumn); + } + } + } + const auto &fields = m_poSchema->fields(); for (int i = 0; i < m_poSchema->num_fields(); ++i) { @@ -80,9 +173,23 @@ void OGRParquetDatasetLayer::EstablishFeatureDefn() continue; } + if (oSetBBOXColumns.find(field->name()) != oSetBBOXColumns.end()) + { + m_oSetBBoxArrowColumns.insert(i); + continue; + } + const bool bGeometryField = DealWithGeometryColumn(i, field, []() { return wkbUnknown; }); - if (!bGeometryField) + if (bGeometryField) + { + const auto oIter = m_oMapGeometryColumns.find(field->name()); + if (bUseBBOX && oIter != m_oMapGeometryColumns.end()) + { + ProcessGeometryColumnCovering(field, oIter->second); + } + } + else { CreateFieldFromSchema(field, {i}, oMapFieldNameToGDALSchemaFieldDefn); @@ -95,16 +202,713 @@ void OGRParquetDatasetLayer::EstablishFeatureDefn() m_poFeatureDefn->GetGeomFieldCount()); } +namespace +{ + +/************************************************************************/ +/* WKBGeometryOptionsType */ +/************************************************************************/ + +class WKBGeometryOptions; + +class WKBGeometryOptionsType : public cp::FunctionOptionsType +{ + WKBGeometryOptionsType() = default; + + static const WKBGeometryOptions &Cast(const cp::FunctionOptions &opts); + + public: + const char *type_name() const override + { + return "WKBGeometryOptionsType"; + } + + std::string Stringify(const cp::FunctionOptions &) const override; + bool Compare(const cp::FunctionOptions &, + const cp::FunctionOptions &) const override; + std::unique_ptr + Copy(const cp::FunctionOptions &) const override; + + static WKBGeometryOptionsType *GetSingleton() + { + static WKBGeometryOptionsType instance; + return &instance; + } +}; + +/************************************************************************/ +/* WKBGeometryOptions */ +/************************************************************************/ + +class WKBGeometryOptions : public cp::FunctionOptions +{ + + public: + explicit WKBGeometryOptions( + const std::vector &abyFilterGeomWkbIn = std::vector()) + : cp::FunctionOptions(WKBGeometryOptionsType::GetSingleton()), + abyFilterGeomWkb(abyFilterGeomWkbIn) + { + } + + bool operator==(const WKBGeometryOptions &other) const + { + return abyFilterGeomWkb == other.abyFilterGeomWkb; + } + + std::vector abyFilterGeomWkb; +}; + +const WKBGeometryOptions & +WKBGeometryOptionsType::Cast(const cp::FunctionOptions &opts) +{ + return *cpl::down_cast(&opts); +} + +bool WKBGeometryOptionsType::Compare(const cp::FunctionOptions &optsA, + const cp::FunctionOptions &optsB) const +{ + return Cast(optsA) == Cast(optsB); +} + +std::string +WKBGeometryOptionsType::Stringify(const cp::FunctionOptions &opts) const +{ + const auto &bboxOptions = Cast(opts); + std::string osRet(type_name()); + osRet += '-'; + for (GByte byVal : bboxOptions.abyFilterGeomWkb) + osRet += CPLSPrintf("%02X", byVal); + return osRet; +} + +std::unique_ptr +WKBGeometryOptionsType::Copy(const cp::FunctionOptions &opts) const +{ + return std::make_unique(Cast(opts)); +} + +/************************************************************************/ +/* OptionsWrapper */ +/************************************************************************/ + +/// KernelState adapter for the common case of kernels whose only +/// state is an instance of a subclass of FunctionOptions. +template struct OptionsWrapper : public cp::KernelState +{ + explicit OptionsWrapper(OptionsType optionsIn) + : options(std::move(optionsIn)) + { + } + + static arrow::Result> + Init(cp::KernelContext *, const cp::KernelInitArgs &args) + { + auto options = cpl::down_cast(args.options); + CPLAssert(options); + return std::make_unique(*options); + } + + static const OptionsType &Get(cp::KernelContext *ctx) + { + return cpl::down_cast(ctx->state())->options; + } + + OptionsType options; +}; +} // namespace + +/************************************************************************/ +/* ExecOGRWKBIntersects() */ +/************************************************************************/ + +static arrow::Status ExecOGRWKBIntersects(cp::KernelContext *ctx, + const cp::ExecSpan &batch, + cp::ExecResult *out) +{ + // Get filter geometry + const auto &opts = OptionsWrapper::Get(ctx); + OGRGeometry *poGeomTmp = nullptr; + OGRErr eErr = OGRGeometryFactory::createFromWkb( + opts.abyFilterGeomWkb.data(), nullptr, &poGeomTmp, + opts.abyFilterGeomWkb.size()); + CPL_IGNORE_RET_VAL(eErr); + CPLAssert(eErr == OGRERR_NONE); + CPLAssert(poGeomTmp != nullptr); + std::unique_ptr poFilterGeom(poGeomTmp); + OGREnvelope sFilterEnvelope; + poFilterGeom->getEnvelope(&sFilterEnvelope); + const bool bFilterIsEnvelope = poFilterGeom->IsRectangle(); + + // Deal with input array + CPLAssert(batch.num_values() == 1); + const arrow::ArraySpan &input = batch[0].array; + CPLAssert(input.type->id() == arrow::Type::BINARY); + // Packed array of bits + const auto pabyInputValidity = input.buffers[0].data; + const auto nInputOffsets = input.offset; + const auto panWkbOffsets = input.GetValues(1); + const auto pabyWkbArray = input.buffers[2].data; + + // Deal with output array + CPLAssert(out->type()->id() == arrow::Type::BOOL); + auto out_span = out->array_span(); + // Below array holds 8 bits per uint8_t + uint8_t *pabitsOutValues = out_span->buffers[1].data; + const auto nOutOffset = out_span->offset; + + // Iterate over WKB geometries + OGRPreparedGeometry *pPreparedFilterGeom = nullptr; + OGREnvelope sEnvelope; + for (int64_t i = 0; i < batch.length; ++i) + { + const bool bInputIsNull = + (pabyInputValidity && + arrow::bit_util::GetBit(pabyInputValidity, i + nInputOffsets) == + 0); + bool bOutputVal = false; + if (!bInputIsNull) + { + const GByte *pabyWkb = pabyWkbArray + panWkbOffsets[i]; + const size_t nWkbSize = panWkbOffsets[i + 1] - panWkbOffsets[i]; + bOutputVal = OGRLayer::FilterWKBGeometry( + pabyWkb, nWkbSize, + /* bEnvelopeAlreadySet = */ false, sEnvelope, + poFilterGeom.get(), bFilterIsEnvelope, sFilterEnvelope, + pPreparedFilterGeom); + } + if (bOutputVal) + arrow::bit_util::SetBit(pabitsOutValues, i + nOutOffset); + else + arrow::bit_util::ClearBit(pabitsOutValues, i + nOutOffset); + } + + // Cleanup + if (pPreparedFilterGeom) + OGRDestroyPreparedGeometry(pPreparedFilterGeom); + + return arrow::Status::OK(); +} + +/************************************************************************/ +/* RegisterOGRWKBIntersectsIfNeeded() */ +/************************************************************************/ + +static bool RegisterOGRWKBIntersectsIfNeeded() +{ + auto registry = cp::GetFunctionRegistry(); + bool bRet = + registry->GetFunction("OGRWKBIntersects").ValueOr(nullptr) != nullptr; + if (!bRet) + { + static const WKBGeometryOptions defaultOpts; + + // Below assert is completely useless but helps improve test coverage + CPLAssert(WKBGeometryOptionsType::GetSingleton()->Compare( + defaultOpts, *(WKBGeometryOptionsType::GetSingleton() + ->Copy(defaultOpts) + .get()))); + + auto func = std::make_shared( + "OGRWKBIntersects", cp::Arity::Unary(), cp::FunctionDoc(), + &defaultOpts); + cp::ScalarKernel kernel({arrow::binary()}, arrow::boolean(), + ExecOGRWKBIntersects, + OptionsWrapper::Init); + kernel.null_handling = cp::NullHandling::OUTPUT_NOT_NULL; + bRet = func->AddKernel(std::move(kernel)).ok() && + registry->AddFunction(std::move(func)).ok(); + } + return bRet; +} + +/************************************************************************/ +/* BuildScanner() */ +/************************************************************************/ + +void OGRParquetDatasetLayer::BuildScanner() +{ + m_bRebuildScanner = false; + m_bSkipFilterGeometry = false; + m_bBaseArrowIgnoreSpatialFilterRect = false; + m_bBaseArrowIgnoreSpatialFilter = false; + m_bBaseArrowIgnoreAttributeFilter = false; + + try + { + std::shared_ptr scannerBuilder; + PARQUET_ASSIGN_OR_THROW(scannerBuilder, m_poDataset->NewScan()); + assert(scannerBuilder); + + // We cannot use the shared memory pool. Otherwise we get random + // crashes in multi-threaded arrow code (apparently some cleanup code), + // that may used the memory pool after it has been destroyed. + // At least this was true with some older libarrow version + // PARQUET_THROW_NOT_OK(scannerBuilder->Pool(m_poMemoryPool)); + + if (m_bIsVSI) + { + const int nFragmentReadAhead = atoi( + CPLGetConfigOption("OGR_PARQUET_FRAGMENT_READ_AHEAD", "2")); + PARQUET_THROW_NOT_OK( + scannerBuilder->FragmentReadahead(nFragmentReadAhead)); + } + + const char *pszBatchSize = + CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr); + if (pszBatchSize) + { + PARQUET_THROW_NOT_OK( + scannerBuilder->BatchSize(CPLAtoGIntBig(pszBatchSize))); + } + + const int nNumCPUs = GetNumCPUs(); + const char *pszUseThreads = + CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr); + if (!pszUseThreads && nNumCPUs > 1) + { + pszUseThreads = "YES"; + } + if (CPLTestBool(pszUseThreads)) + { + PARQUET_THROW_NOT_OK(scannerBuilder->UseThreads(true)); + } + +#if PARQUET_VERSION_MAJOR >= 10 + const char *pszBatchReadAhead = + CPLGetConfigOption("OGR_PARQUET_BATCH_READ_AHEAD", nullptr); + if (pszBatchReadAhead) + { + PARQUET_THROW_NOT_OK( + scannerBuilder->BatchReadahead(atoi(pszBatchReadAhead))); + } +#endif + + cp::Expression expression; + if (m_poFilterGeom && !m_poFilterGeom->IsEmpty() && + CPLTestBool(CPLGetConfigOption( + "OGR_PARQUET_OPTIMIZED_SPATIAL_FILTER", "YES"))) + { + const auto oIter = + m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter); + if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end()) + { + // This actually requires Arrow >= 15 (https://github.com/apache/arrow/issues/39064) + // to be more efficient. +#ifdef SUPPORTS_INDICES_IN_FIELD_REF + const auto &oBBOXDef = oIter->second; + expression = cp::and_( + {cp::less_equal( + cp::field_ref(arrow::FieldRef( + oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldXMin)), + cp::literal(m_sFilterEnvelope.MaxX)), + cp::less_equal( + cp::field_ref(arrow::FieldRef( + oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldYMin)), + cp::literal(m_sFilterEnvelope.MaxY)), + cp::greater_equal( + cp::field_ref(arrow::FieldRef( + oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldXMax)), + cp::literal(m_sFilterEnvelope.MinX)), + cp::greater_equal( + cp::field_ref(arrow::FieldRef( + oBBOXDef.iArrowCol, oBBOXDef.iArrowSubfieldYMax)), + cp::literal(m_sFilterEnvelope.MinY))}); +#else + const auto oIter2 = m_oMapGeometryColumns.find( + m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter) + ->GetNameRef()); + std::string osBBOXColumn; + std::string osXMin, osYMin, osXMax, osYMax; + if (ParseGeometryColumnCovering(oIter2->second, osBBOXColumn, + osXMin, osYMin, osXMax, osYMax)) + { + expression = cp::and_( + {cp::less_equal(cp::field_ref(arrow::FieldRef( + osBBOXColumn, osXMin)), + cp::literal(m_sFilterEnvelope.MaxX)), + cp::less_equal(cp::field_ref(arrow::FieldRef( + osBBOXColumn, osYMin)), + cp::literal(m_sFilterEnvelope.MaxY)), + cp::greater_equal(cp::field_ref(arrow::FieldRef( + osBBOXColumn, osXMax)), + cp::literal(m_sFilterEnvelope.MinX)), + cp::greater_equal( + cp::field_ref( + arrow::FieldRef(osBBOXColumn, osYMax)), + cp::literal(m_sFilterEnvelope.MinY))}); + } +#endif + } + else if (m_iGeomFieldFilter >= 0 && + m_iGeomFieldFilter < + static_cast(m_aeGeomEncoding.size()) && + m_aeGeomEncoding[m_iGeomFieldFilter] == + OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT) + { + const int iCol = + m_anMapGeomFieldIndexToArrowColumn[m_iGeomFieldFilter]; + const auto &field = m_poSchema->fields()[iCol]; + auto type = field->type(); + std::vector fieldRefs; +#ifdef SUPPORTS_INDICES_IN_FIELD_REF + fieldRefs.emplace_back(iCol); +#else + fieldRefs.emplace_back(field->name()); +#endif + if (type->id() == arrow::Type::STRUCT) + { + const auto fieldStruct = + std::static_pointer_cast(type); + const auto fieldX = fieldStruct->GetFieldByName("x"); + const auto fieldY = fieldStruct->GetFieldByName("y"); + if (fieldX && fieldY) + { + auto fieldRefX(fieldRefs); + fieldRefX.emplace_back("x"); + auto fieldRefY(fieldRefs); + fieldRefY.emplace_back("y"); + expression = cp::and_( + {cp::less_equal( + cp::field_ref(arrow::FieldRef(fieldRefX)), + cp::literal(m_sFilterEnvelope.MaxX)), + cp::less_equal( + cp::field_ref(arrow::FieldRef(fieldRefY)), + cp::literal(m_sFilterEnvelope.MaxY)), + cp::greater_equal( + cp::field_ref(arrow::FieldRef(fieldRefX)), + cp::literal(m_sFilterEnvelope.MinX)), + cp::greater_equal( + cp::field_ref(arrow::FieldRef(fieldRefY)), + cp::literal(m_sFilterEnvelope.MinY))}); + } + } + } + else if (m_iGeomFieldFilter >= 0 && + m_iGeomFieldFilter < + static_cast(m_aeGeomEncoding.size()) && + m_aeGeomEncoding[m_iGeomFieldFilter] == + OGRArrowGeomEncoding::WKB) + { + const int iCol = + m_anMapGeomFieldIndexToArrowColumn[m_iGeomFieldFilter]; + const auto &field = m_poSchema->fields()[iCol]; + if (field->type()->id() == arrow::Type::BINARY && + RegisterOGRWKBIntersectsIfNeeded()) + { +#ifdef SUPPORTS_INDICES_IN_FIELD_REF + auto oFieldRef = arrow::FieldRef(iCol); +#else + auto oFieldRef = arrow::FieldRef(field->name()); +#endif + std::vector abyFilterGeomWkb; + abyFilterGeomWkb.resize(m_poFilterGeom->WkbSize()); + m_poFilterGeom->exportToWkb(wkbNDR, abyFilterGeomWkb.data(), + wkbVariantIso); + expression = + cp::call("OGRWKBIntersects", {cp::field_ref(oFieldRef)}, + WKBGeometryOptions(abyFilterGeomWkb)); + + if (expression.is_valid()) + { + m_bBaseArrowIgnoreSpatialFilterRect = true; + m_bBaseArrowIgnoreSpatialFilter = true; + m_bSkipFilterGeometry = true; + } + } + } + + if (expression.is_valid() && !m_bSkipFilterGeometry) + { + m_bBaseArrowIgnoreSpatialFilterRect = true; + + const bool bIsPoint = + wkbFlatten( + m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter) + ->GetType()) == wkbPoint; + m_bBaseArrowIgnoreSpatialFilter = + m_bFilterIsEnvelope && bIsPoint; + + m_bSkipFilterGeometry = + m_bFilterIsEnvelope && + (bIsPoint || + m_poFeatureDefn->GetGeomFieldDefn(m_iGeomFieldFilter) + ->IsIgnored()); + } + } + + if (m_poAttrQuery && + CPLTestBool(CPLGetConfigOption( + "OGR_PARQUET_OPTIMIZED_ATTRIBUTE_FILTER", "YES"))) + { + const swq_expr_node *poNode = + static_cast(m_poAttrQuery->GetSWQExpr()); + bool bFullyTranslated = true; + auto expressionFilter = BuildArrowFilter(poNode, bFullyTranslated); + if (expressionFilter.is_valid()) + { + if (bFullyTranslated) + { + CPLDebugOnly("PARQUET", + "Attribute filter fully translated to Arrow"); + m_asAttributeFilterConstraints.clear(); + m_bBaseArrowIgnoreAttributeFilter = true; + } + + if (expression.is_valid()) + expression = cp::and_(expression, expressionFilter); + else + expression = std::move(expressionFilter); + } + } + + if (expression.is_valid()) + { + PARQUET_THROW_NOT_OK(scannerBuilder->Filter(expression)); + } + + if (m_bIgnoredFields) + { +#ifdef DEBUG + std::string osFields; + for (const std::string &osField : m_aosProjectedFields) + { + if (!osFields.empty()) + osFields += ','; + osFields += osField; + } + CPLDebug("PARQUET", "Projected fields: %s", osFields.c_str()); +#endif + PARQUET_THROW_NOT_OK(scannerBuilder->Project(m_aosProjectedFields)); + } + + PARQUET_ASSIGN_OR_THROW(m_poScanner, scannerBuilder->Finish()); + } + catch (const std::exception &e) + { + CPLError(CE_Failure, CPLE_AppDefined, "Arrow/Parquet exception: %s", + e.what()); + } +} + +/************************************************************************/ +/* BuildArrowFilter() */ +/************************************************************************/ + +cp::Expression +OGRParquetDatasetLayer::BuildArrowFilter(const swq_expr_node *poNode, + bool &bFullyTranslated) +{ + if (poNode->eNodeType == SNT_OPERATION && poNode->nOperation == SWQ_AND && + poNode->nSubExprCount == 2) + { + auto sLeft = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + auto sRight = + BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated); + if (sLeft.is_valid() && sRight.is_valid()) + return cp::and_(sLeft, sRight); + if (sLeft.is_valid()) + return sLeft; + if (sRight.is_valid()) + return sRight; + } + + else if (poNode->eNodeType == SNT_OPERATION && + poNode->nOperation == SWQ_OR && poNode->nSubExprCount == 2) + { + auto sLeft = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + auto sRight = + BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated); + if (sLeft.is_valid() && sRight.is_valid()) + return cp::or_(sLeft, sRight); + } + + else if (poNode->eNodeType == SNT_OPERATION && + poNode->nOperation == SWQ_NOT && poNode->nSubExprCount == 1) + { + auto expr = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + if (expr.is_valid()) + return cp::not_(expr); + } + + else if (poNode->eNodeType == SNT_COLUMN) + { + if (poNode->field_index >= 0 && + poNode->field_index < m_poFeatureDefn->GetFieldCount()) + { + std::vector fieldRefs; +#ifdef SUPPORTS_INDICES_IN_FIELD_REF + for (int idx : m_anMapFieldIndexToArrowColumn[poNode->field_index]) + fieldRefs.emplace_back(idx); +#else + std::shared_ptr field; + for (int idx : m_anMapFieldIndexToArrowColumn[poNode->field_index]) + { + if (!field) + { + field = m_poSchema->fields()[idx]; + } + else + { + CPLAssert(field->type()->id() == arrow::Type::STRUCT); + const auto fieldStruct = + std::static_pointer_cast( + field->type()); + field = fieldStruct->fields()[idx]; + } + fieldRefs.emplace_back(field->name()); + } +#endif + auto expr = cp::field_ref(arrow::FieldRef(std::move(fieldRefs))); + + // Comparing a boolean column to 0 or 1 fails without explicit cast + if (m_poFeatureDefn->GetFieldDefn(poNode->field_index) + ->GetSubType() == OFSTBoolean) + { + expr = cp::call("cast", {expr}, + cp::CastOptions::Safe(arrow::uint8())); + } + return expr; + } + else if (poNode->field_index == + m_poFeatureDefn->GetFieldCount() + SPF_FID && + m_iFIDArrowColumn >= 0) + { +#ifdef SUPPORTS_INDICES_IN_FIELD_REF + return cp::field_ref(arrow::FieldRef(m_iFIDArrowColumn)); +#else + return cp::field_ref(arrow::FieldRef( + m_poSchema->fields()[m_iFIDArrowColumn]->name())); +#endif + } + } + + else if (poNode->eNodeType == SNT_CONSTANT) + { + switch (poNode->field_type) + { + case SWQ_INTEGER: + case SWQ_INTEGER64: + return cp::literal(static_cast(poNode->int_value)); + + case SWQ_FLOAT: + return cp::literal(poNode->float_value); + + case SWQ_STRING: + return cp::literal(poNode->string_value); + + case SWQ_TIMESTAMP: + { + OGRField sField; + if (OGRParseDate(poNode->string_value, &sField, 0)) + { + struct tm brokenDown; + brokenDown.tm_year = sField.Date.Year - 1900; + brokenDown.tm_mon = sField.Date.Month - 1; + brokenDown.tm_mday = sField.Date.Day; + brokenDown.tm_hour = sField.Date.Hour; + brokenDown.tm_min = sField.Date.Minute; + brokenDown.tm_sec = static_cast(sField.Date.Second); + int64_t nVal = + CPLYMDHMSToUnixTime(&brokenDown) * 1000 + + (static_cast(sField.Date.Second * 1000 + 0.5) % + 1000); + if (sField.Date.TZFlag > OGR_TZFLAG_MIXED_TZ) + { + // Convert for sField.Date.TZFlag to UTC + const int TZOffset = + (sField.Date.TZFlag - OGR_TZFLAG_UTC) * 15; + const int TZOffsetMS = TZOffset * 60 * 1000; + nVal -= TZOffsetMS; + return cp::literal(arrow::TimestampScalar( + nVal, arrow::TimeUnit::MILLI, "UTC")); + } + else + { + return cp::literal(arrow::TimestampScalar( + nVal, arrow::TimeUnit::MILLI)); + } + } + } + + default: + break; + } + } + + else if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2 && + IsComparisonOp(poNode->nOperation)) + { + auto sLeft = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + auto sRight = + BuildArrowFilter(poNode->papoSubExpr[1], bFullyTranslated); + if (sLeft.is_valid() && sRight.is_valid()) + { + if (poNode->nOperation == SWQ_EQ) + return cp::equal(sLeft, sRight); + if (poNode->nOperation == SWQ_LT) + return cp::less(sLeft, sRight); + if (poNode->nOperation == SWQ_LE) + return cp::less_equal(sLeft, sRight); + if (poNode->nOperation == SWQ_GT) + return cp::greater(sLeft, sRight); + if (poNode->nOperation == SWQ_GE) + return cp::greater_equal(sLeft, sRight); + if (poNode->nOperation == SWQ_NE) + return cp::not_equal(sLeft, sRight); + } + } + + else if (poNode->eNodeType == SNT_OPERATION && poNode->nSubExprCount == 2 && + (poNode->nOperation == SWQ_LIKE || + poNode->nOperation == SWQ_ILIKE) && + poNode->papoSubExpr[1]->eNodeType == SNT_CONSTANT && + poNode->papoSubExpr[1]->field_type == SWQ_STRING) + { + auto sLeft = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + if (sLeft.is_valid()) + { + if (cp::GetFunctionRegistry() + ->GetFunction("match_like") + .ValueOr(nullptr)) + { + // match_like is only available is Arrow built against RE2. + return cp::call( + "match_like", {sLeft}, + cp::MatchSubstringOptions( + poNode->papoSubExpr[1]->string_value, + /* ignore_case=*/poNode->nOperation == SWQ_ILIKE)); + } + } + } + + else if (poNode->eNodeType == SNT_OPERATION && + poNode->nOperation == SWQ_ISNULL && poNode->nSubExprCount == 1) + { + auto expr = BuildArrowFilter(poNode->papoSubExpr[0], bFullyTranslated); + if (expr.is_valid()) + return cp::is_null(expr); + } + + bFullyTranslated = false; + return {}; +} + /************************************************************************/ /* ReadNextBatch() */ /************************************************************************/ bool OGRParquetDatasetLayer::ReadNextBatch() { + if (m_bRebuildScanner) + BuildScanner(); + m_nIdxInBatch = 0; if (m_poRecordBatchReader == nullptr) { + if (!m_poScanner) + return false; auto result = m_poScanner->ToRecordBatchReader(); if (!result.ok()) { @@ -138,18 +942,35 @@ bool OGRParquetDatasetLayer::ReadNextBatch() } } while (poNextBatch->num_rows() == 0); + // CPLDebug("PARQUET", "Current batch has %d rows", int(poNextBatch->num_rows())); + SetBatch(poNextBatch); return true; } /************************************************************************/ -/* InvalidateCachedBatches() */ +/* GetNextFeature() */ /************************************************************************/ -void OGRParquetDatasetLayer::InvalidateCachedBatches() +OGRFeature *OGRParquetDatasetLayer::GetNextFeature() { - ResetReading(); + while (true) + { + OGRFeature *poFeature = GetNextRawFeature(); + if (poFeature == nullptr) + return nullptr; + + if ((m_poFilterGeom == nullptr || m_bSkipFilterGeometry || + FilterGeometry(poFeature->GetGeometryRef())) && + (m_poAttrQuery == nullptr || m_bBaseArrowIgnoreAttributeFilter || + m_poAttrQuery->Evaluate(poFeature))) + { + return poFeature; + } + else + delete poFeature; + } } /************************************************************************/ @@ -160,6 +981,10 @@ GIntBig OGRParquetDatasetLayer::GetFeatureCount(int bForce) { if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr) { + if (m_bRebuildScanner) + BuildScanner(); + if (!m_poScanner) + return -1; auto status = m_poScanner->CountRows(); if (status.ok()) return *status; @@ -222,7 +1047,7 @@ OGRErr OGRParquetDatasetLayer::GetExtent(int iGeomField, OGREnvelope *psExtent, auto oIter = m_oMapGeometryColumns.find(pszGeomFieldName); if (oIter != m_oMapGeometryColumns.end()) { - auto statusFragments = m_poScanner->dataset()->GetFragments(); + auto statusFragments = m_poDataset->GetFragments(); if (statusFragments.ok()) { *psExtent = OGREnvelope(); @@ -272,3 +1097,176 @@ OGRErr OGRParquetDatasetLayer::GetExtent(int iGeomField, OGREnvelope *psExtent, return OGRParquetLayerBase::GetExtent(iGeomField, psExtent, bForce); } + +/************************************************************************/ +/* SetSpatialFilter() */ +/************************************************************************/ + +void OGRParquetDatasetLayer::SetSpatialFilter(int iGeomField, + OGRGeometry *poGeomIn) + +{ + OGRParquetLayerBase::SetSpatialFilter(iGeomField, poGeomIn); + m_bRebuildScanner = true; + + // Full invalidation + InvalidateCachedBatches(); +} + +/************************************************************************/ +/* SetIgnoredFields() */ +/************************************************************************/ + +OGRErr OGRParquetDatasetLayer::SetIgnoredFields(CSLConstList papszFields) +{ + m_bRebuildScanner = true; + m_aosProjectedFields.clear(); + m_bIgnoredFields = false; + m_anMapFieldIndexToArrayIndex.clear(); + m_anMapGeomFieldIndexToArrayIndex.clear(); + m_nRequestedFIDColumn = -1; + OGRErr eErr = OGRParquetLayerBase::SetIgnoredFields(papszFields); + if (eErr == OGRERR_NONE) + { + m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr; + if (m_bIgnoredFields) + { + if (m_iFIDArrowColumn >= 0) + { + m_nRequestedFIDColumn = + static_cast(m_aosProjectedFields.size()); + m_aosProjectedFields.emplace_back(GetFIDColumn()); + } + + const auto &fields = m_poSchema->fields(); + for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i) + { + const auto &field = + fields[m_anMapFieldIndexToArrowColumn[i][0]]; + const auto eArrowType = field->type()->id(); + if (eArrowType == arrow::Type::STRUCT) + { + // For a struct, for the sake of simplicity in + // GetNextRawFeature(), as soon as one of the member if + // requested, request the struct field, so that the Arrow + // type doesn't change + bool bFoundNotIgnored = false; + for (int j = i; j < m_poFeatureDefn->GetFieldCount() && + m_anMapFieldIndexToArrowColumn[i][0] == + m_anMapFieldIndexToArrowColumn[j][0]; + ++j) + { + if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored()) + { + bFoundNotIgnored = true; + break; + } + } + if (bFoundNotIgnored) + { + int j; + for (j = i; j < m_poFeatureDefn->GetFieldCount() && + m_anMapFieldIndexToArrowColumn[i][0] == + m_anMapFieldIndexToArrowColumn[j][0]; + ++j) + { + if (!m_poFeatureDefn->GetFieldDefn(j)->IsIgnored()) + { + m_anMapFieldIndexToArrayIndex.push_back( + static_cast( + m_aosProjectedFields.size())); + } + else + { + m_anMapFieldIndexToArrayIndex.push_back(-1); + } + } + i = j - 1; + + m_aosProjectedFields.emplace_back(field->name()); + } + else + { + int j; + for (j = i; j < m_poFeatureDefn->GetFieldCount() && + m_anMapFieldIndexToArrowColumn[i][0] == + m_anMapFieldIndexToArrowColumn[j][0]; + ++j) + { + m_anMapFieldIndexToArrayIndex.push_back(-1); + } + i = j - 1; + } + } + else if (!m_poFeatureDefn->GetFieldDefn(i)->IsIgnored()) + { + m_anMapFieldIndexToArrayIndex.push_back( + static_cast(m_aosProjectedFields.size())); + m_aosProjectedFields.emplace_back(field->name()); + } + else + { + m_anMapFieldIndexToArrayIndex.push_back(-1); + } + } + + for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i) + { + const auto &field = + fields[m_anMapGeomFieldIndexToArrowColumn[i]]; + if (!m_poFeatureDefn->GetGeomFieldDefn(i)->IsIgnored()) + { + m_anMapGeomFieldIndexToArrayIndex.push_back( + static_cast(m_aosProjectedFields.size())); + m_aosProjectedFields.emplace_back(field->name()); + } + else + { + m_anMapGeomFieldIndexToArrayIndex.push_back(-1); + } + } + } + } + + m_nExpectedBatchColumns = + m_bIgnoredFields ? static_cast(m_aosProjectedFields.size()) : -1; + + // Full invalidation + InvalidateCachedBatches(); + + return eErr; +} + +/************************************************************************/ +/* TestCapability() */ +/************************************************************************/ + +int OGRParquetDatasetLayer::TestCapability(const char *pszCap) +{ + if (EQUAL(pszCap, OLCIgnoreFields)) + return true; + + if (EQUAL(pszCap, OLCFastSpatialFilter)) + { + if (m_iGeomFieldFilter >= 0 && + m_iGeomFieldFilter < static_cast(m_aeGeomEncoding.size()) && + m_aeGeomEncoding[m_iGeomFieldFilter] == + OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT) + { + return true; + } + // fallback to base method + } + + return OGRParquetLayerBase::TestCapability(pszCap); +} + +/***********************************************************************/ +/* SetAttributeFilter() */ +/***********************************************************************/ + +OGRErr OGRParquetDatasetLayer::SetAttributeFilter(const char *pszFilter) +{ + m_bRebuildScanner = true; + return OGRParquetLayerBase::SetAttributeFilter(pszFilter); +} diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp index 96719ea79db0..f3c0236128b5 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp @@ -246,73 +246,14 @@ static GDALDataset *OpenFromDatasetFactory( std::shared_ptr dataset; PARQUET_ASSIGN_OR_THROW(dataset, factory->Finish()); - std::shared_ptr scannerBuilder; - PARQUET_ASSIGN_OR_THROW(scannerBuilder, dataset->NewScan()); - auto poMemoryPool = std::shared_ptr( arrow::MemoryPool::CreateDefault().release()); - // We cannot use the above shared memory pool. Otherwise we get random - // crashes in multi-threaded arrow code (apparently some cleanup code), - // that may used the memory pool after it has been destroyed. - // PARQUET_THROW_NOT_OK(scannerBuilder->Pool(poMemoryPool.get())); - const bool bIsVSI = STARTS_WITH(osBasePath.c_str(), "/vsi"); - if (bIsVSI) - { - const int nFragmentReadAhead = - atoi(CPLGetConfigOption("OGR_PARQUET_FRAGMENT_READ_AHEAD", "2")); - PARQUET_THROW_NOT_OK( - scannerBuilder->FragmentReadahead(nFragmentReadAhead)); - - const char *pszBatchSize = - CPLGetConfigOption("OGR_PARQUET_BATCH_SIZE", nullptr); - if (pszBatchSize) - { - PARQUET_THROW_NOT_OK( - scannerBuilder->BatchSize(CPLAtoGIntBig(pszBatchSize))); - } - - const char *pszUseThreads = - CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr); - if (pszUseThreads) - { - PARQUET_THROW_NOT_OK( - scannerBuilder->UseThreads(CPLTestBool(pszUseThreads))); - } - - const char *pszNumThreads = - CPLGetConfigOption("GDAL_NUM_THREADS", nullptr); - int nNumThreads = 0; - if (pszNumThreads == nullptr) - nNumThreads = std::min(4, CPLGetNumCPUs()); - else - nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") - ? CPLGetNumCPUs() - : atoi(pszNumThreads); - if (nNumThreads > 1) - { - CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads)); - } - -#if PARQUET_VERSION_MAJOR >= 10 - const char *pszBatchReadAhead = - CPLGetConfigOption("OGR_PARQUET_BATCH_READ_AHEAD", nullptr); - if (pszBatchReadAhead) - { - PARQUET_THROW_NOT_OK( - scannerBuilder->BatchReadahead(atoi(pszBatchReadAhead))); - } -#endif - } - - std::shared_ptr scanner; - PARQUET_ASSIGN_OR_THROW(scanner, scannerBuilder->Finish()); - auto poDS = std::make_unique(poMemoryPool); auto poLayer = std::make_unique( - poDS.get(), CPLGetBasename(osBasePath.c_str()), scanner, - scannerBuilder->schema(), papszOpenOptions); + poDS.get(), CPLGetBasename(osBasePath.c_str()), bIsVSI, dataset, + papszOpenOptions); poDS->SetLayer(std::move(poLayer)); return poDS.release(); } diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp index 67938e383e1c..f09eb7989b61 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp @@ -83,6 +83,16 @@ void OGRParquetLayerBase::ResetReading() OGRArrowLayer::ResetReading(); } +/************************************************************************/ +/* InvalidateCachedBatches() */ +/************************************************************************/ + +void OGRParquetLayerBase::InvalidateCachedBatches() +{ + m_iRecordBatch = -1; + ResetReading(); +} + /************************************************************************/ /* LoadGeoMetadata() */ /************************************************************************/ @@ -135,12 +145,11 @@ void OGRParquetLayerBase::LoadGeoMetadata( /************************************************************************/ //! Parse bounding box column definition -static bool ParseGeometryColumnCovering(const CPLJSONObject &oJSONDef, - std::string &osBBOXColumn, - std::string &osXMin, - std::string &osYMin, - std::string &osXMax, - std::string &osYMax) +/*static */ +bool OGRParquetLayerBase::ParseGeometryColumnCovering( + const CPLJSONObject &oJSONDef, std::string &osBBOXColumn, + std::string &osXMin, std::string &osYMin, std::string &osXMax, + std::string &osYMax) { const auto oCovering = oJSONDef["covering"]; if (oCovering.IsValid() && @@ -470,9 +479,40 @@ int OGRParquetLayerBase::TestCapability(const char *pszCap) if (EQUAL(pszCap, OLCFastSetNextByIndex)) return true; + if (EQUAL(pszCap, OLCFastSpatialFilter)) + { + if (m_oMapGeomFieldIndexToGeomColBBOX.find(m_iGeomFieldFilter) != + m_oMapGeomFieldIndexToGeomColBBOX.end()) + { + return true; + } + return false; + } + return OGRArrowLayer::TestCapability(pszCap); } +/************************************************************************/ +/* GetNumCPUs() */ +/************************************************************************/ + +/* static */ +int OGRParquetLayerBase::GetNumCPUs() +{ + const char *pszNumThreads = CPLGetConfigOption("GDAL_NUM_THREADS", nullptr); + int nNumThreads = 0; + if (pszNumThreads == nullptr) + nNumThreads = std::min(4, CPLGetNumCPUs()); + else + nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") ? CPLGetNumCPUs() + : atoi(pszNumThreads); + if (nNumThreads > 1) + { + CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads)); + } + return nNumThreads; +} + /************************************************************************/ /* OGRParquetLayer() */ /************************************************************************/ @@ -489,16 +529,15 @@ OGRParquetLayer::OGRParquetLayer( if (pszParquetBatchSize) m_poArrowReader->set_batch_size(CPLAtoGIntBig(pszParquetBatchSize)); - const char *pszNumThreads = CPLGetConfigOption("GDAL_NUM_THREADS", nullptr); - int nNumThreads = 0; - if (pszNumThreads == nullptr) - nNumThreads = std::min(4, CPLGetNumCPUs()); - else - nNumThreads = EQUAL(pszNumThreads, "ALL_CPUS") ? CPLGetNumCPUs() - : atoi(pszNumThreads); - if (nNumThreads > 1) + const int nNumCPUs = GetNumCPUs(); + const char *pszUseThreads = + CPLGetConfigOption("OGR_PARQUET_USE_THREADS", nullptr); + if (!pszUseThreads && nNumCPUs > 1) + { + pszUseThreads = "YES"; + } + if (CPLTestBool(pszUseThreads)) { - CPL_IGNORE_RET_VAL(arrow::SetCpuThreadPoolCapacity(nNumThreads)); m_poArrowReader->set_use_threads(true); } @@ -529,8 +568,8 @@ void OGRParquetLayer::EstablishFeatureDefn() return; } - const bool bUseBBOX = CPLTestBool(CPLGetConfigOption( - ("OGR_" + GetDriverUCName() + "_USE_BBOX").c_str(), "YES")); + const bool bUseBBOX = + CPLTestBool(CPLGetConfigOption("OGR_PARQUET_USE_BBOX", "YES")); // Keep track of declared bounding box columns in GeoParquet JSON metadata, // in order not to expose them as regular fields. @@ -1436,18 +1475,7 @@ bool OGRParquetLayer::ReadNextBatch() m_anMapGeomFieldIndexToParquetColumns.size()) && m_anMapGeomFieldIndexToParquetColumns[m_iGeomFieldFilter].size() >= 2 && - (m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT || - m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_LINESTRING || - m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_POLYGON || - m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTIPOINT || - m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTILINESTRING || - m_aeGeomEncoding[m_iGeomFieldFilter] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_MULTIPOLYGON)); + OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter])); if (m_asAttributeFilterConstraints.empty() && !bUSEBBOXFields && !(bIsGeoArrowStruct && m_poFilterGeom)) @@ -1811,57 +1839,6 @@ bool OGRParquetLayer::ReadNextBatch() SetBatch(poNextBatch); -#ifdef DEBUG - const auto &poColumns = m_poBatch->columns(); - - // Sanity checks - CPLAssert(m_poBatch->num_columns() == (m_bIgnoredFields - ? m_nExpectedBatchColumns - : m_poSchema->num_fields())); - - for (int i = 0; i < m_poFeatureDefn->GetFieldCount(); ++i) - { - int iCol; - if (m_bIgnoredFields) - { - iCol = m_anMapFieldIndexToArrayIndex[i]; - if (iCol < 0) - continue; - } - else - { - iCol = m_anMapFieldIndexToArrowColumn[i][0]; - } - CPL_IGNORE_RET_VAL(iCol); // to make cppcheck happy - - CPLAssert(iCol < static_cast(poColumns.size())); - CPLAssert(m_poSchema->fields()[m_anMapFieldIndexToArrowColumn[i][0]] - ->type() - ->id() == poColumns[iCol]->type_id()); - } - - for (int i = 0; i < m_poFeatureDefn->GetGeomFieldCount(); ++i) - { - int iCol; - if (m_bIgnoredFields) - { - iCol = m_anMapGeomFieldIndexToArrayIndex[i]; - if (iCol < 0) - continue; - } - else - { - iCol = m_anMapGeomFieldIndexToArrowColumn[i]; - } - CPL_IGNORE_RET_VAL(iCol); // to make cppcheck happy - - CPLAssert(iCol < static_cast(poColumns.size())); - CPLAssert(m_poSchema->fields()[m_anMapGeomFieldIndexToArrowColumn[i]] - ->type() - ->id() == poColumns[iCol]->type_id()); - } -#endif - return true; } @@ -1871,9 +1848,8 @@ bool OGRParquetLayer::ReadNextBatch() void OGRParquetLayer::InvalidateCachedBatches() { - m_iRecordBatch = -1; m_bSingleBatch = false; - ResetReading(); + OGRParquetLayerBase::InvalidateCachedBatches(); } /************************************************************************/ @@ -1888,12 +1864,12 @@ OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields) m_anMapGeomFieldIndexToArrayIndex.clear(); m_nRequestedFIDColumn = -1; OGRErr eErr = OGRLayer::SetIgnoredFields(papszFields); + int nBatchColumns = 0; if (!m_bHasMissingMappingToParquet && eErr == OGRERR_NONE) { m_bIgnoredFields = papszFields != nullptr && papszFields[0] != nullptr; if (m_bIgnoredFields) { - int nBatchColumns = 0; if (m_iFIDParquetColumn >= 0) { m_nRequestedFIDColumn = nBatchColumns; @@ -2004,34 +1980,14 @@ OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields) m_oMapGeomFieldIndexToGeomColBBOXParquet.find(i); if (oIter != m_oMapGeomFieldIndexToGeomColBBOX.end() && oIterParquet != - m_oMapGeomFieldIndexToGeomColBBOXParquet.end()) + m_oMapGeomFieldIndexToGeomColBBOXParquet.end() && + !OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[i])) { - const bool bIsGeoArrowStruct = - (m_aeGeomEncoding[i] == - OGRArrowGeomEncoding::GEOARROW_STRUCT_POINT || - m_aeGeomEncoding[i] == - OGRArrowGeomEncoding:: - GEOARROW_STRUCT_LINESTRING || - m_aeGeomEncoding[i] == - OGRArrowGeomEncoding:: - GEOARROW_STRUCT_POLYGON || - m_aeGeomEncoding[i] == - OGRArrowGeomEncoding:: - GEOARROW_STRUCT_MULTIPOINT || - m_aeGeomEncoding[i] == - OGRArrowGeomEncoding:: - GEOARROW_STRUCT_MULTILINESTRING || - m_aeGeomEncoding[i] == - OGRArrowGeomEncoding:: - GEOARROW_STRUCT_MULTIPOLYGON); - if (!bIsGeoArrowStruct) - { - oIter->second.iArrayIdx = nBatchColumns++; - m_anRequestedParquetColumns.insert( - m_anRequestedParquetColumns.end(), - oIterParquet->second.anParquetCols.begin(), - oIterParquet->second.anParquetCols.end()); - } + oIter->second.iArrayIdx = nBatchColumns++; + m_anRequestedParquetColumns.insert( + m_anRequestedParquetColumns.end(), + oIterParquet->second.anParquetCols.begin(), + oIterParquet->second.anParquetCols.end()); } } else @@ -2043,12 +1999,11 @@ OGRErr OGRParquetLayer::SetIgnoredFields(CSLConstList papszFields) CPLAssert( static_cast(m_anMapGeomFieldIndexToArrayIndex.size()) == m_poFeatureDefn->GetGeomFieldCount()); -#ifdef DEBUG - m_nExpectedBatchColumns = nBatchColumns; -#endif } } + m_nExpectedBatchColumns = m_bIgnoredFields ? nBatchColumns : -1; + ComputeConstraintsArrayIdx(); // Full invalidation @@ -2153,6 +2108,17 @@ int OGRParquetLayer::TestCapability(const char *pszCap) if (EQUAL(pszCap, OLCIgnoreFields)) return !m_bHasMissingMappingToParquet; + if (EQUAL(pszCap, OLCFastSpatialFilter)) + { + if (m_iGeomFieldFilter >= 0 && + m_iGeomFieldFilter < static_cast(m_aeGeomEncoding.size()) && + OGRArrowIsGeoArrowStruct(m_aeGeomEncoding[m_iGeomFieldFilter])) + { + return true; + } + // fallback to base method + } + return OGRParquetLayerBase::TestCapability(pszCap); }