Skip to content

geotrellis/vectorpipe

Repository files navigation

VectorPipe

CircleCI SonaType Releases Codacy Badge

VectorPipe (VP) is a library for working with OpenStreetMap (OSM) vector data and writing geometries to vector tile layers. Powered by Geotrellis and Apache Spark.

OSM provides a wealth of data which has broad coverage and a deep history. This comes at the price of very large size which can make accessing the power of OSM difficult. VectorPipe can help by making OSM processing in Apache Spark possible, leveraging large computing clusters to churn through the large volume of, say, an OSM full history file.

For those cases where an application needs to process incoming changes, VP also provides streaming Spark DataSources for changesets, OsmChange files, and Augmented diffs generated by Overpass.

For ease of use, the output of VP imports is a Spark DataFrame containing columns of JTS Geometry objects, enabled by the user-defined types provided by GeoMesa. That package also provides functions for manipulating those geometries via Spark SQL directives.

The final important contribution is a set of functions for exporting geometries to vector tiles. This leans on the geotrellis-vectortile package.

Getting Started

Add the following to your build.sbt:

libraryDependencies += "com.azavea.geotrellis" %% "vectorpipe" % "2.2.0"

Note: VectorPipe releases for version 2.0.0+ are hosted on SonaType. If you need earlier releases, they can be found on Bintray. If using SBT for older releases, you will also need to include resolvers ++= Resolver.bintrayRepo("azavea", "maven") in your build.sbt.

With a REPL

The fastest way to get started with VectorPipe in a REPL is to invoke spark-shell:

spark-shell --packages com.azavea.geotrellis:vectorpipe_2.11:2.2.0

This will download the required components and set up a REPL with VectorPipe available. At which point, you may issue

// Make JTS types available to Spark
import org.locationtech.geomesa.spark.jts._
spark.withJTS

import vectorpipe._

and begin using the package.

A Note on Cluster Computing

Your local machine is probably insufficient for dealing with very large OSM files. We recommend the use of Amazon's Elastic Map Reduce (EMR) service to provision substantial clusters of computing resources. You'll want to supply Spark, Hive, and Hadoop to your cluster, with Spark version 2.3. Creating a cluster with EMR version between 5.13 and 5.19 should suffice. From there, ssh into the master node and run spark-shell as above for an interactive environment, or use spark-submit for batch jobs. (You may submit Steps to the EMR cluster using spark-submit as well.)

Importing Data

Batch analysis can be performed in a few different ways. Perhaps the fastest way is to procure an OSM PBF file from a source such as GeoFabrik, which supplies various extracts of OSM, including the full planet worth of data.

VectorPipe does not provide the means to directly read these OSM PBF files, however, and a conversion to a useful file format will thus be needed. We suggest using osm2orc to convert your source file to the ORC format which can be read natively via Spark:

val df = spark.read.orc(path)

The resulting DataFrame can be processed with VectorPipe.

It is also possible to read from a cache of OsmChange files directly rather than convert the PBF file:

import vectorpipe.sources.Source
val df = spark.read
              .format(Source.Changes)
              .options(Map[String, String](
                Source.BaseURI -> "https://download.geofabrik.de/europe/isle-of-man-updates/",
                Source.StartSequence -> "2080",
                Source.EndSequence -> "2174",
                Source.BatchSize -> "1"))
              .load
              .persist // recommended to avoid rereading

(Note that the start and end sequence will shift over time for Geofabrik. Please navigate to the base URI to determine these values, otherwise timeouts may occur.) This may issue errors, but should complete. This is much slower than using ORC files and is much touchier, but it stands as an option.

[It is also possible to build a dataframe from a stream of changesets in a similar manner as above. Changesets carry additional metadata regarding the author of the changes, but none of the geometric information. These tables can be joined on changeset.]

In either case, a useful place to start is to convert the incoming dataframe into a more usable format. We recommend calling

val geoms = OSM.toGeometry(df)

which will produce a frame consisting of "top-level" entities, which is to say nodes that don't participate in a way, ways that don't participate in relations, and a subset of the relations from the OSM data. The resulting dataframe will represent these entities with JTS geometries in the geom column.

The toGeometry function keeps elements that fit one of the following descriptions:

  • points from tagged nodes (including tags that really ought to be dropped—e.g. source=*);
  • polygons derived from ways with tags that cause them to be considered as areas;
  • lines from ways lacking area tags;
  • multipolygons from multipolygon or boundary relations; and
  • multilinestrings from route relations.

It is also possible to filter the results based on information in the tags. For instance, all buildings can be found as

import vectorpipe.functions.osm._
val buildings = geoms.filter(isBuilding('tags))

Again, the JTS user defined types allow for easier manipulation of and calculation from geometric types. See here for a list of functions that operate on geometries.

A Note on Geocoding

VectorPipe provides the means to tag geometries with the country codes of the countries they interact with, but it does not provide the boundaries used to do the coding. That gives the user the option to select geometries appropriate to the task at hand—low resolution geometries for less fussy applications, high resolution when precision is important.

In order for an application to make use of vectorpipe.util.Geocode, it must supply a countries.geojson in in the root of its project's resources directory. That GeoJSON file must contain a FeatureCollection, with each entry having an ADM0_A3 entry in its properties list.

One may employ the Natural Earth Admin 0 resource for low-precision tasks, or use something like the Global LSIB Polygons for more precise tasks (though the latter resource does not tag its elements with the ADM0_A3 three-letter codes, so some preprocessing would be required).

The internal package

While most users will rely solely on the features exposed by the OSM object, finer-grained control of the output of the process—say, if one does not need relations, for example—is available through the vectorpipe.internal package.

There is a significant caveat here: there are two schemas that are found in the system when working with imported OSM dataframes. The difference is in the type of a sub-field of the members list. This can cause errors of the form

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Byte

when using the internal package methods.

These type problems can be fixed by calling vectorpipe.functions.osm.ensureCompressedMembers on the input OSM data frame before passing to any relation-generating functions, such as reconstructRelationGeometries. Top-level functions in the OSM object handle this conversion for you. Note that this only affects the data frames carrying the initially imported OSM data.

Local Development

If you are intending to contribute to VectorPipe, you may need to work with a development version. If that is the case, instead of loading from Bintray, you will need to build a fat jar using

./sbt assembly

and following that,

spark-shell --jars target/scala_2.11/vectorpipe.jar

IntelliJ IDEA

When developing with IntelliJ IDEA, the sbt plugin will see Spark dependencies as provided, which will prevent them from being indexed properly, resulting in errors / warnings within the IDE. To fix this, create idea.sbt at the root of the project:

import Dependencies._

lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
  libraryDependencies ++= Seq(
    sparkSql % Compile
  )
)