Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

geospark geomesa interoperation #253

Open
geoHeil opened this issue Jul 11, 2018 · 12 comments

Comments

Projects
None yet
3 participants
@geoHeil
Copy link

commented Jul 11, 2018

As a data scientist I want to be able to mix and match spatial libraries for spark. Currently, it is rather XOR as they do not integrate with each other and have overlapping classes and UDF function names.

In particular I would want to be able to easily integrate geospark and geomesa

One possibility could be to write my own udf registrator: https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/main/scala/org/datasyslab/geosparksql/UDF/UdfRegistrator.scala

def registerAll(sparkSession: SparkSession): Unit = {
Catalog.expressions.foreach(f=>FunctionRegistry.builtin.registerFunction("geospark_"+f.getClass.getSimpleName.dropRight(1),f))
    Catalog.aggregateExpressions.foreach(f=>sparkSession.udf.register("geospark_"+f.getClass.getSimpleName,f))
  }

However this is still not handling overlapping classes (JTS, geotools)

@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 12, 2018

The suggested workaround seems to work partially.
When not renaming UDF, this is a fallback to geomesa's functions.

But when renaming also the UDF (I actually want to get the speedup of geospark) the functions do not seem to be properly registered

Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'geospark_ST_Point'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 12, 2018

An reproducible example can be found at https://github.com/geoHeil/geomesa-geospark

@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 17, 2018

I get the following problems:

clash of classes

18-07-17 21:36:03 WARN UDTRegistration: Cannot register UDT for com.vividsolutions.jts.geom.Geometry, which is already registered.

when changing the scope from compileOnly to compile and executing in IDEA. Execution via the fat jar from the build tool in a shell fails with a time out.

@jiayuasu

This comment has been minimized.

Copy link
Member

commented Jul 18, 2018

@geoHeil This is probably because GeoMesa also has its own customize Geometry kryo serializer which is same as GeoSpark. GeoSpark wrote a bunch of code to put spatial indexes and geometries into an array. Since we both utilize JTS geometry, this could be a conflict.

@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 18, 2018

See the latest updates to https://github.com/geoHeil/geomesa-geospark

one problems remains:

  • `18/07/18 21:13:33 WARN UDTRegistration: Cannot register UDT for com.vividsolutions.jts.geom.Geometry, which is already registered. How to fix this easily? Shading JTS & registrator does not seem to be a maintainble idea
  • understand why ordering is imortant and why if geomesa first and geospark second the error is:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.UnsafeArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow
  • query plans are impacted. Geospark optimizations are only used when not using it in conjunction with geomesa
make runGeosparkSolo

comparison

geospark & geomesa

regular join

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#120L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#124L])
      +- *Project
         +- BroadcastNestedLoopJoin BuildRight, Inner,  **org.apache.spark.sql.geosparksql.expressions.ST_Contains$**
            :- LocalTableScan [geom_polygons#72]
            +- BroadcastExchange IdentityBroadcastMode
               +- LocalTableScan [geom_points#60]

geospark solo

optimized range join

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#81L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#85L])
      +- *Project
         +- RangeJoin geom_polygons#43: geometry, geom_points#31: geometry, false
            :- LocalTableScan [geom_polygons#43]
            +- LocalTableScan [geom_points#31]
@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 18, 2018

@jiayuasu do you believe this conflict is causing the problem that spark resorts to regular, i.e. no longer optimized joins?

@jiayuasu

This comment has been minimized.

Copy link
Member

commented Jul 18, 2018

@geoHeil You probably can try to register GeoSpark join strategy manually: https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/main/scala/org/datasyslab/geosparksql/utils/GeoSparkSQLRegistrator.scala

In other words, add the following line:

sparkSession.experimental.extraStrategies = JoinQueryDetector :: Nil
@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 18, 2018

@jiayuasu thanks a lot. This is correct / was lacking from my registrator. Now optimized range joins are used as well.

Do you have an opinion regarding UDT registration / clashing class names? Or a better Idea than my own above with shading?

geospark

import com.vividsolutions.jts.geom.Geometry
import com.vividsolutions.jts.index.SpatialIndex
UDTRegistration.register(classOf[Geometry].getName, classOf[GeometryUDT].getName)
UDTRegistration.register(classOf[SpatialIndex].getName, classOf[IndexUDT].getName)

geomesa

import com.vividsolutions.jts.geom._
val typeMap: Map[Class[_], Class[_ <: UserDefinedType[_]]] = Map(
    classOf[Geometry]            -> classOf[GeometryUDT],
    classOf[Point]               -> classOf[PointUDT],
    classOf[LineString]          -> classOf[LineStringUDT],
    classOf[Polygon]             -> classOf[PolygonUDT],
    classOf[MultiPoint]          -> classOf[MultiPointUDT],
    classOf[MultiLineString]     -> classOf[MultiLineStringUDT],
    classOf[MultiPolygon]        -> classOf[MultiPolygonUDT],
    classOf[GeometryCollection]  -> classOf[GeometryCollectionUDT]
  )

  1. is there a good way to merge the registrations (prevent double registrations (or can these simply be ignored?
  2. is geospark adding some custom (https://github.com/jiayuasu/JTSplus) code under the namespace of com.vividsolutions.jts.* i.e. when the order is
    • geospark
    • geomesa
      no functionality is lost, any warning regarding double registration of the types could be ignored
@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 19, 2018

Geomesa will serialize using JTS

override def serialize(obj: T): InternalRow = {
    new GenericInternalRow(Array[Any](WKBUtils.write(obj)))
  }

  override def sqlType: DataType = StructType(Seq(
    StructField("wkb", DataTypes.BinaryType)
  ))
  override def deserialize(datum: Any): T = {
    val ir = datum.asInstanceOf[InternalRow]
    WKBUtils.read(ir.getBinary(0)).asInstanceOf[T]
  }

geospark using

def serialize(geometry: Geometry): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    val kryo = new Kryo()
    val geometrySerde = new GeometrySerde()
    val output = new Output(out)
    geometrySerde.write(kryo, output, geometry)
    output.close()
    return out.toByteArray
  }

  def deserialize(values: ArrayData): Geometry = {
    val in = new ByteArrayInputStream(values.toByteArray())
    val kryo = new Kryo()
    val geometrySerde = new GeometrySerde()
    val input = new Input(in)
    val geometry = geometrySerde.read(kryo, input, classOf[Geometry])
    input.close()
    return geometry.asInstanceOf[Geometry]
  }

is there any problem if JTS code (from geomesa) is serialized via the geospark serializer? Any problems regarding efficiency?

@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 19, 2018

According to James (from geomesa gitter chat)

One strategy that might work would be for GeoSpark and GeoMesa to agree on the classnames for UDT registrations
and then for end users to register ONE AND ONLY ONE set of the UDTs...
the UDFs could be based on those classnames, and there's a fight chance that'd let someone 'mix and match' (as well as combine UDFs between packages)

is there some interest from both projects to collaborate here?

@geoHeil

This comment has been minimized.

Copy link
Author

commented Jul 31, 2018

cannot resolve 'CAST(`hw_aggreagtion_area` AS ARRAY<TINYINT>)' due to data type mismatch: cannot cast org.apache.spark.sql.jts.PointUDT@449554e8 to org.apache.spark.sql.geosparksql.UDT.GeometryUDT@3f2c1eb5;

this is then the problem of clashing UDT. How could this be resolved (quickly)?

@jnh5y

This comment has been minimized.

Copy link

commented Jun 3, 2019

@geoHeil your blog post about this work is great!

@jiayuasu any thoughts integration points?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.