-
Notifications
You must be signed in to change notification settings - Fork 654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SEDONA-30] Add raster data support in Sedona SQL #523
Changes from 27 commits
913ad3c
d29eaa2
42ce86a
54d189c
6153acd
1e9d54f
6b35f0b
d7ceb97
7d285e2
5c436dd
bcc72ca
57a44be
767bd80
44e3bea
7433c28
8d41782
c6c9b23
bf821cc
84c36c2
1c8e1a7
6630188
068ca92
9630d74
f5c6a00
53663cb
264b829
02afc7f
fa7d8b4
9aad497
7c67294
275dfa7
bcee4e5
2ee8363
0d28f47
2cd4e80
25bffd1
de420a0
ec3ccc2
931e2d7
47073ed
c0fb98e
93fc155
629d764
523a16a
56f6117
af5cb92
0218fb0
363f27b
8f51c26
84285a9
64e85cd
6d6a8af
8983548
4e5441f
7cc6584
80dc4ca
cfc8e57
996f3a3
2a80aab
bc2f8e2
10f98b6
1262428
908e7d6
2cb0a5a
2b0f7ee
6f066c2
9068d12
d4f4853
356aa26
0bd7f8d
85b0e2e
28f3262
a5a5c49
d3836c2
2e9e54c
32e7025
54ae903
ee4fb46
700fbd4
aae478b
b1e0496
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,4 +12,4 @@ | |
/site/ | ||
/.bloop/ | ||
/.metals/ | ||
/.vscode/ | ||
/.vscode/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,4 +2,5 @@ | |
bin | ||
/.settings | ||
/.classpath | ||
/.project | ||
/.project | ||
*.iml |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,7 +81,10 @@ object Catalog { | |
ST_IsRing, | ||
ST_FlipCoordinates, | ||
ST_LineSubstring, | ||
ST_LineInterpolatePoint | ||
ST_LineInterpolatePoint, | ||
ST_GeomFromRaster, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change the name to ST_GeomFromGeoTiff |
||
ST_DataframeFromRaster, | ||
ST_getBand | ||
) | ||
|
||
val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression | |
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
import org.apache.spark.sql.catalyst.util.GenericArrayData | ||
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT | ||
import org.apache.spark.sql.types.{DataType, Decimal} | ||
import org.apache.spark.sql.types._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this change is not necessary, do not commit this change. You can replace the "_" with the original content There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
import org.apache.spark.unsafe.types.UTF8String | ||
import org.locationtech.jts.geom.{Coordinate, GeometryFactory} | ||
|
||
|
@@ -303,5 +303,4 @@ trait UserDataGeneratator { | |
} | ||
return userData | ||
} | ||
} | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't commit any change to this file |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,14 +34,13 @@ import org.geotools.geometry.jts.JTS | |
import org.geotools.referencing.CRS | ||
import org.locationtech.jts.algorithm.MinimumBoundingCircle | ||
import org.locationtech.jts.geom.{PrecisionModel, _} | ||
import org.locationtech.jts.io.WKBWriter | ||
import org.locationtech.jts.linearref.LengthIndexedLine | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. If this change is not necessary, do not commit this change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't commit any change to this file |
||
import org.locationtech.jts.operation.IsSimpleOp | ||
import org.locationtech.jts.operation.buffer.BufferParameters | ||
import org.locationtech.jts.operation.linemerge.LineMerger | ||
import org.locationtech.jts.operation.valid.IsValidOp | ||
import org.locationtech.jts.precision.GeometryPrecisionReducer | ||
import org.locationtech.jts.simplify.TopologyPreservingSimplifier | ||
import org.locationtech.jts.linearref.LengthIndexedLine | ||
import org.opengis.referencing.operation.MathTransform | ||
|
||
import java.util | ||
|
@@ -1122,4 +1121,4 @@ case class ST_FlipCoordinates(inputExpressions: Seq[Expression]) | |
override def dataType: DataType = GeometryUDT | ||
|
||
override def children: Seq[Expression] = inputExpressions | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sedona_sql.expressions | ||
|
||
import org.apache.sedona.sql.utils.GeometrySerializer | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.Expression | ||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
import org.apache.spark.sql.catalyst.util.GenericArrayData | ||
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT | ||
import org.apache.spark.sql.sedona_sql.expressions.implicits.GeometryEnhancer | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.unsafe.types.UTF8String | ||
import org.geotools.coverage.grid.io.{AbstractGridFormat, GridCoverage2DReader, GridFormatFinder, OverviewPolicy} | ||
import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D} | ||
import org.geotools.gce.geotiff.GeoTiffReader | ||
import org.geotools.geometry.jts.JTS | ||
import org.geotools.referencing.CRS | ||
import org.geotools.util.factory.Hints | ||
import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory} | ||
import org.opengis.coverage.grid.{GridCoordinates, GridEnvelope} | ||
import org.opengis.parameter.{GeneralParameterValue, ParameterValue} | ||
|
||
import java.io.IOException | ||
import java.util | ||
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` | ||
|
||
// Fetches polygonal coordinates from a raster image | ||
|
||
case class ST_GeomFromRaster(inputExpressions: Seq[Expression]) | ||
extends Expression with CodegenFallback with UserDataGeneratator { | ||
override def nullable: Boolean = false | ||
|
||
override def eval(inputRow: InternalRow): Any = { | ||
// This is an expression which takes one input expressions | ||
assert(inputExpressions.length == 1) | ||
val geomString = inputExpressions(0).eval(inputRow).asInstanceOf[UTF8String].toString | ||
val geometry = readGeometry(geomString) | ||
new GenericArrayData(GeometrySerializer.serialize(geometry)) | ||
} | ||
|
||
private def readGeometry(url: String): Geometry = { | ||
|
||
val format = GridFormatFinder.findFormat(url) | ||
val hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true) | ||
val reader = format.getReader(url, hints) | ||
var coverage:GridCoverage2D = null | ||
|
||
try coverage = reader.read(null) | ||
catch { | ||
case giveUp: IOException => | ||
throw new RuntimeException(giveUp) | ||
} | ||
reader.dispose() | ||
val source = coverage.getCoordinateReferenceSystem | ||
val target = CRS.decode("EPSG:4326", true) | ||
jiayuasu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val targetCRS = CRS.findMathTransform(source, target) | ||
val gridRange2D = coverage.getGridGeometry.getGridRange | ||
val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1))) | ||
val polyCoordinates = new Array[Coordinate](5) | ||
var index = 0 | ||
|
||
for (point <- cords) { | ||
val coordinate2D = new GridCoordinates2D(point(0), point(1)) | ||
val result = coverage.getGridGeometry.gridToWorld(coordinate2D) | ||
polyCoordinates({ | ||
index += 1; index - 1 | ||
}) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1)) | ||
} | ||
|
||
polyCoordinates(index) = polyCoordinates(0) | ||
val factory = new GeometryFactory | ||
val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS) | ||
|
||
polygon | ||
|
||
} | ||
override def dataType: DataType = GeometryUDT | ||
|
||
override def children: Seq[Expression] = inputExpressions | ||
} | ||
|
||
|
||
// Constructs a raster dataframe from a raster image which contains multiple columns such as Geometry, Band values etc | ||
case class ST_DataframeFromRaster(inputExpressions: Seq[Expression]) | ||
extends Expression with CodegenFallback with UserDataGeneratator { | ||
override def nullable: Boolean = false | ||
|
||
override def eval(inputRow: InternalRow): Any = { | ||
// This is an expression which takes one input expressions | ||
assert(inputExpressions.length == 2) | ||
val geomString = inputExpressions(0).eval(inputRow).asInstanceOf[UTF8String].toString | ||
val totalBands = inputExpressions(1).eval(inputRow).asInstanceOf[Int] | ||
val geometry = readGeometry(geomString) | ||
val bandvalues = getBands(geomString, totalBands).toArray | ||
returnValue(geometry.toGenericArrayData,bandvalues, 2) | ||
} | ||
|
||
private def readGeometry(url: String): Geometry = { | ||
jiayuasu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
val format = GridFormatFinder.findFormat(url) | ||
val hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true) | ||
val reader = format.getReader(url, hints) | ||
var coverage: GridCoverage2D = null | ||
|
||
try coverage = reader.read(null) | ||
catch { | ||
case giveUp: IOException => | ||
throw new RuntimeException(giveUp) | ||
} | ||
reader.dispose() | ||
val source = coverage.getCoordinateReferenceSystem | ||
val target = CRS.decode("EPSG:4326", true) | ||
val targetCRS = CRS.findMathTransform(source, target) | ||
jiayuasu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val gridRange2D = coverage.getGridGeometry.getGridRange | ||
val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1))) | ||
val polyCoordinates = new Array[Coordinate](5) | ||
var index = 0 | ||
|
||
for (point <- cords) { | ||
val coordinate2D = new GridCoordinates2D(point(0), point(1)) | ||
val result = coverage.getGridGeometry.gridToWorld(coordinate2D) | ||
polyCoordinates({ | ||
index += 1; | ||
index - 1 | ||
}) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1)) | ||
} | ||
|
||
polyCoordinates(index) = polyCoordinates(0) | ||
val factory = new GeometryFactory | ||
val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS) | ||
|
||
polygon | ||
} | ||
|
||
private def getBands(url: String, bands:Int): List[Double] = { | ||
val policy: ParameterValue[OverviewPolicy] = AbstractGridFormat.OVERVIEW_POLICY.createValue | ||
policy.setValue(OverviewPolicy.IGNORE) | ||
|
||
val gridsize: ParameterValue[String] = AbstractGridFormat.SUGGESTED_TILE_SIZE.createValue | ||
|
||
val useJaiRead: ParameterValue[Boolean] = AbstractGridFormat.USE_JAI_IMAGEREAD.createValue.asInstanceOf[ParameterValue[Boolean]] | ||
useJaiRead.setValue(true) | ||
|
||
|
||
val reader: GridCoverage2DReader = new GeoTiffReader(url) | ||
val coverage: GridCoverage2D = reader.read(Array[GeneralParameterValue](policy, gridsize, useJaiRead)) | ||
|
||
val dimensions: GridEnvelope = reader.getOriginalGridRange | ||
val maxDimensions: GridCoordinates = dimensions.getHigh | ||
val w: Int = maxDimensions.getCoordinateValue(0) + 1 | ||
val h: Int = maxDimensions.getCoordinateValue(1) + 1 | ||
val numBands: Int = bands | ||
|
||
val bandValues: util.List[util.List[Double]] = new util.ArrayList[util.List[Double]](numBands) | ||
|
||
for (i <- 0 until numBands) { | ||
bandValues.add(new util.ArrayList[Double]) | ||
} | ||
|
||
for (i <- 0 until w) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this loop and the next loop can be merged There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jiayuasu The current loop returns an array of size equal to the number of bands for every pixel and I use it to construct a 2D array of size number of bands * number of pixels while the second loop flattens this array. If I need to merge both the loops, I have to keep a separate starting index for every band which may vary. |
||
for (j <- 0 until h) { | ||
val vals: Array[Double] = new Array[Double](numBands) | ||
coverage.evaluate(new GridCoordinates2D(i, j), vals) | ||
var band: Int = 0 | ||
for (pixel <- vals) { | ||
bandValues.get({ | ||
band += 1; band - 1 | ||
}).add(pixel) | ||
} | ||
} | ||
} | ||
|
||
bandValues.flatten.toList | ||
|
||
} | ||
|
||
// Dynamic results based on number of columns and type of structure | ||
private def returnValue(geometry:GenericArrayData, bands:Array[Double], count:Int): InternalRow = { | ||
|
||
val genData = new Array[GenericArrayData](count) | ||
genData(0) = geometry | ||
genData(1) = new GenericArrayData(bands) | ||
val result = InternalRow(genData.toList : _*) | ||
result | ||
} | ||
|
||
// Dynamic Schema generation using Number of Bands | ||
private def getSchema():DataType = { | ||
val mySchema = StructType(Array(StructField("Polygon", GeometryUDT, false),StructField("bands", ArrayType(DoubleType)))) | ||
mySchema | ||
} | ||
|
||
override def dataType: DataType = getSchema() | ||
|
||
override def children: Seq[Expression] = inputExpressions | ||
} | ||
|
||
// get a particular band from a raster dataframe | ||
case class ST_getBand(inputExpressions: Seq[Expression]) | ||
jiayuasu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
extends Expression with CodegenFallback with UserDataGeneratator { | ||
override def nullable: Boolean = false | ||
|
||
override def eval(inputRow: InternalRow): Any = { | ||
// This is an expression which takes one input expressions | ||
assert(inputExpressions.length == 3) | ||
val bandInfo = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray() | ||
val targetBand = inputExpressions(1).eval(inputRow).asInstanceOf[Int] | ||
val totalBands = inputExpressions(2).eval(inputRow).asInstanceOf[Int] | ||
val result = gettargetband(bandInfo, targetBand, totalBands) | ||
new GenericArrayData(result) | ||
} | ||
|
||
// fetch target band from the given array of bands | ||
private def gettargetband(bandinfo: Array[Double], targetband:Int, totalbands:Int): Array[Double] = { | ||
val sizeOfBand = bandinfo.length/totalbands | ||
val lowerBound = (targetband - 1)*sizeOfBand | ||
val upperBound = targetband*sizeOfBand-1 | ||
assert(bandinfo.slice(lowerBound,upperBound).length + 1==sizeOfBand) | ||
bandinfo.slice(lowerBound, upperBound) | ||
|
||
} | ||
|
||
override def dataType: DataType = ArrayType(DoubleType) | ||
|
||
override def children: Seq[Expression] = inputExpressions | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move all geotools dependency to the parent pom. See here: https://github.com/apache/incubator-sedona/blob/master/pom.xml#L120
Make sure you use the geotools scope variable for the scope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed