# Spark DataFrames and Magellan

We advise the read of the [sql programming guide/sql](https://spark.apache.org/docs/2.1.1/sql-programming-guide.html#sql) and visit [Magellan repository](https://github.com/harsha2010/magellan).

In [1]:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

In [2]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

Waiting for a Spark session to start...

sqlContext = org.apache.spark.sql.SQLContext@4ae17bc0




org.apache.spark.sql.SQLContext@4ae17bc0

In [3]:
var dir_path = "hdfs:///user/emma/ecolidar/"
var offline_dir_path = "hdfs:///user/emma/ecolidar/"
var las_cell = "C_25EZ2"
var parquet_file = las_cell + ".parquet"

dir_path = hdfs:///user/emma/ecolidar/
offline_dir_path = hdfs:///user/emma/ecolidar/
las_cell = C_25EZ2
parquet_file = C_25EZ2.parquet


C_25EZ2.parquet

## Load DataFrame

The dataframe is loaded from a Parquet file.

In [4]:
val df = sqlContext.read.parquet(parquet_file)

df = [intensity: smallint, raw_classification: tinyint ... 3 more fields]


[intensity: smallint, raw_classification: tinyint ... 3 more fields]

In [5]:
df.head(10)

0,1,2,3,4
308,2,125360792,488733112,423
45,1,125358802,488732300,4814
357,2,125361146,488733277,446
40,1,125358863,488732341,5336
421,2,125361495,488733437,439
443,2,125361860,488733607,423
12,1,125359004,488732432,6407
357,2,125362169,488733752,538
383,2,125362538,488733923,525
356,1,125362867,488734078,622


## Create a Temporary table or view

It is possible to create a temporary table or view from a dataframe.

In [6]:
df.createOrReplaceTempView(las_cell + "_tab")

## Show existent tables

In [7]:
sqlContext.sql("show tables").show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
|        |c_25ez2_tab|       true|
+--------+-----------+-----------+



## Run a simple SQL query

In [8]:
val res = sqlContext.sql("select * from " + las_cell + "_tab where intensity = 277 and z = -444 and x = 125377487 limit 10")

res = [intensity: smallint, raw_classification: tinyint ... 3 more fields]


[intensity: smallint, raw_classification: tinyint ... 3 more fields]

In [9]:
res.show()

+---------+------------------+---------+---------+----+                         
|intensity|raw_classification|        x|        y|   z|
+---------+------------------+---------+---------+----+
|      277|                 9|125377487|487500057|-444|
+---------+------------------+---------+---------+----+



# Magellan

In [25]:
import magellan.{Point, Polygon}
import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.types._

## Data structures

In [16]:
//Only 2D points are supported.
val points = sc.parallelize(Seq((-1.0, -1.0), (-1.0, 1.0), (1.0, -1.0))).toDF("x", "y").select(point($"x", $"y").as("point"))

points.show()

|            point|
+-----------------+
|Point(-1.0, -1.0)|
| Point(-1.0, 1.0)|
| Point(1.0, -1.0)|
+-----------------+



points = [point: point]


[point: point]

## Create a list of points from a DataFrame

In [17]:
val lidar_points = df.select(point($"x", $"y").as("point"))

lidar_points.show()

+--------------------+
|               point|
+--------------------+
|Point(1.25360792E...|
|Point(1.25358802E...|
|Point(1.25361146E...|
|Point(1.25358863E...|
|Point(1.25361495E...|
|Point(1.2536186E8...|
|Point(1.25359004E...|
|Point(1.25362169E...|
|Point(1.25362538E...|
|Point(1.25362867E...|
|Point(1.25363318E...|
|Point(1.25363672E...|
|Point(1.25363609E...|
|Point(1.25364027E...|
|Point(1.25363769E...|
|Point(1.25364381E...|
|Point(1.25364341E...|
|Point(1.25364777E...|
|Point(1.25364499E...|
|Point(1.25365127E...|
+--------------------+
only showing top 20 rows



lidar_points = [point: point]


[point: point]

# Create Polygon

In [27]:
import magellan.{Point, Polygon}
case class PolygonRecord(polygon: Polygon)

defined class PolygonRecord


In [28]:
val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
val polygons = sc.parallelize(Seq(PolygonRecord(Polygon(Array(0), ring)))).toDF()

ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
polygons = [polygon: polygon]


[polygon: polygon]

# Within

In [30]:
lidar_points.join(polygons).where($"point" within $"polygon").show()

+-----+-------+                                                                 
|point|polygon|
+-----+-------+
+-----+-------+



# Intersects

In [31]:
lidar_points.join(polygons).where($"point" intersects $"polygon").show()

+-----+-------+                                                                 
|point|polygon|
+-----+-------+
+-----+-------+



# Contains

In [32]:
lidar_points.join(polygons).where($"point" >? $"polygon").show()

+-----+-------+                                                                 
|point|polygon|
+-----+-------+
+-----+-------+



# Indices and Joins

In [37]:
//To enable spatial joins in Magellan, add a spatial join rule to Spark by injecting the following code before the join:
var t0 : Long = 0
var t1 : Long = 0
magellan.Utils.injectRules(spark)

t0 = 0
t1 = 0


0

In [39]:
//Furthermore, during the join, you will need to provide Magellan a hint of the precision at which to create indices for the join
polygons = polygons.index(30)

Name: Compile Error
Message: <console>:65: error: reassignment to val
       polygons = polygons.index(30)
                ^

StackTrace: 

## Within using a indexed Join

In [41]:
t0 = System.nanoTime()
lidar_points.join(polygons).where($"point" within $"polygon").show()
t1 = System.nanoTime()
println("Elapsed time: " + ((t1 - t0) / 1000000) + " ms")

|point|polygon|
+-----+-------+
+-----+-------+

Elapsed time: 56119ms


t0 = 9300304969224
t1 = 9356424393008


9356424393008

In [42]:
t0 = System.nanoTime()
lidar_points.join(polygons).where($"point" within $"polygon").show()
t1 = System.nanoTime()
println("Elapsed time: " + ((t1 - t0) / 1000000) + " ms")

|point|polygon|
+-----+-------+
+-----+-------+

Elapsed time: 60003ms


t0 = 9361952487099
t1 = 9421956415493


9421956415493