Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't zip RDDs with unequal partitions #142

Closed
SiefSeif opened this issue Oct 24, 2017 · 5 comments
Closed

Can't zip RDDs with unequal partitions #142

SiefSeif opened this issue Oct 24, 2017 · 5 comments

Comments

@SiefSeif
Copy link

SiefSeif commented Oct 24, 2017

I'm trying to join PointRDD and PolygonRDD as testSpatialJoinQueryUsingIndex() in ScalaExample.scala
but I'm facing the error below in the JoinQuery line, i tried to change number of partitions but still can't pass the error. I'm using the CSVs datasets of Polygons and Points in the test folder of GeoSpark. contains 20k points, 13361 polygons. GeoSpark 0.8.2, Spark 1.6. could you tell me what's wrong? thank you.

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65) at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:526) at org.datasyslab.geospark.spatialPartitioning.DuplicatesHandler.removeDuplicatesGeometryByPolygon(DuplicatesHandler.java:32) at org.datasyslab.geospark.spatialOperator.JoinQuery.SpatialJoinQuery(JoinQuery.java:318) at GeoSpark$.main(GeoSpark.scala:42) at GeoSpark.main(GeoSpark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Here's my code

object GeoSpark {

  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf().setAppName("Hive SQL").setMaster("local[*]").set("spark.executor.memory","1g");
    val sc = new SparkContext(conf)

    val pointRDD = new PointRDD(sc,"/home/cloudera/Desktop/SmallTest/crs-test-point.csv",0,FileDataSplitter.CSV, true,10,StorageLevel.MEMORY_ONLY);

    val polygonRDD = new PolygonRDD(sc,"/home/cloudera/Desktop/SmallTest/crs-test-polygon.csv",FileDataSplitter.CSV, true,10,StorageLevel.MEMORY_ONLY);

    // set partitioning type
    pointRDD.spatialPartitioning(GridType.QUADTREE)
    polygonRDD.spatialPartitioning(GridType.QUADTREE)

//     build index
    pointRDD.buildIndex(IndexType.RTREE,true)

    pointRDD.indexedRDD.persist(StorageLevel.MEMORY_ONLY)
    polygonRDD.spatialPartitionedRDD.persist(StorageLevel.MEMORY_ONLY)


    val joinResult = JoinQuery.SpatialJoinQuery(pointRDD,polygonRDD,true, false).collect();

    println(joinResult)

  }
}
@SiefSeif
Copy link
Author

SiefSeif commented Oct 24, 2017

I found the problem, the problem was in spatialPartitioning, the ScalaExample.scala test passes the partitioner of the PointsRDD to the PolygonRDD using getPartitioner() method, but somehow I can't find getPartitioner() in my PointRDD, so I passed GridTipe.QUADTREE again to the PolgyonRDD instead, and that was the mistake. The solution I made was passing partitionTree field of the pointRDD instead, as follows```

pointRDD.spatialPartitioning(GridType.QUADTREE)
polygonRDD.spatialPartitioning(pointRDD.partitionTree)

@jiayuasu
Copy link
Member

@SiefSeif We probably have some out-of-date API examples...We will fix it. Sorry about that. But I guess you have solved the problem?

@SiefSeif
Copy link
Author

SiefSeif commented Oct 26, 2017

Yes, I did. but is it the best way to do so?

@jiayuasu
Copy link
Member

Hi @SiefSeif , it has the best performance using "pointRDD.partitionTree". But it is supposed to be called using "getPartitioner()".

@SiefSeif
Copy link
Author

SiefSeif commented Nov 1, 2017

Yes, I made a workaround because I couldn't find getPartitioner() in PointRDD.

jiayuasu added a commit that referenced this issue Apr 18, 2024
* add ST_Polygonize

* add docs

* Fix python test

* Fix tests

* fix python test

* fix python test

* fix tests

* Update docs

* fix test

* fix test

* fix test

* fix test

* fix tests

* fix tests

* fix test

* fix test

* fix test

* fix test

* Fix test

* Fix typo

* Fix docs

---------

Co-authored-by: Jia Yu <jiayu@wherobots.com>
jiayuasu added a commit that referenced this issue Apr 18, 2024
* [TASK-19] Add ST_Polygonize (#142)

* add ST_Polygonize

* add docs

* Fix python test

* Fix tests

* fix python test

* fix python test

* fix tests

* Update docs

* fix test

* fix test

* fix test

* fix test

* fix tests

* fix tests

* fix test

* fix test

* fix test

* fix test

* Fix test

* Fix typo

* Fix docs

---------

Co-authored-by: Jia Yu <jiayu@wherobots.com>

* Update ST_Polygonize version to OSS Sedona 1.6.0

---------

Co-authored-by: Pranav Toggi <prantogg@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants