Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TINKERPOP-1033: Store sideEffects as a persisted RDD #192

Merged
merged 15 commits into from Jan 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -26,6 +26,11 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

* It is possible to completely avoid using HDFS with Spark if `PersistedInputRDD` and `PersistedOutpuRDD` are leveraged.
* `InputRDD` and `OutputRDD` can now process both graphs and memory (i.e. sideEffects).
* Removed Groovy specific meta-programming overloads for handling Hadoop `FileSystem` (instead, its all accessible via `FileSystemStorage`).
* Added `FileSystemStorage` and `SparkContextStorage` which both implement the new `Storage` API.
* Added `Storage` to the gremlin-core io-package which providers can implement to allow conventional access to data sources (e.g. `ls()`, `rm()`, `cp()`, etc.).
* Bumped to Spark 1.5.2.
* Bumped to Groovy 2.4.5.
* Execute the `LifeCycle.beforeEval()` in the same thread that `eval()` is executed in for `GremlinExecutor`.
Expand Down
104 changes: 49 additions & 55 deletions docs/src/reference/implementations.asciidoc
Expand Up @@ -1213,30 +1213,8 @@ Using a Persisted Context

It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration.
Similarly, `PersistedInputRDD` is used with respective `gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.

There is a static `spark` object that can be used to manage persisted RDDs much like `hdfs` is used to manage HDFS files (see <<interacting-with-hdfs, Interacting with HDFS>>).

[gremlin-groovy]
----
spark.create('local[4]') // the SparkContext location (master)
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.spark.persistContext',true)
graph.configuration().setProperty('gremlin.spark.graphOutputRDD','org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD')
graph.configuration().setProperty('gremlin.hadoop.outputLocation','pageRankGraph')
graph.compute(SparkGraphComputer).program(PageRankVertexProgram.build().create()).submit().get()
spark.ls()
graph.configuration().setProperty('gremlin.hadoop.outputLocation','peerPressureGraph')
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create()).submit().get()
spark.ls()
spark.rm('pageRankGraph')
spark.head('peerPressureGraph')
spark.describe('peerPressureGraph')
spark.rm('peerPressureGraph')
spark.ls()
spark.close()
----
The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration. Similarly, `PersistedInputRDD` is used with respective
`gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.

When using a persistent `SparkContext` the configuration used by the original Spark Configuration will be inherited by all threaded
references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
Expand All @@ -1247,6 +1225,8 @@ references to that Spark Context. The exception to this rule are those propertie
. spark.job.interruptOnCancel
. spark.scheduler.pool

Finally, there is a `spark` object that can be used to manage persisted RDDs (see <<interacting-with-spark, Interacting with Spark>>).

Loading with BulkLoaderVertexProgram
++++++++++++++++++++++++++++++++++++

Expand All @@ -1256,7 +1236,7 @@ Grateful Dead graph from HadoopGraph into TinkerGraph over Spark:

[gremlin-groovy]
----
hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
Expand All @@ -1279,10 +1259,8 @@ graph.close()
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=data/grateful-dead.kryo
gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

#
Expand Down Expand Up @@ -1385,7 +1363,7 @@ the Grateful Dead graph from HadoopGraph into TinkerGraph over Giraph:

[gremlin-groovy]
----
hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
Expand All @@ -1409,10 +1387,8 @@ graph.close()
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=data/grateful-dead.kryo
gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

#
Expand Down Expand Up @@ -1477,12 +1453,14 @@ simple (easy to create and parse).
The data below represents an adjacency list representation of the classic TinkerGraph toy graph in GraphSON format.

[source,json]
----
{"id":1,"label":"person","outE":{"created":[{"id":9,"inV":3,"properties":{"weight":0.4}}],"knows":[{"id":7,"inV":2,"properties":{"weight":0.5}},{"id":8,"inV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":0,"value":"marko"}],"age":[{"id":1,"value":29}]}}
{"id":2,"label":"person","inE":{"knows":[{"id":7,"outV":1,"properties":{"weight":0.5}}]},"properties":{"name":[{"id":2,"value":"vadas"}],"age":[{"id":3,"value":27}]}}
{"id":3,"label":"software","inE":{"created":[{"id":9,"outV":1,"properties":{"weight":0.4}},{"id":11,"outV":4,"properties":{"weight":0.4}},{"id":12,"outV":6,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":4,"value":"lop"}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","inE":{"knows":[{"id":8,"outV":1,"properties":{"weight":1.0}}]},"outE":{"created":[{"id":10,"inV":5,"properties":{"weight":1.0}},{"id":11,"inV":3,"properties":{"weight":0.4}}]},"properties":{"name":[{"id":6,"value":"josh"}],"age":[{"id":7,"value":32}]}}
{"id":5,"label":"software","inE":{"created":[{"id":10,"outV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":8,"value":"ripple"}],"lang":[{"id":9,"value":"java"}]}}
{"id":6,"label":"person","outE":{"created":[{"id":12,"inV":3,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":10,"value":"peter"}],"age":[{"id":11,"value":35}]}}
----

[[script-io-format]]
Script I/O Format
Expand Down Expand Up @@ -1575,42 +1553,58 @@ def stringify(vertex) {
return [v, outE].join('\t')
}



Storage Systems
~~~~~~~~~~~~~~~

Hadoop-Gremlin provides two implementations of the `Storage` API:

* `FileSystemStorage`: Access HDFS and local file system data.
* `SparkContextStorage`: Access Spark persisted RDD data.

[[interacting-with-hdfs]]
Interacting with HDFS
~~~~~~~~~~~~~~~~~~~~~
^^^^^^^^^^^^^^^^^^^^^

The distributed file system of Hadoop is called link:http://en.wikipedia.org/wiki/Apache_Hadoop#Hadoop_distributed_file_system[HDFS].
The results of any OLAP operation are stored in HDFS accessible via `hdfs`.
The results of any OLAP operation are stored in HDFS accessible via `hdfs`. For local file system access, there is `local`.

[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal(computer(SparkGraphComputer))
:remote connect tinkerpop.hadoop graph g
:> g.V().group().by{it.value('name')[1]}.by('name')
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
hdfs.ls()
hdfs.ls('output')
hdfs.ls('output/~reducing')
hdfs.head('output/~reducing', ObjectWritable)
hdfs.head('output', GryoInputFormat)
hdfs.head('output', 'clusterCount', SequenceFileInputFormat)
hdfs.rm('output')
hdfs.ls()
----

A list of the HDFS methods available are itemized below. Note that these methods are also available for the 'local' variable:
[[interacting-with-spark]]
Interacting with Spark
^^^^^^^^^^^^^^^^^^^^^^

[width="100%",cols="13,10",options="header"]
|=========================================================
| Method| Description
|hdfs.ls(String path)| List the contents of the supplied directory.
|hdfs.cp(String from, String to)| Copy the specified path to the specified path.
|hdfs.exists(String path)| Whether the specified path exists.
|hdfs.rm(String path)| Remove the specified path.
|hdfs.rmr(String path)| Remove the specified path and its contents recurssively.
|hdfs.copyToLocal(String from, String to)| Copy the specified HDFS path to the specified local path.
|hdfs.copyFromLocal(String from, String to)| Copy the specified local path to the specified HDFS path.
|hdfs.mergeToLocal(String from, String to)| Merge the files in path to the specified local path.
|hdfs.head(String path)| Display the data in the path as text.
|hdfs.head(String path, int lineCount)| Text display only the first `lineCount`-number of lines in the path.
|hdfs.head(String path, int totalKeyValues, Class<Writable> writableClass)| Display the path interpreting the key values as respective writable.
|=========================================================
If a Spark context is persisted, then Spark RDDs will remain the Spark cache and accessible over subsequent jobs.
RDDs are retrieved and saved to the `SparkContext` via `PersistedInputRDD` and `PersistedOutputRDD` respectivly.
Persisted RDDs can be accessed using `spark`.

[gremlin-groovy]
----
Spark.create('local[4]')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.spark.graphOutputRDD', PersistedOutputRDD.class.getCanonicalName())
graph.configuration().clearProperty('gremlin.hadoop.graphOutputFormat')
graph.configuration().setProperty('gremlin.spark.persistContext',true)
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
spark.ls()
spark.ls('output')
spark.head('output', PersistedInputRDD)
spark.head('output', 'clusterCount', PersistedInputRDD)
spark.rm('output')
spark.ls()
----

A Command Line Example
~~~~~~~~~~~~~~~~~~~~~~
Expand Down
26 changes: 26 additions & 0 deletions docs/src/upgrade/release-3.1.x-incubating.asciidoc
Expand Up @@ -32,6 +32,32 @@ Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.1.1-inc
Upgrading for Users
~~~~~~~~~~~~~~~~~~~

Storage I/O
^^^^^^^^^^^

The `gremlin-core` io-package now has a `Storage` interface. The methods that were available via `hdfs`
(e.g. `rm()`, `ls()`, `head()`, etc.) are now part of `Storage`. Both HDFS and Spark implement `Storage` via
`FileSystemStorage` and `SparkContextStorage`, respectively. `SparkContextStorage` adds support for interacting with
persisted RDDs in the Spark cache.

This update changed a few of the file handling methods. As it stands, these changes only effect manual Gremlin Console
usage as HDFS support was previously provided via Groovy meta-programing. Thus, these are not "code-based" breaking changes.

* `hdfs.rmr()` no longer exists. `hdfs.rm()` is now recursive. Simply change all references to `rmr()` to `rm()` for identical behavior.
* `hdfs.head(location,lines,writableClass)` no longer exists.
** For graph locations, use `hdfs.head(location,writableClass,lines)`.
** For memory locations, use `hdfs.head(location,memoryKey,writableClass,lines)`.
* `hdfs.head(...,ObjectWritable)` no longer exists. Use `SequenceFileInputFormat` as an input format is the parsing class.

Given that HDFS (and now Spark) interactions are possible via `Storage` and no longer via Groovy meta-programming,
developers can use these `Storage` implementations in their Java code. In fact, `Storage` has greatly simplified
complex file/RDD operations in both `GiraphGraphComputer` and `SparkGraphComputer`.

Finally, note that the following low-level/internal classes have been removed: `HadoopLoader` and `HDFSTools`.

See: link:https://issues.apache.org/jira/browse/TINKERPOP-1033[TINKERPOP-1033],
link:https://issues.apache.org/jira/browse/TINKERPOP-1023[TINKERPOP-1023]

Gremlin Server Transaction Management
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down