diff --git a/docs/tutorial/concepts/spatial-joins.zh.md b/docs/tutorial/concepts/spatial-joins.zh.md new file mode 100644 index 00000000000..ddd53a6f8eb --- /dev/null +++ b/docs/tutorial/concepts/spatial-joins.zh.md @@ -0,0 +1,418 @@ + + +# 在 Spark 上使用 Apache Sedona 进行空间连接 + +本文介绍如何使用 Apache Sedona 进行空间连接(spatial join)。您将了解到不同类型的空间连接以及如何高效执行它们。 + +页面中的示例较为基础,便于直观地说明空间连接的核心概念。文末还会针对真实业务规模的数据集,进一步阐述空间连接概念并强调若干关键的性能改进。 + +## 使用 Spark 进行 within 空间连接 + +下图包含 3 个点和 2 个多边形:`point_b` 位于 `polygon_y` 内部,`point_c` 位于 `polygon_x` 内部,`point_a` 不在任何多边形中。 + +![spatial join within](../../image/tutorial/concepts/spatial-join1.png) + +`points` 表存放点,`polygons` 表存放多边形。 + +执行该查询的 SQL 如下: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +JOIN polygons ON ST_Within(points.geometry, polygons.geometry); +``` + +结果如下: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| b| y| +| c| x| ++--------+----------+ +``` + +`point_a` 不在结果 DataFrame 中,因为它不在任何多边形内部。 + +如果使用 `LEFT JOIN`,可以更直观地看到 `point_a` 的 `polygon_id` 为 `NULL`: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +LEFT JOIN polygons ON ST_Within(points.geometry, polygons.geometry); +``` + +输出: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| a| NULL| +| b| y| +| c| x| ++--------+----------+ +``` + +`point_a` 的 `polygon_id` 为 `NULL`,因为它不在任何多边形内。 + +在生产应用中通常使用 `JOIN`。本文使用 `LEFT JOIN` 仅是为了展示哪些行没有匹配到。 + +上面的代码使用了 `ST_Within` 谓词,它与 `ST_Contains` 含义相同,只是参数顺序相反。下面是用 `ST_Contains` 写的等价查询: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +LEFT JOIN polygons ON ST_Contains(polygons.geometry, points.geometry); +``` + +结果完全相同: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| a| NULL| +| b| y| +| c| x| ++--------+----------+ +``` + +## 使用 Spark 进行 crosses 空间连接 + +下图包含 1 个多边形和 2 条折线:`line_a` 与 `line_b` 都穿过 `polygon_x`,`line_c` 没有穿过 `polygon_x`。 + +![spatial join crosses](../../image/tutorial/concepts/spatial-join2.png) + +执行该空间连接的 SQL: + +```sql +SELECT + lines.id as line_id, + polygons.id as polygon_id +FROM lines +LEFT JOIN polygons ON ST_Crosses(lines.geometry, polygons.geometry); +``` + +结果: + +``` ++-------+----------+ +|line_id|polygon_id| ++-------+----------+ +| a| x| +| b| x| +| c| NULL| ++-------+----------+ +``` + +借助 `ST_Crosses` 的空间连接,可以筛选出与多边形相交的折线。 + +## 使用 Spark 进行 touches 空间连接 + +假设有 1 个多边形和 2 条折线:`line_a` 不接触多边形,`line_b` 接触多边形。如下图所示: + +![spatial join touches](../../image/tutorial/concepts/spatial-join3.png) + +我们用折线创建 `table_a`,用多边形创建 `table_b`,然后做连接。 + +多边形表内容如下: + +``` ++---+-----------------------------------+ +|id |geometry | ++---+-----------------------------------+ +|x |POLYGON ((6 2, 6 4, 8 4, 8 2, 6 2))| ++---+-----------------------------------+ +``` + +折线表内容如下: + +``` ++---+----------------------+ +|id |geometry | ++---+----------------------+ +|a |LINESTRING (2 4, 4 0) | +|b |LINESTRING (6 0, 10 4)| ++---+----------------------+ +``` + +匹配相互接触的连接: + +```python +sedona.sql(""" +SELECT + lines.id as line_id, + polygons.id as polygon_id +FROM lines +LEFT JOIN polygons ON ST_Touches(lines.geometry, polygons.geometry); +""").show() +``` + +连接结果: + +``` ++-------+----------+ +|line_id|polygon_id| ++-------+----------+ +| a| NULL| +| b| x| ++-------+----------+ +``` + +接下来看看如何用空间连接判断点是否在多边形内。 + +## 使用 Spark 进行 overlaps 空间连接 + +下图展示了 2 个多边形与若干图形:`polygon_a` 与 `polygon_x` 重叠;`line_b`、`line_c` 与 `point_d` 都不与 `polygon_y` 或 `polygon_x` 重叠。 + +![spatial join overlaps](../../image/tutorial/concepts/spatial-join4.png) + +执行该空间连接的 SQL: + +```sql +SELECT + shapes.id as shape_id, + polygons.id as polygon_id +FROM shapes +LEFT JOIN polygons ON ST_Overlaps(shapes.geometry, polygons.geometry); +``` + +结果: + +``` ++--------+----------+ +|shape_id|polygon_id| ++--------+----------+ +| a| x| +| b| NULL| +| c| NULL| +| d| NULL| ++--------+----------+ +``` + +## 使用 Spark 进行 K 近邻空间连接(KNN spatial join) + +假设有一张地址表与一张咖啡店表,希望为每个地址找到最近的两家咖啡店。 + +`addresses` 表包含 `latitude` 与 `longitude`: + +``` ++---+---------+--------+ +| id|longitude|latitude| ++---+---------+--------+ +| a1| 2.0| 3.0| +| a2| 5.0| 5.0| +| a3| 7.0| 2.0| ++---+---------+--------+ +``` + +`coffee_shops` 表同样包含 `latitude` 与 `longitude`: + +``` ++---+---------+--------+ +| id|longitude|latitude| ++---+---------+--------+ +| c1| 1.0| 4.0| +| c2| 3.0| 5.0| +| c3| 5.0| 1.0| +| c4| 8.0| 4.0| ++---+---------+--------+ +``` + +为每个地址计算最近的两家咖啡店: + +```sql +SELECT + addresses.id AS address_id, + coffee_shops.id AS coffee_shop_id +FROM addresses +JOIN coffee_shops +ON ST_KNN(addresses.geometry, coffee_shops.geometry, 2) +``` + +结果: + +``` ++----------+--------------+ +|address_id|coffee_shop_id| ++----------+--------------+ +| a1| c1| +| a1| c2| +| a2| c2| +| a2| c4| +| a3| c3| +| a3| c4| ++----------+--------------+ +``` + +可视化结果如下: + +![spatial join knn](../../image/tutorial/concepts/spatial-join5.png) + +可以一目了然地看出每个地址附近的两家咖啡店。 + +## 使用 Spark 进行空间距离连接 + +下图展示了 1 个点与若干公交站点。我们要找出与该点距离不超过 2.5 单位的所有公交站点。 + +![spatial distance join](../../image/tutorial/concepts/spatial-join6.png) + +可以看出 `t2` 与 `t3` 在 2.5 单位范围内。 + +points 表如下: + +``` ++---+-------------+ +| id| geometry| ++---+-------------+ +| p1|POINT (4.5 3)| ++---+-------------+ +``` + +transit 表如下: + +``` ++---+-----------+ +| id| geometry| ++---+-----------+ +| t1|POINT (1 4)| +| t2|POINT (3 4)| +| t3|POINT (5 2)| +| t4|POINT (8 4)| ++---+-----------+ +``` + +执行距离连接,找出与该点距离 ≤ 2.5 的公交站点: + +```sql +SELECT + points.id AS point_id, + transit.id AS transit_id +FROM points +JOIN transit +ON ST_DWithin(points.geometry, transit.geometry, 2.5) +``` + +结果: + +``` ++--------+----------+ +|point_id|transit_id| ++--------+----------+ +| p1| t2| +| p1| t3| ++--------+----------+ +``` + +Sedona 非常适合查找距离某点一定范围内的位置。 + +Sedona 使用的是欧氏距离,因此距离单位与原始坐标的 CRS 相同。如果要在 WGS84 坐标上以米为单位运算,请使用 `ST_DistanceSphere`、`ST_DistanceSpheroid` 或 `ST_DWithin(useSpheroid = true)`。 + +## 使用 Spark 进行空间范围连接 + +由 `ST_Intersects`、`ST_Contains`、`ST_Within`、`ST_DWithin`、`ST_Touches`、`ST_Crosses` 触发的连接都属于范围连接(range join)。本节再展示一个范围连接示例,其实前面已经介绍过多个范围连接。 + +假设有一张城市表与一张餐厅表,希望找出某城市内的所有餐厅。示例数据如下图: + +![spatial range join](../../image/tutorial/concepts/spatial-join7.png) + +3 家餐厅位于城市边界内,1 家餐厅在城市外。 + +cities 表: + +``` ++-----+--------------------+ +| id| geometry| ++-----+--------------------+ +|city1|POLYGON ((1 1, 1 ...| ++-----+--------------------+ +``` + +`restaurants` 表: + +``` ++---+-----------+ +| id| geometry| ++---+-----------+ +| r1|POINT (2 2)| +| r2|POINT (3 3)| +| r3|POINT (4 4)| +| r4|POINT (6 6)| ++---+-----------+ +``` + +执行范围连接: + +``` +SELECT + cities.id AS city_id, + restaurants.id AS restaurant_id +FROM cities +JOIN restaurants +ON ST_Intersects(restaurants.geometry, cities.geometry) +``` + +结果: + +``` ++-------+-------------+ +|city_id|restaurant_id| ++-------+-------------+ +| city1| r1| +| city1| r2| +| city1| r3| ++-------+-------------+ +``` + +范围连接在很多实际场景中都非常有用。 + +## Sedona 与 Apache Spark 上的空间连接优化 + +可以通过更优的文件格式、索引、查询写法等方式优化空间连接。 + +例如,假设您正在对存储为 GeoJSON 的两张宽表执行空间连接,且不需要全部输出列。GeoJSON 是行式存储,不支持列裁剪。把数据从 GeoJSON 切换到 GeoParquet 等列式格式即可享受列裁剪带来的关键性能收益。 + +更多查询优化方法请参阅 [此处](https://sedona.apache.org/latest/api/sql/Optimizer/)。 + +## 空间广播连接 + +Sedona 既能在单机运行,也能在多节点集群上运行。 + +可以想象,当数据分布在集群的不同机器上时,连接两个数据集会更慢——因为需要 shuffle,开销不小。 + +如果其中一张表很小,可以将其广播到集群中的所有机器,从而大幅提升连接速度。 + +通常只有相对较小的 DataFrame 才适合广播,详见 [此处说明](https://sedona.apache.org/latest/api/sql/Optimizer/?h=broadcast#broadcast-index-join)。 + +Sedona 会自动广播大小低于阈值的表,[详细信息见此](https://sedona.apache.org/latest/api/sql/Optimizer/#automatic-broadcast-index-join)。 + +## 结论 + +Apache Sedona 支持多种类型的空间连接。 + +空间连接是 Sedona 引擎的强项之一。其他引擎在执行空间连接时常常受到内存问题的困扰,而 Sedona 能够在海量数据集上完成空间连接。 diff --git a/docs/tutorial/files/geoparquet-sedona-spark.zh.md b/docs/tutorial/files/geoparquet-sedona-spark.zh.md new file mode 100644 index 00000000000..d2a8cec29c8 --- /dev/null +++ b/docs/tutorial/files/geoparquet-sedona-spark.zh.md @@ -0,0 +1,352 @@ + + +# 在 Spark 上使用 Apache Sedona 处理 GeoParquet + +本页介绍如何使用 Apache Sedona 与 GeoParquet 构建空间数据湖。 + +GeoParquet 在空间数据集场景下具有诸多优势: + +* 内置对几何列的支持 +* GeoParquet 是列式存储,支持列裁剪,仅使用部分列的查询会更快 +* 内嵌统计信息让某些查询能够直接跳过整块文件内容 +* 为每个 row group 存储边界框(“bbox”)元数据,可进行 row-group 级过滤 +* 列式格式利于压缩 +* 与 GeoJSON 或 CSV 不同,GeoParquet 在文件尾部携带 schema,无需推断或手动指定 + +下面看如何把 Sedona DataFrame 写成 GeoParquet 文件。 + +## 使用 Spark 把 Sedona DataFrame 写为 GeoParquet + +先创建一个 Sedona DataFrame: + +```python +df = sedona.createDataFrame( + [ + ("a", "LINESTRING(2.0 5.0,6.0 1.0)"), + ("b", "LINESTRING(7.0 4.0,9.0 2.0)"), + ("c", "LINESTRING(1.0 3.0,3.0 1.0)"), + ], + ["id", "geometry"], +) +df = df.withColumn("geometry", ST_GeomFromText(col("geometry"))) +``` + +将 DataFrame 写成 GeoParquet 文件: + +```python +df.write.format("geoparquet").mode("overwrite").save("/tmp/somewhere") +``` + +写出的文件如下: + +``` +somewhere/ + _SUCCESS + part-00000-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00003-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00007-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00011-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet +``` + +Sedona 会并行写出多个 Parquet 文件,比写单一文件快得多。 + +## 使用 Spark 将 GeoParquet 读入 Sedona DataFrame + +将 GeoParquet 文件读入 Sedona DataFrame: + +```python +df = sedona.read.format("geoparquet").load("/tmp/somewhere") +df.show(truncate=False) +``` + +结果如下: + +``` ++---+---------------------+ +|id |geometry | ++---+---------------------+ +|a |LINESTRING (2 5, 6 1)| +|b |LINESTRING (7 4, 9 2)| +|c |LINESTRING (1 3, 3 1)| ++---+---------------------+ +``` + +Sedona 在底层执行该查询的过程大致如下: + +1. 从 GeoParquet 文件尾部读取 schema,因此无需 schema 推断。 +2. 收集每个文件的 bbox 元数据,判断哪些数据可以被跳过。 +3. 执行查询时跳过未使用的列。 + +得益于此,Sedona 在 GeoParquet 上的查询通常比在 CSV 等格式上更快、更稳定。CSV 既不在文件尾部存 schema,也不支持列裁剪,更没有用于数据跳过的 row-group 元数据。 + +## 检查 GeoParquet 元数据 + +自 v`1.5.1` 起,Sedona 提供了一个 Spark SQL 数据源 `"geoparquet.metadata"`,可用于检查 GeoParquet 元数据。返回的 DataFrame 中包含每个输入文件的 “geo” 元数据。 + +=== "Scala" + + ```scala + val df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Java" + + ```java + Dataset df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Python" + + ```python + df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +输出如下: + +``` +root + |-- path: string (nullable = true) + |-- version: string (nullable = true) + |-- primary_column: string (nullable = true) + |-- columns: map (nullable = true) + | |-- key: string + | |-- value: struct (valueContainsNull = true) + | | |-- encoding: string (nullable = true) + | | |-- geometry_types: array (nullable = true) + | | | |-- element: string (containsNull = true) + | | |-- bbox: array (nullable = true) + | | | |-- element: double (containsNull = true) + | | |-- crs: string (nullable = true) +``` + +GeoParquet 文件尾部为每一列存储以下数据: + +* geometry_type:几何对象的类型,如点、多边形等 +* bbox:文件中对象的边界框 +* crs:坐标参考系 +* covering:包含 xmin、ymin、xmax、ymax 的结构体列,用于 GeoParquet 1.1 文件的 row group 统计信息 + +其他 Parquet 元数据存储在 Parquet 文件尾部,与普通 Parquet 文件保持一致。 + +如果输入的 Parquet 文件没有 GeoParquet 元数据,返回 DataFrame 中的 `version`、`primary_column` 与 `columns` 字段会为 `null`。 + +`geoparquet.metadata` 仅支持读取 GeoParquet 特有的元数据。如果需要读取通用 Parquet 文件的全部元数据,可以使用 [G-Research/spark-extension](https://github.com/G-Research/spark-extension/blob/13109b8e43dfba9272c85896ba5e30cfe280426f/PARQUET.md)。 + +我们来查看一下元数据的内容与 schema: + +``` +df.show(truncate=False) + ++-----------------------------+-------------------------------------------------------------------+ +|path |columns | ++-----------------------------+-------------------------------------------------------------------+ +|file:/part-00003-1c13.parquet|{geometry -> {WKB, [LineString], [2.0, 1.0, 6.0, 5.0], null, NULL}}| +|file:/part-00007-1c13.parquet|{geometry -> {WKB, [LineString], [7.0, 2.0, 9.0, 4.0], null, NULL}}| +|file:/part-00011-1c13.parquet|{geometry -> {WKB, [LineString], [1.0, 1.0, 3.0, 3.0], null, NULL}}| +|file:/part-00000-1c13.parquet|{geometry -> {WKB, [], [0.0, 0.0, 0.0, 0.0], null, NULL}} | ++-----------------------------+-------------------------------------------------------------------+ +``` + +`columns` 列保存了 GeoParquet 数据湖中各文件的边界框信息,可用于跳过整个文件或 row group。下文还会进一步介绍如何利用 bbox 元数据。 + +## 写出带 CRS 元数据的 GeoParquet + +自 v`1.5.1` 起,Sedona 支持以自定义的 GeoParquet 规范版本与 CRS 写出 GeoParquet 文件。默认的 GeoParquet 规范版本为 `1.1.0`(自 `v1.9.0` 起),默认 CRS 为 `null`。可按以下方式指定: + +```scala +val projjson = "{...}" // 所有几何列共享的 PROJJSON 字符串 +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs", projjson) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +如果 GeoParquet 文件中存在多个几何列,可以为每个列分别指定 CRS。例如 `df` 中有两列几何列 `g0` 与 `g1`,希望分别指定它们的 CRS: + +```scala +val projjson_g0 = "{...}" // g0 的 PROJJSON 字符串 +val projjson_g1 = "{...}" // g1 的 PROJJSON 字符串 +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs.g0", projjson_g0) + .option("geoparquet.crs.g1", projjson_g1) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +`geoparquet.crs` 与 `geoparquet.crs.` 可取以下值: + +* `"null"`:显式将 `crs` 字段置为 `null`,这是几何 SRID 为 0 时的默认行为。 +* `""`(空字符串):省略 `crs` 字段。对于支持 CRS 的实现而言,意味着使用 [OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84)。 +* `"{...}"`(PROJJSON 字符串):`crs` 字段会被设为表示该几何坐标参考系的 PROJJSON 对象。可在 https://epsg.io/ (页面底部点击 JSON 选项)查找特定 CRS 的 PROJJSON,也可以根据需要自定义 PROJJSON。 + +### 通过 SRID 自动推断 CRS + +未显式提供 `geoparquet.crs` 选项时,Sedona 会根据几何列的 SRID 自动推导出 CRS PROJJSON。例如,如果某列所有几何对象的 SRID 都是 32632(通过 [`ST_SetSRID`](../../api/sql/Spatial-Reference-System/ST_SetSRID.md) 设置),写入器会自动在 GeoParquet 元数据中生成 EPSG:32632 对应的 PROJJSON。当 SRID 为 4326 时,由于这就是 GeoParquet 的默认 CRS(OGC:CRS84),`crs` 字段会被省略。 + +* 若 SRID 为 0(未显式设置 SRID 的几何对象的默认值),`crs` 字段会被设为 `null`。 +* 若同一列中的几何对象 SRID 不一致,`crs` 字段默认置为 `null`。 +* 若显式提供了 `geoparquet.crs` 或 `geoparquet.crs.` 选项,则始终优先于 SRID 推导出的 CRS。 + +Sedona 的 GeoParquet 读写器**不会**校验坐标轴顺序(lon/lat 还是 lat/lon),假定使用者在读写时自行处理。可以使用 [`ST_FlipCoordinates`](../../api/sql/Geometry-Editors/ST_FlipCoordinates.md) 来交换几何对象的坐标轴顺序。 + +## 写出带 covering 元数据的 GeoParquet + +自 `v1.6.1` 起,Sedona 支持向几何列元数据写入 [`covering` 字段](https://github.com/opengeospatial/geoparquet/blob/v1.1.0/format-specs/geoparquet.md#covering)。该字段指定一列边界框列以加速空间数据检索。该边界框列必须是顶层结构体列,包含 `xmin`、`ymin`、`xmax`、`ymax` 四个子列。如果待写入的 DataFrame 已经包含这样的列,可通过 `.option("geoparquet.covering.", "")` 选项把 `covering` 元数据写入 GeoParquet: + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering.geometry", "bbox") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果 DataFrame 仅有一列几何列,可以省略列名直接使用 `geoparquet.covering`: + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering", "bbox") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果未设置 `geoparquet.covering` 选项,Sedona 在写出 GeoParquet `1.1.0` 时会自动为您填充 covering 元数据。 + +对每个几何列,Sedona 会使用 `_bbox` 作为 covering 列: + +* 如果 `_bbox` 已存在且是合法的 covering 结构体(`xmin`、`ymin`、`xmax`、`ymax`),Sedona 会复用它。 +* 如果 `_bbox` 不存在,Sedona 会在写出时自动生成。 + +显式的 `geoparquet.covering` 或 `geoparquet.covering.` 选项始终优先于上述默认行为。 + +可以通过 `geoparquet.covering.mode` 控制该默认行为: + +* `auto`(默认):在写 GeoParquet `1.1.0` 时启用 covering 元数据/列的自动生成。 +* `legacy`:禁用自动 covering 生成。 + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering.mode", "legacy") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果 DataFrame 中没有 covering 列,可以使用 Sedona 的 SQL 函数构造一个: + +```scala +val df_bbox = df.withColumn("bbox", expr("struct(ST_XMin(geometry) AS xmin, ST_YMin(geometry) AS ymin, ST_XMax(geometry) AS xmax, ST_YMax(geometry) AS ymax)")) +df_bbox.write.format("geoparquet").option("geoparquet.covering.geometry", "bbox").save("/path/to/saved_geoparquet.parquet") +``` + +## 排序后再写出 GeoParquet + +为了最大程度地发挥 Sedona GeoParquet filter pushdown 的性能,建议先按几何对象的 geohash 值(参见 [ST_GeoHash](../../api/sql/Geometry-Output/ST_GeoHash.md))排序,再写出 GeoParquet。示例如下: + +``` +SELECT col1, col2, geom, ST_GeoHash(geom, 5) as geohash +FROM spatialDf +ORDER BY geohash +``` + +接下来更深入地看看 Sedona 是如何利用 GeoParquet 的 bbox 元数据来优化查询的。 + +## Sedona 如何在 Spark 上利用 GeoParquet 边界框(bbox)元数据 + +bbox 元数据描述了某个文件中所包含几何对象覆盖的范围。如果查询的区域不与某个文件的边界框相交,引擎在执行查询时即可整体跳过该文件——因为它不可能包含相关数据。 + +跳过整个文件可以极大提升性能,跳过的数据越多,查询就越快。 + +下面来看一个由若干点和三个边界框组成的数据集示例: + +![GeoParquet bbox](../../image/tutorial/files/geoparquet_bbox1.png) + +应用一个空间过滤器,只读取某个区域内的点: + +![GeoParquet bbox 过滤](../../image/tutorial/files/geoparquet_bbox2.png) + +查询代码: + +```python +my_shape = "POLYGON((4.0 3.5, 4.0 6.0, 8.0 6.0, 8.0 4.5, 4.0 3.5))" + +res = sedona.sql(f""" +select * +from points +where st_intersects(geometry, ST_GeomFromWKT('{my_shape}')) +""") +res.show(truncate=False) +``` + +结果如下: + +``` ++---+-----------+ +|id |geometry | ++---+-----------+ +|e |POINT (5 4)| +|f |POINT (6 4)| +|g |POINT (6 5)| ++---+-----------+ +``` + +执行该查询其实并不需要边界框 1 或 3 中的数据,只需要边界框 2 中的点。文件跳过让我们对这个查询只需读取一个文件而非三个。该数据湖中各文件的 bbox 如下: + +``` ++--------------------------------------------------------------+ +|columns | ++--------------------------------------------------------------+ +|{geometry -> {WKB, [Point], [2.0, 1.0, 3.0, 3.0], null, NULL}}| +|{geometry -> {WKB, [Point], [7.0, 1.0, 8.0, 2.0], null, NULL}}| +|{geometry -> {WKB, [Point], [5.0, 4.0, 6.0, 5.0], null, NULL}}| ++--------------------------------------------------------------+ +``` + +跳过整个文件能带来显著的性能收益。 + +## GeoParquet 的优势 + +如前所述,GeoParquet 相比 CSV、GeoJSON 等行式格式有诸多优势: + +* 列裁剪 +* row-group 过滤 +* schema 写在文件尾部 +* 列式数据布局,压缩效果良好 + +但这些优势仍不足以满足空间数据从业者所需的全部能力。 + +## GeoParquet 的局限 + +Parquet 与 GeoParquet 数据湖与其他数据湖一样,存在以下局限: + +* 不支持可靠事务 +* 不支持部分数据操作语言(DML),如 update、delete +* 没有并发保护 +* 与数据库相比,某些操作性能较差 + +为了克服这些局限,数据社区已逐步迁移到 Iceberg、Delta Lake、Hudi 等开放式表格式。 + +Iceberg 最近开始支持 geometry 与 geography 列,让数据从业者既能享受开放式表格式的特性,又能复用底层的 Parquet 文件。Iceberg 解决了 GeoParquet 数据湖的局限,支持可靠事务以及 update、delete 等常见操作。 + +## 结论 + +GeoParquet 是空间数据社区的一项重要进展。它为地理工程师提供了内嵌 schema、性能优化与对几何列的原生支持。 + +由于 Iceberg 现已具备 GeoParquet 的全部优秀特性甚至更多,空间数据工程师也可以平滑迁移到 Iceberg。Iceberg 提供了大量实用的开放式表格式特性,几乎在所有场景下都优于纯 GeoParquet(例外情况:永不变更的单文件数据集,或与其他引擎的兼容性需求)。 + +将工作流从 Shapefile、GeoPackage、CSV、GeoJSON 等传统格式迁移到 GeoParquet/Iceberg 等高性能格式,能立刻提升计算效率。 diff --git a/docs/tutorial/files/stac-sedona-spark.zh.md b/docs/tutorial/files/stac-sedona-spark.zh.md new file mode 100644 index 00000000000..6028159e793 --- /dev/null +++ b/docs/tutorial/files/stac-sedona-spark.zh.md @@ -0,0 +1,468 @@ + + +# 在 Spark 上使用 Apache Sedona 处理 STAC catalog + +STAC 数据源允许您从 SpatioTemporal Asset Catalog(STAC)API 读取数据。该数据源同时支持读取 STAC items 与 collections。 + +## 用法 + +要使用 STAC 数据源,可使用 `stac` 格式将 STAC catalog 加载到 Sedona DataFrame。`load` 的路径可以是本地的 STAC collection JSON 文件,也可以是 HTTP/HTTPS 端点。 + +从本地 collection 文件加载 STAC collection: + +```python +df = sedona.read.format("stac").load("/user/stac_collection.json") +df.printSchema() +df.show() +``` + +从 S3 上的 collection 文件加载: + +```python +df = sedona.read.format("stac").load( + "s3a://example.com/stac_bucket/stac_collection.json" +) +df.printSchema() +df.show() +``` + +也可以从 HTTP/HTTPS 端点加载: + +```python +df = sedona.read.format("stac").load( + "https://earth-search.aws.element84.com/v1/collections/sentinel-2-pre-c1-l2a" +) +df.printSchema() +df.show() +``` + +输出: + +``` +root + |-- stac_version: string (nullable = false) + |-- stac_extensions: array (nullable = true) + | |-- element: string (containsNull = true) + |-- type: string (nullable = false) + |-- id: string (nullable = false) + |-- bbox: array (nullable = true) + | |-- element: double (containsNull = true) + |-- geometry: geometry (nullable = true) + |-- title: string (nullable = true) + |-- description: string (nullable = true) + |-- datetime: timestamp (nullable = true) + |-- start_datetime: timestamp (nullable = true) + |-- end_datetime: timestamp (nullable = true) + |-- created: timestamp (nullable = true) + |-- updated: timestamp (nullable = true) + |-- platform: string (nullable = true) + |-- instruments: array (nullable = true) + | |-- element: string (containsNull = true) + |-- constellation: string (nullable = true) + |-- mission: string (nullable = true) + |-- grid:code: string (nullable = true) + |-- gsd: double (nullable = true) + |-- collection: string (nullable = true) + |-- links: array (nullable = true) + | |-- element: struct (containsNull = true) + | | |-- rel: string (nullable = true) + | | |-- href: string (nullable = true) + | | |-- type: string (nullable = true) + | | |-- title: string (nullable = true) + |-- assets: map (nullable = true) + | |-- key: string + | |-- value: struct (valueContainsNull = true) + | | |-- href: string (nullable = true) + | | |-- type: string (nullable = true) + | | |-- title: string (nullable = true) + | | |-- roles: array (nullable = true) + | | | |-- element: string (containsNull = true) + ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +|stac_version| stac_extensions| type| id| bbox| geometry|title|description| datetime|start_datetime|end_datetime| created| updated| platform|instruments|constellation|mission| gsd| collection| links| assets| ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYC_202212...|[-55.202493, 1.71...|POLYGON ((-55.201...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:13:...|2024-05-01 21:13:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZC_202212...|[-54.30394, 1.719...|POLYGON ((-54.302...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:39:...|2024-05-03 00:39:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBH_202212...|[-53.698196, 2.63...|POLYGON ((-53.698...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:26:...|2024-05-03 00:26:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYD_202212...|[-55.201423, 2.62...|POLYGON ((-55.199...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:10:...|2024-05-01 21:10:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZD_202212...|[-54.302336, 2.62...|POLYGON ((-54.299...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:12:...|2024-05-03 00:12:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBJ_202212...|[-53.700535, 2.63...|POLYGON ((-53.700...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:30:...|2024-05-03 00:30:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYE_202212...|[-55.199906, 3.52...|POLYGON ((-55.197...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:24:...|2024-05-01 21:24:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZE_202212...|[-54.300062, 3.52...|POLYGON ((-54.296...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:14:...|2024-05-03 00:14:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBK_202212...|[-53.703548, 3.52...|POLYGON ((-53.703...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:32:...|2024-05-03 00:32:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYF_202212...|[-55.197941, 4.42...|POLYGON ((-55.195...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:43:...|2024-05-01 21:43:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +``` + +## 谓词下推(Filter Pushdown) + +STAC 数据源支持把空间和时间过滤下推到底层数据源,减少需要读取的数据量。 + +### 空间过滤下推 + +空间过滤下推让数据源能够直接在数据源层应用空间谓词(如 `st_intersects`),从而减少传输与处理的数据量。 + +### 时间过滤下推 + +时间过滤下推让数据源能够直接在数据源层应用时间谓词(如 `BETWEEN`、`>=`、`<=`),同样可以减少传输与处理的数据量。 + +## 示例 + +下面的示例假设 STAC 数据源已经被加载到名为 `STAC_TABLE` 的表中。 + +### 不带过滤的 SQL Select + +```sql +SELECT id, datetime as dt, geometry, bbox FROM STAC_TABLE +``` + +### 带时间过滤的 SQL Select + +```sql + SELECT id, datetime as dt, geometry, bbox + FROM STAC_TABLE + WHERE datetime BETWEEN '2020-01-01' AND '2020-12-13' +``` + +该例中,数据源会把时间过滤下推到底层数据源。 + +### 带空间过滤的 SQL Select + +```sql + SELECT id, geometry + FROM STAC_TABLE + WHERE st_intersects(ST_GeomFromText('POLYGON((17 10, 18 10, 18 11, 17 11, 17 10))'), geometry) +``` + +该例中,数据源会把空间过滤下推到底层数据源。 + +### Sedona 的 STAC 读取器配置 + +在 Sedona 中使用 STAC 读取器时,可通过若干配置项控制读取行为,通常以 `Map[String, String]` 形式传入。下列是核心 Sedona 配置项: + +- **spark.sedona.stac.load.maxPartitionItemFiles**:单个分区中可包含的最大 item 文件数量,用于控制分区大小。默认 `-1`,表示由系统自动决定。 + +- **spark.sedona.stac.load.numPartitions**:为 STAC 数据创建的分区数,便于控制数据分布与并行度。默认 `-1`,表示由系统自动决定。 + +下面是 STAC 读取器的常见 reader option: + +- **itemsLimitMax**:从 STAC collection 加载的 item 数量上限,便于限制处理规模。默认 `-1`,表示加载全部 item。 + +- **itemsLoadProcessReportThreshold**:item 加载进度的报告阈值。默认 `1000000`,即每加载 100 万个 item 报告一次进度。 + +- **itemsLimitPerRequest**:单次 API 请求中获取的最大 item 数。默认 `10`。 + +- **headers**:STAC API 请求中携带的 HTTP 头,应是 JSON 编码的字符串,包含一系列键值对。可用于鉴权或自定义头。例如:`{"Authorization": "Basic "}` + +可以将这些配置合并到一个 `Map[String, String]` 中传给 STAC 读取器: + +```scala + def defaultSparkConfig: Map[String, String] = Map( + "spark.sedona.stac.load.maxPartitionItemFiles" -> "100", + "spark.sedona.stac.load.numPartitions" -> "10", + "spark.sedona.stac.load.itemsLimitMax" -> "20") + + val sparkSession: SparkSession = { + val builder = SedonaContext + .builder() + .master("local[*]") + defaultSparkConfig.foreach { case (key, value) => builder.config(key, value) } + builder.getOrCreate() + } + + df = sedona.read + .format("stac") + .option("itemsLimitMax", "100") + .option("itemsLoadProcessReportThreshold", "2000000") + .option("itemsLimitPerRequest", "100") + .load("https://earth-search.aws.element84.com/v1/collections/sentinel-2-pre-c1-l2a") +``` + +通过以上选项,可以对 Sedona 中 STAC 数据的读取与处理过程进行细粒度的控制。 + +## Python API + +Python API 让您可以通过 Client 类与 STAC API 交互。它提供了打开 STAC API 连接、获取 collection、按多种条件搜索 item 的方法。 + +### 示例代码 + +#### 初始化客户端 + +```python +from sedona.spark.stac import Client + +# 初始化客户端 +client = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") +``` + +#### 在某个 collection 中按年份搜索 + +```python +items = client.search( + collection_id="aster-l1t", datetime="2020", return_dataframe=False +) +``` + +#### 在某个 collection 中按月份搜索并限制最大返回数 + +```python +items = client.search( + collection_id="aster-l1t", datetime="2020-05", return_dataframe=False, max_items=5 +) +``` + +#### 按 bbox 与时间区间搜索 + +```python +items = client.search( + collection_id="aster-l1t", + ids=["AST_L1T_00312272006020322_20150518201805"], + bbox=[-180.0, -90.0, 180.0, 90.0], + datetime=["2006-01-01T00:00:00Z", "2007-01-01T00:00:00Z"], + return_dataframe=False, +) +``` + +#### 按多个 bbox 搜索 + +```python +bbox_list = [[-180.0, -90.0, 180.0, 90.0], [-100.0, -50.0, 100.0, 50.0]] +items = client.search(collection_id="aster-l1t", bbox=bbox_list, return_dataframe=False) +``` + +#### 在多个时间区间内搜索并以 DataFrame 返回 + +```python +interval_list = [ + ["2020-01-01T00:00:00Z", "2020-06-01T00:00:00Z"], + ["2020-07-01T00:00:00Z", "2021-01-01T00:00:00Z"], +] +df = client.search( + collection_id="aster-l1t", datetime=interval_list, return_dataframe=True +) +df.show() +``` + +#### 同时使用 bbox 与时间将 item 写入 GeoParquet + +```python +# 将 DataFrame 中的 item 写入 GeoParquet,同时指定 bbox 与时间 +client.get_collection("aster-l1t").save_to_geoparquet( + output_path="/path/to/output", bbox=bbox_list, datetime="2020-05" +) +``` + +以上示例演示了如何使用 Client 类按各种条件检索 STAC collection,并以 PyStacItem 迭代器或 Spark DataFrame 的形式返回。 + +### 鉴权 + +许多 STAC 服务都需要鉴权才能访问数据。STAC 客户端支持多种鉴权方式:HTTP Basic Authentication、Bearer Token Authentication,以及自定义 HTTP 头。 + +#### Basic 认证 + +Basic 认证通常用于 API key 或用户名/密码组合。许多服务(如 Planet Labs)会把 API key 作为用户名,密码留空。 + +```python +from sedona.spark.stac import Client + +# 示例 1:使用 API key(常见模式) +client = Client.open("https://api.example.com/stac/v1") +client.with_basic_auth("your_api_key_here", "") + +# 在认证状态下搜索 item +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 示例 2:使用用户名与密码 +client = Client.open("https://api.example.com/stac/v1") +client.with_basic_auth("username", "password") + +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 示例 3:链式调用 +df = ( + Client.open("https://api.example.com/stac/v1") + .with_basic_auth("your_api_key", "") + .search(collection_id="example-collection", max_items=10) +) +df.show() +``` + +#### Bearer Token 认证 + +Bearer Token 认证常与 OAuth2 token 或 JWT token 一起使用。注意:某些服务可能仅支持特定的鉴权方式。 + +```python +from sedona.spark.stac import Client + +# 使用 bearer token +client = Client.open("https://api.example.com/stac/v1") +client.with_bearer_token("your_access_token_here") + +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 链式调用 +df = ( + Client.open("https://api.example.com/stac/v1") + .with_bearer_token("your_token") + .search(collection_id="example-collection", max_items=10) +) +df.show() +``` + +#### 自定义 HTTP 头 + +也可以在创建客户端时直接传入自定义 HTTP 头,适用于鉴权方式较为特殊的服务。 + +```python +from sedona.spark.stac import Client + +# 使用自定义 HTTP 头 +headers = {"Authorization": "Bearer your_token_here", "X-Custom-Header": "custom_value"} +client = Client.open("https://api.example.com/stac/v1", headers=headers) + +df = client.search(collection_id="example-collection", max_items=10) +df.show() +``` + +#### 在 Scala DataSource 中使用鉴权 + +直接在 Scala 或 Spark SQL 中使用 STAC 数据源时,可以把鉴权 HTTP 头作为 JSON 编码的 option 传入: + +```python +import json +from pyspark.sql import SparkSession + +# 准备鉴权 HTTP 头 +headers = {"Authorization": "Basic "} +headers_json = json.dumps(headers) + +# 在鉴权状态下加载 STAC 数据 +df = ( + spark.read.format("stac") + .option("headers", headers_json) + .load("https://api.example.com/stac/v1/collections/example-collection") +) + +df.show() +``` + +```scala +// Scala 示例 +val headersJson = """{"Authorization":"Basic "}""" + +val df = sparkSession.read + .format("stac") + .option("headers", headersJson) + .load("https://api.example.com/stac/v1/collections/example-collection") + +df.show() +``` + +#### 注意事项 + +- **鉴权方式互斥**:设置新的鉴权方式会覆盖此前的 Authorization 头,但不会影响其他自定义 HTTP 头。 +- **HTTP 头会向下传播**:在 Client 上设置的 HTTP 头会自动应用到后续的 collection 与 item 请求。 +- **不同服务有不同的要求**:不同 STAC 服务可能要求不同的鉴权方式。例如 Planet Labs 在访问 collection 时需要 Basic 认证,而非 Bearer Token。 +- **向后兼容**:所有鉴权参数都是可选的。原本访问无需鉴权的公开 STAC 服务的代码不受影响,可继续使用。 + +### 方法 + +**`open(url: str, headers: Optional[dict] = None) -> Client`** +打开到指定 STAC API URL 的连接。 + +参数: + +* `url` (*str*):要连接的 STAC API URL。例如:`"https://planetarycomputer.microsoft.com/api/stac/v1"` +* `headers` (*Optional[dict]*):可选的 HTTP 头字典,用于鉴权或自定义头。例如:`{"Authorization": "Bearer token123"}` + +返回: + +* `Client`:连接到指定 URL 的 `Client` 实例。 + +--- + +**`with_basic_auth(username: str, password: str) -> Client`** +为客户端启用 HTTP Basic 认证。 + +该方法会将用户名与密码进行 Base64 编码,并添加相应的 Authorization 头。 + +参数: + +* `username` (*str*):用户名。对于 API key,此项通常就是 API key 本身。例如 `"your_api_key"`。 +* `password` (*str*):密码。对于 API key 通常留空。例如 `""`。 + +返回: + +* `Client`:返回自身以便链式调用。 + +--- + +**`with_bearer_token(token: str) -> Client`** +为客户端启用 Bearer Token 认证。 + +该方法会添加 Bearer Token 形式的 Authorization 头,常用于 OAuth2 与 API token 场景。 + +参数: + +* `token` (*str*):用于鉴权的 bearer token。例如 `"your_access_token_here"`。 + +返回: + +* `Client`:返回自身以便链式调用。 + +--- + +**`get_collection(collection_id: str) -> CollectionClient`** +按 collection ID 获取对应的 CollectionClient。 + +参数: + +* `collection_id` (*str*):collection 的 ID。例如 `"aster-l1t"`。 + +返回: + +* `CollectionClient`:对应 collection 的 `CollectionClient` 实例。 + +--- + +**`search(*ids: Union[str, list], collection_id: str, bbox: Optional[list] = None, datetime: Optional[Union[str, datetime.datetime, list]] = None, max_items: Optional[int] = None, return_dataframe: bool = True) -> Union[Iterator[PyStacItem], DataFrame]`** +在指定 collection 中按可选过滤条件搜索 item。 + +参数: + +* `ids` (*Union[str, list]*):可变数量的 item ID,用于过滤。例如 `"item_id1"` 或 `["item_id1", "item_id2"]`。 +* `collection_id` (*str*):要搜索的 collection ID。例如 `"aster-l1t"`。 +* `bbox` (*Optional[list]*):用于过滤 item 的边界框列表,每项形如 `[min_lon, min_lat, max_lon, max_lat]`。例如 `[[ -180.0, -90.0, 180.0, 90.0 ]]`。 +* `datetime` (*Optional[Union[str, datetime.datetime, list]]*):单个日期时间、符合 RFC 3339 的时间戳,或一组时间区间。例如 `"2020-01-01T00:00:00Z"`、`datetime.datetime(2020, 1, 1)`、`[["2020-01-01T00:00:00Z", "2021-01-01T00:00:00Z"]]`。 +* `max_items` (*Optional[int]*):返回 item 的最大数量。例如 `100`。 +* `return_dataframe` (*bool*):若为 `True`(默认),返回 Spark DataFrame,而不是 `PyStacItem` 迭代器。例如 `True`。 + +返回: + +* *Union[Iterator[PyStacItem], DataFrame]*:与过滤条件匹配的 `PyStacItem` 迭代器或 Spark DataFrame。 + +## 参考 + +- STAC 规范:https://stacspec.org/ + +- STAC Browser:https://github.com/radiantearth/stac-browser diff --git a/docs/tutorial/flink/sql.zh.md b/docs/tutorial/flink/sql.zh.md new file mode 100644 index 00000000000..5d7276c507e --- /dev/null +++ b/docs/tutorial/flink/sql.zh.md @@ -0,0 +1,511 @@ + + +本页介绍如何使用 SedonaSQL 管理空间数据。==示例代码以 Java 编写,但同样适用于 Scala==。 + +SedonaSQL 支持 SQL/MM Part3 空间 SQL 标准,提供四类 SQL 算子(如下文所示),所有算子都可以直接通过以下方式调用: + +```java +Table myTable = tableEnv.sqlQuery("YOUR_SQL") +``` + +SedonaSQL 详细的 API 说明请参阅 [SedonaSQL API](../../api/flink/Overview.md)。 + +## 配置依赖 + +1. 阅读 [Sedona Maven Central 坐标](../../setup/maven-coordinates.md) +2. 在 build.sbt 或 pom.xml 中添加 Sedona 依赖 +3. 在 build.sbt 或 pom.xml 中添加 [Flink 依赖](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/) +4. 参考 [SQL 示例项目](../demo.md) + +## 初始化 Stream Environment + +在程序起始处使用以下代码初始化 `StreamExecutionEnvironment`: + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); +``` + +## 初始化 SedonaContext + +在创建好 `StreamExecutionEnvironment` 与 `StreamTableEnvironment` 后,添加以下代码: + +==Sedona >= 1.4.1== + +```java +StreamTableEnvironment sedona = SedonaContext.create(env, tableEnv); +``` + +==Sedona <1.4.1== + +下面这种方式自 Sedona 1.4.1 起已弃用,请改用上面的方式创建 SedonaContext。 + +```java +SedonaFlinkRegistrator.registerType(env); +SedonaFlinkRegistrator.registerFunc(tableEnv); +``` + +!!!warning + Sedona 内置了一套高效的几何与索引序列化器,忘记启用它们会导致内存占用过高。 + +该函数会注册 Sedona 的用户自定义类型与用户自定义函数。 + +## 创建 Geometry 类型列 + +SedonaSQL 中所有几何运算都作用在 Geometry 类型对象上。因此在执行任何查询之前,需要先在 DataFrame 上构造一列 Geometry 类型列。 + +假设有一张如下的 Flink Table `tbl`: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_polygon | name_polygon | ++----+--------------------------------+--------------------------------+ +| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 | +| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 | +| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 | +| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 | +| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 | +| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 | +| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 | +| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 | +| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 | +| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 | ++----+--------------------------------+--------------------------------+ +10 rows in set +``` + +可以这样创建一张带有 Geometry 类型列的 Table: + +```java +sedona.createTemporaryView("myTable", tbl) +Table geomTbl = sedona.sqlQuery("SELECT ST_GeomFromWKT(geom_polygon) as geom_polygon, name_polygon FROM myTable") +geomTbl.execute().print() +``` + +输出如下: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_polygon | name_polygon | ++----+--------------------------------+--------------------------------+ +| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 | +| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 | +| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 | +| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 | +| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 | +| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 | +| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 | +| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 | +| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 | +| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 | ++----+--------------------------------+--------------------------------+ +10 rows in set +``` + +虽然外观与输入一致,但 `geom_polygon` 列的类型已经变为 ==Geometry==。 + +可以打印 schema 进行验证: + +```java +geomTbl.printSchema() +``` + +输出: + +``` +( + `geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'), + `name_polygon` STRING +) +``` + +!!!note + SedonaSQL 提供了大量构造 Geometry 列的函数,详见 [SedonaSQL 构造器 API](../../api/flink/Geometry-Functions.md)。 + +## 转换坐标参考系 + +Sedona 不会自动管理一列 Geometry 中所有几何对象的坐标单位(基于度还是基于米)。SedonaSQL 中所有相关距离的单位与 Geometry 列中几何对象的单位保持一致。 + +如需对前面创建的 Geometry 列进行 CRS 转换,可使用以下代码: + +```java +Table geomTbl3857 = sedona.sqlQuery("SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable") +geomTbl3857.execute().print() +``` + +`ST_Transform` 第一个 EPSG 代码 EPSG:4326 是源 CRS——也就是最常见的基于度的 CRS(WGS84)。 + +`ST_Transform` 第二个 EPSG 代码 EPSG:3857 是目标 CRS——最常见的基于米的 CRS。 + +`ST_Transform` 会把这些几何对象的 CRS 从 EPSG:4326 转换到 EPSG:3857。详细的 CRS 信息可以在 [EPSG.io](https://epsg.io/) 找到。 + +!!!note + 了解不同的空间查询谓词请阅读 [SedonaSQL ST_Transform API](../../api/flink/Spatial-Reference-System/ST_Transform.md)。 + +例如,存储美国坐标的 Table 转换前后会变为如下样子: + +转换前: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_point | name_point | ++----+--------------------------------+--------------------------------+ +| +I | POINT (32 -118) | point | +| +I | POINT (33 -117) | point | +| +I | POINT (34 -116) | point | +| +I | POINT (35 -115) | point | +| +I | POINT (36 -114) | point | +| +I | POINT (37 -113) | point | +| +I | POINT (38 -112) | point | +| +I | POINT (39 -111) | point | +| +I | POINT (40 -110) | point | +| +I | POINT (41 -109) | point | ++----+--------------------------------+--------------------------------+ +``` + +转换后: + +``` ++----+--------------------------------+--------------------------------+ +| op | _c0 | name_point | ++----+--------------------------------+--------------------------------+ +| +I | POINT (-13135699.91360628 3... | point | +| +I | POINT (-13024380.422813008 ... | point | +| +I | POINT (-12913060.932019735 ... | point | +| +I | POINT (-12801741.44122646 4... | point | +| +I | POINT (-12690421.950433187 ... | point | +| +I | POINT (-12579102.459639912 ... | point | +| +I | POINT (-12467782.96884664 4... | point | +| +I | POINT (-12356463.478053367 ... | point | +| +I | POINT (-12245143.987260092 ... | point | +| +I | POINT (-12133824.496466817 ... | point | ++----+--------------------------------+--------------------------------+ +``` + +构造完 Geometry 类型列后,即可执行各类空间查询。 + +## 范围查询 + +使用 ==ST_Contains==、==ST_Intersects== 等谓词可以在单列上执行范围查询。 + +下例查找位于给定多边形内的所有 county: + +```java +geomTable = sedona.sqlQuery( + " + SELECT * + FROM spatialdf + WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) + ") +geomTable.execute().print() +``` + +!!!note + 了解不同的空间查询谓词请阅读 [SedonaSQL Predicate API](../../api/flink/Geometry-Functions.md#predicates)。 + +## KNN 查询 + +使用 ==ST_Distance== 计算距离并排序。 + +下面的代码返回距离给定多边形最近的 5 个对象: + +```java +geomTable = sedona.sqlQuery( + " + SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance + FROM geomTable + ORDER BY distance DESC + LIMIT 5 + ") +geomTable.execute().print() +``` + +## 连接查询 + +下面的等值连接借助 Flink 内置的 equi-join 算法。可以选择跳过 Sedona 的 refinement 步骤来牺牲一部分查询精度。完整示例可参考 [SQL 示例项目](../demo.md)。 + +操作步骤如下: + +### 1. 为两张表生成 S2 ID + +使用 [ST_S2CellIds](../../api/flink/Spatial-Indexing/ST_S2CellIDs.md) 为每个几何对象生成 cell ID,单个几何对象可能产生多个 ID。 + +```sql +SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray +FROM lefts +``` + +```sql +SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray +FROM rights +``` + +### 2. 展开 ID 数组 + +S2 cell ID 是整数数组,需要把它们展开成多行,以便后续按 ID 进行连接。 + +``` +SELECT id, geom, name, cellId +FROM lefts CROSS JOIN UNNEST(lefts.idarray) AS tmpTbl1(cellId) +``` + +``` +SELECT id, geom, name, cellId +FROM rights CROSS JOIN UNNEST(rights.idarray) AS tmpTbl2(cellId) +``` + +### 3. 执行等值连接 + +按 S2 cellId 对两张表做等值连接: + +```sql +SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name +FROM lcs JOIN rcs ON lcs.cellId = rcs.cellId +``` + +### 4. 可选:refine 结果 + +由于 S2 cellId 的特性,连接结果可能存在少量假阳性,具体程度取决于 S2 level 的选择:level 越小,cell 越大,展开行数越少,但假阳性越多。 + +为了保证正确性,可使用任一 [空间谓词](../../api/sql/Geometry-Functions.md#predicates) 过滤掉假阳性,将下面的查询替换步骤 3 中的查询: + +```sql +SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name +FROM lcs, rcs +WHERE lcs.cellId = rcs.cellId AND ST_Contains(lcs.geom, rcs.geom) +``` + +可以看到,这里相比步骤 2 的查询多加了一个 `ST_Contains` 过滤来剔除假阳性,也可以使用 `ST_Intersects` 等其他谓词。 + +!!!tip + 如果不要求 100% 的准确度且希望更快的查询速度,可以跳过此步骤。 + +### 5. 可选:去重 + +由于生成 S2 cellId 时使用了展开操作,结果 DataFrame 中可能出现 `` 重复匹配。可以通过 GroupBy 查询去重: + +```sql +SELECT lcs_id, rcs_id, FIRST_VALUE(lcs_geom), FIRST_VALUE(lcs_name), first(rcs_geom), first(rcs_name) +FROM joinresult +GROUP BY (lcs_id, rcs_id) +``` + +`FIRST_VALUE` 函数用于在多个重复值中取第一个。 + +如果没有为每个几何对象提供唯一 ID,也可以直接按几何分组: + +```sql +SELECT lcs_geom, rcs_geom, first(lcs_name), first(rcs_name) +FROM joinresult +GROUP BY (lcs_geom, rcs_geom) +``` + +!!!note + 做点-多边形 join 时不会出现该问题,可以放心忽略;只有 polygon-polygon、polygon-linestring、linestring-linestring 等连接才会遇到此问题。 + +### 距离连接中的 S2 用法 + +S2 同样适用于距离连接。先使用 `ST_Buffer(geometry, distance)` 把其中一列原始几何对象包裹起来。如果原始几何对象是点,`ST_Buffer` 会把它们变成半径为 `distance` 的圆。 + +例如,在步骤 1 之前,先在左表上运行: + +```sql +SELECT id, ST_Buffer(geom, DISTANCE), name +FROM lefts +``` + +由于坐标位于经纬度系统中,`distance` 的单位应为度而不是米或英里。可以根据实际米值估算对应的度数,可借助 [此换算工具](https://lucidar.me/en/online-unit-converter-length-to-angle/convert-degrees-to-meters/#online-converter)。 + +## 把空间 Table 转换为空间 DataStream + +### 获取 DataStream + +使用 TableEnv 的 `toDataStream` 函数: + +```java +DataStream geomStream = sedona.toDataStream(geomTable) +``` + +### 取出 Geometry + +接着用 Map 从每个 Row 对象中取出 Geometry: + +```java +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = geomStream.map(new MapFunction() { + @Override + public Geometry map(Row value) throws Exception { + return (Geometry) value.getField(0); + } + }); +geometries.print(); +``` + +输出如下: + +``` +14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5)) +2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5)) +5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5)) +16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5)) +12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5)) +13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5)) +15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5)) +3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5)) +1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5)) +4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5)) +``` + +### 在 Geometry 中保存非空间属性 + +可以把其他非空间属性拼接到 Geometry 的 `userData` 字段中保存,便于后续恢复。`userData` 字段可以是任意对象类型。 + +```java +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = geomStream.map(new MapFunction() { + @Override + public Geometry map(Row value) throws Exception { + Geometry geom = (Geometry) value.getField(0); + geom.setUserData(value.getField(1)); + return geom; + } + }); +geometries.print(); +``` + +`print` 命令不会输出 `userData` 字段。可以这样获取: + +```java +import org.locationtech.jts.geom.Geometry; + +geometries.map(new MapFunction() { + @Override + public String map(Geometry value) throws Exception + { + return (String) value.getUserData(); + } + }).print(); +``` + +输出如下: + +``` +13> polygon9 +6> polygon2 +10> polygon6 +11> polygon7 +5> polygon1 +12> polygon8 +8> polygon4 +4> polygon0 +7> polygon3 +9> polygon5 +``` + +## 把空间 DataStream 转换为空间 Table + +### 使用 Sedona 的 FormatUtils 创建 Geometry + +* 从 WKT 字符串创建 Geometry + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false); + return formatUtils.readGeometry(value); + } + }) +``` + +* 从字符串 `1.1, 2.2` 创建 Point,使用 `,` 作为分隔符。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(",", false, GeometryType.POINT); + return formatUtils.readGeometry(value); + } + }) +``` + +* 从字符串 `1.1, 1.1, 10.1, 10.1` 创建 Polygon,表示以 (1.1, 1.1) 与 (10.1, 10.1) 为最小/最大角点的矩形。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + // 在此实现 minX、minY、maxX、maxY 四个 double 值的解析逻辑 + ... + Coordinate[] coordinates = new Coordinate[5]; + coordinates[0] = new Coordinate(minX, minY); + coordinates[1] = new Coordinate(minX, maxY); + coordinates[2] = new Coordinate(maxX, maxY); + coordinates[3] = new Coordinate(maxX, minY); + coordinates[4] = coordinates[0]; + GeometryFactory geometryFactory = new GeometryFactory(); + return geometryFactory.createPolygon(coordinates); + } + }) +``` + +### 创建 Row 对象 + +把 Geometry 包进 Flink Row 中放入 `geomStream`。也可以在 Row 中放入其他属性。本例为所有几何对象赋了同一个常量 `myName`。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; +import org.apache.flink.types.Row; + +DataStream geomStream = text.map(new MapFunction() { + @Override + public Row map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false); + return Row.of(formatUtils.readGeometry(value), "myName"); + } + }) +``` + +### 得到空间 Table + +使用 TableEnv 的 `fromDataStream` 函数,并指定列名 `geom` 与 `geom_name`: + +```java +Table geomTable = sedona.fromDataStream(geomStream, "geom", "geom_name") +``` diff --git a/docs/tutorial/geopandas-api.zh.md b/docs/tutorial/geopandas-api.zh.md new file mode 100644 index 00000000000..fe6b3568613 --- /dev/null +++ b/docs/tutorial/geopandas-api.zh.md @@ -0,0 +1,398 @@ + + +# Apache Sedona 上的 GeoPandas API + +Apache Sedona 上的 GeoPandas API 提供了与 GeoPandas 一致的接口,可以让您的地理空间分析突破单机的局限。该 API 把熟悉的 GeoPandas DataFrame 语法与 Apache Sedona 在 Apache Spark 上的分布式处理能力结合起来,让您能够使用同样的代码模式处理行星级(planetary-scale)的数据集。 + +## 概览 + +### 什么是 Apache Sedona 的 GeoPandas API? + +Apache Sedona 的 GeoPandas API 是一层兼容层,让您能够在分布式地理空间数据上使用 GeoPandas 风格的操作。您原有的 GeoPandas 代码不再受限于单机处理能力,可以充分利用 Apache Spark 集群进行大规模地理空间分析。 + +### 主要优势 + +- **熟悉的 API**:使用您已经熟悉的 GeoPandas 语法与方法 +- **分布式处理**:突破单机限制,处理大规模数据集 +- **惰性求值**:受益于 Apache Sedona 的查询优化与延迟执行 +- **高性能**:利用分布式计算高效完成复杂的地理空间运算 +- **平滑迁移**:以最少的代码改动迁移现有 GeoPandas 工作流 + +## 准备工作 + +Apache Sedona 的 GeoPandas API 通过 PySpark 的 pandas-on-Spark 集成自动管理 SparkSession。准备方式有两种: + +### 方式 1:自动创建 SparkSession(推荐) + +GeoPandas API 会自动使用 PySpark 的默认 SparkSession: + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet + +# 无需显式配置 SparkSession,使用默认会话 +# API 会自动初始化 Sedona context +``` + +### 方式 2:手动配置 SparkSession + +如果需要自定义 SparkSession,或需要在某些环境下显式控制: + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet +from sedona.spark import SedonaContext + +# 创建并配置 SparkSession +config = SedonaContext.builder().getOrCreate() +sedona = SedonaContext.create(config) + +# GeoPandas API 会使用上面配置好的会话 +``` + +### 方式 3:复用现有 SparkSession + +如果已有 SparkSession(如 Databricks、EMR 等托管环境): + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet +from sedona.spark import SedonaContext + +# 复用现有 SparkSession(如 Databricks 中的 `spark`) +sedona = SedonaContext.create(spark) # `spark` 即已有会话 +``` + +### SparkSession 是如何被管理的 + +GeoPandas API 借助 PySpark 的 pandas-on-Spark 自动管理 SparkSession 生命周期: + +1. **默认会话**:导入 `sedona.spark.geopandas` 时,会自动通过 `pyspark.pandas.utils.default_session()` 获取 PySpark 的默认会话。 + +2. **自动注册 Sedona 函数**:必要时 API 会自动把 Sedona 的空间函数与优化注册到 SparkSession。 + +3. **透明集成**:所有 GeoPandas 操作在底层都被翻译为 Spark SQL 操作,使用所配置的 SparkSession 执行。 + +4. **无需手动管理 context**:与传统的 Sedona 用法不同,您通常不需要显式调用 `SedonaContext.create()`,除非有自定义配置需求。 + +这种设计让 API 更加易用,把 SparkSession 管理的复杂度隐藏起来,同时仍能充分发挥分布式处理的能力。 + +### S3 配置 + +在使用 S3 数据时,GeoPandas API 使用 Spark 内置的 S3 支持,而不是 s3fs 等外部库。可通过 Spark 配置开启对公开 S3 桶的匿名访问: + +```python +from sedona.spark import SedonaContext + +# 公开 S3 桶的匿名访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.bucket-name.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) +``` + +需要鉴权访问 S3 时,请使用合适的 AWS 凭据提供者: + +```python +# 使用 IAM 角色(在 EC2/EMR 上推荐) +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.InstanceProfileCredentialsProvider", + ) + .getOrCreate() +) + +# 使用 access key(不推荐用于生产环境) +config = ( + SedonaContext.builder() + .config("spark.hadoop.fs.s3a.access.key", "your-access-key") + .config("spark.hadoop.fs.s3a.secret.key", "your-secret-key") + .getOrCreate() +) +``` + +## 基本用法 + +### 引入 API + +不再直接导入 GeoPandas,而是从 Sedona 的 GeoPandas 模块导入: + +```python +# 传统的 GeoPandas 导入 +# import geopandas as gpd + +# Sedona GeoPandas API 的导入 +import sedona.spark.geopandas as gpd + +# 或 +from sedona.spark.geopandas import GeoDataFrame, read_parquet +``` + +### 读取数据 + +API 支持读取多种地理空间格式,包括来自云存储的 Parquet 文件。要以匿名凭据访问 S3,请配置 Spark 使用匿名 AWS 凭据: + +```python +from sedona.spark import SedonaContext + +# 配置 Spark 进行匿名 S3 访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) + +# 直接从 S3 加载 GeoParquet 文件 +s3_path = "s3://wherobots-examples/data/onboarding_1/nyc_buildings.parquet" +nyc_buildings = gpd.read_parquet(s3_path) + +# 显示基本信息 +print(f"Dataset shape: {nyc_buildings.shape}") +print(f"Columns: {nyc_buildings.columns.tolist()}") +nyc_buildings.head() +``` + +### 空间过滤 + +使用空间索引与过滤方法。注意:当前版本尚未实现 `cx` 空间索引: + +```python +from shapely.geometry import box + +# 中央公园的边界框 +central_park_bbox = box( + -73.973, + 40.764, # 左下角(经度,纬度) + -73.951, + 40.789, # 右上角(经度,纬度) +) + +# 使用空间索引筛选边界框内的建筑 +# 注意:该写法需要把数据收集到 driver 才能进行空间过滤 +# 对于大规模数据,建议改用空间连接(spatial join) +buildings_sample = nyc_buildings.sample(1000) # 演示用:抽样 1000 行 +central_park_buildings = buildings_sample[ + buildings_sample.geometry.intersects(central_park_bbox) +] + +# 显示结果 +print( + central_park_buildings[["BUILD_ID", "PROP_ADDR", "height_val", "geometry"]].head() +) +``` + +**针对大规模数据的替代方案——使用空间连接:** + +```python +# 用边界框创建一个 GeoDataFrame +bbox_gdf = gpd.GeoDataFrame({"id": [1]}, geometry=[central_park_bbox], crs="EPSG:4326") + +# 使用空间连接筛选边界框内的建筑 +central_park_buildings = nyc_buildings.sjoin(bbox_gdf, predicate="intersects") +``` + +## 高级操作 + +### 空间连接 + +可使用与 GeoPandas 相同的语法执行空间连接: + +```python +# 加载两份数据集 +left_df = gpd.read_parquet("s3://bucket/left_data.parquet") +right_df = gpd.read_parquet("s3://bucket/right_data.parquet") + +# 使用 distance 谓词的空间连接 +result = left_df.sjoin(right_df, predicate="dwithin", distance=50) + +# 其他空间谓词 +intersects_result = left_df.sjoin(right_df, predicate="intersects") +contains_result = left_df.sjoin(right_df, predicate="contains") +``` + +### 坐标参考系操作 + +在不同坐标参考系(CRS)之间转换几何对象: + +```python +# 设置初始 CRS +buildings = gpd.read_parquet("buildings.parquet") +buildings = buildings.set_crs("EPSG:4326") + +# 转换为投影坐标系以便计算面积 +buildings_projected = buildings.to_crs("EPSG:3857") + +# 计算面积 +buildings_projected["area"] = buildings_projected.geometry.area +``` + +### 几何运算 + +应用几何变换与分析: + +```python +# 缓冲区操作 +buffered = buildings.geometry.buffer(100) # 100 米缓冲 + +# 几何属性 +buildings["is_valid"] = buildings.geometry.is_valid +buildings["is_simple"] = buildings.geometry.is_simple +buildings["bounds"] = buildings.geometry.bounds + +# 距离计算 +from shapely.geometry import Point + +reference_point = Point(-73.9857, 40.7484) # 时代广场 +buildings["distance_to_times_square"] = buildings.geometry.distance(reference_point) + +# 面积与周长(需要使用投影 CRS) +buildings_projected = buildings.to_crs("EPSG:3857") # Web Mercator +buildings_projected["area"] = buildings_projected.geometry.area +buildings_projected["perimeter"] = buildings_projected.geometry.length +``` + +## 性能考虑 + +### 何时仍使用传统 GeoPandas: + +- 小数据集(< 1GB) +- 对本地数据的简单操作 +- 需要完整的功能覆盖 +- 单机处理已经足够 + +### 何时使用 Apache Sedona 的 GeoPandas API: + +- 大规模数据集(> 1GB) +- 复杂的地理空间分析 +- 需要分布式处理 +- 数据保存在云存储(S3、HDFS 等) + +## 已支持的操作 + +Apache Sedona 的 GeoPandas API 已实现 **39 个 GeoSeries 函数** 与 **10 个 GeoDataFrame 函数**,覆盖了 GeoPandas 中最常用的操作: + +### 数据 I/O + +- `read_parquet()` —— 读取 GeoParquet 文件 +- `read_file()` —— 读取多种地理空间格式 +- `to_parquet()` —— 写出为 Parquet 格式 + +### 空间操作 + +- `sjoin()` —— 多种谓词的空间连接 +- `buffer()` —— 几何缓冲 +- `distance()` —— 距离计算 +- `intersects()`、`contains()`、`within()` —— 空间谓词 +- `sindex` —— 空间索引(功能有限) + +### CRS 操作 + +- `set_crs()` —— 设置坐标参考系 +- `to_crs()` —— 在 CRS 之间转换 +- `crs` —— 访问 CRS 信息 + +### 几何属性 + +- `area`、`length`、`bounds` —— 几何度量 +- `is_valid`、`is_simple`、`is_empty` —— 几何校验 +- `centroid`、`envelope`、`boundary` —— 几何属性 +- `x`、`y`、`z`、`has_z` —— 坐标访问 +- `total_bounds`、`estimate_utm_crs` —— 包围盒与 CRS 工具 + +### 空间运算 + +- `buffer()` —— 几何缓冲 +- `distance()` —— 距离计算 +- `intersects()`、`contains()`、`within()` —— 空间谓词 +- `intersection()` —— 几何相交 +- `make_valid()` —— 几何校验与修复 +- `sindex` —— 空间索引(功能有限) + +### 数据转换 + +- `to_geopandas()` —— 转换为传统 GeoPandas +- `to_wkb()`、`to_wkt()` —— 转换为 WKB/WKT +- `from_xy()` —— 通过坐标创建几何 +- `geom_type` —— 获取几何类型 + +## 完整工作流示例 + +```python +import sedona.spark.geopandas as gpd +from sedona.spark import SedonaContext + +# 配置 Spark 进行匿名 S3 访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) + +# 加载数据 +DATA_DIR = "s3://wherobots-examples/data/geopandas_blog/" +overture_size = "1M" +postal_codes_path = DATA_DIR + "postal-code/" +overture_path = DATA_DIR + overture_size + "/" + "overture-buildings/" + +postal_codes = gpd.read_parquet(postal_codes_path) +buildings = gpd.read_parquet(overture_path) + +# 空间分析 +buildings = buildings.set_crs("EPSG:4326") +buildings_projected = buildings.to_crs("EPSG:3857") + +# 计算面积并过滤 +buildings_projected["area"] = buildings_projected.geometry.area +large_buildings = buildings_projected[buildings_projected["area"] > 1000] + +result = large_buildings.sjoin(postal_codes, predicate="intersects") + +# 按邮政编码聚合 +summary = ( + result.groupby("postal_code") + .agg({"area": "sum", "BUILD_ID": "count"}) + .rename(columns={"BUILD_ID": "building_count"}) +) + +print(summary.head()) +``` + +## 资源与贡献 + +完整且最新的 API 文档(包含方法签名、参数与示例)请参阅: + +**📚 [GeoPandas API 文档](https://sedona.apache.org/latest/api/pydocs/sedona.spark.geopandas.html)** + +Apache Sedona 的 GeoPandas API 是一个开源项目,欢迎参与贡献:可在 [GitHub issue tracker](https://github.com/apache/sedona/issues/2230) 报告 bug、提出新需求或贡献代码。更多贡献指南请参阅 [贡献者指南](../community/develop.md)。 diff --git a/docs/tutorial/geopandas-shapely.zh.md b/docs/tutorial/geopandas-shapely.zh.md new file mode 100644 index 00000000000..1442effd259 --- /dev/null +++ b/docs/tutorial/geopandas-shapely.zh.md @@ -0,0 +1,371 @@ + + +# 与 GeoPandas 和 Shapely 配合使用 + +!!! note + Sedona 1.6.0 之前的版本仅支持 Shapely 1.x。如需使用 Shapely 2.x,请使用 1.6.0 及以上的 Sedona。 + + 如果您使用 Sedona < 1.6.0,请安装 GeoPandas <= `0.11.1`,因为 GeoPandas > 0.11.1 会自动安装 Shapely 2.0;如果使用 Shapely,请安装 <= `1.8.5`。 + +## 与 GeoPandas 互通 + +Sedona Python 已实现序列化器与反序列化器,可以将 Sedona Geometry 对象转换为 Shapely 的 BaseGeometry。基于此,您可以用 GeoPandas 从文件加载数据(驱动可参考 Fiona 支持的列表),并基于 GeoDataFrame 创建 Spark DataFrame。 + +### 从 GeoPandas 到 Sedona DataFrame + +使用 GeoPandas 的 `read_file` 方法从 shapefile 加载数据,并基于 GeoDataFrame 创建 Spark DataFrame: + +```python +import geopandas as gpd +from sedona.spark import * + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +gdf = gpd.read_file("gis_osm_pois_free_1.shp") + +sedona.createDataFrame(gdf).show() +``` + +输出如下: + +``` + ++---------+----+-----------+--------------------+--------------------+ +| osm_id|code| fclass| name| geometry| ++---------+----+-----------+--------------------+--------------------+ +| 26860257|2422| camp_site| de Kroon|POINT (15.3393145...| +| 26860294|2406| chalet| Leśne Ustronie|POINT (14.8709625...| +| 29947493|2402| motel| null|POINT (15.0946636...| +| 29947498|2602| atm| null|POINT (15.0732014...| +| 29947499|2401| hotel| null|POINT (15.0696777...| +| 29947505|2401| hotel| null|POINT (15.0155749...| ++---------+----+-----------+--------------------+--------------------+ + +``` + +如需借助 Arrow 优化加快转换速度,可以使用 `create_spatial_dataframe`,它接收 SparkSession 与 GeoDataFrame 作为参数,返回 Sedona DataFrame: + +```python +def create_spatial_dataframe( + spark: SparkSession, gdf: gpd.GeoDataFrame +) -> DataFrame: ... +``` + +- spark:SparkSession +- gdf:gpd.GeoDataFrame +- 返回:DataFrame + +示例: + +```python +from sedona.spark.geoarrow import create_spatial_dataframe + +create_spatial_dataframe(spark, gdf) +``` + +### 从 Sedona DataFrame 到 GeoPandas + +用 Spark 读取数据并转换为 GeoPandas: + +```python +import geopandas as gpd +from sedona.spark import * + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +counties = ( + sedona.read.option("delimiter", "|").option("header", "true").csv("counties.csv") +) + +counties.createOrReplaceTempView("county") + +counties_geom = sedona.sql("SELECT *, st_geomFromWKT(geom) as geometry from county") + +df = counties_geom.toPandas() +gdf = gpd.GeoDataFrame(df, geometry="geometry") + +gdf.plot( + figsize=(10, 8), + column="value", + legend=True, + cmap="YlOrBr", + scheme="quantiles", + edgecolor="lightgray", +) +``` + +
+
+ +![poland_image](https://user-images.githubusercontent.com/22958216/67603296-c08b4680-f778-11e9-8cde-d2e14ffbba3b.png) + +
+
+ +也可以尝试通过 GeoArrow 转换为 GeoPandas,对于大型结果集这种方式可能显著更快(要求 geopandas >= 1.0)。 + +```python +import geopandas as gpd +from sedona.spark import dataframe_to_arrow + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +test_wkt = ["POINT (0 1)", "LINESTRING (0 1, 2 3)"] +df = sedona.createDataFrame(zip(test_wkt), ["wkt"]).selectExpr( + "ST_GeomFromText(wkt) as geom" +) + +gpd.GeoDataFrame.from_arrow(dataframe_to_arrow(df)) +``` + +## 与 Shapely 对象互通 + +### 支持的 Shapely 对象 + +| Shapely 对象 | 是否支持 | +|--------------------|--------------------| +| Point | :heavy_check_mark: | +| MultiPoint | :heavy_check_mark: | +| LineString | :heavy_check_mark: | +| MultiLinestring | :heavy_check_mark: | +| Polygon | :heavy_check_mark: | +| MultiPolygon | :heavy_check_mark: | +| GeometryCollection | :heavy_check_mark: | + +要基于上述几何类型创建 Spark DataFrame,请使用 `sedona.sql.types` 模块下的 GeometryType。该转换支持包含 Shapely 对象的 list 或 tuple。 + +针对包含整型 id 与几何类型的目标表,schema 可以这样定义: + +```python +from pyspark.sql.types import IntegerType, StructField, StructType + +from sedona.spark import * + +schema = StructType( + [ + StructField("id", IntegerType(), False), + StructField("geom", GeometryType(), False), + ] +) +``` + +同样,包含几何类型列的 Spark DataFrame 可以通过 collect 方法转换为 Shapely 对象列表。 + +### Point 示例 + +```python +from shapely.geometry import Point + +data = [[1, Point(21.0, 52.0)], [1, Point(23.0, 42.0)], [1, Point(26.0, 32.0)]] + + +gdf = sedona.createDataFrame(data, schema) + +gdf.show() +``` + +``` ++---+-------------+ +| id| geom| ++---+-------------+ +| 1|POINT (21 52)| +| 1|POINT (23 42)| +| 1|POINT (26 32)| ++---+-------------+ +``` + +```python +gdf.printSchema() +``` + +``` +root + |-- id: integer (nullable = false) + |-- geom: geometry (nullable = false) +``` + +### MultiPoint 示例 + +```python3 +from shapely.geometry import MultiPoint + +data = [[1, MultiPoint([[19.511463, 51.765158], [19.446408, 51.779752]])]] + +gdf = sedona.createDataFrame(data, schema).show(1, False) +``` + +``` + ++---+---------------------------------------------------------+ +|id |geom | ++---+---------------------------------------------------------+ +|1 |MULTIPOINT ((19.511463 51.765158), (19.446408 51.779752))| ++---+---------------------------------------------------------+ + + +``` + +### LineString 示例 + +```python3 +from shapely.geometry import LineString + +line = [(40, 40), (30, 30), (40, 20), (30, 10)] + +data = [[1, LineString(line)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+--------------------------------+ +|id |geom | ++---+--------------------------------+ +|1 |LINESTRING (10 10, 20 20, 10 40)| ++---+--------------------------------+ + +``` + +### MultiLineString 示例 + +```python3 +from shapely.geometry import MultiLineString + +line1 = [(10, 10), (20, 20), (10, 40)] +line2 = [(40, 40), (30, 30), (40, 20), (30, 10)] + +data = [[1, MultiLineString([line1, line2])]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+---------------------------------------------------------------------+ +|id |geom | ++---+---------------------------------------------------------------------+ +|1 |MULTILINESTRING ((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))| ++---+---------------------------------------------------------------------+ + +``` + +### Polygon 示例 + +```python3 +from shapely.geometry import Polygon + +polygon = Polygon( + [ + [19.51121, 51.76426], + [19.51056, 51.76583], + [19.51216, 51.76599], + [19.51280, 51.76448], + [19.51121, 51.76426], + ] +) + +data = [[1, polygon]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+--------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+--------------------------------------------------------------------------------------------------------+ +|1 |POLYGON ((19.51121 51.76426, 19.51056 51.76583, 19.51216 51.76599, 19.5128 51.76448, 19.51121 51.76426))| ++---+--------------------------------------------------------------------------------------------------------+ + +``` + +### MultiPolygon 示例 + +```python3 +from shapely.geometry import MultiPolygon + +exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)] +interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)] + +exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)] + +polygons = [Polygon(exterior_p1, [interior_p1]), Polygon(exterior_p2)] + +data = [[1, MultiPolygon(polygons)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+----------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+----------------------------------------------------------------------------------------------------------+ +|1 |MULTIPOLYGON (((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1.5 1, 1.5 1.5, 1 1.5, 1 1)), ((0 0, 0 1, 1 1, 1 0, 0 0)))| ++---+----------------------------------------------------------------------------------------------------------+ + +``` + +### GeometryCollection 示例 + +```python3 +from shapely.geometry import GeometryCollection, Point, LineString, Polygon + +exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)] +interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)] +exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)] + +geoms = [ + Polygon(exterior_p1, [interior_p1]), + Polygon(exterior_p2), + Point(1, 1), + LineString([(0, 0), (1, 1), (2, 2)]), +] + +data = [[1, GeometryCollection(geoms)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +|1 |GEOMETRYCOLLECTION (POLYGON ((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1 1.5, 1.5 1.5, 1.5 1, 1 1)), POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0)), POINT (1 1), LINESTRING (0 0, 1 1, 2 2))| ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +```