In [17]:
sc

res10: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2f097a1b


### 1. read points 

In [18]:
// Load one dataset 
val pointsPath="hdfs:////user/ds503/project3/Points.csv"
val points=sc.textFile(pointsPath)

pointsPath: String = hdfs:////user/ds503/project3/Points.csv
points: org.apache.spark.rdd.RDD[String] = hdfs:////user/ds503/project3/Points.csv MapPartitionsRDD[42] at textFile at <console>:29


### 2. define two helper function

In [19]:
// Input point x, y
// Output cell ID
val getCellID: (Int, Int) => Int = {
    case (x, y) => {
        val cell_x = ((x-1) / 20) + 1 
        val cell_y = ((y-1) / 20) + 1 
        cell_x + (500 - cell_y) * 500

    }
}

getCellID: (Int, Int) => Int = <function2>


In [20]:
// Input cell ID
// Output cell neighbour ID array
val getNeighbour: Int => Array[Int] = c =>  {
    
    val upLeft = c - 501
    val up = c - 500
    val upRight = c - 499

    val left = c - 1
    val right = c + 1

    val lowLeft = c + 499
    val low = c + 500
    val lowRight = c + 501
    
    if(c % 500 == 1) {
        Array(up, upRight, right, low, lowRight).filter(x => {x >= 1 & x <= 250000})
    } else if(c % 500 == 0){
        Array(upLeft, up, left, lowLeft, low).filter(x => {x >= 1 & x <= 250000})
    } else{
        Array(upLeft, up, upRight, left, right, lowLeft, low, lowRight).filter(x => {x >= 1 & x <= 250000})
    }

}

getNeighbour: Int => Array[Int] = <function1>


### 3. parse point
from string to tuple (x, y)

In [21]:
val pointsXY = points.map(x => {
    val field = x.split(",")
    (field(0).toInt, field(1).toInt)
})
pointsXY.take(2)

pointsXY: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[43] at map at <console>:28
res11: Array[(Int, Int)] = Array((2535,8149), (155,6776))


### 4. compute cell points count 
(cellID, pointCount)

In [22]:
val cellPointsCount = pointsXY.map(x=>{
    (getCellID(x._1, x._2), 1)
}).reduceByKey((x, y) => x + y)
cellPointsCount.take(5)

2019-03-30 00:02:55 WARN  Executor:66 - Managed memory leak detected; size = 5243080 bytes, TID = 55


cellPointsCount: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[45] at reduceByKey at <console>:32
res12: Array[(Int, Int)] = Array((185012,46), (65722,39), (186574,67), (108150,51), (129434,47))


### 5. compute cell neighbour count
(cellID, neighbourCount)

In [23]:
val cellNeigCount = cellPointsCount.map(x => {
    val cellID = x._1
    val cellNeigCount = getNeighbour(cellID).length
    (cellID, cellNeigCount)
})
cellNeigCount.take(5)

2019-03-30 00:02:55 WARN  Executor:66 - Managed memory leak detected; size = 5243080 bytes, TID = 56


cellNeigCount: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[46] at map at <console>:30
res13: Array[(Int, Int)] = Array((185012,8), (65722,8), (186574,8), (108150,8), (129434,8))


### 6. compute cell neighbour point avg count
(cellID, neighbourAvgPointCount)

In [24]:
val cellNeigFlatCount = cellPointsCount.flatMap(x=>{
    val cellID = x._1
    val cellNeigID = getNeighbour(cellID)
    for(oneNeig <- cellNeigID)
        yield (oneNeig, cellID)
}).leftOuterJoin(cellPointsCount)
cellNeigFlatCount.take(10)

2019-03-30 00:02:58 WARN  Executor:66 - Managed memory leak detected; size = 45759468 bytes, TID = 59


cellNeigFlatCount: org.apache.spark.rdd.RDD[(Int, (Int, Option[Int]))] = MapPartitionsRDD[50] at leftOuterJoin at <console>:35
res14: Array[(Int, (Int, Option[Int]))] = Array((185012,(184512,Some(46))), (185012,(185512,Some(46))), (185012,(185013,Some(46))), (185012,(185511,Some(46))), (185012,(185011,Some(46))), (185012,(184513,Some(46))), (185012,(184511,Some(46))), (185012,(185513,Some(46))), (65722,(66222,Some(39))), (65722,(65222,Some(39))))


In [25]:
// (cellID, neighbourPointCount)
val cellNeigPointCount = cellNeigFlatCount.map(x => {
    val cellID = x._2._1
//     val cellNeigPointCount = if(x._2._2 == None) 0 else x._2._2.get
    val cellNeigPointCount = if(x._2._2 == None) 0 else x._2._2.get
    (cellID, cellNeigPointCount)
}).reduceByKey((x, y) => (x + y))
cellNeigPointCount.take(5)

2019-03-30 00:03:01 WARN  Executor:66 - Managed memory leak detected; size = 5243780 bytes, TID = 62


cellNeigPointCount: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[52] at reduceByKey at <console>:34
res15: Array[(Int, Int)] = Array((185012,316), (65722,366), (186574,355), (108150,348), (129434,347))


In [26]:
val cellNeigAvgCount = cellNeigPointCount.join(cellNeigCount)
                                         .map(x => (x._1, (x._2._1.toDouble/x._2._2.toDouble)))
cellNeigAvgCount.take(5)


2019-03-30 00:03:02 WARN  Executor:66 - Managed memory leak detected; size = 15748364 bytes, TID = 65


cellNeigAvgCount: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[56] at map at <console>:31
res16: Array[(Int, Double)] = Array((185012,39.5), (65722,45.75), (186574,44.375), (108150,43.5), (129434,43.375))


### 7. compute cell density

In [27]:
val cellDensity = cellPointsCount.join(cellNeigAvgCount)
                                 .map(x => (x._1, (x._2._1.toDouble/x._2._2.toDouble)))
                                 .sortBy(-_._2)


cellDensity: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[65] at sortBy at <console>:30


In [28]:
cellDensity.take(10)

res17: Array[(Int, Double)] = Array((44947,2.032154340836013), (68303,1.8313253012048192), (46501,1.8206521739130437), (241132,1.7969230769230768), (104811,1.7655172413793103), (6457,1.7610062893081762), (55493,1.7476923076923077), (226717,1.741046831955923), (84024,1.7379310344827585), (142898,1.732484076433121))


### 8. Top10 Density 

In [29]:
val top10 = sc.parallelize(cellDensity.take(10))

top10: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[66] at parallelize at <console>:27


In [30]:
top10.sortBy(-_._2).foreach(println)

(241132,1.7969230769230768)
(84024,1.7379310344827585)
(104811,1.7655172413793103)
(142898,1.732484076433121)
(44947,2.032154340836013)
(68303,1.8313253012048192)
(46501,1.8206521739130437)
(6457,1.7610062893081762)
(55493,1.7476923076923077)
(226717,1.741046831955923)


### 9. TOP k grid cells w.r.t their Relative-Density Scores
(cellID, cellDensity, numOfNeighbours, neighbourID, neighbourDensity)

In [31]:
val rdd = top10.flatMap(x => {
    val cellNeig = getNeighbour(x._1)
    for(oneNeig <- cellNeig)
        yield (oneNeig, (x._1, x._2, cellNeig.length))
}).leftOuterJoin(cellDensity).map{
    case (neigID, ((cellID, cellDensity, neigCount), neigDensity)) => {
        (cellID, cellDensity, neigCount, neigID, neigDensity.get)
    }
}.sortBy(_._1)

rdd: org.apache.spark.rdd.RDD[(Int, Double, Int, Int, Double)] = MapPartitionsRDD[81] at sortBy at <console>:38


In [32]:
rdd.collect().foreach(println)

(6457,1.7610062893081762,8,5956,0.8465608465608465)
(6457,1.7610062893081762,8,6956,0.6736842105263158)
(6457,1.7610062893081762,8,6456,0.8467966573816156)
(6457,1.7610062893081762,8,6957,1.0386740331491713)
(6457,1.7610062893081762,8,5957,0.7466666666666667)
(6457,1.7610062893081762,8,5958,0.8541666666666666)
(6457,1.7610062893081762,8,6958,1.0162162162162163)
(6457,1.7610062893081762,8,6458,0.8085106382978723)
(44947,2.032154340836013,8,44448,0.6253369272237197)
(44947,2.032154340836013,8,44948,1.0823529411764705)
(44947,2.032154340836013,8,45448,0.6772486772486772)
(44947,2.032154340836013,8,44946,0.7411444141689373)
(44947,2.032154340836013,8,45446,0.9824561403508771)
(44947,2.032154340836013,8,44446,1.1685393258426966)
(44947,2.032154340836013,8,45447,0.7091412742382271)
(44947,2.032154340836013,8,44447,0.9859943977591037)
(46501,1.8206521739130437,5,47001,0.7296137339055794)
(46501,1.8206521739130437,5,46001,1.0267857142857144)
(46501,1.8206521739130437,5,46002,0.6067415730337079