Skip to content

Commit

Permalink
Merge pull request #297 from uber/jg_172_c3
Browse files Browse the repository at this point in the history
Implemented a new record reader to extract JanusGraph data from Cassandra
Fixes #172
  • Loading branch information
pluradj committed Aug 13, 2017
2 parents c225e53 + 2af763d commit 170f424
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 7 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Joe Ferner <joe@fernsroth.com>
Joshua Shinavier <josh@fortytwo.net>
Justin Corpron <justin.corpron@justinc-677.glassdoor.local>
Karthik Ramachandran <kramachandran@iqt.com>
Kedar Mhaswade <kedar.mhaswade@gmail.com>
ksenji <ksenji@ebay.com>
Leifur Halldor Asgeirsson <leifurhauks@gmail.com>
Mark McCraw <mark.mccraw@sas.com>
Expand Down
1 change: 1 addition & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ It also includes software from other open source projects including, but not lim
* Apache Solr [http://lucene.apache.org/solr/]
* Apache TinkerPop [http://tinkerpop.apache.org/]
* Astyanax [https://github.com/Netflix/astyanax]
* DataStax Driver for Apache Cassandra [https://github.com/datastax/java-driver]
* EasyMock [http://easymock.org/]
* Elasticsearch [https://www.elastic.co/]
* Google Guava [https://github.com/google/guava]
Expand Down
9 changes: 9 additions & 0 deletions TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@ To run Elasicsearch tests using an embedded Elasticsearch Docker container, use
```bash
mvn clean install -pl janusgraph-es -Pes-docker
```

### Running Hadoop tests with Cassandra-3 using CQL record reader (requires Docker)

To run Hadoop tests with Cassandra-3 using the CQL record reader, start a Cassandra-3 Docker container and run tests with `-DskipCassandra3=false`. Note core HBase and Cassandra-2 Hadoop tests must be skipped when an external Cassandra instance is running.

```bash
docker run --name jg-cassandra -p 9160:9160 -p 9042:9042 -e CASSANDRA_START_RPC=true -d cassandra:3.10
mvn clean install -pl :janusgraph-hadoop-2 -DskipHBase -DskipCassandra -DskipCassandra3=false
```
2 changes: 0 additions & 2 deletions janusgraph-cql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
<url>http://janusgraph.org</url>

<properties>
<cassandra-driver.version>3.1.4</cassandra-driver.version>
<vavr.version>0.9.0</vavr.version>

<test.byteorderedpartitioner>byteorderedpartitioner</test.byteorderedpartitioner>
<test.murmur>murmur</test.murmur>
<test.murmur-ssl>murmur-ssl</test.murmur-ssl>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

#
# JanusGraph Cassandra InputFormat configuration
#
janusgraphmr.ioformat.conf.storage.backend=cassandra
janusgraphmr.ioformat.conf.storage.hostname=localhost
janusgraphmr.ioformat.conf.storage.port=9160
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=janusgraph

#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

#
# SparkGraphComputer Configuration
#
spark.master=local[4]
spark.serializer=org.apache.spark.serializer.KryoSerializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.hadoop.formats.cassandra;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;

import java.io.IOException;

/**
* Wraps a ColumnFamilyInputFormat and converts CFIF's binary types to JanusGraph's binary types.
*/
public class Cassandra3BinaryInputFormat extends CassandraBinaryInputFormat {

@Override
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
janusgraphRecordReader = new CqlBridgeRecordReader(); // See issue 172
return janusgraphRecordReader;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.hadoop.formats.cassandra;

import org.janusgraph.diskstorage.configuration.ConfigOption;
import org.janusgraph.hadoop.formats.util.GiraphInputFormat;

import static org.janusgraph.diskstorage.cassandra.AbstractCassandraStoreManager.*;

public class Cassandra3InputFormat extends GiraphInputFormat {

public Cassandra3InputFormat() {
super(new Cassandra3BinaryInputFormat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CassandraBinaryInputFormat extends AbstractBinaryInputFormat {

private final ColumnFamilyInputFormat columnFamilyInputFormat = new ColumnFamilyInputFormat();
private ColumnFamilyRecordReader columnFamilyRecordReader;
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;
RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;

public RecordReader<StaticBuffer, Iterable<Entry>> getRecordReader() {
return janusgraphRecordReader;
Expand All @@ -66,9 +66,9 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
columnFamilyRecordReader =
(ColumnFamilyRecordReader)columnFamilyInputFormat.createRecordReader(inputSplit, taskAttemptContext);
(ColumnFamilyRecordReader) columnFamilyInputFormat.createRecordReader(inputSplit, taskAttemptContext);
janusgraphRecordReader =
new CassandraBinaryRecordReader(columnFamilyRecordReader);
new CassandraBinaryRecordReader(columnFamilyRecordReader);
return janusgraphRecordReader;
}

Expand Down
Loading

0 comments on commit 170f424

Please sign in to comment.