In [11]:
import $ivy.`org.apache.spark::spark-sql:2.4.0` 
import $ivy.`sh.almond::ammonite-spark:0.4.0`
import $ivy.`org.datasyslab:geospark:1.2.0`
import org.apache.log4j.{Level, Logger}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.datasyslab.geospark.spatialRDD.SpatialRDD
import org.datasyslab.geospark.enums.{GridType, IndexType}
import org.datasyslab.geospark.spatialOperator.JoinQuery
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
import com.vividsolutions.jts.geom.{GeometryFactory, Geometry}
import com.vividsolutions.jts.io.WKTReader
import scala.collection.JavaConverters._
import java.io._

Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = AmmoniteSparkSession.builder()
    .config("spark.serializer",classOf[KryoSerializer].getName)
    .master("local[*]").appName("Area_interpolate")
    .getOrCreate()

import spark.implicits._ 
val geofactory: GeometryFactory = new GeometryFactory()
val appID = spark.sparkContext.applicationId
val gridType = GridType.QUADTREE
val indexType = IndexType.QUADTREE
val partitions = 1

Creating SparkSession


[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.serializer.KryoSerializer
[39m
[32mimport [39m[36morg.apache.spark.storage.StorageLevel
[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.datasyslab.geospark.spatialRDD.SpatialRDD
[39m
[32mimport [39m[36morg.datasyslab.geospark.enums.{GridType, IndexType}
[39m
[32mimport [39m[36morg.datasyslab.geospark.spatialOperator.JoinQuery
[39m
[32mimport [39m[36morg.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
[39m
[32mimport [39m[36mcom.vividsolutions.jts.geom.{GeometryFactory, Geometry}
[39m
[32mimport [39m[36mcom.vividsolutions.jts.io.WKTReader
[39m
[32mimpo

In [12]:
def area_table(sourceRDD: SpatialRDD[Geometry], targetRDD: SpatialRDD[Geometry]): RDD[(Int, Int, Double)] = {
     // Doing spatial join...                                                                                                                                                                                                                                                   
     val considerBoundaryIntersection = true // Only return gemeotries fully covered by each query window in queryWindowRDD                                                                                                                                                     
     val buildOnSpatialPartitionedRDD = true // Set to TRUE only if run join query                                                                                                                                                                                              
     val usingIndex = true

     sourceRDD.analyze()
     sourceRDD.spatialPartitioning(gridType, partitions)
     targetRDD.spatialPartitioning(sourceRDD.getPartitioner)
     sourceRDD.buildIndex(indexType, buildOnSpatialPartitionedRDD)

     val joined = JoinQuery.SpatialJoinQuery(targetRDD, sourceRDD, usingIndex, considerBoundaryIntersection)
     val nJoined = joined.count()
     
     // Flattening join results...                                                                                                                                                                                                                                              
     val flattened = joined.rdd.flatMap{ pair =>
       val a = pair._1
       pair._2.asScala.map(b => (a, b))                                                                                                                                                                                                                                         
     }                                                                                                                                                                                                                                                                          
     val nFlattened = flattened.count()
     
     // Computing intersection area...                                                                                                                                                                                                                                          
     val areal = flattened.map{ pair =>
       val source_id  = pair._1.getUserData.toString().split("\t")(0).toInt
       val target_id  = pair._2.getUserData.toString().split("\t")(0).toInt
       val area = pair._1.intersection(pair._2).getArea
       (source_id, target_id, area)
     }                                                                                                                                                                                                                                                                          
     val nAreaTable = areal.count()
     
     areal
   }


defined [32mfunction[39m [36marea_table[39m

In [13]:
import spark.implicits._ 
def area_interpolate(spark: SparkSession, sourceRDD: SpatialRDD[Geometry], targetRDD: SpatialRDD[Geometry], extensive_variables: List[String]): Unit = {
     val areas = area_table(sourceRDD, targetRDD).toDF("SID", "TID", "area")
     areas.show(truncate = false)

     val extensiveAttributes = sourceRDD.rawSpatialRDD.rdd.map{ s =>                                                                                                                                                                                                                                                         
       val attr = s.getUserData().toString().split("\t")                                                                                                                                                                                                                                                                     
       val id = attr(0).toInt                                                                                                                                                                                                                                                                                                
       val tarea = s.getArea()                                                                                                                                                                                                                                                                                               
       val population = attr(1).toInt                                                                                                                                                                                                                                                                                        
       val income = attr(3).toDouble                                                                                                                                                                                                                                                                                         
       (id, tarea, population, income)                                                                                                                                                                                                                                                                                       
     }.toDF("ID", "tarea", "population", "income")

     val table_extensive = areas.join(extensiveAttributes, $"SID" === $"ID")
       .withColumn("tpopulation", $"area" / $"tarea" * $"population")
       .withColumn("tincome", $"area" / $"tarea" * $"income")

     table_extensive.orderBy($"SID").show(truncate = false)

     val target_extensive = table_extensive.select("TID", "tpopulation", "tincome")
       .groupBy($"TID")
       .agg(
         sum($"tpopulation").as("population"),
         sum($"tincome").as("income")
       )

     target_extensive.orderBy($"TID").show(truncate = false)

     val intensiveAttributes = sourceRDD.rawSpatialRDD.rdd.map{ s =>                                                                                                                                                                                                                                                         
       val attr = s.getUserData().toString().split("\t")                                                                                                                                                                                                                                                                     
       val id = attr(0).toInt                                                                                                                                                                                                                                                                                                
       val pci = attr(2).toDouble                                                                                                                                                                                                                                                                                            
       (id, pci)                                                                                                                                                                                                                                                                                                             
     }.toDF("IDS", "pci")
     val targetAreas = targetRDD.rawSpatialRDD.rdd.map{ t =>                                                                                                                                                                                                                                                                 
       val attr = t.getUserData().toString().split("\t")                                                                                                                                                                                                                                                                     
       val id = attr(0).toInt                                                                                                                                                                                                                                                                                                
       val tarea = t.getArea()                                                                                                                                                                                                                                                                                               
       (id, tarea)                                                                                                                                                                                                                                                                                                           
     }.toDF("IDT", "tarea")

     val table_intensive = areas.join(targetAreas, $"TID" === $"IDT", "left_outer")
       .join(intensiveAttributes, $"SID" === $"IDS", "left_outer")
       .withColumn("tpci", $"area" / $"tarea" * $"pci")

     table_intensive.orderBy($"TID").show(truncate = false)

     val target_intensive = table_intensive.select("TID", "tpci")
       .groupBy($"TID")
       .agg(
         sum($"tpci").as("pci")
       )

     target_intensive.orderBy($"TID").show(truncate = false)

   }

[32mimport [39m[36mspark.implicits._ 
[39m
defined [32mfunction[39m [36marea_interpolate[39m

In [15]:
     // Reading source...                                                                                                                                                                                                                                                                                                    
     val source = "/home/acald013/RIDIR/Datasets/A.wkt"
     val sourceRDD = new SpatialRDD[Geometry]()
     val sourceWKT = spark.read.option("header", "false").option("delimiter", "\t").
       csv(source).rdd.
       map{ s =>
         val geom = new WKTReader(geofactory).read(s.getString(0))
         val id = s.getString(1)
         val population = s.getString(2).toInt
         val pci = s.getString(3).toDouble
         val income = population * pci
         val userData = s"$id\t$population\t$pci\t$income"
         geom.setUserData(userData)
         geom
       }                                                                                                                                                                                                                                                                                                                     
     sourceRDD.setRawSpatialRDD(sourceWKT)
     val nSourceRDD = sourceRDD.rawSpatialRDD.rdd.count()

[36msource[39m: [32mString[39m = [32m"/home/acald013/RIDIR/Datasets/A.wkt"[39m
[36msourceRDD[39m: [32mSpatialRDD[39m[[32mGeometry[39m] = org.datasyslab.geospark.spatialRDD.SpatialRDD@22f9fc50
[36msourceWKT[39m: [32mRDD[39m[[32mGeometry[39m] = MapPartitionsRDD[27] at map at cmd14.sc:5
[36mnSourceRDD[39m: [32mLong[39m = [32m2L[39m

In [16]:
     // Reading target...     
    val target = "/home/acald013/RIDIR/Datasets/B.wkt"
    val targetRDD = new SpatialRDD[Geometry]()
    val targetWKT = spark.read.option("header", "false").option("delimiter", "\t").
       csv(target).rdd.
       map{ s =>
         val geom = new WKTReader(geofactory).read(s.getString(0))
         val id = s.getString(1)
         geom.setUserData(id)
         geom
       }                                                                                                                                                                                                                                                                                                                     
     targetRDD.setRawSpatialRDD(targetWKT)
     val nTargetRDD = targetRDD.rawSpatialRDD.rdd.count()

[36mtarget[39m: [32mString[39m = [32m"/home/acald013/RIDIR/Datasets/B.wkt"[39m
[36mtargetRDD[39m: [32mSpatialRDD[39m[[32mGeometry[39m] = org.datasyslab.geospark.spatialRDD.SpatialRDD@263604e
[36mtargetWKT[39m: [32mRDD[39m[[32mGeometry[39m] = MapPartitionsRDD[41] at map at cmd15.sc:5
[36mnTargetRDD[39m: [32mLong[39m = [32m3L[39m

In [None]:
     // Calling area_table method...                                                                                                                                                                                                                                                                                         
     val extensive = List("population", "income")
     area_interpolate(spark, sourceRDD, targetRDD, extensive)

+---+---+----+
|SID|TID|area|
+---+---+----+
|1  |2  |25.0|
|1  |1  |25.0|
|2  |1  |10.0|
|2  |2  |25.0|
|2  |3  |15.0|
+---+---+----+

+---+---+----+---+-----+----------+-------+-----------+-------+
|SID|TID|area|ID |tarea|population|income |tpopulation|tincome|
+---+---+----+---+-----+----------+-------+-----------+-------+
|1  |2  |25.0|1  |50.0 |500       |37500.0|250.0      |18750.0|
|1  |1  |25.0|1  |50.0 |500       |37500.0|250.0      |18750.0|
|2  |1  |10.0|2  |50.0 |200       |20000.0|40.0       |4000.0 |
|2  |2  |25.0|2  |50.0 |200       |20000.0|100.0      |10000.0|
|2  |3  |15.0|2  |50.0 |200       |20000.0|60.0       |6000.0 |
+---+---+----+---+-----+----------+-------+-----------+-------+

