From 7390fd8208cdad17fe5e58dab4f7ff16acd05c17 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Thu, 7 May 2026 23:50:37 -0700 Subject: [PATCH] [GH-2911] Translate Tutorial / Programming Guide (part 2) to Chinese Phase 4b of #2867. Adds Chinese translations for 6 of the remaining 10 tutorial pages. The 4 largest pages (sql, rdd, raster, snowflake/sql; ~3,690 lines combined) are deferred to Phase 4c so each PR stays review-able. Files translated in this PR: - files/: geoparquet-sedona-spark, stac-sedona-spark - geopandas-shapely, geopandas-api - concepts/: spatial-joins - flink/: sql Combined with Phase 4a (#2931), 23 of 27 tutorial pages now have Chinese versions on the docs site. Code blocks, command snippets, version macros, admonitions, and tab attributes are preserved verbatim. Only prose, headings, table headers, and explanatory inline comments are translated. --- docs/tutorial/concepts/spatial-joins.zh.md | 418 ++++++++++++++ .../files/geoparquet-sedona-spark.zh.md | 352 ++++++++++++ docs/tutorial/files/stac-sedona-spark.zh.md | 468 ++++++++++++++++ docs/tutorial/flink/sql.zh.md | 511 ++++++++++++++++++ docs/tutorial/geopandas-api.zh.md | 398 ++++++++++++++ docs/tutorial/geopandas-shapely.zh.md | 371 +++++++++++++ 6 files changed, 2518 insertions(+) create mode 100644 docs/tutorial/concepts/spatial-joins.zh.md create mode 100644 docs/tutorial/files/geoparquet-sedona-spark.zh.md create mode 100644 docs/tutorial/files/stac-sedona-spark.zh.md create mode 100644 docs/tutorial/flink/sql.zh.md create mode 100644 docs/tutorial/geopandas-api.zh.md create mode 100644 docs/tutorial/geopandas-shapely.zh.md diff --git a/docs/tutorial/concepts/spatial-joins.zh.md b/docs/tutorial/concepts/spatial-joins.zh.md new file mode 100644 index 00000000000..ddd53a6f8eb --- /dev/null +++ b/docs/tutorial/concepts/spatial-joins.zh.md @@ -0,0 +1,418 @@ + + +# 在 Spark 上使用 Apache Sedona 进行空间连接 + +本文介绍如何使用 Apache Sedona 进行空间连接(spatial join)。您将了解到不同类型的空间连接以及如何高效执行它们。 + +页面中的示例较为基础,便于直观地说明空间连接的核心概念。文末还会针对真实业务规模的数据集,进一步阐述空间连接概念并强调若干关键的性能改进。 + +## 使用 Spark 进行 within 空间连接 + +下图包含 3 个点和 2 个多边形:`point_b` 位于 `polygon_y` 内部,`point_c` 位于 `polygon_x` 内部,`point_a` 不在任何多边形中。 + +![spatial join within](../../image/tutorial/concepts/spatial-join1.png) + +`points` 表存放点,`polygons` 表存放多边形。 + +执行该查询的 SQL 如下: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +JOIN polygons ON ST_Within(points.geometry, polygons.geometry); +``` + +结果如下: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| b| y| +| c| x| ++--------+----------+ +``` + +`point_a` 不在结果 DataFrame 中,因为它不在任何多边形内部。 + +如果使用 `LEFT JOIN`,可以更直观地看到 `point_a` 的 `polygon_id` 为 `NULL`: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +LEFT JOIN polygons ON ST_Within(points.geometry, polygons.geometry); +``` + +输出: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| a| NULL| +| b| y| +| c| x| ++--------+----------+ +``` + +`point_a` 的 `polygon_id` 为 `NULL`,因为它不在任何多边形内。 + +在生产应用中通常使用 `JOIN`。本文使用 `LEFT JOIN` 仅是为了展示哪些行没有匹配到。 + +上面的代码使用了 `ST_Within` 谓词,它与 `ST_Contains` 含义相同,只是参数顺序相反。下面是用 `ST_Contains` 写的等价查询: + +```sql +SELECT + points.id as point_id, + polygons.id as polygon_id +FROM points +LEFT JOIN polygons ON ST_Contains(polygons.geometry, points.geometry); +``` + +结果完全相同: + +``` ++--------+----------+ +|point_id|polygon_id| ++--------+----------+ +| a| NULL| +| b| y| +| c| x| ++--------+----------+ +``` + +## 使用 Spark 进行 crosses 空间连接 + +下图包含 1 个多边形和 2 条折线:`line_a` 与 `line_b` 都穿过 `polygon_x`,`line_c` 没有穿过 `polygon_x`。 + +![spatial join crosses](../../image/tutorial/concepts/spatial-join2.png) + +执行该空间连接的 SQL: + +```sql +SELECT + lines.id as line_id, + polygons.id as polygon_id +FROM lines +LEFT JOIN polygons ON ST_Crosses(lines.geometry, polygons.geometry); +``` + +结果: + +``` ++-------+----------+ +|line_id|polygon_id| ++-------+----------+ +| a| x| +| b| x| +| c| NULL| ++-------+----------+ +``` + +借助 `ST_Crosses` 的空间连接,可以筛选出与多边形相交的折线。 + +## 使用 Spark 进行 touches 空间连接 + +假设有 1 个多边形和 2 条折线:`line_a` 不接触多边形,`line_b` 接触多边形。如下图所示: + +![spatial join touches](../../image/tutorial/concepts/spatial-join3.png) + +我们用折线创建 `table_a`,用多边形创建 `table_b`,然后做连接。 + +多边形表内容如下: + +``` ++---+-----------------------------------+ +|id |geometry | ++---+-----------------------------------+ +|x |POLYGON ((6 2, 6 4, 8 4, 8 2, 6 2))| ++---+-----------------------------------+ +``` + +折线表内容如下: + +``` ++---+----------------------+ +|id |geometry | ++---+----------------------+ +|a |LINESTRING (2 4, 4 0) | +|b |LINESTRING (6 0, 10 4)| ++---+----------------------+ +``` + +匹配相互接触的连接: + +```python +sedona.sql(""" +SELECT + lines.id as line_id, + polygons.id as polygon_id +FROM lines +LEFT JOIN polygons ON ST_Touches(lines.geometry, polygons.geometry); +""").show() +``` + +连接结果: + +``` ++-------+----------+ +|line_id|polygon_id| ++-------+----------+ +| a| NULL| +| b| x| ++-------+----------+ +``` + +接下来看看如何用空间连接判断点是否在多边形内。 + +## 使用 Spark 进行 overlaps 空间连接 + +下图展示了 2 个多边形与若干图形:`polygon_a` 与 `polygon_x` 重叠;`line_b`、`line_c` 与 `point_d` 都不与 `polygon_y` 或 `polygon_x` 重叠。 + +![spatial join overlaps](../../image/tutorial/concepts/spatial-join4.png) + +执行该空间连接的 SQL: + +```sql +SELECT + shapes.id as shape_id, + polygons.id as polygon_id +FROM shapes +LEFT JOIN polygons ON ST_Overlaps(shapes.geometry, polygons.geometry); +``` + +结果: + +``` ++--------+----------+ +|shape_id|polygon_id| ++--------+----------+ +| a| x| +| b| NULL| +| c| NULL| +| d| NULL| ++--------+----------+ +``` + +## 使用 Spark 进行 K 近邻空间连接(KNN spatial join) + +假设有一张地址表与一张咖啡店表,希望为每个地址找到最近的两家咖啡店。 + +`addresses` 表包含 `latitude` 与 `longitude`: + +``` ++---+---------+--------+ +| id|longitude|latitude| ++---+---------+--------+ +| a1| 2.0| 3.0| +| a2| 5.0| 5.0| +| a3| 7.0| 2.0| ++---+---------+--------+ +``` + +`coffee_shops` 表同样包含 `latitude` 与 `longitude`: + +``` ++---+---------+--------+ +| id|longitude|latitude| ++---+---------+--------+ +| c1| 1.0| 4.0| +| c2| 3.0| 5.0| +| c3| 5.0| 1.0| +| c4| 8.0| 4.0| ++---+---------+--------+ +``` + +为每个地址计算最近的两家咖啡店: + +```sql +SELECT + addresses.id AS address_id, + coffee_shops.id AS coffee_shop_id +FROM addresses +JOIN coffee_shops +ON ST_KNN(addresses.geometry, coffee_shops.geometry, 2) +``` + +结果: + +``` ++----------+--------------+ +|address_id|coffee_shop_id| ++----------+--------------+ +| a1| c1| +| a1| c2| +| a2| c2| +| a2| c4| +| a3| c3| +| a3| c4| ++----------+--------------+ +``` + +可视化结果如下: + +![spatial join knn](../../image/tutorial/concepts/spatial-join5.png) + +可以一目了然地看出每个地址附近的两家咖啡店。 + +## 使用 Spark 进行空间距离连接 + +下图展示了 1 个点与若干公交站点。我们要找出与该点距离不超过 2.5 单位的所有公交站点。 + +![spatial distance join](../../image/tutorial/concepts/spatial-join6.png) + +可以看出 `t2` 与 `t3` 在 2.5 单位范围内。 + +points 表如下: + +``` ++---+-------------+ +| id| geometry| ++---+-------------+ +| p1|POINT (4.5 3)| ++---+-------------+ +``` + +transit 表如下: + +``` ++---+-----------+ +| id| geometry| ++---+-----------+ +| t1|POINT (1 4)| +| t2|POINT (3 4)| +| t3|POINT (5 2)| +| t4|POINT (8 4)| ++---+-----------+ +``` + +执行距离连接,找出与该点距离 ≤ 2.5 的公交站点: + +```sql +SELECT + points.id AS point_id, + transit.id AS transit_id +FROM points +JOIN transit +ON ST_DWithin(points.geometry, transit.geometry, 2.5) +``` + +结果: + +``` ++--------+----------+ +|point_id|transit_id| ++--------+----------+ +| p1| t2| +| p1| t3| ++--------+----------+ +``` + +Sedona 非常适合查找距离某点一定范围内的位置。 + +Sedona 使用的是欧氏距离,因此距离单位与原始坐标的 CRS 相同。如果要在 WGS84 坐标上以米为单位运算,请使用 `ST_DistanceSphere`、`ST_DistanceSpheroid` 或 `ST_DWithin(useSpheroid = true)`。 + +## 使用 Spark 进行空间范围连接 + +由 `ST_Intersects`、`ST_Contains`、`ST_Within`、`ST_DWithin`、`ST_Touches`、`ST_Crosses` 触发的连接都属于范围连接(range join)。本节再展示一个范围连接示例,其实前面已经介绍过多个范围连接。 + +假设有一张城市表与一张餐厅表,希望找出某城市内的所有餐厅。示例数据如下图: + +![spatial range join](../../image/tutorial/concepts/spatial-join7.png) + +3 家餐厅位于城市边界内,1 家餐厅在城市外。 + +cities 表: + +``` ++-----+--------------------+ +| id| geometry| ++-----+--------------------+ +|city1|POLYGON ((1 1, 1 ...| ++-----+--------------------+ +``` + +`restaurants` 表: + +``` ++---+-----------+ +| id| geometry| ++---+-----------+ +| r1|POINT (2 2)| +| r2|POINT (3 3)| +| r3|POINT (4 4)| +| r4|POINT (6 6)| ++---+-----------+ +``` + +执行范围连接: + +``` +SELECT + cities.id AS city_id, + restaurants.id AS restaurant_id +FROM cities +JOIN restaurants +ON ST_Intersects(restaurants.geometry, cities.geometry) +``` + +结果: + +``` ++-------+-------------+ +|city_id|restaurant_id| ++-------+-------------+ +| city1| r1| +| city1| r2| +| city1| r3| ++-------+-------------+ +``` + +范围连接在很多实际场景中都非常有用。 + +## Sedona 与 Apache Spark 上的空间连接优化 + +可以通过更优的文件格式、索引、查询写法等方式优化空间连接。 + +例如,假设您正在对存储为 GeoJSON 的两张宽表执行空间连接,且不需要全部输出列。GeoJSON 是行式存储,不支持列裁剪。把数据从 GeoJSON 切换到 GeoParquet 等列式格式即可享受列裁剪带来的关键性能收益。 + +更多查询优化方法请参阅 [此处](https://sedona.apache.org/latest/api/sql/Optimizer/)。 + +## 空间广播连接 + +Sedona 既能在单机运行,也能在多节点集群上运行。 + +可以想象,当数据分布在集群的不同机器上时,连接两个数据集会更慢——因为需要 shuffle,开销不小。 + +如果其中一张表很小,可以将其广播到集群中的所有机器,从而大幅提升连接速度。 + +通常只有相对较小的 DataFrame 才适合广播,详见 [此处说明](https://sedona.apache.org/latest/api/sql/Optimizer/?h=broadcast#broadcast-index-join)。 + +Sedona 会自动广播大小低于阈值的表,[详细信息见此](https://sedona.apache.org/latest/api/sql/Optimizer/#automatic-broadcast-index-join)。 + +## 结论 + +Apache Sedona 支持多种类型的空间连接。 + +空间连接是 Sedona 引擎的强项之一。其他引擎在执行空间连接时常常受到内存问题的困扰,而 Sedona 能够在海量数据集上完成空间连接。 diff --git a/docs/tutorial/files/geoparquet-sedona-spark.zh.md b/docs/tutorial/files/geoparquet-sedona-spark.zh.md new file mode 100644 index 00000000000..d2a8cec29c8 --- /dev/null +++ b/docs/tutorial/files/geoparquet-sedona-spark.zh.md @@ -0,0 +1,352 @@ + + +# 在 Spark 上使用 Apache Sedona 处理 GeoParquet + +本页介绍如何使用 Apache Sedona 与 GeoParquet 构建空间数据湖。 + +GeoParquet 在空间数据集场景下具有诸多优势: + +* 内置对几何列的支持 +* GeoParquet 是列式存储,支持列裁剪,仅使用部分列的查询会更快 +* 内嵌统计信息让某些查询能够直接跳过整块文件内容 +* 为每个 row group 存储边界框(“bbox”)元数据,可进行 row-group 级过滤 +* 列式格式利于压缩 +* 与 GeoJSON 或 CSV 不同,GeoParquet 在文件尾部携带 schema,无需推断或手动指定 + +下面看如何把 Sedona DataFrame 写成 GeoParquet 文件。 + +## 使用 Spark 把 Sedona DataFrame 写为 GeoParquet + +先创建一个 Sedona DataFrame: + +```python +df = sedona.createDataFrame( + [ + ("a", "LINESTRING(2.0 5.0,6.0 1.0)"), + ("b", "LINESTRING(7.0 4.0,9.0 2.0)"), + ("c", "LINESTRING(1.0 3.0,3.0 1.0)"), + ], + ["id", "geometry"], +) +df = df.withColumn("geometry", ST_GeomFromText(col("geometry"))) +``` + +将 DataFrame 写成 GeoParquet 文件: + +```python +df.write.format("geoparquet").mode("overwrite").save("/tmp/somewhere") +``` + +写出的文件如下: + +``` +somewhere/ + _SUCCESS + part-00000-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00003-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00007-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet + part-00011-1c13be9e-6d4c-401e-89d8-739000ad3aba-c000.snappy.parquet +``` + +Sedona 会并行写出多个 Parquet 文件,比写单一文件快得多。 + +## 使用 Spark 将 GeoParquet 读入 Sedona DataFrame + +将 GeoParquet 文件读入 Sedona DataFrame: + +```python +df = sedona.read.format("geoparquet").load("/tmp/somewhere") +df.show(truncate=False) +``` + +结果如下: + +``` ++---+---------------------+ +|id |geometry | ++---+---------------------+ +|a |LINESTRING (2 5, 6 1)| +|b |LINESTRING (7 4, 9 2)| +|c |LINESTRING (1 3, 3 1)| ++---+---------------------+ +``` + +Sedona 在底层执行该查询的过程大致如下: + +1. 从 GeoParquet 文件尾部读取 schema,因此无需 schema 推断。 +2. 收集每个文件的 bbox 元数据,判断哪些数据可以被跳过。 +3. 执行查询时跳过未使用的列。 + +得益于此,Sedona 在 GeoParquet 上的查询通常比在 CSV 等格式上更快、更稳定。CSV 既不在文件尾部存 schema,也不支持列裁剪,更没有用于数据跳过的 row-group 元数据。 + +## 检查 GeoParquet 元数据 + +自 v`1.5.1` 起,Sedona 提供了一个 Spark SQL 数据源 `"geoparquet.metadata"`,可用于检查 GeoParquet 元数据。返回的 DataFrame 中包含每个输入文件的 “geo” 元数据。 + +=== "Scala" + + ```scala + val df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Java" + + ```java + Dataset df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Python" + + ```python + df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +输出如下: + +``` +root + |-- path: string (nullable = true) + |-- version: string (nullable = true) + |-- primary_column: string (nullable = true) + |-- columns: map (nullable = true) + | |-- key: string + | |-- value: struct (valueContainsNull = true) + | | |-- encoding: string (nullable = true) + | | |-- geometry_types: array (nullable = true) + | | | |-- element: string (containsNull = true) + | | |-- bbox: array (nullable = true) + | | | |-- element: double (containsNull = true) + | | |-- crs: string (nullable = true) +``` + +GeoParquet 文件尾部为每一列存储以下数据: + +* geometry_type:几何对象的类型,如点、多边形等 +* bbox:文件中对象的边界框 +* crs:坐标参考系 +* covering:包含 xmin、ymin、xmax、ymax 的结构体列,用于 GeoParquet 1.1 文件的 row group 统计信息 + +其他 Parquet 元数据存储在 Parquet 文件尾部,与普通 Parquet 文件保持一致。 + +如果输入的 Parquet 文件没有 GeoParquet 元数据,返回 DataFrame 中的 `version`、`primary_column` 与 `columns` 字段会为 `null`。 + +`geoparquet.metadata` 仅支持读取 GeoParquet 特有的元数据。如果需要读取通用 Parquet 文件的全部元数据,可以使用 [G-Research/spark-extension](https://github.com/G-Research/spark-extension/blob/13109b8e43dfba9272c85896ba5e30cfe280426f/PARQUET.md)。 + +我们来查看一下元数据的内容与 schema: + +``` +df.show(truncate=False) + ++-----------------------------+-------------------------------------------------------------------+ +|path |columns | ++-----------------------------+-------------------------------------------------------------------+ +|file:/part-00003-1c13.parquet|{geometry -> {WKB, [LineString], [2.0, 1.0, 6.0, 5.0], null, NULL}}| +|file:/part-00007-1c13.parquet|{geometry -> {WKB, [LineString], [7.0, 2.0, 9.0, 4.0], null, NULL}}| +|file:/part-00011-1c13.parquet|{geometry -> {WKB, [LineString], [1.0, 1.0, 3.0, 3.0], null, NULL}}| +|file:/part-00000-1c13.parquet|{geometry -> {WKB, [], [0.0, 0.0, 0.0, 0.0], null, NULL}} | ++-----------------------------+-------------------------------------------------------------------+ +``` + +`columns` 列保存了 GeoParquet 数据湖中各文件的边界框信息,可用于跳过整个文件或 row group。下文还会进一步介绍如何利用 bbox 元数据。 + +## 写出带 CRS 元数据的 GeoParquet + +自 v`1.5.1` 起,Sedona 支持以自定义的 GeoParquet 规范版本与 CRS 写出 GeoParquet 文件。默认的 GeoParquet 规范版本为 `1.1.0`(自 `v1.9.0` 起),默认 CRS 为 `null`。可按以下方式指定: + +```scala +val projjson = "{...}" // 所有几何列共享的 PROJJSON 字符串 +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs", projjson) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +如果 GeoParquet 文件中存在多个几何列,可以为每个列分别指定 CRS。例如 `df` 中有两列几何列 `g0` 与 `g1`,希望分别指定它们的 CRS: + +```scala +val projjson_g0 = "{...}" // g0 的 PROJJSON 字符串 +val projjson_g1 = "{...}" // g1 的 PROJJSON 字符串 +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs.g0", projjson_g0) + .option("geoparquet.crs.g1", projjson_g1) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +`geoparquet.crs` 与 `geoparquet.crs.` 可取以下值: + +* `"null"`:显式将 `crs` 字段置为 `null`,这是几何 SRID 为 0 时的默认行为。 +* `""`(空字符串):省略 `crs` 字段。对于支持 CRS 的实现而言,意味着使用 [OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84)。 +* `"{...}"`(PROJJSON 字符串):`crs` 字段会被设为表示该几何坐标参考系的 PROJJSON 对象。可在 https://epsg.io/ (页面底部点击 JSON 选项)查找特定 CRS 的 PROJJSON,也可以根据需要自定义 PROJJSON。 + +### 通过 SRID 自动推断 CRS + +未显式提供 `geoparquet.crs` 选项时,Sedona 会根据几何列的 SRID 自动推导出 CRS PROJJSON。例如,如果某列所有几何对象的 SRID 都是 32632(通过 [`ST_SetSRID`](../../api/sql/Spatial-Reference-System/ST_SetSRID.md) 设置),写入器会自动在 GeoParquet 元数据中生成 EPSG:32632 对应的 PROJJSON。当 SRID 为 4326 时,由于这就是 GeoParquet 的默认 CRS(OGC:CRS84),`crs` 字段会被省略。 + +* 若 SRID 为 0(未显式设置 SRID 的几何对象的默认值),`crs` 字段会被设为 `null`。 +* 若同一列中的几何对象 SRID 不一致,`crs` 字段默认置为 `null`。 +* 若显式提供了 `geoparquet.crs` 或 `geoparquet.crs.` 选项,则始终优先于 SRID 推导出的 CRS。 + +Sedona 的 GeoParquet 读写器**不会**校验坐标轴顺序(lon/lat 还是 lat/lon),假定使用者在读写时自行处理。可以使用 [`ST_FlipCoordinates`](../../api/sql/Geometry-Editors/ST_FlipCoordinates.md) 来交换几何对象的坐标轴顺序。 + +## 写出带 covering 元数据的 GeoParquet + +自 `v1.6.1` 起,Sedona 支持向几何列元数据写入 [`covering` 字段](https://github.com/opengeospatial/geoparquet/blob/v1.1.0/format-specs/geoparquet.md#covering)。该字段指定一列边界框列以加速空间数据检索。该边界框列必须是顶层结构体列,包含 `xmin`、`ymin`、`xmax`、`ymax` 四个子列。如果待写入的 DataFrame 已经包含这样的列,可通过 `.option("geoparquet.covering.", "")` 选项把 `covering` 元数据写入 GeoParquet: + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering.geometry", "bbox") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果 DataFrame 仅有一列几何列,可以省略列名直接使用 `geoparquet.covering`: + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering", "bbox") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果未设置 `geoparquet.covering` 选项,Sedona 在写出 GeoParquet `1.1.0` 时会自动为您填充 covering 元数据。 + +对每个几何列,Sedona 会使用 `_bbox` 作为 covering 列: + +* 如果 `_bbox` 已存在且是合法的 covering 结构体(`xmin`、`ymin`、`xmax`、`ymax`),Sedona 会复用它。 +* 如果 `_bbox` 不存在,Sedona 会在写出时自动生成。 + +显式的 `geoparquet.covering` 或 `geoparquet.covering.` 选项始终优先于上述默认行为。 + +可以通过 `geoparquet.covering.mode` 控制该默认行为: + +* `auto`(默认):在写 GeoParquet `1.1.0` 时启用 covering 元数据/列的自动生成。 +* `legacy`:禁用自动 covering 生成。 + +```scala +df.write.format("geoparquet") + .option("geoparquet.covering.mode", "legacy") + .save("/path/to/saved_geoparquet.parquet") +``` + +如果 DataFrame 中没有 covering 列,可以使用 Sedona 的 SQL 函数构造一个: + +```scala +val df_bbox = df.withColumn("bbox", expr("struct(ST_XMin(geometry) AS xmin, ST_YMin(geometry) AS ymin, ST_XMax(geometry) AS xmax, ST_YMax(geometry) AS ymax)")) +df_bbox.write.format("geoparquet").option("geoparquet.covering.geometry", "bbox").save("/path/to/saved_geoparquet.parquet") +``` + +## 排序后再写出 GeoParquet + +为了最大程度地发挥 Sedona GeoParquet filter pushdown 的性能,建议先按几何对象的 geohash 值(参见 [ST_GeoHash](../../api/sql/Geometry-Output/ST_GeoHash.md))排序,再写出 GeoParquet。示例如下: + +``` +SELECT col1, col2, geom, ST_GeoHash(geom, 5) as geohash +FROM spatialDf +ORDER BY geohash +``` + +接下来更深入地看看 Sedona 是如何利用 GeoParquet 的 bbox 元数据来优化查询的。 + +## Sedona 如何在 Spark 上利用 GeoParquet 边界框(bbox)元数据 + +bbox 元数据描述了某个文件中所包含几何对象覆盖的范围。如果查询的区域不与某个文件的边界框相交,引擎在执行查询时即可整体跳过该文件——因为它不可能包含相关数据。 + +跳过整个文件可以极大提升性能,跳过的数据越多,查询就越快。 + +下面来看一个由若干点和三个边界框组成的数据集示例: + +![GeoParquet bbox](../../image/tutorial/files/geoparquet_bbox1.png) + +应用一个空间过滤器,只读取某个区域内的点: + +![GeoParquet bbox 过滤](../../image/tutorial/files/geoparquet_bbox2.png) + +查询代码: + +```python +my_shape = "POLYGON((4.0 3.5, 4.0 6.0, 8.0 6.0, 8.0 4.5, 4.0 3.5))" + +res = sedona.sql(f""" +select * +from points +where st_intersects(geometry, ST_GeomFromWKT('{my_shape}')) +""") +res.show(truncate=False) +``` + +结果如下: + +``` ++---+-----------+ +|id |geometry | ++---+-----------+ +|e |POINT (5 4)| +|f |POINT (6 4)| +|g |POINT (6 5)| ++---+-----------+ +``` + +执行该查询其实并不需要边界框 1 或 3 中的数据,只需要边界框 2 中的点。文件跳过让我们对这个查询只需读取一个文件而非三个。该数据湖中各文件的 bbox 如下: + +``` ++--------------------------------------------------------------+ +|columns | ++--------------------------------------------------------------+ +|{geometry -> {WKB, [Point], [2.0, 1.0, 3.0, 3.0], null, NULL}}| +|{geometry -> {WKB, [Point], [7.0, 1.0, 8.0, 2.0], null, NULL}}| +|{geometry -> {WKB, [Point], [5.0, 4.0, 6.0, 5.0], null, NULL}}| ++--------------------------------------------------------------+ +``` + +跳过整个文件能带来显著的性能收益。 + +## GeoParquet 的优势 + +如前所述,GeoParquet 相比 CSV、GeoJSON 等行式格式有诸多优势: + +* 列裁剪 +* row-group 过滤 +* schema 写在文件尾部 +* 列式数据布局,压缩效果良好 + +但这些优势仍不足以满足空间数据从业者所需的全部能力。 + +## GeoParquet 的局限 + +Parquet 与 GeoParquet 数据湖与其他数据湖一样,存在以下局限: + +* 不支持可靠事务 +* 不支持部分数据操作语言(DML),如 update、delete +* 没有并发保护 +* 与数据库相比,某些操作性能较差 + +为了克服这些局限,数据社区已逐步迁移到 Iceberg、Delta Lake、Hudi 等开放式表格式。 + +Iceberg 最近开始支持 geometry 与 geography 列,让数据从业者既能享受开放式表格式的特性,又能复用底层的 Parquet 文件。Iceberg 解决了 GeoParquet 数据湖的局限,支持可靠事务以及 update、delete 等常见操作。 + +## 结论 + +GeoParquet 是空间数据社区的一项重要进展。它为地理工程师提供了内嵌 schema、性能优化与对几何列的原生支持。 + +由于 Iceberg 现已具备 GeoParquet 的全部优秀特性甚至更多,空间数据工程师也可以平滑迁移到 Iceberg。Iceberg 提供了大量实用的开放式表格式特性,几乎在所有场景下都优于纯 GeoParquet(例外情况:永不变更的单文件数据集,或与其他引擎的兼容性需求)。 + +将工作流从 Shapefile、GeoPackage、CSV、GeoJSON 等传统格式迁移到 GeoParquet/Iceberg 等高性能格式,能立刻提升计算效率。 diff --git a/docs/tutorial/files/stac-sedona-spark.zh.md b/docs/tutorial/files/stac-sedona-spark.zh.md new file mode 100644 index 00000000000..6028159e793 --- /dev/null +++ b/docs/tutorial/files/stac-sedona-spark.zh.md @@ -0,0 +1,468 @@ + + +# 在 Spark 上使用 Apache Sedona 处理 STAC catalog + +STAC 数据源允许您从 SpatioTemporal Asset Catalog(STAC)API 读取数据。该数据源同时支持读取 STAC items 与 collections。 + +## 用法 + +要使用 STAC 数据源,可使用 `stac` 格式将 STAC catalog 加载到 Sedona DataFrame。`load` 的路径可以是本地的 STAC collection JSON 文件,也可以是 HTTP/HTTPS 端点。 + +从本地 collection 文件加载 STAC collection: + +```python +df = sedona.read.format("stac").load("/user/stac_collection.json") +df.printSchema() +df.show() +``` + +从 S3 上的 collection 文件加载: + +```python +df = sedona.read.format("stac").load( + "s3a://example.com/stac_bucket/stac_collection.json" +) +df.printSchema() +df.show() +``` + +也可以从 HTTP/HTTPS 端点加载: + +```python +df = sedona.read.format("stac").load( + "https://earth-search.aws.element84.com/v1/collections/sentinel-2-pre-c1-l2a" +) +df.printSchema() +df.show() +``` + +输出: + +``` +root + |-- stac_version: string (nullable = false) + |-- stac_extensions: array (nullable = true) + | |-- element: string (containsNull = true) + |-- type: string (nullable = false) + |-- id: string (nullable = false) + |-- bbox: array (nullable = true) + | |-- element: double (containsNull = true) + |-- geometry: geometry (nullable = true) + |-- title: string (nullable = true) + |-- description: string (nullable = true) + |-- datetime: timestamp (nullable = true) + |-- start_datetime: timestamp (nullable = true) + |-- end_datetime: timestamp (nullable = true) + |-- created: timestamp (nullable = true) + |-- updated: timestamp (nullable = true) + |-- platform: string (nullable = true) + |-- instruments: array (nullable = true) + | |-- element: string (containsNull = true) + |-- constellation: string (nullable = true) + |-- mission: string (nullable = true) + |-- grid:code: string (nullable = true) + |-- gsd: double (nullable = true) + |-- collection: string (nullable = true) + |-- links: array (nullable = true) + | |-- element: struct (containsNull = true) + | | |-- rel: string (nullable = true) + | | |-- href: string (nullable = true) + | | |-- type: string (nullable = true) + | | |-- title: string (nullable = true) + |-- assets: map (nullable = true) + | |-- key: string + | |-- value: struct (valueContainsNull = true) + | | |-- href: string (nullable = true) + | | |-- type: string (nullable = true) + | | |-- title: string (nullable = true) + | | |-- roles: array (nullable = true) + | | | |-- element: string (containsNull = true) + ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +|stac_version| stac_extensions| type| id| bbox| geometry|title|description| datetime|start_datetime|end_datetime| created| updated| platform|instruments|constellation|mission| gsd| collection| links| assets| ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYC_202212...|[-55.202493, 1.71...|POLYGON ((-55.201...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:13:...|2024-05-01 21:13:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZC_202212...|[-54.30394, 1.719...|POLYGON ((-54.302...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:39:...|2024-05-03 00:39:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBH_202212...|[-53.698196, 2.63...|POLYGON ((-53.698...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:26:...|2024-05-03 00:26:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYD_202212...|[-55.201423, 2.62...|POLYGON ((-55.199...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:10:...|2024-05-01 21:10:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZD_202212...|[-54.302336, 2.62...|POLYGON ((-54.299...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:12:...|2024-05-03 00:12:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBJ_202212...|[-53.700535, 2.63...|POLYGON ((-53.700...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:30:...|2024-05-03 00:30:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYE_202212...|[-55.199906, 3.52...|POLYGON ((-55.197...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:24:...|2024-05-01 21:24:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NZE_202212...|[-54.300062, 3.52...|POLYGON ((-54.296...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:14:...|2024-05-03 00:14:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T22NBK_202212...|[-53.703548, 3.52...|POLYGON ((-53.703...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-03 00:32:...|2024-05-03 00:32:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| +| 1.0.0|[https://stac-ext...|Feature|S2B_T21NYF_202212...|[-55.197941, 4.42...|POLYGON ((-55.195...| NULL| NULL|2022-12-05 14:11:...| NULL| NULL|2024-05-01 21:43:...|2024-05-01 21:43:...|sentinel-2b| [msi]| sentinel-2| NULL|NULL|sentinel-2-pre-c1...|[{self, https://e...|{red -> {https://...| ++------------+--------------------+-------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------+------------+--------------------+--------------------+-----------+-----------+-------------+-------+----+--------------------+--------------------+--------------------+ +``` + +## 谓词下推(Filter Pushdown) + +STAC 数据源支持把空间和时间过滤下推到底层数据源,减少需要读取的数据量。 + +### 空间过滤下推 + +空间过滤下推让数据源能够直接在数据源层应用空间谓词(如 `st_intersects`),从而减少传输与处理的数据量。 + +### 时间过滤下推 + +时间过滤下推让数据源能够直接在数据源层应用时间谓词(如 `BETWEEN`、`>=`、`<=`),同样可以减少传输与处理的数据量。 + +## 示例 + +下面的示例假设 STAC 数据源已经被加载到名为 `STAC_TABLE` 的表中。 + +### 不带过滤的 SQL Select + +```sql +SELECT id, datetime as dt, geometry, bbox FROM STAC_TABLE +``` + +### 带时间过滤的 SQL Select + +```sql + SELECT id, datetime as dt, geometry, bbox + FROM STAC_TABLE + WHERE datetime BETWEEN '2020-01-01' AND '2020-12-13' +``` + +该例中,数据源会把时间过滤下推到底层数据源。 + +### 带空间过滤的 SQL Select + +```sql + SELECT id, geometry + FROM STAC_TABLE + WHERE st_intersects(ST_GeomFromText('POLYGON((17 10, 18 10, 18 11, 17 11, 17 10))'), geometry) +``` + +该例中,数据源会把空间过滤下推到底层数据源。 + +### Sedona 的 STAC 读取器配置 + +在 Sedona 中使用 STAC 读取器时,可通过若干配置项控制读取行为,通常以 `Map[String, String]` 形式传入。下列是核心 Sedona 配置项: + +- **spark.sedona.stac.load.maxPartitionItemFiles**:单个分区中可包含的最大 item 文件数量,用于控制分区大小。默认 `-1`,表示由系统自动决定。 + +- **spark.sedona.stac.load.numPartitions**:为 STAC 数据创建的分区数,便于控制数据分布与并行度。默认 `-1`,表示由系统自动决定。 + +下面是 STAC 读取器的常见 reader option: + +- **itemsLimitMax**:从 STAC collection 加载的 item 数量上限,便于限制处理规模。默认 `-1`,表示加载全部 item。 + +- **itemsLoadProcessReportThreshold**:item 加载进度的报告阈值。默认 `1000000`,即每加载 100 万个 item 报告一次进度。 + +- **itemsLimitPerRequest**:单次 API 请求中获取的最大 item 数。默认 `10`。 + +- **headers**:STAC API 请求中携带的 HTTP 头,应是 JSON 编码的字符串,包含一系列键值对。可用于鉴权或自定义头。例如:`{"Authorization": "Basic "}` + +可以将这些配置合并到一个 `Map[String, String]` 中传给 STAC 读取器: + +```scala + def defaultSparkConfig: Map[String, String] = Map( + "spark.sedona.stac.load.maxPartitionItemFiles" -> "100", + "spark.sedona.stac.load.numPartitions" -> "10", + "spark.sedona.stac.load.itemsLimitMax" -> "20") + + val sparkSession: SparkSession = { + val builder = SedonaContext + .builder() + .master("local[*]") + defaultSparkConfig.foreach { case (key, value) => builder.config(key, value) } + builder.getOrCreate() + } + + df = sedona.read + .format("stac") + .option("itemsLimitMax", "100") + .option("itemsLoadProcessReportThreshold", "2000000") + .option("itemsLimitPerRequest", "100") + .load("https://earth-search.aws.element84.com/v1/collections/sentinel-2-pre-c1-l2a") +``` + +通过以上选项,可以对 Sedona 中 STAC 数据的读取与处理过程进行细粒度的控制。 + +## Python API + +Python API 让您可以通过 Client 类与 STAC API 交互。它提供了打开 STAC API 连接、获取 collection、按多种条件搜索 item 的方法。 + +### 示例代码 + +#### 初始化客户端 + +```python +from sedona.spark.stac import Client + +# 初始化客户端 +client = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") +``` + +#### 在某个 collection 中按年份搜索 + +```python +items = client.search( + collection_id="aster-l1t", datetime="2020", return_dataframe=False +) +``` + +#### 在某个 collection 中按月份搜索并限制最大返回数 + +```python +items = client.search( + collection_id="aster-l1t", datetime="2020-05", return_dataframe=False, max_items=5 +) +``` + +#### 按 bbox 与时间区间搜索 + +```python +items = client.search( + collection_id="aster-l1t", + ids=["AST_L1T_00312272006020322_20150518201805"], + bbox=[-180.0, -90.0, 180.0, 90.0], + datetime=["2006-01-01T00:00:00Z", "2007-01-01T00:00:00Z"], + return_dataframe=False, +) +``` + +#### 按多个 bbox 搜索 + +```python +bbox_list = [[-180.0, -90.0, 180.0, 90.0], [-100.0, -50.0, 100.0, 50.0]] +items = client.search(collection_id="aster-l1t", bbox=bbox_list, return_dataframe=False) +``` + +#### 在多个时间区间内搜索并以 DataFrame 返回 + +```python +interval_list = [ + ["2020-01-01T00:00:00Z", "2020-06-01T00:00:00Z"], + ["2020-07-01T00:00:00Z", "2021-01-01T00:00:00Z"], +] +df = client.search( + collection_id="aster-l1t", datetime=interval_list, return_dataframe=True +) +df.show() +``` + +#### 同时使用 bbox 与时间将 item 写入 GeoParquet + +```python +# 将 DataFrame 中的 item 写入 GeoParquet,同时指定 bbox 与时间 +client.get_collection("aster-l1t").save_to_geoparquet( + output_path="/path/to/output", bbox=bbox_list, datetime="2020-05" +) +``` + +以上示例演示了如何使用 Client 类按各种条件检索 STAC collection,并以 PyStacItem 迭代器或 Spark DataFrame 的形式返回。 + +### 鉴权 + +许多 STAC 服务都需要鉴权才能访问数据。STAC 客户端支持多种鉴权方式:HTTP Basic Authentication、Bearer Token Authentication,以及自定义 HTTP 头。 + +#### Basic 认证 + +Basic 认证通常用于 API key 或用户名/密码组合。许多服务(如 Planet Labs)会把 API key 作为用户名,密码留空。 + +```python +from sedona.spark.stac import Client + +# 示例 1:使用 API key(常见模式) +client = Client.open("https://api.example.com/stac/v1") +client.with_basic_auth("your_api_key_here", "") + +# 在认证状态下搜索 item +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 示例 2:使用用户名与密码 +client = Client.open("https://api.example.com/stac/v1") +client.with_basic_auth("username", "password") + +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 示例 3:链式调用 +df = ( + Client.open("https://api.example.com/stac/v1") + .with_basic_auth("your_api_key", "") + .search(collection_id="example-collection", max_items=10) +) +df.show() +``` + +#### Bearer Token 认证 + +Bearer Token 认证常与 OAuth2 token 或 JWT token 一起使用。注意:某些服务可能仅支持特定的鉴权方式。 + +```python +from sedona.spark.stac import Client + +# 使用 bearer token +client = Client.open("https://api.example.com/stac/v1") +client.with_bearer_token("your_access_token_here") + +df = client.search(collection_id="example-collection", max_items=10) +df.show() + +# 链式调用 +df = ( + Client.open("https://api.example.com/stac/v1") + .with_bearer_token("your_token") + .search(collection_id="example-collection", max_items=10) +) +df.show() +``` + +#### 自定义 HTTP 头 + +也可以在创建客户端时直接传入自定义 HTTP 头,适用于鉴权方式较为特殊的服务。 + +```python +from sedona.spark.stac import Client + +# 使用自定义 HTTP 头 +headers = {"Authorization": "Bearer your_token_here", "X-Custom-Header": "custom_value"} +client = Client.open("https://api.example.com/stac/v1", headers=headers) + +df = client.search(collection_id="example-collection", max_items=10) +df.show() +``` + +#### 在 Scala DataSource 中使用鉴权 + +直接在 Scala 或 Spark SQL 中使用 STAC 数据源时,可以把鉴权 HTTP 头作为 JSON 编码的 option 传入: + +```python +import json +from pyspark.sql import SparkSession + +# 准备鉴权 HTTP 头 +headers = {"Authorization": "Basic "} +headers_json = json.dumps(headers) + +# 在鉴权状态下加载 STAC 数据 +df = ( + spark.read.format("stac") + .option("headers", headers_json) + .load("https://api.example.com/stac/v1/collections/example-collection") +) + +df.show() +``` + +```scala +// Scala 示例 +val headersJson = """{"Authorization":"Basic "}""" + +val df = sparkSession.read + .format("stac") + .option("headers", headersJson) + .load("https://api.example.com/stac/v1/collections/example-collection") + +df.show() +``` + +#### 注意事项 + +- **鉴权方式互斥**:设置新的鉴权方式会覆盖此前的 Authorization 头,但不会影响其他自定义 HTTP 头。 +- **HTTP 头会向下传播**:在 Client 上设置的 HTTP 头会自动应用到后续的 collection 与 item 请求。 +- **不同服务有不同的要求**:不同 STAC 服务可能要求不同的鉴权方式。例如 Planet Labs 在访问 collection 时需要 Basic 认证,而非 Bearer Token。 +- **向后兼容**:所有鉴权参数都是可选的。原本访问无需鉴权的公开 STAC 服务的代码不受影响,可继续使用。 + +### 方法 + +**`open(url: str, headers: Optional[dict] = None) -> Client`** +打开到指定 STAC API URL 的连接。 + +参数: + +* `url` (*str*):要连接的 STAC API URL。例如:`"https://planetarycomputer.microsoft.com/api/stac/v1"` +* `headers` (*Optional[dict]*):可选的 HTTP 头字典,用于鉴权或自定义头。例如:`{"Authorization": "Bearer token123"}` + +返回: + +* `Client`:连接到指定 URL 的 `Client` 实例。 + +--- + +**`with_basic_auth(username: str, password: str) -> Client`** +为客户端启用 HTTP Basic 认证。 + +该方法会将用户名与密码进行 Base64 编码,并添加相应的 Authorization 头。 + +参数: + +* `username` (*str*):用户名。对于 API key,此项通常就是 API key 本身。例如 `"your_api_key"`。 +* `password` (*str*):密码。对于 API key 通常留空。例如 `""`。 + +返回: + +* `Client`:返回自身以便链式调用。 + +--- + +**`with_bearer_token(token: str) -> Client`** +为客户端启用 Bearer Token 认证。 + +该方法会添加 Bearer Token 形式的 Authorization 头,常用于 OAuth2 与 API token 场景。 + +参数: + +* `token` (*str*):用于鉴权的 bearer token。例如 `"your_access_token_here"`。 + +返回: + +* `Client`:返回自身以便链式调用。 + +--- + +**`get_collection(collection_id: str) -> CollectionClient`** +按 collection ID 获取对应的 CollectionClient。 + +参数: + +* `collection_id` (*str*):collection 的 ID。例如 `"aster-l1t"`。 + +返回: + +* `CollectionClient`:对应 collection 的 `CollectionClient` 实例。 + +--- + +**`search(*ids: Union[str, list], collection_id: str, bbox: Optional[list] = None, datetime: Optional[Union[str, datetime.datetime, list]] = None, max_items: Optional[int] = None, return_dataframe: bool = True) -> Union[Iterator[PyStacItem], DataFrame]`** +在指定 collection 中按可选过滤条件搜索 item。 + +参数: + +* `ids` (*Union[str, list]*):可变数量的 item ID,用于过滤。例如 `"item_id1"` 或 `["item_id1", "item_id2"]`。 +* `collection_id` (*str*):要搜索的 collection ID。例如 `"aster-l1t"`。 +* `bbox` (*Optional[list]*):用于过滤 item 的边界框列表,每项形如 `[min_lon, min_lat, max_lon, max_lat]`。例如 `[[ -180.0, -90.0, 180.0, 90.0 ]]`。 +* `datetime` (*Optional[Union[str, datetime.datetime, list]]*):单个日期时间、符合 RFC 3339 的时间戳,或一组时间区间。例如 `"2020-01-01T00:00:00Z"`、`datetime.datetime(2020, 1, 1)`、`[["2020-01-01T00:00:00Z", "2021-01-01T00:00:00Z"]]`。 +* `max_items` (*Optional[int]*):返回 item 的最大数量。例如 `100`。 +* `return_dataframe` (*bool*):若为 `True`(默认),返回 Spark DataFrame,而不是 `PyStacItem` 迭代器。例如 `True`。 + +返回: + +* *Union[Iterator[PyStacItem], DataFrame]*:与过滤条件匹配的 `PyStacItem` 迭代器或 Spark DataFrame。 + +## 参考 + +- STAC 规范:https://stacspec.org/ + +- STAC Browser:https://github.com/radiantearth/stac-browser diff --git a/docs/tutorial/flink/sql.zh.md b/docs/tutorial/flink/sql.zh.md new file mode 100644 index 00000000000..5d7276c507e --- /dev/null +++ b/docs/tutorial/flink/sql.zh.md @@ -0,0 +1,511 @@ + + +本页介绍如何使用 SedonaSQL 管理空间数据。==示例代码以 Java 编写,但同样适用于 Scala==。 + +SedonaSQL 支持 SQL/MM Part3 空间 SQL 标准,提供四类 SQL 算子(如下文所示),所有算子都可以直接通过以下方式调用: + +```java +Table myTable = tableEnv.sqlQuery("YOUR_SQL") +``` + +SedonaSQL 详细的 API 说明请参阅 [SedonaSQL API](../../api/flink/Overview.md)。 + +## 配置依赖 + +1. 阅读 [Sedona Maven Central 坐标](../../setup/maven-coordinates.md) +2. 在 build.sbt 或 pom.xml 中添加 Sedona 依赖 +3. 在 build.sbt 或 pom.xml 中添加 [Flink 依赖](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/) +4. 参考 [SQL 示例项目](../demo.md) + +## 初始化 Stream Environment + +在程序起始处使用以下代码初始化 `StreamExecutionEnvironment`: + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); +StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); +``` + +## 初始化 SedonaContext + +在创建好 `StreamExecutionEnvironment` 与 `StreamTableEnvironment` 后,添加以下代码: + +==Sedona >= 1.4.1== + +```java +StreamTableEnvironment sedona = SedonaContext.create(env, tableEnv); +``` + +==Sedona <1.4.1== + +下面这种方式自 Sedona 1.4.1 起已弃用,请改用上面的方式创建 SedonaContext。 + +```java +SedonaFlinkRegistrator.registerType(env); +SedonaFlinkRegistrator.registerFunc(tableEnv); +``` + +!!!warning + Sedona 内置了一套高效的几何与索引序列化器,忘记启用它们会导致内存占用过高。 + +该函数会注册 Sedona 的用户自定义类型与用户自定义函数。 + +## 创建 Geometry 类型列 + +SedonaSQL 中所有几何运算都作用在 Geometry 类型对象上。因此在执行任何查询之前,需要先在 DataFrame 上构造一列 Geometry 类型列。 + +假设有一张如下的 Flink Table `tbl`: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_polygon | name_polygon | ++----+--------------------------------+--------------------------------+ +| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 | +| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 | +| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 | +| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 | +| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 | +| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 | +| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 | +| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 | +| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 | +| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 | ++----+--------------------------------+--------------------------------+ +10 rows in set +``` + +可以这样创建一张带有 Geometry 类型列的 Table: + +```java +sedona.createTemporaryView("myTable", tbl) +Table geomTbl = sedona.sqlQuery("SELECT ST_GeomFromWKT(geom_polygon) as geom_polygon, name_polygon FROM myTable") +geomTbl.execute().print() +``` + +输出如下: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_polygon | name_polygon | ++----+--------------------------------+--------------------------------+ +| +I | POLYGON ((-0.5 -0.5, -0.5 0... | polygon0 | +| +I | POLYGON ((0.5 0.5, 0.5 1.5,... | polygon1 | +| +I | POLYGON ((1.5 1.5, 1.5 2.5,... | polygon2 | +| +I | POLYGON ((2.5 2.5, 2.5 3.5,... | polygon3 | +| +I | POLYGON ((3.5 3.5, 3.5 4.5,... | polygon4 | +| +I | POLYGON ((4.5 4.5, 4.5 5.5,... | polygon5 | +| +I | POLYGON ((5.5 5.5, 5.5 6.5,... | polygon6 | +| +I | POLYGON ((6.5 6.5, 6.5 7.5,... | polygon7 | +| +I | POLYGON ((7.5 7.5, 7.5 8.5,... | polygon8 | +| +I | POLYGON ((8.5 8.5, 8.5 9.5,... | polygon9 | ++----+--------------------------------+--------------------------------+ +10 rows in set +``` + +虽然外观与输入一致,但 `geom_polygon` 列的类型已经变为 ==Geometry==。 + +可以打印 schema 进行验证: + +```java +geomTbl.printSchema() +``` + +输出: + +``` +( + `geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'), + `name_polygon` STRING +) +``` + +!!!note + SedonaSQL 提供了大量构造 Geometry 列的函数,详见 [SedonaSQL 构造器 API](../../api/flink/Geometry-Functions.md)。 + +## 转换坐标参考系 + +Sedona 不会自动管理一列 Geometry 中所有几何对象的坐标单位(基于度还是基于米)。SedonaSQL 中所有相关距离的单位与 Geometry 列中几何对象的单位保持一致。 + +如需对前面创建的 Geometry 列进行 CRS 转换,可使用以下代码: + +```java +Table geomTbl3857 = sedona.sqlQuery("SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable") +geomTbl3857.execute().print() +``` + +`ST_Transform` 第一个 EPSG 代码 EPSG:4326 是源 CRS——也就是最常见的基于度的 CRS(WGS84)。 + +`ST_Transform` 第二个 EPSG 代码 EPSG:3857 是目标 CRS——最常见的基于米的 CRS。 + +`ST_Transform` 会把这些几何对象的 CRS 从 EPSG:4326 转换到 EPSG:3857。详细的 CRS 信息可以在 [EPSG.io](https://epsg.io/) 找到。 + +!!!note + 了解不同的空间查询谓词请阅读 [SedonaSQL ST_Transform API](../../api/flink/Spatial-Reference-System/ST_Transform.md)。 + +例如,存储美国坐标的 Table 转换前后会变为如下样子: + +转换前: + +``` ++----+--------------------------------+--------------------------------+ +| op | geom_point | name_point | ++----+--------------------------------+--------------------------------+ +| +I | POINT (32 -118) | point | +| +I | POINT (33 -117) | point | +| +I | POINT (34 -116) | point | +| +I | POINT (35 -115) | point | +| +I | POINT (36 -114) | point | +| +I | POINT (37 -113) | point | +| +I | POINT (38 -112) | point | +| +I | POINT (39 -111) | point | +| +I | POINT (40 -110) | point | +| +I | POINT (41 -109) | point | ++----+--------------------------------+--------------------------------+ +``` + +转换后: + +``` ++----+--------------------------------+--------------------------------+ +| op | _c0 | name_point | ++----+--------------------------------+--------------------------------+ +| +I | POINT (-13135699.91360628 3... | point | +| +I | POINT (-13024380.422813008 ... | point | +| +I | POINT (-12913060.932019735 ... | point | +| +I | POINT (-12801741.44122646 4... | point | +| +I | POINT (-12690421.950433187 ... | point | +| +I | POINT (-12579102.459639912 ... | point | +| +I | POINT (-12467782.96884664 4... | point | +| +I | POINT (-12356463.478053367 ... | point | +| +I | POINT (-12245143.987260092 ... | point | +| +I | POINT (-12133824.496466817 ... | point | ++----+--------------------------------+--------------------------------+ +``` + +构造完 Geometry 类型列后,即可执行各类空间查询。 + +## 范围查询 + +使用 ==ST_Contains==、==ST_Intersects== 等谓词可以在单列上执行范围查询。 + +下例查找位于给定多边形内的所有 county: + +```java +geomTable = sedona.sqlQuery( + " + SELECT * + FROM spatialdf + WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) + ") +geomTable.execute().print() +``` + +!!!note + 了解不同的空间查询谓词请阅读 [SedonaSQL Predicate API](../../api/flink/Geometry-Functions.md#predicates)。 + +## KNN 查询 + +使用 ==ST_Distance== 计算距离并排序。 + +下面的代码返回距离给定多边形最近的 5 个对象: + +```java +geomTable = sedona.sqlQuery( + " + SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance + FROM geomTable + ORDER BY distance DESC + LIMIT 5 + ") +geomTable.execute().print() +``` + +## 连接查询 + +下面的等值连接借助 Flink 内置的 equi-join 算法。可以选择跳过 Sedona 的 refinement 步骤来牺牲一部分查询精度。完整示例可参考 [SQL 示例项目](../demo.md)。 + +操作步骤如下: + +### 1. 为两张表生成 S2 ID + +使用 [ST_S2CellIds](../../api/flink/Spatial-Indexing/ST_S2CellIDs.md) 为每个几何对象生成 cell ID,单个几何对象可能产生多个 ID。 + +```sql +SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray +FROM lefts +``` + +```sql +SELECT id, geom, name, ST_S2CellIDs(geom, 15) as idarray +FROM rights +``` + +### 2. 展开 ID 数组 + +S2 cell ID 是整数数组,需要把它们展开成多行,以便后续按 ID 进行连接。 + +``` +SELECT id, geom, name, cellId +FROM lefts CROSS JOIN UNNEST(lefts.idarray) AS tmpTbl1(cellId) +``` + +``` +SELECT id, geom, name, cellId +FROM rights CROSS JOIN UNNEST(rights.idarray) AS tmpTbl2(cellId) +``` + +### 3. 执行等值连接 + +按 S2 cellId 对两张表做等值连接: + +```sql +SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name +FROM lcs JOIN rcs ON lcs.cellId = rcs.cellId +``` + +### 4. 可选:refine 结果 + +由于 S2 cellId 的特性,连接结果可能存在少量假阳性,具体程度取决于 S2 level 的选择:level 越小,cell 越大,展开行数越少,但假阳性越多。 + +为了保证正确性,可使用任一 [空间谓词](../../api/sql/Geometry-Functions.md#predicates) 过滤掉假阳性,将下面的查询替换步骤 3 中的查询: + +```sql +SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name +FROM lcs, rcs +WHERE lcs.cellId = rcs.cellId AND ST_Contains(lcs.geom, rcs.geom) +``` + +可以看到,这里相比步骤 2 的查询多加了一个 `ST_Contains` 过滤来剔除假阳性,也可以使用 `ST_Intersects` 等其他谓词。 + +!!!tip + 如果不要求 100% 的准确度且希望更快的查询速度,可以跳过此步骤。 + +### 5. 可选:去重 + +由于生成 S2 cellId 时使用了展开操作,结果 DataFrame 中可能出现 `` 重复匹配。可以通过 GroupBy 查询去重: + +```sql +SELECT lcs_id, rcs_id, FIRST_VALUE(lcs_geom), FIRST_VALUE(lcs_name), first(rcs_geom), first(rcs_name) +FROM joinresult +GROUP BY (lcs_id, rcs_id) +``` + +`FIRST_VALUE` 函数用于在多个重复值中取第一个。 + +如果没有为每个几何对象提供唯一 ID,也可以直接按几何分组: + +```sql +SELECT lcs_geom, rcs_geom, first(lcs_name), first(rcs_name) +FROM joinresult +GROUP BY (lcs_geom, rcs_geom) +``` + +!!!note + 做点-多边形 join 时不会出现该问题,可以放心忽略;只有 polygon-polygon、polygon-linestring、linestring-linestring 等连接才会遇到此问题。 + +### 距离连接中的 S2 用法 + +S2 同样适用于距离连接。先使用 `ST_Buffer(geometry, distance)` 把其中一列原始几何对象包裹起来。如果原始几何对象是点,`ST_Buffer` 会把它们变成半径为 `distance` 的圆。 + +例如,在步骤 1 之前,先在左表上运行: + +```sql +SELECT id, ST_Buffer(geom, DISTANCE), name +FROM lefts +``` + +由于坐标位于经纬度系统中,`distance` 的单位应为度而不是米或英里。可以根据实际米值估算对应的度数,可借助 [此换算工具](https://lucidar.me/en/online-unit-converter-length-to-angle/convert-degrees-to-meters/#online-converter)。 + +## 把空间 Table 转换为空间 DataStream + +### 获取 DataStream + +使用 TableEnv 的 `toDataStream` 函数: + +```java +DataStream geomStream = sedona.toDataStream(geomTable) +``` + +### 取出 Geometry + +接着用 Map 从每个 Row 对象中取出 Geometry: + +```java +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = geomStream.map(new MapFunction() { + @Override + public Geometry map(Row value) throws Exception { + return (Geometry) value.getField(0); + } + }); +geometries.print(); +``` + +输出如下: + +``` +14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5)) +2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5)) +5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5)) +16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5)) +12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5)) +13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5)) +15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5)) +3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5)) +1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5)) +4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5)) +``` + +### 在 Geometry 中保存非空间属性 + +可以把其他非空间属性拼接到 Geometry 的 `userData` 字段中保存,便于后续恢复。`userData` 字段可以是任意对象类型。 + +```java +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = geomStream.map(new MapFunction() { + @Override + public Geometry map(Row value) throws Exception { + Geometry geom = (Geometry) value.getField(0); + geom.setUserData(value.getField(1)); + return geom; + } + }); +geometries.print(); +``` + +`print` 命令不会输出 `userData` 字段。可以这样获取: + +```java +import org.locationtech.jts.geom.Geometry; + +geometries.map(new MapFunction() { + @Override + public String map(Geometry value) throws Exception + { + return (String) value.getUserData(); + } + }).print(); +``` + +输出如下: + +``` +13> polygon9 +6> polygon2 +10> polygon6 +11> polygon7 +5> polygon1 +12> polygon8 +8> polygon4 +4> polygon0 +7> polygon3 +9> polygon5 +``` + +## 把空间 DataStream 转换为空间 Table + +### 使用 Sedona 的 FormatUtils 创建 Geometry + +* 从 WKT 字符串创建 Geometry + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false); + return formatUtils.readGeometry(value); + } + }) +``` + +* 从字符串 `1.1, 2.2` 创建 Point,使用 `,` 作为分隔符。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(",", false, GeometryType.POINT); + return formatUtils.readGeometry(value); + } + }) +``` + +* 从字符串 `1.1, 1.1, 10.1, 10.1` 创建 Polygon,表示以 (1.1, 1.1) 与 (10.1, 10.1) 为最小/最大角点的矩形。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Geometry; + +DataStream geometries = text.map(new MapFunction() { + @Override + public Geometry map(String value) throws Exception + { + // 在此实现 minX、minY、maxX、maxY 四个 double 值的解析逻辑 + ... + Coordinate[] coordinates = new Coordinate[5]; + coordinates[0] = new Coordinate(minX, minY); + coordinates[1] = new Coordinate(minX, maxY); + coordinates[2] = new Coordinate(maxX, maxY); + coordinates[3] = new Coordinate(maxX, minY); + coordinates[4] = coordinates[0]; + GeometryFactory geometryFactory = new GeometryFactory(); + return geometryFactory.createPolygon(coordinates); + } + }) +``` + +### 创建 Row 对象 + +把 Geometry 包进 Flink Row 中放入 `geomStream`。也可以在 Row 中放入其他属性。本例为所有几何对象赋了同一个常量 `myName`。 + +```java +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; +import org.apache.flink.types.Row; + +DataStream geomStream = text.map(new MapFunction() { + @Override + public Row map(String value) throws Exception + { + FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false); + return Row.of(formatUtils.readGeometry(value), "myName"); + } + }) +``` + +### 得到空间 Table + +使用 TableEnv 的 `fromDataStream` 函数,并指定列名 `geom` 与 `geom_name`: + +```java +Table geomTable = sedona.fromDataStream(geomStream, "geom", "geom_name") +``` diff --git a/docs/tutorial/geopandas-api.zh.md b/docs/tutorial/geopandas-api.zh.md new file mode 100644 index 00000000000..fe6b3568613 --- /dev/null +++ b/docs/tutorial/geopandas-api.zh.md @@ -0,0 +1,398 @@ + + +# Apache Sedona 上的 GeoPandas API + +Apache Sedona 上的 GeoPandas API 提供了与 GeoPandas 一致的接口,可以让您的地理空间分析突破单机的局限。该 API 把熟悉的 GeoPandas DataFrame 语法与 Apache Sedona 在 Apache Spark 上的分布式处理能力结合起来,让您能够使用同样的代码模式处理行星级(planetary-scale)的数据集。 + +## 概览 + +### 什么是 Apache Sedona 的 GeoPandas API? + +Apache Sedona 的 GeoPandas API 是一层兼容层,让您能够在分布式地理空间数据上使用 GeoPandas 风格的操作。您原有的 GeoPandas 代码不再受限于单机处理能力,可以充分利用 Apache Spark 集群进行大规模地理空间分析。 + +### 主要优势 + +- **熟悉的 API**:使用您已经熟悉的 GeoPandas 语法与方法 +- **分布式处理**:突破单机限制,处理大规模数据集 +- **惰性求值**:受益于 Apache Sedona 的查询优化与延迟执行 +- **高性能**:利用分布式计算高效完成复杂的地理空间运算 +- **平滑迁移**:以最少的代码改动迁移现有 GeoPandas 工作流 + +## 准备工作 + +Apache Sedona 的 GeoPandas API 通过 PySpark 的 pandas-on-Spark 集成自动管理 SparkSession。准备方式有两种: + +### 方式 1:自动创建 SparkSession(推荐) + +GeoPandas API 会自动使用 PySpark 的默认 SparkSession: + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet + +# 无需显式配置 SparkSession,使用默认会话 +# API 会自动初始化 Sedona context +``` + +### 方式 2:手动配置 SparkSession + +如果需要自定义 SparkSession,或需要在某些环境下显式控制: + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet +from sedona.spark import SedonaContext + +# 创建并配置 SparkSession +config = SedonaContext.builder().getOrCreate() +sedona = SedonaContext.create(config) + +# GeoPandas API 会使用上面配置好的会话 +``` + +### 方式 3:复用现有 SparkSession + +如果已有 SparkSession(如 Databricks、EMR 等托管环境): + +```python +from sedona.spark.geopandas import GeoDataFrame, read_parquet +from sedona.spark import SedonaContext + +# 复用现有 SparkSession(如 Databricks 中的 `spark`) +sedona = SedonaContext.create(spark) # `spark` 即已有会话 +``` + +### SparkSession 是如何被管理的 + +GeoPandas API 借助 PySpark 的 pandas-on-Spark 自动管理 SparkSession 生命周期: + +1. **默认会话**:导入 `sedona.spark.geopandas` 时,会自动通过 `pyspark.pandas.utils.default_session()` 获取 PySpark 的默认会话。 + +2. **自动注册 Sedona 函数**:必要时 API 会自动把 Sedona 的空间函数与优化注册到 SparkSession。 + +3. **透明集成**:所有 GeoPandas 操作在底层都被翻译为 Spark SQL 操作,使用所配置的 SparkSession 执行。 + +4. **无需手动管理 context**:与传统的 Sedona 用法不同,您通常不需要显式调用 `SedonaContext.create()`,除非有自定义配置需求。 + +这种设计让 API 更加易用,把 SparkSession 管理的复杂度隐藏起来,同时仍能充分发挥分布式处理的能力。 + +### S3 配置 + +在使用 S3 数据时,GeoPandas API 使用 Spark 内置的 S3 支持,而不是 s3fs 等外部库。可通过 Spark 配置开启对公开 S3 桶的匿名访问: + +```python +from sedona.spark import SedonaContext + +# 公开 S3 桶的匿名访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.bucket-name.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) +``` + +需要鉴权访问 S3 时,请使用合适的 AWS 凭据提供者: + +```python +# 使用 IAM 角色(在 EC2/EMR 上推荐) +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "com.amazonaws.auth.InstanceProfileCredentialsProvider", + ) + .getOrCreate() +) + +# 使用 access key(不推荐用于生产环境) +config = ( + SedonaContext.builder() + .config("spark.hadoop.fs.s3a.access.key", "your-access-key") + .config("spark.hadoop.fs.s3a.secret.key", "your-secret-key") + .getOrCreate() +) +``` + +## 基本用法 + +### 引入 API + +不再直接导入 GeoPandas,而是从 Sedona 的 GeoPandas 模块导入: + +```python +# 传统的 GeoPandas 导入 +# import geopandas as gpd + +# Sedona GeoPandas API 的导入 +import sedona.spark.geopandas as gpd + +# 或 +from sedona.spark.geopandas import GeoDataFrame, read_parquet +``` + +### 读取数据 + +API 支持读取多种地理空间格式,包括来自云存储的 Parquet 文件。要以匿名凭据访问 S3,请配置 Spark 使用匿名 AWS 凭据: + +```python +from sedona.spark import SedonaContext + +# 配置 Spark 进行匿名 S3 访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) + +# 直接从 S3 加载 GeoParquet 文件 +s3_path = "s3://wherobots-examples/data/onboarding_1/nyc_buildings.parquet" +nyc_buildings = gpd.read_parquet(s3_path) + +# 显示基本信息 +print(f"Dataset shape: {nyc_buildings.shape}") +print(f"Columns: {nyc_buildings.columns.tolist()}") +nyc_buildings.head() +``` + +### 空间过滤 + +使用空间索引与过滤方法。注意:当前版本尚未实现 `cx` 空间索引: + +```python +from shapely.geometry import box + +# 中央公园的边界框 +central_park_bbox = box( + -73.973, + 40.764, # 左下角(经度,纬度) + -73.951, + 40.789, # 右上角(经度,纬度) +) + +# 使用空间索引筛选边界框内的建筑 +# 注意:该写法需要把数据收集到 driver 才能进行空间过滤 +# 对于大规模数据,建议改用空间连接(spatial join) +buildings_sample = nyc_buildings.sample(1000) # 演示用:抽样 1000 行 +central_park_buildings = buildings_sample[ + buildings_sample.geometry.intersects(central_park_bbox) +] + +# 显示结果 +print( + central_park_buildings[["BUILD_ID", "PROP_ADDR", "height_val", "geometry"]].head() +) +``` + +**针对大规模数据的替代方案——使用空间连接:** + +```python +# 用边界框创建一个 GeoDataFrame +bbox_gdf = gpd.GeoDataFrame({"id": [1]}, geometry=[central_park_bbox], crs="EPSG:4326") + +# 使用空间连接筛选边界框内的建筑 +central_park_buildings = nyc_buildings.sjoin(bbox_gdf, predicate="intersects") +``` + +## 高级操作 + +### 空间连接 + +可使用与 GeoPandas 相同的语法执行空间连接: + +```python +# 加载两份数据集 +left_df = gpd.read_parquet("s3://bucket/left_data.parquet") +right_df = gpd.read_parquet("s3://bucket/right_data.parquet") + +# 使用 distance 谓词的空间连接 +result = left_df.sjoin(right_df, predicate="dwithin", distance=50) + +# 其他空间谓词 +intersects_result = left_df.sjoin(right_df, predicate="intersects") +contains_result = left_df.sjoin(right_df, predicate="contains") +``` + +### 坐标参考系操作 + +在不同坐标参考系(CRS)之间转换几何对象: + +```python +# 设置初始 CRS +buildings = gpd.read_parquet("buildings.parquet") +buildings = buildings.set_crs("EPSG:4326") + +# 转换为投影坐标系以便计算面积 +buildings_projected = buildings.to_crs("EPSG:3857") + +# 计算面积 +buildings_projected["area"] = buildings_projected.geometry.area +``` + +### 几何运算 + +应用几何变换与分析: + +```python +# 缓冲区操作 +buffered = buildings.geometry.buffer(100) # 100 米缓冲 + +# 几何属性 +buildings["is_valid"] = buildings.geometry.is_valid +buildings["is_simple"] = buildings.geometry.is_simple +buildings["bounds"] = buildings.geometry.bounds + +# 距离计算 +from shapely.geometry import Point + +reference_point = Point(-73.9857, 40.7484) # 时代广场 +buildings["distance_to_times_square"] = buildings.geometry.distance(reference_point) + +# 面积与周长(需要使用投影 CRS) +buildings_projected = buildings.to_crs("EPSG:3857") # Web Mercator +buildings_projected["area"] = buildings_projected.geometry.area +buildings_projected["perimeter"] = buildings_projected.geometry.length +``` + +## 性能考虑 + +### 何时仍使用传统 GeoPandas: + +- 小数据集(< 1GB) +- 对本地数据的简单操作 +- 需要完整的功能覆盖 +- 单机处理已经足够 + +### 何时使用 Apache Sedona 的 GeoPandas API: + +- 大规模数据集(> 1GB) +- 复杂的地理空间分析 +- 需要分布式处理 +- 数据保存在云存储(S3、HDFS 等) + +## 已支持的操作 + +Apache Sedona 的 GeoPandas API 已实现 **39 个 GeoSeries 函数** 与 **10 个 GeoDataFrame 函数**,覆盖了 GeoPandas 中最常用的操作: + +### 数据 I/O + +- `read_parquet()` —— 读取 GeoParquet 文件 +- `read_file()` —— 读取多种地理空间格式 +- `to_parquet()` —— 写出为 Parquet 格式 + +### 空间操作 + +- `sjoin()` —— 多种谓词的空间连接 +- `buffer()` —— 几何缓冲 +- `distance()` —— 距离计算 +- `intersects()`、`contains()`、`within()` —— 空间谓词 +- `sindex` —— 空间索引(功能有限) + +### CRS 操作 + +- `set_crs()` —— 设置坐标参考系 +- `to_crs()` —— 在 CRS 之间转换 +- `crs` —— 访问 CRS 信息 + +### 几何属性 + +- `area`、`length`、`bounds` —— 几何度量 +- `is_valid`、`is_simple`、`is_empty` —— 几何校验 +- `centroid`、`envelope`、`boundary` —— 几何属性 +- `x`、`y`、`z`、`has_z` —— 坐标访问 +- `total_bounds`、`estimate_utm_crs` —— 包围盒与 CRS 工具 + +### 空间运算 + +- `buffer()` —— 几何缓冲 +- `distance()` —— 距离计算 +- `intersects()`、`contains()`、`within()` —— 空间谓词 +- `intersection()` —— 几何相交 +- `make_valid()` —— 几何校验与修复 +- `sindex` —— 空间索引(功能有限) + +### 数据转换 + +- `to_geopandas()` —— 转换为传统 GeoPandas +- `to_wkb()`、`to_wkt()` —— 转换为 WKB/WKT +- `from_xy()` —— 通过坐标创建几何 +- `geom_type` —— 获取几何类型 + +## 完整工作流示例 + +```python +import sedona.spark.geopandas as gpd +from sedona.spark import SedonaContext + +# 配置 Spark 进行匿名 S3 访问 +config = ( + SedonaContext.builder() + .config( + "spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider", + ) + .getOrCreate() +) + +sedona = SedonaContext.create(config) + +# 加载数据 +DATA_DIR = "s3://wherobots-examples/data/geopandas_blog/" +overture_size = "1M" +postal_codes_path = DATA_DIR + "postal-code/" +overture_path = DATA_DIR + overture_size + "/" + "overture-buildings/" + +postal_codes = gpd.read_parquet(postal_codes_path) +buildings = gpd.read_parquet(overture_path) + +# 空间分析 +buildings = buildings.set_crs("EPSG:4326") +buildings_projected = buildings.to_crs("EPSG:3857") + +# 计算面积并过滤 +buildings_projected["area"] = buildings_projected.geometry.area +large_buildings = buildings_projected[buildings_projected["area"] > 1000] + +result = large_buildings.sjoin(postal_codes, predicate="intersects") + +# 按邮政编码聚合 +summary = ( + result.groupby("postal_code") + .agg({"area": "sum", "BUILD_ID": "count"}) + .rename(columns={"BUILD_ID": "building_count"}) +) + +print(summary.head()) +``` + +## 资源与贡献 + +完整且最新的 API 文档(包含方法签名、参数与示例)请参阅: + +**📚 [GeoPandas API 文档](https://sedona.apache.org/latest/api/pydocs/sedona.spark.geopandas.html)** + +Apache Sedona 的 GeoPandas API 是一个开源项目,欢迎参与贡献:可在 [GitHub issue tracker](https://github.com/apache/sedona/issues/2230) 报告 bug、提出新需求或贡献代码。更多贡献指南请参阅 [贡献者指南](../community/develop.md)。 diff --git a/docs/tutorial/geopandas-shapely.zh.md b/docs/tutorial/geopandas-shapely.zh.md new file mode 100644 index 00000000000..1442effd259 --- /dev/null +++ b/docs/tutorial/geopandas-shapely.zh.md @@ -0,0 +1,371 @@ + + +# 与 GeoPandas 和 Shapely 配合使用 + +!!! note + Sedona 1.6.0 之前的版本仅支持 Shapely 1.x。如需使用 Shapely 2.x,请使用 1.6.0 及以上的 Sedona。 + + 如果您使用 Sedona < 1.6.0,请安装 GeoPandas <= `0.11.1`,因为 GeoPandas > 0.11.1 会自动安装 Shapely 2.0;如果使用 Shapely,请安装 <= `1.8.5`。 + +## 与 GeoPandas 互通 + +Sedona Python 已实现序列化器与反序列化器,可以将 Sedona Geometry 对象转换为 Shapely 的 BaseGeometry。基于此,您可以用 GeoPandas 从文件加载数据(驱动可参考 Fiona 支持的列表),并基于 GeoDataFrame 创建 Spark DataFrame。 + +### 从 GeoPandas 到 Sedona DataFrame + +使用 GeoPandas 的 `read_file` 方法从 shapefile 加载数据,并基于 GeoDataFrame 创建 Spark DataFrame: + +```python +import geopandas as gpd +from sedona.spark import * + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +gdf = gpd.read_file("gis_osm_pois_free_1.shp") + +sedona.createDataFrame(gdf).show() +``` + +输出如下: + +``` + ++---------+----+-----------+--------------------+--------------------+ +| osm_id|code| fclass| name| geometry| ++---------+----+-----------+--------------------+--------------------+ +| 26860257|2422| camp_site| de Kroon|POINT (15.3393145...| +| 26860294|2406| chalet| Leśne Ustronie|POINT (14.8709625...| +| 29947493|2402| motel| null|POINT (15.0946636...| +| 29947498|2602| atm| null|POINT (15.0732014...| +| 29947499|2401| hotel| null|POINT (15.0696777...| +| 29947505|2401| hotel| null|POINT (15.0155749...| ++---------+----+-----------+--------------------+--------------------+ + +``` + +如需借助 Arrow 优化加快转换速度,可以使用 `create_spatial_dataframe`,它接收 SparkSession 与 GeoDataFrame 作为参数,返回 Sedona DataFrame: + +```python +def create_spatial_dataframe( + spark: SparkSession, gdf: gpd.GeoDataFrame +) -> DataFrame: ... +``` + +- spark:SparkSession +- gdf:gpd.GeoDataFrame +- 返回:DataFrame + +示例: + +```python +from sedona.spark.geoarrow import create_spatial_dataframe + +create_spatial_dataframe(spark, gdf) +``` + +### 从 Sedona DataFrame 到 GeoPandas + +用 Spark 读取数据并转换为 GeoPandas: + +```python +import geopandas as gpd +from sedona.spark import * + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +counties = ( + sedona.read.option("delimiter", "|").option("header", "true").csv("counties.csv") +) + +counties.createOrReplaceTempView("county") + +counties_geom = sedona.sql("SELECT *, st_geomFromWKT(geom) as geometry from county") + +df = counties_geom.toPandas() +gdf = gpd.GeoDataFrame(df, geometry="geometry") + +gdf.plot( + figsize=(10, 8), + column="value", + legend=True, + cmap="YlOrBr", + scheme="quantiles", + edgecolor="lightgray", +) +``` + +
+
+ +![poland_image](https://user-images.githubusercontent.com/22958216/67603296-c08b4680-f778-11e9-8cde-d2e14ffbba3b.png) + +
+
+ +也可以尝试通过 GeoArrow 转换为 GeoPandas,对于大型结果集这种方式可能显著更快(要求 geopandas >= 1.0)。 + +```python +import geopandas as gpd +from sedona.spark import dataframe_to_arrow + +config = SedonaContext.builder().getOrCreate() + +sedona = SedonaContext.create(config) + +test_wkt = ["POINT (0 1)", "LINESTRING (0 1, 2 3)"] +df = sedona.createDataFrame(zip(test_wkt), ["wkt"]).selectExpr( + "ST_GeomFromText(wkt) as geom" +) + +gpd.GeoDataFrame.from_arrow(dataframe_to_arrow(df)) +``` + +## 与 Shapely 对象互通 + +### 支持的 Shapely 对象 + +| Shapely 对象 | 是否支持 | +|--------------------|--------------------| +| Point | :heavy_check_mark: | +| MultiPoint | :heavy_check_mark: | +| LineString | :heavy_check_mark: | +| MultiLinestring | :heavy_check_mark: | +| Polygon | :heavy_check_mark: | +| MultiPolygon | :heavy_check_mark: | +| GeometryCollection | :heavy_check_mark: | + +要基于上述几何类型创建 Spark DataFrame,请使用 `sedona.sql.types` 模块下的 GeometryType。该转换支持包含 Shapely 对象的 list 或 tuple。 + +针对包含整型 id 与几何类型的目标表,schema 可以这样定义: + +```python +from pyspark.sql.types import IntegerType, StructField, StructType + +from sedona.spark import * + +schema = StructType( + [ + StructField("id", IntegerType(), False), + StructField("geom", GeometryType(), False), + ] +) +``` + +同样,包含几何类型列的 Spark DataFrame 可以通过 collect 方法转换为 Shapely 对象列表。 + +### Point 示例 + +```python +from shapely.geometry import Point + +data = [[1, Point(21.0, 52.0)], [1, Point(23.0, 42.0)], [1, Point(26.0, 32.0)]] + + +gdf = sedona.createDataFrame(data, schema) + +gdf.show() +``` + +``` ++---+-------------+ +| id| geom| ++---+-------------+ +| 1|POINT (21 52)| +| 1|POINT (23 42)| +| 1|POINT (26 32)| ++---+-------------+ +``` + +```python +gdf.printSchema() +``` + +``` +root + |-- id: integer (nullable = false) + |-- geom: geometry (nullable = false) +``` + +### MultiPoint 示例 + +```python3 +from shapely.geometry import MultiPoint + +data = [[1, MultiPoint([[19.511463, 51.765158], [19.446408, 51.779752]])]] + +gdf = sedona.createDataFrame(data, schema).show(1, False) +``` + +``` + ++---+---------------------------------------------------------+ +|id |geom | ++---+---------------------------------------------------------+ +|1 |MULTIPOINT ((19.511463 51.765158), (19.446408 51.779752))| ++---+---------------------------------------------------------+ + + +``` + +### LineString 示例 + +```python3 +from shapely.geometry import LineString + +line = [(40, 40), (30, 30), (40, 20), (30, 10)] + +data = [[1, LineString(line)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+--------------------------------+ +|id |geom | ++---+--------------------------------+ +|1 |LINESTRING (10 10, 20 20, 10 40)| ++---+--------------------------------+ + +``` + +### MultiLineString 示例 + +```python3 +from shapely.geometry import MultiLineString + +line1 = [(10, 10), (20, 20), (10, 40)] +line2 = [(40, 40), (30, 30), (40, 20), (30, 10)] + +data = [[1, MultiLineString([line1, line2])]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+---------------------------------------------------------------------+ +|id |geom | ++---+---------------------------------------------------------------------+ +|1 |MULTILINESTRING ((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))| ++---+---------------------------------------------------------------------+ + +``` + +### Polygon 示例 + +```python3 +from shapely.geometry import Polygon + +polygon = Polygon( + [ + [19.51121, 51.76426], + [19.51056, 51.76583], + [19.51216, 51.76599], + [19.51280, 51.76448], + [19.51121, 51.76426], + ] +) + +data = [[1, polygon]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+--------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+--------------------------------------------------------------------------------------------------------+ +|1 |POLYGON ((19.51121 51.76426, 19.51056 51.76583, 19.51216 51.76599, 19.5128 51.76448, 19.51121 51.76426))| ++---+--------------------------------------------------------------------------------------------------------+ + +``` + +### MultiPolygon 示例 + +```python3 +from shapely.geometry import MultiPolygon + +exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)] +interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)] + +exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)] + +polygons = [Polygon(exterior_p1, [interior_p1]), Polygon(exterior_p2)] + +data = [[1, MultiPolygon(polygons)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` + ++---+----------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+----------------------------------------------------------------------------------------------------------+ +|1 |MULTIPOLYGON (((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1.5 1, 1.5 1.5, 1 1.5, 1 1)), ((0 0, 0 1, 1 1, 1 0, 0 0)))| ++---+----------------------------------------------------------------------------------------------------------+ + +``` + +### GeometryCollection 示例 + +```python3 +from shapely.geometry import GeometryCollection, Point, LineString, Polygon + +exterior_p1 = [(0, 0), (0, 2), (2, 2), (2, 0), (0, 0)] +interior_p1 = [(1, 1), (1, 1.5), (1.5, 1.5), (1.5, 1), (1, 1)] +exterior_p2 = [(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)] + +geoms = [ + Polygon(exterior_p1, [interior_p1]), + Polygon(exterior_p2), + Point(1, 1), + LineString([(0, 0), (1, 1), (2, 2)]), +] + +data = [[1, GeometryCollection(geoms)]] + +gdf = sedona.createDataFrame(data, schema) + +gdf.show(1, False) +``` + +``` ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +|id |geom | ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +|1 |GEOMETRYCOLLECTION (POLYGON ((0 0, 0 2, 2 2, 2 0, 0 0), (1 1, 1 1.5, 1.5 1.5, 1.5 1, 1 1)), POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0)), POINT (1 1), LINESTRING (0 0, 1 1, 2 2))| ++---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +```