Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ExpressionInfo {
"collection_funcs", "predicate_funcs", "conditional_funcs", "conversion_funcs",
"csv_funcs", "datetime_funcs", "generator_funcs", "hash_funcs", "json_funcs",
"lambda_funcs", "map_funcs", "math_funcs", "misc_funcs", "string_funcs", "struct_funcs",
"window_funcs", "xml_funcs", "table_funcs", "url_funcs", "variant_funcs"));
"window_funcs", "xml_funcs", "table_funcs", "url_funcs", "variant_funcs", "st_funcs"));

private static final Set<String> validSources =
new HashSet<>(Arrays.asList("built-in", "hive", "python_udf", "scala_udf", "sql_udf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public Geography copy() {

// Returns a Geography object with the specified SRID value by parsing the input WKB.
public static Geography fromWkb(byte[] wkb, int srid) {
throw new UnsupportedOperationException("Geography WKB parsing is not yet supported.");
byte[] bytes = new byte[HEADER_SIZE + wkb.length];
ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
return fromBytes(bytes);
}

// Overload for the WKB reader where we use the default SRID for Geography.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public Geometry copy() {

// Returns a Geometry object with the specified SRID value by parsing the input WKB.
public static Geometry fromWkb(byte[] wkb, int srid) {
throw new UnsupportedOperationException("Geometry WKB parsing is not yet supported.");
byte[] bytes = new byte[HEADER_SIZE + wkb.length];
ByteBuffer.wrap(bytes).order(DEFAULT_ENDIANNESS).putInt(srid);
System.arraycopy(wkb, 0, bytes, WKB_OFFSET, wkb.length);
return fromBytes(bytes);
}

// Overload for the WKB reader where we use the default SRID for Geometry.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util;

import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

// This class defines static methods that used to implement ST expressions using `StaticInvoke`.
public final class STUtils {

/** Conversion methods from physical values to Geography/Geometry objects. */

// Converts a GEOGRAPHY from its physical value to the corresponding `Geography` object
static Geography fromPhysVal(GeographyVal value) {
return Geography.fromBytes(value.getBytes());
}

// Converts a GEOMETRY from its physical value to the corresponding `Geometry` object
static Geometry fromPhysVal(GeometryVal value) {
return Geometry.fromBytes(value.getBytes());
}

/** Conversion methods from Geography/Geometry objects to physical values. */

// Converts a `Geography` object to the corresponding GEOGRAPHY physical value.
static GeographyVal toPhysVal(Geography g) {
return g.getValue();
}

// Converts a `Geometry` object to the corresponding GEOMETRY physical value.
static GeometryVal toPhysVal(Geometry g) {
return g.getValue();
}

/** Methods for implementing ST expressions. */

// ST_AsBinary
public static byte[] stAsBinary(GeographyVal geo) {
return fromPhysVal(geo).toWkb();
}

public static byte[] stAsBinary(GeometryVal geo) {
return fromPhysVal(geo).toWkb();
}

// ST_GeogFromWKB
public static GeographyVal stGeogFromWKB(byte[] wkb) {
return toPhysVal(Geography.fromWkb(wkb));
}

// ST_GeomFromWKB
public static GeometryVal stGeomFromWKB(byte[] wkb) {
return toPhysVal(Geometry.fromWkb(wkb));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,11 @@ object FunctionRegistry {
expression[SchemaOfVariantAgg]("schema_of_variant_agg"),
expression[ToVariantObject]("to_variant_object"),

// Spatial
expression[ST_AsBinary]("st_asbinary"),
expression[ST_GeogFromWKB]("st_geogfromwkb"),
expression[ST_GeomFromWKB]("st_geomfromwkb"),

// cast
expression[Cast]("cast"),
// Cast aliases (SPARK-16730)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ object Literal {
case PhysicalNullType => true
case PhysicalShortType => v.isInstanceOf[Short]
case _: PhysicalStringType => v.isInstanceOf[UTF8String]
case _: PhysicalGeographyType => v.isInstanceOf[GeographyVal]
case _: PhysicalGeometryType => v.isInstanceOf[GeometryVal]
case PhysicalVariantType => v.isInstanceOf[VariantVal]
case st: PhysicalStructType =>
v.isInstanceOf[InternalRow] && {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.trees._
import org.apache.spark.sql.catalyst.util.{Geography, Geometry, STUtils}
import org.apache.spark.sql.types._


////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines expressions for geospatial operations.
////////////////////////////////////////////////////////////////////////////////////////////////////


// Useful constants for ST expressions.
private[sql] object ExpressionDefaults {
val DEFAULT_GEOGRAPHY_SRID: Int = Geography.DEFAULT_SRID
val DEFAULT_GEOMETRY_SRID: Int = Geometry.DEFAULT_SRID
}

/** ST writer expressions. */

/**
* Returns the input GEOGRAPHY or GEOMETRY value in WKB format.
* See https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry#Well-known_binary
* for more details on the WKB format.
*/
@ExpressionDescription(
usage = "_FUNC_(geo) - Returns the geospatial value (value of type GEOGRAPHY or GEOMETRY) "
+ "in WKB format.",
arguments = """
Arguments:
* geo - A geospatial value, either a GEOGRAPHY or a GEOMETRY.
""",
examples = """
Examples:
> SELECT hex(_FUNC_(st_geogfromwkb(X'0101000000000000000000F03F0000000000000040')));
0101000000000000000000F03F0000000000000040
> SELECT hex(_FUNC_(st_geomfromwkb(X'0101000000000000000000F03F0000000000000040')));
0101000000000000000000F03F0000000000000040
""",
since = "4.1.0",
group = "st_funcs"
)
case class ST_AsBinary(geo: Expression)
extends RuntimeReplaceable
with ImplicitCastInputTypes
with UnaryLike[Expression] {

override def inputTypes: Seq[AbstractDataType] = Seq(
TypeCollection(GeographyType, GeometryType)
)

override lazy val replacement: Expression = StaticInvoke(
classOf[STUtils],
BinaryType,
"stAsBinary",
Seq(geo),
returnNullable = false
)

override def prettyName: String = "st_asbinary"

override def child: Expression = geo

override protected def withNewChildInternal(newChild: Expression): ST_AsBinary =
copy(geo = newChild)
}

/** ST reader expressions. */

/**
* Parses the WKB description of a geography and returns the corresponding GEOGRAPHY value. The SRID
* value of the returned GEOGRAPHY value is 4326.
* See https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry#Well-known_binary
* for more details on the WKB format.
*/
@ExpressionDescription(
usage = "_FUNC_(wkb) - Parses the WKB description of a geography and returns the corresponding "
+ "GEOGRAPHY value.",
arguments = """
Arguments:
* wkb - A BINARY value in WKB format, representing a GEOGRAPHY value.
""",
examples = """
Examples:
> SELECT hex(st_asbinary(_FUNC_(X'0101000000000000000000F03F0000000000000040')));
0101000000000000000000F03F0000000000000040
""",
since = "4.1.0",
group = "st_funcs"
)
case class ST_GeogFromWKB(wkb: Expression)
extends RuntimeReplaceable
with ImplicitCastInputTypes
with UnaryLike[Expression] {

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val replacement: Expression = StaticInvoke(
classOf[STUtils],
GeographyType(ExpressionDefaults.DEFAULT_GEOGRAPHY_SRID),
"stGeogFromWKB",
Seq(wkb),
returnNullable = false
)

override def prettyName: String = "st_geogfromwkb"

override def child: Expression = wkb

override protected def withNewChildInternal(newChild: Expression): ST_GeogFromWKB =
copy(wkb = newChild)
}

/**
* Parses the WKB description of a geometry and returns the corresponding GEOMETRY value. The SRID
* value of the returned GEOMETRY value is 0.
* See https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry#Well-known_binary
* for more details on the WKB format.
*/
@ExpressionDescription(
usage = "_FUNC_(wkb) - Parses the WKB description of a geometry and returns the corresponding "
+ "GEOMETRY value.",
arguments = """
Arguments:
* wkb - A BINARY value in WKB format, representing a GEOMETRY value.
""",
examples = """
Examples:
> SELECT hex(st_asbinary(_FUNC_(X'0101000000000000000000F03F0000000000000040')));
0101000000000000000000F03F0000000000000040
""",
since = "4.1.0",
group = "st_funcs"
)
case class ST_GeomFromWKB(wkb: Expression)
extends RuntimeReplaceable
with ImplicitCastInputTypes
with UnaryLike[Expression] {

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val replacement: Expression = StaticInvoke(
classOf[STUtils],
GeometryType(ExpressionDefaults.DEFAULT_GEOMETRY_SRID),
"stGeomFromWKB",
Seq(wkb),
returnNullable = false
)

override def prettyName: String = "st_geomfromwkb"

override def child: Expression = wkb

override protected def withNewChildInternal(newChild: Expression): ST_GeomFromWKB =
copy(wkb = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,25 @@ void testDefaultSrid() {
/** Tests for Geography WKB parsing. */

@Test
void testFromWkbWithSridUnsupported() {
void testFromWkbWithSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
() -> Geography.fromWkb(wkb, 0)
);
assertEquals("Geography WKB parsing is not yet supported.", exception.getMessage());
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
Geography geography = Geography.fromWkb(wkb, 4326);
assertNotNull(geography);
assertArrayEquals(wkb, geography.toWkb());
assertEquals(4326, geography.srid());
}

@Test
void testFromWkbNoSridUnsupported() {
void testFromWkbNoSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
() -> Geography.fromWkb(wkb)
);
assertEquals("Geography WKB parsing is not yet supported.", exception.getMessage());
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
Geography geography = Geography.fromWkb(wkb);
assertNotNull(geography);
assertArrayEquals(wkb, geography.toWkb());
assertEquals(4326, geography.srid());
}

/** Tests for Geography EWKB parsing. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,25 @@ void testDefaultSrid() {
/** Tests for Geometry WKB parsing. */

@Test
void testFromWkbWithSridUnsupported() {
void testFromWkbWithSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
() -> Geometry.fromWkb(wkb, 0)
);
assertEquals("Geometry WKB parsing is not yet supported.", exception.getMessage());
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
Geometry geometry = Geometry.fromWkb(wkb, 4326);
assertNotNull(geometry);
assertArrayEquals(wkb, geometry.toWkb());
assertEquals(4326, geometry.srid());
}

@Test
void testFromWkbNoSridUnsupported() {
void testFromWkbNoSridRudimentary() {
byte[] wkb = new byte[]{1, 2, 3};
UnsupportedOperationException exception = assertThrows(
UnsupportedOperationException.class,
() -> Geometry.fromWkb(wkb)
);
assertEquals("Geometry WKB parsing is not yet supported.", exception.getMessage());
// Note: This is a rudimentary WKB handling test; actual WKB parsing is not yet implemented.
// Once we implement the appropriate parsing logic, this test should be updated accordingly.
Geometry geometry = Geometry.fromWkb(wkb);
assertNotNull(geometry);
assertArrayEquals(wkb, geometry.toWkb());
assertEquals(0, geometry.srid());
}

/** Tests for Geometry EWKB parsing. */
Expand Down
Loading