Skip to content
Permalink
Browse files
HugeGraph-1280: add support for hbase backend
Change-Id: I9c2552ac76aa40009b5251ae62de51b9d5cd09a3
  • Loading branch information
javeme authored and Linary committed Aug 9, 2018
1 parent 30bbf0d commit 3f117b249c3474a7df35e4c9eb8c8d6ce24fadce
Showing 52 changed files with 2,324 additions and 501 deletions.
@@ -22,8 +22,8 @@
import java.util.Collection;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.serializer.TableBackendEntry;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.TableBackendEntry;
import com.baidu.hugegraph.type.HugeType;

public class CassandraBackendEntry extends TableBackendEntry {
@@ -20,30 +20,27 @@
package com.baidu.hugegraph.backend.store.cassandra;

import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;

import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendEntryIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.datastax.driver.core.ColumnDefinitions.Definition;
import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;

public class CassandraEntryIterator extends BackendEntryIterator<Row> {
public class CassandraEntryIterator extends BackendEntryIterator {

private final ResultSet results;
private final Iterator<Row> rows;
private final BiFunction<BackendEntry, BackendEntry, BackendEntry> merger;
private final BiFunction<BackendEntry, Row, BackendEntry> merger;

private long remaining;
private BackendEntry next;

public CassandraEntryIterator(ResultSet results, Query query,
BiFunction<BackendEntry, BackendEntry, BackendEntry> merger) {
BiFunction<BackendEntry, Row, BackendEntry> merger) {
super(query);
this.results = results;
this.rows = results.iterator();
@@ -53,7 +50,7 @@ public CassandraEntryIterator(ResultSet results, Query query,

this.skipOffset();

if (query.page() != null) {
if (query.paging()) {
E.checkState(this.remaining == query.limit() ||
results.isFullyFetched(),
"Unexpected fetched page size: %s", this.remaining);
@@ -77,19 +74,19 @@ protected final boolean fetch() {
if (this.query.paging()) {
this.remaining--;
}
CassandraBackendEntry e = this.row2Entry(this.rows.next());
BackendEntry merged = this.merger.apply(this.current, e);
Row row = this.rows.next();
BackendEntry merged = this.merger.apply(this.current, row);
if (this.current == null) {
// The first time to read
this.current = merged;
} else if (merged == this.current) {
// Does the next entry belongs to the current entry
// The next entry belongs to the current entry
assert merged != null;
} else {
// New entry
assert this.next == null;
this.next = merged;
return true;
break;
}
}
return this.current != null;
@@ -120,18 +117,4 @@ protected String pageState() {
}
return page.toString();
}

private CassandraBackendEntry row2Entry(Row row) {
HugeType type = this.query.resultType();
CassandraBackendEntry entry = new CassandraBackendEntry(type);

List<Definition> cols = row.getColumnDefinitions().asList();
for (Definition col : cols) {
String name = col.getName();
Object value = row.getObject(name);
entry.column(CassandraTable.parseKey(name), value);
}

return entry;
}
}
@@ -28,9 +28,9 @@
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.serializer.TableBackendEntry;
import com.baidu.hugegraph.backend.serializer.TableSerializer;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.TableBackendEntry;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.structure.HugeElement;
import com.baidu.hugegraph.structure.HugeProperty;
@@ -24,6 +24,7 @@
import java.util.List;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendSession;
import com.baidu.hugegraph.backend.store.BackendSessionPool;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.util.E;
@@ -137,7 +138,7 @@ public final void checkSessionConnected() {
* The Session class is a wrapper of driver Session
* Expect every thread hold a its own session(wrapper)
*/
public final class Session extends BackendSessionPool.Session {
public final class Session extends BackendSession {

private com.datastax.driver.core.Session session;
private BatchStatement batch;
@@ -209,8 +209,8 @@ private void mutate(CassandraSessionPool.Session session,
}
break;
default:
throw new BackendException("Unsupported mutate type: %s",
item.action());
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
}
}

@@ -39,14 +39,17 @@
import com.baidu.hugegraph.backend.store.BackendTable;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.Shard;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.CopyUtil;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.datastax.driver.core.ColumnDefinitions.Definition;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.PagingStateException;
import com.datastax.driver.core.querybuilder.Clause;
@@ -330,7 +333,23 @@ protected static Object serializeValue(Object value) {
}

protected Iterator<BackendEntry> results2Entries(Query q, ResultSet r) {
return new CassandraEntryIterator(r, q, this::mergeEntries);
return new CassandraEntryIterator(r, q, (e1, row) -> {
CassandraBackendEntry e2 = row2Entry(q.resultType(), row);
return this.mergeEntries(e1, e2);
});
}

private static CassandraBackendEntry row2Entry(HugeType type, Row row) {
CassandraBackendEntry entry = new CassandraBackendEntry(type);

List<Definition> cols = row.getColumnDefinitions().asList();
for (Definition col : cols) {
String name = col.getName();
Object value = row.getObject(name);
entry.column(CassandraTable.parseKey(name), value);
}

return entry;
}

protected List<HugeKeys> pkColumnName() {
@@ -32,6 +32,7 @@
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.type.Shard;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.NumericUtil;
import com.google.common.collect.ImmutableList;
@@ -59,6 +60,9 @@ public enum RelationType {
NOT_IN("notin", (v1, v2) -> {
return !((Collection<?>) v2).contains(v1);
}),
PREFIX("prefix", (v1, v2) -> {
return Bytes.prefixWith(((Id) v2).asBytes(), ((Id) v1).asBytes()) ;
}),
CONTAINS("contains", (v1, v2) -> {
return ((Map<?, ?>) v1).containsValue(v2);
}),
@@ -216,6 +220,10 @@ public static Condition nin(HugeKeys key, List<?> value) {
return new SyspropRelation(key, RelationType.NOT_IN, value);
}

public static Condition prefix(HugeKeys key, Id value) {
return new SyspropRelation(key, RelationType.PREFIX, value);
}

public static Condition contains(HugeKeys key, Object value) {
return new SyspropRelation(key, RelationType.CONTAINS, value);
}
@@ -94,6 +94,10 @@ public ConditionQuery neq(HugeKeys key, Object value) {
return this.query(Condition.neq(key, value));
}

public ConditionQuery prefix(HugeKeys key, Id value) {
return this.query(Condition.prefix(key, value));
}

public ConditionQuery key(HugeKeys key, Object value) {
return this.query(Condition.containsKey(key, value));
}
@@ -29,6 +29,7 @@
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.StringEncoding;

@@ -41,6 +42,10 @@ public class BinaryBackendEntry implements BackendEntry {
private Id subId;
private final List<BackendColumn> columns;

public BinaryBackendEntry(HugeType type, byte[] bytes) {
this(type, BytesBuffer.wrap(bytes).parseId());
}

public BinaryBackendEntry(HugeType type, BinaryId id) {
this.type = type;
this.id = id;
@@ -87,10 +92,8 @@ public void column(BackendColumn column) {

public void column(byte[] name, byte[] value) {
E.checkNotNull(name, "name");
BackendColumn col = new BackendColumn();
col.name = name;
col.value = value != null ? value : EMPTY_BYTES;
this.columns.add(col);
value = value != null ? value : EMPTY_BYTES;
this.columns.add(BackendColumn.of(name, value));
}

@Override
@@ -169,6 +172,11 @@ public Id origin() {
return this.id;
}

@Override
public boolean number() {
return false;
}

@Override
public Object asObject() {
return this.asBytes();
@@ -184,21 +192,23 @@ public long asLong() {
throw new UnsupportedOperationException();
}

@Override
public boolean number() {
throw new UnsupportedOperationException();
}

@Override
public int compareTo(Id other) {
throw new UnsupportedOperationException();
return Bytes.compare(this.bytes, other.asBytes());
}

@Override
public byte[] asBytes() {
return this.bytes;
}

public byte[] asBytes(int offset) {
E.checkArgument(offset < this.bytes.length,
"Invalid offset %s, must be < length %s",
offset, this.bytes.length);
return Arrays.copyOfRange(this.bytes, offset, this.bytes.length);
}

@Override
public int length() {
return this.bytes.length;

0 comments on commit 3f117b2

Please sign in to comment.