Skip to content

Commit

Permalink
Add UInt64AddOperator to rocksjava (#4448)
Browse files Browse the repository at this point in the history
Summary:
Closes #4447
Pull Request resolved: #4448

Differential Revision: D10351852

Pulled By: ajkr

fbshipit-source-id: 18287b5190ae0b8153ce425da9a0bdfe1af88c34
  • Loading branch information
John Calcote authored and facebook-github-bot committed Oct 13, 2018
1 parent 6f8d4bd commit 9c20797
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 1 deletion.
2 changes: 2 additions & 0 deletions java/CMakeLists.txt
Expand Up @@ -132,6 +132,7 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.TransactionLogIterator
org.rocksdb.TransactionOptions
org.rocksdb.TtlDB
org.rocksdb.UInt64AddOperator
org.rocksdb.VectorMemTableConfig
org.rocksdb.WBWIRocksIterator
org.rocksdb.WriteBatch
Expand Down Expand Up @@ -294,6 +295,7 @@ add_jar(
src/test/java/org/rocksdb/RocksDBExceptionTest.java
src/test/java/org/rocksdb/RocksMemoryResource.java
src/test/java/org/rocksdb/SnapshotTest.java
src/main/java/org/rocksdb/UInt64AddOperator.java
src/test/java/org/rocksdb/WriteBatchTest.java
src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java
src/test/java/org/rocksdb/util/WriteBatchGetter.java
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Expand Up @@ -62,6 +62,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.Snapshot\
org.rocksdb.StringAppendOperator\
org.rocksdb.UInt64AddOperator\
org.rocksdb.WriteBatch\
org.rocksdb.WriteBatch.Handler\
org.rocksdb.WriteOptions\
Expand Down
26 changes: 26 additions & 0 deletions java/rocksjni/merge_operator.cc
Expand Up @@ -13,6 +13,7 @@
#include <string>

#include "include/org_rocksdb_StringAppendOperator.h"
#include "include/org_rocksdb_UInt64AddOperator.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
Expand Down Expand Up @@ -47,3 +48,28 @@ void Java_org_rocksdb_StringAppendOperator_disposeInternal(JNIEnv* /*env*/,
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete sptr_string_append_op; // delete std::shared_ptr
}

/*
* Class: org_rocksdb_UInt64AddOperator
* Method: newSharedUInt64AddOperator
* Signature: ()J
*/
jlong Java_org_rocksdb_UInt64AddOperator_newSharedUInt64AddOperator(
JNIEnv* /*env*/, jclass /*jclazz*/) {
auto* sptr_uint64_add_op = new std::shared_ptr<rocksdb::MergeOperator>(
rocksdb::MergeOperators::CreateUInt64AddOperator());
return reinterpret_cast<jlong>(sptr_uint64_add_op);
}

/*
* Class: org_rocksdb_UInt64AddOperator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_UInt64AddOperator_disposeInternal(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jhandle) {
auto* sptr_uint64_add_op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete sptr_uint64_add_op; // delete std::shared_ptr
}
19 changes: 19 additions & 0 deletions java/src/main/java/org/rocksdb/UInt64AddOperator.java
@@ -0,0 +1,19 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* Uint64AddOperator is a merge operator that accumlates a long
* integer value.
*/
public class UInt64AddOperator extends MergeOperator {
public UInt64AddOperator() {
super(newSharedUInt64AddOperator());
}

private native static long newSharedUInt64AddOperator();
@Override protected final native void disposeInternal(final long handle);
}
202 changes: 201 additions & 1 deletion java/src/test/java/org/rocksdb/MergeTest.java
Expand Up @@ -5,6 +5,7 @@

package org.rocksdb;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
Expand Down Expand Up @@ -44,6 +45,38 @@ public void stringOption()
}
}

private byte[] longToByteArray(long l) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
buf.putLong(l);
return buf.array();
}

private long longFromByteArray(byte[] a) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES);
buf.put(a);
buf.flip();
return buf.getLong();
}

@Test
public void uint64AddOption()
throws InterruptedException, RocksDBException {
try (final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperatorName("uint64add");
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
// writing (long)100 under key
db.put("key".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge("key".getBytes(), longToByteArray(1));

final byte[] value = db.get("key".getBytes());
final long longValue = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
}
}

@Test
public void cFStringOption()
throws InterruptedException, RocksDBException {
Expand Down Expand Up @@ -86,6 +119,48 @@ public void cFStringOption()
}
}

@Test
public void cFUInt64AddOption()
throws InterruptedException, RocksDBException {

try (final ColumnFamilyOptions cfOpt1 = new ColumnFamilyOptions()
.setMergeOperatorName("uint64add");
final ColumnFamilyOptions cfOpt2 = new ColumnFamilyOptions()
.setMergeOperatorName("uint64add")
) {
final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt1),
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt2)
);

final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
try (final DBOptions opt = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(), cfDescriptors,
columnFamilyHandleList)) {
try {
// writing (long)100 under key
db.put(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(1));

byte[] value = db.get(columnFamilyHandleList.get(1),
"cfkey".getBytes());
long longValue = longFromByteArray(value);
assertThat(longValue).isEqualTo(101);
} finally {
for (final ColumnFamilyHandle handle : columnFamilyHandleList) {
handle.close();
}
}
}
}
}

@Test
public void operatorOption()
throws InterruptedException, RocksDBException {
Expand All @@ -108,6 +183,28 @@ public void operatorOption()
}
}

@Test
public void uint64AddOperatorOption()
throws InterruptedException, RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator();
final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
// Writing (long)100 under key
db.put("key".getBytes(), longToByteArray(100));

// Writing (long)1 under key
db.merge("key".getBytes(), longToByteArray(1));

final byte[] value = db.get("key".getBytes());
final long longValue = longFromByteArray(value);

assertThat(longValue).isEqualTo(101);
}
}

@Test
public void cFOperatorOption()
throws InterruptedException, RocksDBException {
Expand Down Expand Up @@ -170,6 +267,68 @@ public void cFOperatorOption()
}
}

@Test
public void cFUInt64AddOperatorOption()
throws InterruptedException, RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator();
final ColumnFamilyOptions cfOpt1 = new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator);
final ColumnFamilyOptions cfOpt2 = new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator)
) {
final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpt1),
new ColumnFamilyDescriptor("new_cf".getBytes(), cfOpt2)
);
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
try (final DBOptions opt = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath(), cfDescriptors,
columnFamilyHandleList)
) {
try {
// writing (long)100 under key
db.put(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(100));
// merge (long)1 under key
db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), longToByteArray(1));
byte[] value = db.get(columnFamilyHandleList.get(1),
"cfkey".getBytes());
long longValue = longFromByteArray(value);

// Test also with createColumnFamily
try (final ColumnFamilyOptions cfHandleOpts =
new ColumnFamilyOptions()
.setMergeOperator(uint64AddOperator);
final ColumnFamilyHandle cfHandle =
db.createColumnFamily(
new ColumnFamilyDescriptor("new_cf2".getBytes(),
cfHandleOpts))
) {
// writing (long)200 under cfkey2
db.put(cfHandle, "cfkey2".getBytes(), longToByteArray(200));
// merge (long)50 under cfkey2
db.merge(cfHandle, new WriteOptions(), "cfkey2".getBytes(),
longToByteArray(50));
value = db.get(cfHandle, "cfkey2".getBytes());
long longValueTmpCf = longFromByteArray(value);

assertThat(longValue).isEqualTo(101);
assertThat(longValueTmpCf).isEqualTo(250);
}
} finally {
for (final ColumnFamilyHandle columnFamilyHandle :
columnFamilyHandleList) {
columnFamilyHandle.close();
}
}
}
}
}

@Test
public void operatorGcBehaviour()
throws RocksDBException {
Expand All @@ -182,7 +341,6 @@ public void operatorGcBehaviour()
//no-op
}


// test reuse
try (final Options opt = new Options()
.setMergeOperator(stringAppendOperator);
Expand Down Expand Up @@ -213,6 +371,48 @@ public void operatorGcBehaviour()
}
}

@Test
public void uint64AddOperatorGcBehaviour()
throws RocksDBException {
try (final UInt64AddOperator uint64AddOperator = new UInt64AddOperator()) {
try (final Options opt = new Options()
.setCreateIfMissing(true)
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}

// test reuse
try (final Options opt = new Options()
.setMergeOperator(uint64AddOperator);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}

// test param init
try (final UInt64AddOperator uint64AddOperator2 = new UInt64AddOperator();
final Options opt = new Options()
.setMergeOperator(uint64AddOperator2);
final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}

// test replace one with another merge operator instance
try (final Options opt = new Options()
.setMergeOperator(uint64AddOperator);
final UInt64AddOperator newUInt64AddOperator = new UInt64AddOperator()) {
opt.setMergeOperator(newUInt64AddOperator);
try (final RocksDB db = RocksDB.open(opt,
dbFolder.getRoot().getAbsolutePath())) {
//no-op
}
}
}
}

@Test
public void emptyStringInSetMergeOperatorByName() {
try (final Options opt = new Options()
Expand Down

0 comments on commit 9c20797

Please sign in to comment.