Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin authored and echeipesh committed Jan 15, 2018
1 parent a102ca8 commit a2eaf2d
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 0 deletions.
19 changes: 19 additions & 0 deletions spark-pipeline/build.sbt
@@ -0,0 +1,19 @@
import Dependencies._

name := "geotrellis-spark-pipeline"
libraryDependencies ++= Seq(
"com.github.fge" % "json-schema-validator" % "2.2.6",
sparkCore % "provided",
scalatest % "test")

test in assembly := {}

assemblyMergeStrategy in assembly := {
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case "META-INF/MANIFEST.MF" => MergeStrategy.discard
case "META-INF\\MANIFEST.MF" => MergeStrategy.discard
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.discard
case "META-INF/ECLIPSEF.SF" => MergeStrategy.discard
case _ => MergeStrategy.first
}
@@ -0,0 +1,11 @@
package geotrellis.spark.pipeline

trait PipelineExpr {
def ~(other: PipelineExpr): PipelineConstructor = this :: other :: Nil

def ~(other: Option[PipelineExpr]): PipelineConstructor =
other.fold(this :: Nil)(o => this :: o :: Nil)

val `type`: String
}

@@ -0,0 +1,8 @@
package geotrellis.spark.pipeline

case class PipelineKeyIndexMethod(
`type`: String,
timeTag: Option[String] = None,
timeFormat: Option[String] = None,
temporalResolution: Option[Int] = None
)
75 changes: 75 additions & 0 deletions spark-pipeline/src/main/scala/geotrellis/spark/pipeline/Read.scala
@@ -0,0 +1,75 @@
package geotrellis.spark.pipeline

import java.net.URI

import geotrellis.proj4.CRS
import geotrellis.raster.CellGrid
import geotrellis.spark.Metadata
import geotrellis.util.Component
import geotrellis.vector.ProjectedExtent
import org.apache.spark.rdd.RDD

import scala.reflect._
import scala.reflect.runtime.universe._
import scala.util.Try

trait Read extends PipelineExpr {
val profile: String
val uri: String
val crs: Option[String]
val tag: Option[String]
val maxTileSize: Option[Int]
val partitions: Option[Int]
val clip: Boolean

def getURI = new URI(uri)
def getCRS = crs.map(c => Try(CRS.fromName(c)) getOrElse CRS.fromString(c))
def getTag = tag.getOrElse("default")

def eval[I, V]: RDD[(I, V)]
}

case class ReadHadoop(
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.hadoop"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}

case class ReadS3(
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.s3"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}

case class ReadFile(
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.file"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}
@@ -0,0 +1,48 @@
package geotrellis.spark.pipeline

trait Write extends PipelineExpr {
val name: String
val profile: String
val uri: String
val keyIndexMethod: PipelineKeyIndexMethod
}

case class ReindexHadoop(
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.hadoop"
)

case class ReindexS3(
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.s3"
)

case class ReindexAccumulo(
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.accumulo"
)

case class ReindexCassandra(
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.cassandra"
)

case class ReindexHBase(
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.hbase"
)
@@ -0,0 +1,58 @@
package geotrellis.spark.pipeline

import geotrellis.raster.CellSize
import geotrellis.spark.Metadata
import org.apache.spark.rdd.RDD

trait Transform extends PipelineExpr

/** Rename Inputs into groups */
case class TransformGroup(
tags: List[String],
tag: Option[String],
`type`: String = "transform.group"
) extends Transform {
def eval[I, V](rdds: List[RDD[(I, V)]]): List[RDD[(I, V)]] = null
}

/** Merge inputs into a single Multiband RDD */
case class TransformMerge(
tags: List[String],
tag: String,
`type`: String = "transform.merge"
) extends Transform {
def eval[I, V, V2](rdds: List[RDD[(I, V)]]): List[RDD[(I, V2)]] = null
}

case class TransformMap(
func: String, // function name
tag: Option[String] = None,
`type`: String = "transform.map"
) extends Transform {
def eval[I, V](rdd: RDD[(I, V)]): RDD[(I, V)] = null
}

case class TransformBufferedReproject(
crs: String,
`type`: String = "transform.reproject.buffered"
) extends Transform {
def eval[I, K, V, M[_]](rdd: RDD[(I, V)]): RDD[(K, V)] with Metadata[M[K]] = null
}

case class TransformPerTileReproject(
crs: String,
`type`: String = "transform.reproject.per-tile"
) extends Transform {
def eval[K, V, M[_]](rdd: RDD[(K, V)] with Metadata[M[K]]): RDD[(K, V)] with Metadata[M[K]] = null
}

case class TransformTile(
resampleMethod: String = "nearest-neighbor", // nearest-neighbor | bilinear | cubic-convolution | cubic-spline | lanczos,
layoutScheme: String = "zoomed", // floating | zoomed
tileSize: Option[Int] = None,
cellSize: Option[CellSize] = None,
partitions: Option[Int] = None,
`type`: String = "transform.tile"
) extends Transform {

}
@@ -0,0 +1,64 @@
package geotrellis.spark.pipeline

trait Update extends PipelineExpr {
val name: String
val profile: String
val uri: String
val pyramid: Boolean // true | false
val maxZoom: Option[Int]
val `type`: String
}

case class UpdateFile(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.file"
) extends Update

case class UpdateHadoop(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.hadoop"
) extends Update

case class UpdateS3(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.s3"
) extends Update

case class UpdateAccumulo(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.accumulo"
) extends Update

case class UpdateCassandra(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.cassandra"
) extends Update

case class UpdateHBase(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
maxZoom: Option[Int] = None,
`type`: String = "update.hbase"
) extends Update
@@ -0,0 +1,71 @@
package geotrellis.spark.pipeline

import geotrellis.spark.TileLayerRDD

trait Write extends PipelineExpr {
val name: String
val profile: String
val uri: String
val pyramid: Boolean // true | false
val maxZoom: Option[Int]
val keyIndexMethod: PipelineKeyIndexMethod
}
case class WriteFile(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.file"
) extends Write

case class WriteHadoop(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.hadoop"
) extends Write

case class WriteS3(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.s3"
) extends Write

case class WriteAccumulo(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.accumulo"
) extends Write

case class WriteCassandra(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.cassandra"
) extends Write

case class WriteHBase(
name: String,
profile: String,
uri: String,
pyramid: Boolean,
keyIndexMethod: PipelineKeyIndexMethod,
maxZoom: Option[Int] = None,
`type`: String = "write.hbase"
) extends Write
@@ -0,0 +1,33 @@
package geotrellis.spark

import org.apache.spark.rdd.RDD

package object pipeline {
type PipelineConstructor = List[PipelineExpr]

implicit class PipelineMethods(pipeline: List[PipelineExpr]) {
lazy val (read, transform, write) = {
val map =
pipeline
.zipWithIndex
.groupBy(_._1.`type`.split("\\.").head)
.map { case (n, l) => n -> l.sortBy(_._2).map(_._1) }

(map.getOrElse("read", Nil), map.getOrElse("transform", Nil), map.getOrElse("write", Nil))
}

def eval[I, K, V, M[_]] = {
val reads: Map[String, RDD[(I, V)]] =
read.map { case r: Read => r.getTag -> r.eval[I, V] }.toMap

// first group operations
val group = transform.collect { case t: TransformGroup => t }
val merge = transform.collect { case t: TransformMerge => t }

group.map { t =>
val r = t.tags.flatMap(reads.get(_))
}

}
}
}

0 comments on commit a2eaf2d

Please sign in to comment.