Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Adding assoc schema APIs for rocksdb so that linkbench can run rocksdb.

Summary:
LinkStoreRocksDb.java contains the assoc-schema APIs that enable
linkbench to be run over rocksdb. swift has been incorporated to make the thrift
calls using a java client. It uses netty and is faster. Am still working on it. Checking it in so that it can be tested by others

Test Plan: ~/linkbench/bin/linkbench -c config/LinkConfigRocksDb.properties –l; <Run the rocks server in fbcode simultaneously>

Reviewers: dhruba, tarmstrong

Reviewed By: tarmstrong

CC: heyongqiang, sheki, zshao, kosievdmerwe, vamsi, andrewcox

Differential Revision: https://reviews.facebook.net/D8115
  • Loading branch information...
commit 7481a67cfad296eed94bd93ac8d3511c566c536b 1 parent 6438dae
@emayanke emayanke authored
Showing with 1,566 additions and 18 deletions.
  1. +1 −1  README.md
  2. +5 −0 bin/linkbench
  3. +12 −0 build.properties
  4. +53 −5 build.xml
  5. +111 −0 config/LinkConfigRocksDb.properties
  6. BIN  lib/slf4j-simple-1.5.8.jar
  7. +90 −0 pom.xml
  8. +1 −6 src/java/com/facebook/LinkBench/LinkBenchDriver.java
  9. +1 −1  src/java/com/facebook/LinkBench/LinkBenchLoad.java
  10. +3 −5 src/java/com/facebook/LinkBench/LinkBenchRequest.java
  11. +521 −0 src/java/com/facebook/LinkBench/LinkStoreRocksDb.java
  12. +21 −0 src/java/com/facebook/rocks/swift/AssocVisibility.java
  13. +21 −0 src/java/com/facebook/rocks/swift/Code.java
  14. +21 −0 src/java/com/facebook/rocks/swift/CompressionType.java
  15. +18 −0 src/java/com/facebook/rocks/swift/IOError.java
  16. +24 −0 src/java/com/facebook/rocks/swift/Kv.java
  17. +32 −0 src/java/com/facebook/rocks/swift/ReadOptions.java
  18. +24 −0 src/java/com/facebook/rocks/swift/ResultSnapshot.java
  19. +24 −0 src/java/com/facebook/rocks/swift/RetCode.java
  20. +26 −0 src/java/com/facebook/rocks/swift/RocksException.java
  21. +24 −0 src/java/com/facebook/rocks/swift/RocksGetResponse.java
  22. +24 −0 src/java/com/facebook/rocks/swift/RocksIterateResponse.java
  23. +112 −0 src/java/com/facebook/rocks/swift/RocksService.java
  24. +16 −0 src/java/com/facebook/rocks/swift/Snapshot.java
  25. +46 −0 src/java/com/facebook/rocks/swift/TaoAssocGetResult.java
  26. +24 −0 src/java/com/facebook/rocks/swift/WriteOptions.java
  27. +311 −0 src/java/com/facebook/rocks/swift/rocks.thrift
View
2  README.md
@@ -84,7 +84,7 @@ Prerequisites:
These instructions assume you are using a UNIX-like system such as a Linux distribution
or Mac OS X.
-**Java**: You will need a Java 6+ runtime environment. LinkBench by default
+**Java**: You will need a Java 7+ runtime environment. LinkBench by default
uses the version of Java on your path. You can override this by setting the
JAVA\_HOME environment variable to the directory of the desired
Java runtime version. You will also need a Java JDK to compile from source.
View
5 bin/linkbench
@@ -71,6 +71,11 @@ for f in $LINKBENCH_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
+# add libs to CLASSPATH
+for f in $LINKBENCH_HOME/lib/build/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
# add latest jar to CLASSPATH
CLASSPATH=${CLASSPATH}:dist/FacebookLinkBench.jar;
View
12 build.properties
@@ -0,0 +1,12 @@
+version=0.1-SNAPSHOT
+
+linkbench.groupId=com.facebook.linkbench
+linkbench.artifactId=linkbench
+linkbench.url=https://github.com/facebook/linkbench
+
+mvn.fb.repo=http://repo1.maven.org/maven2
+mvn.ant.task.url=${mvn.fb.repo}/org/apache/maven/maven-ant-tasks/${mvn.ant.task.version}
+mvn.ant.task.jar=maven-ant-tasks-${mvn.ant.task.version}.jar
+mvn.ant.task.version=2.1.3
+
+swift.version=0.2.0
View
58 build.xml
@@ -1,19 +1,24 @@
-<project name="FacebookLinkBench" default="dist" basedir=".">
+<project name="FacebookLinkBench" default="dist" basedir="."
+ xmlns:artifact="antlib:org.apache.maven.artifact.ant">
<property name="linkbench.version" value="0.1"/>
<description>
simple example build file
</description>
+
<!-- set global properties for this build -->
+ <property file="${basedir}/build.properties" />
<property name="src" location="src"/>
<property name="build" location="build"/>
+ <property name="build.lib" location="build/lib"/>
+ <property name="build.tools" location="build/tools"/>
<property name="dist" location="dist"/>
<property name="lib" location="lib"/>
<property name="test.output" location="test-output"/>
<property name="test.rawoutput" location="${test.output}/raw"/>
<property name="test.tmp" location="${test.output}/tmp"/>
<property name="test.report" location="${test.output}/report"/>
- <property name="linkbench.jar"
+ <property name="linkbench.jar"
value="${dist}/FacebookLinkBench-${linkbench.version}.jar"/>
<property name="linkbench.latest.jar" value="${dist}/FacebookLinkBench.jar"/>
@@ -26,16 +31,22 @@
<tstamp/>
<!-- Create the build directory structure used by compile -->
<mkdir dir="${build}"/>
+ <mkdir dir="${build.lib}"/>
+ <mkdir dir="${build.tools}"/>
</target>
- <target name="compile" depends="init"
+ <target name="compile" depends="init,mvn.init"
description="compile the source " >
<!-- Compile the java code from ${src} into ${build} -->
<javac includeantruntime="false" srcdir="${src}" destdir="${build}"
+ source="1.7" target="1.7"
debug="${debug}">
<classpath>
<pathelement path="${classpath}"/>
- <fileset dir="lib">
+ <fileset dir="${lib}">
+ <include name="**/*.jar"/>
+ </fileset>
+ <fileset dir="${build.lib}">
<include name="**/*.jar"/>
</fileset>
<pathelement location="classes"/>
@@ -73,7 +84,7 @@
<assertions> <enable /> </assertions>
<batchtest todir="${test.rawoutput}">
- <fileset dir="${build}">
+ <fileset dir="${build}">
<include name="**/*Test*.class"/>
<exclude name="**/LinkStoreHBaseGeneralAtomicityTesting.class"/>
<exclude name="**/*TestBase.class"/>
@@ -100,4 +111,41 @@
<delete dir="${dist}"/>
<delete dir="${test.output}"/>
</target>
+
+ <target name="proxy" if="proxy.enabled">
+ <setproxy proxyhost="${proxy.host}" proxyport="${proxy.port}"
+ proxyuser="${proxy.user}" proxypassword="${proxy.pass}"/>
+ </target>
+
+ <target name="mvn.ant.tasks.download" depends="init,mvn.ant.tasks.check,proxy" unless="mvn.ant.tasks.found">
+ <get src="${mvn.ant.task.url}/${mvn.ant.task.jar}" dest="${build.tools}/${mvn.ant.task.jar}" usetimestamp="true"/>
+ </target>
+
+ <target name="mvn.ant.tasks.check">
+ <condition property="mvn.ant.tasks.found">
+ <typefound uri="antlib:org.apache.maven.artifact.ant" name="artifact"/>
+ </condition>
+ </target>
+
+ <target name="mvn.init" depends="mvn.ant.tasks.download" unless="mvn.finished">
+ <!-- Download mvn ant tasks, download dependencies, and setup pom file -->
+ <typedef uri="antlib:org.apache.maven.artifact.ant" classpath="${build.tools}/${mvn.ant.task.jar}"/>
+
+ <!-- remote repositories used to download dependencies from -->
+ <artifact:remoteRepository id="fbnexus" url="${mvn.fb.repo}"/>
+
+ <!-- Download the dependencies -->
+ <artifact:dependencies filesetId="build-dependency-jars" settingsFile="${basedir}/settings.xml">
+ <pom file="pom.xml"/>
+ </artifact:dependencies>
+
+ <!-- Copy the dependencies to the build/lib dir -->
+ <copy todir="${build}/lib">
+ <fileset refid="build-dependency-jars"/>
+ <mapper type="flatten"/>
+ </copy>
+
+ <property name="mvn.finished" value="true"/>
+ </target>
+
</project>
View
111 config/LinkConfigRocksDb.properties
@@ -0,0 +1,111 @@
+# Sample RocksDb LinkBench configuration file.
+#
+# This file contains settings for the data store, as well as controlling
+# benchmark output and behavior. The workload is defined in a separate
+# file.
+#
+
+##########################
+# Workload Configuration #
+##########################
+
+# Path for workload properties file. Properties in this file will override
+# those in workload properties file.
+# Can be absolute path, or relative path from LinkBench home directory
+workload_file = config/FBWorkload.properties
+
+#################################
+# #
+# Data Source Configuration #
+# #
+#################################
+
+# Implementation of LinkStore and NodeStore to use
+linkstore = com.facebook.LinkBench.LinkStoreRocksDb
+nodestore = com.facebook.LinkBench.LinkStoreRocksDb
+
+# RocksDb connection information
+host = yourhostname.here
+port = 9090
+# dbid: the database name to use
+dbid = linkdb
+
+###############################
+# #
+# Logging and Stats Setup #
+# #
+###############################
+
+# This controls logging output. Settings are, in order of increasing
+# verbosity:
+# ERROR: only output serious errors
+# WARN: output warnings
+# INFO: output additional information such as progress
+# DEBUG: output high-level debugging information
+# TRACE: output more detailed lower-level debugging information
+debuglevel = INFO
+
+# display frequency of per-thread progress in seconds
+progressfreq = 300
+
+# display frequency of per-thread stats (latency, etc) in seconds
+displayfreq = 1800
+
+# display global load update (% complete, etc) after this many links loaded
+load_progress_interval = 50000
+
+# display global update on request phase (% complete, etc) after this many ops
+req_progress_interval = 10000
+
+# max number of samples to store for each per-thread statistic
+maxsamples = 10000
+
+###############################
+# #
+# Load Phase Configuration #
+# #
+###############################
+
+# number of threads to run during load phase
+loaders = 10
+
+# whether to generate graph nodes during load process
+generate_nodes = true
+
+# partition loading work into chunks of id1s of this size
+loader_chunk_size = 2048
+
+# seed for initial data load random number generation (optional)
+# load_random_seed = 12345
+
+##################################
+# #
+# Request Phase Configuration #
+# #
+##################################
+
+# number of threads to run during request phase
+requesters = 100
+
+# read + write requests per thread
+requests = 500000
+
+# request rate per thread. <= 0 means unthrottled requests, > 0 limits
+# the average request rate to that number of requests per second per thread,
+# with the inter-request intervals governed by an exponential distribution
+requestrate = 0
+
+# max duration in seconds for request phase of benchmark
+maxtime = 100000
+
+# warmup time in seconds. The benchmark is run for a warmup period
+# during which no statistics are recorded. This allows database caches,
+# etc to warm up.
+warmup_time = 0
+
+# seed for request random number generation (optional)
+# request_random_seed = 12345
+
+# maximum number of failures per requester to tolerate before aborting
+# negative number means never abort
+max_failed_requests = 100
View
BIN  lib/slf4j-simple-1.5.8.jar
Binary file not shown
View
90 pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.facebook.linkbench</groupId>
+ <artifactId>linkbench</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <url>https://github.com/facebook/linkbench</url>
+ <inceptionYear>2012</inceptionYear>
+
+ <developers>
+ <developer>
+ <id>tarmstrong</id>
+ <name>Tim Armstrong</name>
+ <email>tarmstrong@fb.com</email>
+ </developer>
+ <developer>
+ <id>dhruba</id>
+ <name>Dhruba Borthakur</name>
+ <email>dhruba@fb.com</email>
+ </developer>
+ <developer>
+ <id>amayank</id>
+ <name>Mayank Agarwal</name>
+ <email>amayank@fb.com</email>
+ </developer>
+ <developer>
+ <id>andrewcox</id>
+ <name>Andrew Cox</name>
+ <email>andrewcox@fb.com</email>
+ </developer>
+ </developers>
+
+ <scm>
+ <connection>scm:git:https://github.com/facebook/linkbench.git</connection>
+ <developerConnection>scm:git@github.com:facebook/linkbench.git</developerConnection>
+ <url>https://github.com/facebook/linkbench</url>
+ <tag>HEAD</tag>
+ </scm>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-codec</artifactId>
+ <version>0.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.swift</groupId>
+ <artifactId>swift-service</artifactId>
+ <version>0.2.0</version>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>nexus</id>
+ <!--Enable snapshots for the built in central repo to direct -->
+ <!--all requests to nexus via the mirror -->
+ <repositories>
+ <repository>
+ <id>central</id>
+ <url>http://repo1.maven.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>central</id>
+ <url>http://repo1.maven.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </pluginRepository>
+ </pluginRepositories>
+ </profile>
+ </profiles>
+
+</project>
View
7 src/java/com/facebook/LinkBench/LinkBenchDriver.java
@@ -215,7 +215,6 @@ void load() throws IOException, InterruptedException, Throwable {
logger.info("Skipping load data per the cmdline arg");
return;
}
-
// load data
int nLinkLoaders = ConfigUtil.getInt(props, Config.NUM_LOADERS);
@@ -259,10 +258,8 @@ void load() throws IOException, InterruptedException, Throwable {
loaders.add(new NodeLoader(props, logger, nodeStore, rng,
latencyStats, csvStreamFile, loaderId));
}
-
enqueueLoadWork(chunk_q, startid1, maxid1, nLinkLoaders,
new Random(masterRandom.nextLong()));
-
// run loaders
loadTracker.startTimer();
long loadTime = concurrentExec(loaders);
@@ -289,7 +286,7 @@ void load() throws IOException, InterruptedException, Throwable {
logger.info(String.format("LOAD PHASE COMPLETED. " +
" Loaded %d nodes (Expected %d)." +
" Loaded %d links (%.2f links per node). " +
- " Took %.1f seconds. Links/second = %d",
+ " Took %.1f seconds. Links/second = %d",
actualNodes, expectedNodes, actualLinks,
actualLinks / (double) actualNodes, loadTime_s,
(long) Math.round(actualLinks / loadTime_s)));
@@ -382,7 +379,6 @@ void sendrequests() throws IOException, InterruptedException, Throwable {
progress, new Random(masterRandom.nextLong()), i, nrequesters);
requesters.add(l);
}
-
progress.startTimer();
// run requesters
concurrentExec(requesters);
@@ -464,7 +460,6 @@ void drive() throws IOException, InterruptedException, Throwable {
public static void main(String[] args)
throws IOException, InterruptedException, Throwable {
processArgs(args);
-
LinkBenchDriver d = new LinkBenchDriver(configFile,
cmdLineProps, logFile);
try {
View
2  src/java/com/facebook/LinkBench/LinkBenchLoad.java
@@ -205,6 +205,7 @@ public void run() {
LoadChunk chunk;
try {
chunk = chunk_q.take();
+ //logger.info("chunk end="+chunk.end);
} catch (InterruptedException ie) {
logger.warn("InterruptedException not expected, try again", ie);
continue;
@@ -468,7 +469,6 @@ private void loadLink(Link link, long outlink_ix, long nlinks,
private void loadLinks(ArrayList<Link> loadBuffer) {
long timestart = System.nanoTime();
-
try {
// no inverses for now
int nlinks = loadBuffer.size();
View
8 src/java/com/facebook/LinkBench/LinkBenchRequest.java
@@ -509,9 +509,8 @@ private boolean oneRequest(boolean recordStats) {
long id2 = id2chooser.chooseForOp(rng, id1, link_type,
ID2Chooser.P_DELETE_EXIST);
starttime = System.nanoTime();
- linkStore.deleteLink(dbid, id1, link_type, id2,
- true, // no inverse
- false);
+ linkStore.deleteLink(dbid, id1, link_type, id2, true, // no inverse
+ false);
endtime = System.nanoTime();
if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
logger.trace("deleteLink id1=" + id1 + " link_type=" + link_type
@@ -678,8 +677,6 @@ private boolean oneRequest(boolean recordStats) {
linkStore.clearErrors(requesterID);
return false;
}
-
-
}
/**
@@ -840,6 +837,7 @@ public void run() {
}
}
+ this.linkStore.close();
// Do final update of statistics
progressTracker.update(requestsSinceLastUpdate);
displayStats(lastStatDisplay_ms, System.currentTimeMillis());
View
521 src/java/com/facebook/LinkBench/LinkStoreRocksDb.java
@@ -0,0 +1,521 @@
+/*
+ * Copyright 2012, Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.LinkBench;
+
+import com.facebook.rocks.swift.*;
+import com.facebook.swift.service.ThriftClientManager;
+import com.facebook.nifty.client.FramedClientConnector;
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.commons.codec.binary.Hex;
+
+import static com.google.common.net.HostAndPort.fromParts;
+
+/*
+ * This file implements Linkbench methods for loading/requesting data to rocksDb
+ * database server by calling thrift apis after creating 2 java thrift clients
+ * through swift : assocClient for the link operations and nodeClient for the
+ * node operations. assocClient interacts on port 'port' on the database =
+ * dbid + "assocs" AND nodeClient interacts on port 'port'+1 on the database=
+ * dbid + "nodes"
+ */
+
+public class LinkStoreRocksDb extends GraphStore {
+ private static final ThriftClientManager clientManager =
+ new ThriftClientManager();
+ private RocksService assocClient;
+ private RocksService nodeClient;
+ /* RocksDb database server configuration keys */
+ public static final String CONFIG_HOST = "host";
+ public static final String CONFIG_PORT = "port";
+ public static final String CONFIG_USER = "user";
+ public static final String CONFIG_PASSWORD = "password";
+
+ public static final int DEFAULT_BULKINSERT_SIZE = 1024;
+ private static final boolean INTERNAL_TESTING = false;
+
+ String host;
+ String user;
+ String pwd;
+ String port;
+
+ Level debuglevel;
+
+ int bulkInsertSize = DEFAULT_BULKINSERT_SIZE;
+
+ private final Logger logger = Logger.getLogger(ConfigUtil.LINKBENCH_LOGGER);
+
+ private void openConnection() {
+ try {
+ assocClient = clientManager.createClient(
+ new FramedClientConnector(fromParts(host, Integer.parseInt(port))),
+ RocksService.class).get();
+ nodeClient = clientManager.createClient(
+ new FramedClientConnector(fromParts(host, Integer.parseInt(port) + 1)),
+ RocksService.class).get();
+ } catch (Exception e) {
+ logger.error("Error in open!");
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (assocClient != null)
+ assocClient.close();
+ if (nodeClient != null)
+ nodeClient.close();
+ if (clientManager != null)
+ clientManager.close();
+ } catch (IOException ioex) {
+ logger.error("Error while closing client connection: " + ioex);
+ }
+ }
+
+ @Override
+ public void initialize(Properties p, Phase currentPhase, int threadId)
+ throws IOException, Exception {
+ host = ConfigUtil.getPropertyRequired(p, CONFIG_HOST);
+ port = ConfigUtil.getPropertyRequired(p, CONFIG_PORT);
+ openConnection();
+ debuglevel = ConfigUtil.getDebugLevel(p);
+ }
+
+ public LinkStoreRocksDb() {
+ super();
+ }
+
+ public LinkStoreRocksDb(Properties props) throws IOException, Exception {
+ super();
+ initialize(props, Phase.LOAD, 0);
+ }
+
+ public void clearErrors(int threadID) {
+ logger.warn("Reopening Rocksdb connection in threadID " + threadID);
+ try {
+ close();
+ openConnection();
+ } catch (Throwable e) {
+ logger.error("Error in Reopen!");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean addLink(String dbid, Link l, boolean noinverse) {
+ try {
+ return addLinkImpl(dbid, l, noinverse);
+ } catch (Exception ex) {
+ logger.error("addlink failed!");
+ return false;
+ }
+ }
+
+ private boolean addLinkImpl(String dbid, Link l, boolean noinverse)
+ throws Exception {
+
+ if (Level.DEBUG.isGreaterOrEqual(debuglevel)) {
+ logger.debug("addLink " + l.id1 +
+ "." + l.id2 +
+ "." + l.link_type);
+ }
+ AssocVisibility av = AssocVisibility.values()[l.visibility];
+ String s = "wormhole...";
+ dbid += "assocs";
+ return assocClient.TaoAssocPut(
+ dbid.getBytes(), l.link_type, l.id1, l.id2, l.time,
+ av, true, Long.valueOf(l.version), l.data, s.getBytes()) == 1;
+ }
+
+ /**
+ * Internal method: add links without updating the count
+ */
+ private boolean addLinksNoCount(String dbid, List<Link> links)
+ throws Exception {
+ if (links.size() == 0)
+ return false;
+ for (Link l:links) {
+ AssocVisibility av = AssocVisibility.values()[l.visibility];
+ String s = "wormhole...";
+ dbid += "assocs";
+ if (assocClient.TaoAssocPut(dbid.getBytes(), l.link_type, l.id1, l.id2,
+ l.time, av, false, Long.valueOf(l.version), l.data, s.getBytes()) == 1) {
+ logger.error("addLinksNoCount failed!");
+ return false;
+ }
+ }
+ return true;
+}
+
+ @Override
+ public boolean deleteLink(String dbid, long id1, long link_type, long id2,
+ boolean noinverse, boolean expunge) {
+ try {
+ return deleteLinkImpl(dbid, id1, link_type, id2, noinverse, expunge);
+ } catch (Exception ex) {
+ logger.error("deletelink failed!");
+ return false;
+ }
+ }
+
+ private boolean deleteLinkImpl(String dbid, long id1, long link_type,
+ long id2, boolean noinverse, boolean expunge) throws Exception {
+ if (Level.DEBUG.isGreaterOrEqual(debuglevel)) {
+ logger.debug("deleteLink " + id1 +
+ "." + id2 +
+ "." + link_type);
+ }
+ String s = "wormhole...";
+ dbid += "assocs";
+ return assocClient.TaoAssocDelete(dbid.getBytes() , link_type, id1, id2,
+ AssocVisibility.HARD__DELETE, true, s.getBytes()) == 1;
+ }
+
+ @Override
+ public boolean updateLink(String dbid, Link l, boolean noinverse)
+ throws Exception {
+ // Retry logic is in addLink
+ boolean added = addLink(dbid, l, noinverse);
+ return !added; // return true if updated instead of added
+ }
+
+
+ // lookup using id1, type, id2
+ @Override
+ public Link getLink(String dbid, long id1, long link_type, long id2) {
+ try {
+ return getLinkImpl(dbid, id1, link_type, id2);
+ } catch (Exception ex) {
+ logger.error("getLink failed!");
+ return null;
+ }
+ }
+
+ private Link getLinkImpl(String dbid, long id1, long link_type, long id2)
+ throws Exception {
+ Link res[] = multigetLinks(dbid, id1, link_type, new long[] {id2});
+ if (res == null)
+ return null;
+ assert(res.length <= 1);
+ return res.length == 0 ? null : res[0];
+ }
+
+
+ @Override
+ public Link[] multigetLinks(String dbid, long id1, long link_type,
+ long[] id2s) {
+ try {
+ return multigetLinksImpl(dbid, id1, link_type, id2s);
+ } catch (Exception ex) {
+ logger.error("multigetlinks failed!");
+ return null;
+ }
+ }
+
+ private Link[] multigetLinksImpl(String dbid, long id1, long link_type,
+ long[] id2s) throws Exception {
+ List<Long> l = new ArrayList<Long>();
+ for (int i = 0; i < id2s.length; i++) {
+ l.add(new Long(id2s[i]));
+ }
+ dbid += "assocs";
+ List<TaoAssocGetResult> tr = assocClient.TaoAssocGet(dbid.getBytes(),
+ link_type, id1, l);
+ Link results[] = new Link[tr.size()];
+ int i = 0;
+ for (TaoAssocGetResult tar : tr) {
+ results[i] = new Link(id1, link_type, tar.getId2(),
+ LinkStore.VISIBILITY_DEFAULT, tar.getData(),
+ (int)(tar.getDataVersion()), tar.getTime());
+ }
+ return results;
+ }
+
+ // lookup using just id1, type
+ @Override
+ public Link[] getLinkList(String dbid, long id1, long link_type)
+ throws Exception {
+ return getLinkListImpl(
+ dbid, id1, link_type, 0, Long.MAX_VALUE, 0, rangeLimit);
+ }
+
+ @Override
+ public Link[] getLinkList(String dbid, long id1, long link_type,
+ long minTimestamp, long maxTimestamp, int offset, int limit) {
+ try {
+ return getLinkListImpl(dbid, id1, link_type, minTimestamp,
+ maxTimestamp, offset, limit);
+ } catch (Exception ex) {
+ logger.error("getLinkList failed!");
+ return null;
+ }
+ }
+
+ private Link[] getLinkListImpl(String dbid, long id1, long link_type,
+ long minTimestamp, long maxTimestamp, int offset, int limit)
+ throws Exception {
+ dbid += "assocs";
+ List<TaoAssocGetResult> tr = assocClient.TaoAssocRangeGet(
+ dbid.getBytes(), link_type, id1, maxTimestamp, minTimestamp,
+ Long.valueOf(offset), Long.valueOf(limit));
+ Link results[] = new Link[tr.size()];
+ int i = 0;
+ for (TaoAssocGetResult tar : tr) {
+ results[i] = new Link(id1, link_type, tar.getId2(),
+ LinkStore.VISIBILITY_DEFAULT, tar.getData(),
+ (int)(tar.getDataVersion()), tar.getTime());
+ }
+ return results;
+ }
+
+ // count the #links
+ @Override
+ public long countLinks(String dbid, long id1, long link_type) {
+ try {
+ return countLinksImpl(dbid, id1, link_type);
+ } catch (Exception ex) {
+ logger.error("countLinks failed!");
+ return -1;
+ }
+ }
+
+ private long countLinksImpl(String dbid, long id1, long link_type)
+ throws Exception {
+ long count = 0;
+ dbid += "assocs";
+ count = assocClient.TaoAssocCount(dbid.getBytes(), link_type, id1);
+ boolean found = false;
+ if (count > 0) {
+ found = true;
+ }
+ if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
+ logger.trace("Count result: " + id1 + "," + link_type +
+ " is " + found + " and " + count);
+ }
+ return count;
+ }
+
+ @Override
+ public int bulkLoadBatchSize() {
+ return bulkInsertSize;
+ }
+
+ @Override
+ public void addBulkLinks(String dbid, List<Link> links, boolean noinverse) {
+ try {
+ addBulkLinksImpl(dbid, links, noinverse);
+ } catch (Exception ex) {
+ logger.error("addBulkLinks failed!");
+ }
+ }
+
+ private void addBulkLinksImpl(String dbid, List<Link> links,
+ boolean noinverse) throws Exception {
+ if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
+ logger.trace("addBulkLinks: " + links.size() + " links");
+ }
+ addLinksNoCount(dbid, links);
+ }
+
+ @Override
+ public void addBulkCounts(String dbid, List<LinkCount> counts) {
+ try {
+ addBulkCountsImpl(dbid, counts);
+ } catch (Exception ex) {
+ logger.error("addbulkCounts failed!");
+ }
+ }
+
+ private void addBulkCountsImpl(String dbid, List<LinkCount> counts)
+ throws Exception {
+ if (Level.TRACE.isGreaterOrEqual(debuglevel)) {
+ logger.trace("addBulkCounts: " + counts.size() + " link counts");
+ }
+ if (counts.size() == 0)
+ return;
+
+ WriteOptions wopts = new WriteOptions();
+ wopts.setSync(false);
+ List<Kv> batchCounts = new ArrayList<Kv>();
+ for (LinkCount count: counts) {
+ byte[] id1 = ByteBuffer.allocate(8).putLong(
+ count.id1).array();
+ byte[] linkType = ByteBuffer.allocate(8).putLong(
+ count.link_type).array();
+ byte[] ckey = new byte[id1.length + linkType.length + 1];
+ System.arraycopy(id1, 0, ckey, 0, id1.length);
+ System.arraycopy(linkType, 0, ckey, id1.length, linkType.length);
+ char c = 'c';
+ ckey[ckey.length - 1] = (byte)c;
+
+ byte[] countValue = ByteBuffer.allocate(8).putLong(
+ count.count).array();
+
+ Kv keyvalue = new Kv();
+ keyvalue.setKey(ckey);
+ keyvalue.setValue(countValue);
+ batchCounts.add(keyvalue);
+ }
+ dbid += "assocs";
+ assocClient.Write(dbid.getBytes(), batchCounts, wopts);
+ }
+
+ @Override
+ public void resetNodeStore(String dbid, long startID) throws Exception {
+ //doesn't have a defined utility for Rocksdb
+ }
+
+ @Override
+ public long addNode(String dbid, Node node) {
+ try {
+ return addNodeImpl(dbid, node);
+ } catch (Exception ex) {
+ logger.error("addNode failed!");
+ return -1;
+ }
+ }
+
+ private long addNodeImpl(String dbid, Node node) throws Exception {
+ long ids[] = bulkAddNodes(dbid, Collections.singletonList(node));
+ assert(ids.length == 1);
+ return ids[0];
+ }
+
+ @Override
+ public long[] bulkAddNodes(String dbid, List<Node> nodes) throws Exception {
+ try {
+ return bulkAddNodesImpl(dbid, nodes);
+ } catch (Exception ex) {
+ logger.error("bulkAddNodes failed!");
+ return null;
+ }
+ }
+
+ private long[] bulkAddNodesImpl(String dbid, List<Node> nodes)
+ throws Exception {
+ long newIds[] = new long[nodes.size()];
+ WriteOptions wopts = new WriteOptions();
+ wopts.setSync(false);
+ int i = 0;
+ for (Node n : nodes) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );
+ outputStream.write(ByteBuffer.allocate(8).putInt(n.type).array());
+ outputStream.write(ByteBuffer.allocate(8).putLong(n.version).array());
+ outputStream.write(ByteBuffer.allocate(8).putInt(n.time).array());
+ outputStream.write(n.data);
+ byte[] idAsByte = ByteBuffer.allocate(8).putLong(n.id).array();
+ dbid += "nodes";
+ RetCode code = nodeClient.Put(
+ dbid.getBytes(), idAsByte, outputStream.toByteArray(), wopts);
+ if (code.getState() != Code.K_OK) {
+ throw new Exception();
+ }
+ newIds[i++] = n.id;
+ }
+ return newIds;
+ }
+
+ @Override
+ public Node getNode(String dbid, int type, long id) {
+ try {
+ return getNodeImpl(dbid, type, id);
+ } catch (Exception ex) {
+ logger.error("getnode failed!");
+ return null;
+ }
+ }
+
+ private Node getNodeImpl(String dbid, int type, long id) throws Exception {
+ ReadOptions ropts = new ReadOptions();
+ dbid += "nodes";
+ RocksGetResponse rgr =
+ nodeClient.Get(
+ dbid.getBytes(), ByteBuffer.allocate(8).putLong(id).array(), ropts);
+ if (rgr.getRetCode().getState() == (Code.K_NOT_FOUND)) {
+ return null; //Node was not found
+ } else if (rgr.getRetCode().getState() == (Code.K_OK)) {
+ byte[] rgrValue = rgr.getValue();
+ if (rgrValue.length < 24) {
+ logger.error("Fetched node does not have proper value");
+ return null;
+ }
+ int ntype = ByteBuffer.wrap(rgrValue, 0, 8).getInt();
+ long nversion = ByteBuffer.wrap(rgrValue, 8, 8).getInt();
+ int ntime = ByteBuffer.wrap(rgrValue, 16, 8).getInt();
+ byte[] ndata =
+ ByteBuffer.wrap(rgrValue, 24, rgrValue.length - 24).array();
+ return new Node(id, ntype, nversion, ntime, ndata);
+ } else {
+ logger.error("IO Error");
+ return null;
+ }
+ }
+
+ @Override
+ public boolean updateNode(String dbid, Node node) throws Exception {
+ try {
+ return updateNodeImpl(dbid, node);
+ } catch (Exception ex) {
+ logger.error("updateNode failed!");
+ return false;
+ }
+ }
+
+ private boolean updateNodeImpl(String dbid, Node node) throws Exception {
+ return addNode(dbid, node) == 1;
+ }
+
+ @Override
+ public boolean deleteNode(String dbid, int type, long id) throws Exception {
+ try {
+ return deleteNodeImpl(dbid, type, id);
+ } catch (Exception ex) {
+ logger.error("deleteNode failed!");
+ return false;
+ }
+ }
+
+ private boolean deleteNodeImpl(String dbid, int type, long id)
+ throws Exception {
+ WriteOptions wopts = new WriteOptions();
+ wopts.setSync(false);
+ dbid += "nodes";
+ return nodeClient.Delete(
+ dbid.getBytes(), ByteBuffer.allocate(8).putLong(id).array(),
+ wopts).getState() == (Code.K_OK);
+ }
+}
View
21 src/java/com/facebook/rocks/swift/AssocVisibility.java
@@ -0,0 +1,21 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+
+public enum AssocVisibility
+{
+VISIBLE(0), DELETED(1), UNUSED1(2), HIDDEN(3), HARD__DELETE(4);
+
+ private final int value;
+
+AssocVisibility(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
+}
View
21 src/java/com/facebook/rocks/swift/Code.java
@@ -0,0 +1,21 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+
+public enum Code
+{
+K_OK(0), K_END(1), K_NOT_FOUND(2), K_CORRUPTION(3), K_NOT_SUPPORTED(4), K_INVALID_ARGUMENT(5), K_IOERROR(6), K_SNAPSHOT_NOT_EXISTS(7), K_WRONG_SHARD(8), K_UNKNOWN_ERROR(9), K_CLIENT_ERROR(10);
+
+ private final int value;
+
+Code(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
+}
View
21 src/java/com/facebook/rocks/swift/CompressionType.java
@@ -0,0 +1,21 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+
+public enum CompressionType
+{
+K_NO_COMPRESSION(0), K_SNAPPY_COMPRESSION(1), K_ZLIB(2), K_BZIP2(3);
+
+ private final int value;
+
+CompressionType(int value)
+ {
+ this.value = value;
+ }
+
+ @ThriftEnumValue
+ public int getValue()
+ {
+ return value;
+ }
+}
View
18 src/java/com/facebook/rocks/swift/IOError.java
@@ -0,0 +1,18 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("IOError")
+public class IOError extends Exception
+{
+ private static final long serialVersionUID = 1L;
+
+private String message;
+
+@ThriftField(value=1, name="message")
+public String getMessage() { return message; }
+
+@ThriftField(value=1, name="message")
+public void setMessage(final String message) { this.message = message; }
+}
View
24 src/java/com/facebook/rocks/swift/Kv.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("Kv")
+public class Kv
+{
+private byte[] key;
+
+@ThriftField(value=1, name="key")
+public byte[] getKey() { return key; }
+
+@ThriftField(value=1, name="key")
+public void setKey(final byte[] key) { this.key = key; }
+
+private byte[] value;
+
+@ThriftField(value=2, name="value")
+public byte[] getValue() { return value; }
+
+@ThriftField(value=2, name="value")
+public void setValue(final byte[] value) { this.value = value; }
+}
View
32 src/java/com/facebook/rocks/swift/ReadOptions.java
@@ -0,0 +1,32 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("ReadOptions")
+public class ReadOptions
+{
+private boolean verifyChecksums;
+
+@ThriftField(value=1, name="verify_checksums")
+public boolean isVerifyChecksums() { return verifyChecksums; }
+
+@ThriftField(value=1, name="verify_checksums")
+public void setVerifyChecksums(final boolean verifyChecksums) { this.verifyChecksums = verifyChecksums; }
+
+private boolean fillCache;
+
+@ThriftField(value=2, name="fill_cache")
+public boolean isFillCache() { return fillCache; }
+
+@ThriftField(value=2, name="fill_cache")
+public void setFillCache(final boolean fillCache) { this.fillCache = fillCache; }
+
+private Snapshot snapshot;
+
+@ThriftField(value=3, name="snapshot")
+public Snapshot getSnapshot() { return snapshot; }
+
+@ThriftField(value=3, name="snapshot")
+public void setSnapshot(final Snapshot snapshot) { this.snapshot = snapshot; }
+}
View
24 src/java/com/facebook/rocks/swift/ResultSnapshot.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("ResultSnapshot")
+public class ResultSnapshot
+{
+private RetCode status;
+
+@ThriftField(value=1, name="status")
+public RetCode getStatus() { return status; }
+
+@ThriftField(value=1, name="status")
+public void setStatus(final RetCode status) { this.status = status; }
+
+private Snapshot snapshot;
+
+@ThriftField(value=2, name="snapshot")
+public Snapshot getSnapshot() { return snapshot; }
+
+@ThriftField(value=2, name="snapshot")
+public void setSnapshot(final Snapshot snapshot) { this.snapshot = snapshot; }
+}
View
24 src/java/com/facebook/rocks/swift/RetCode.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("RetCode")
+public class RetCode
+{
+private Code state;
+
+@ThriftField(value=1, name="state")
+public Code getState() { return state; }
+
+@ThriftField(value=1, name="state")
+public void setState(final Code state) { this.state = state; }
+
+private byte[] msg;
+
+@ThriftField(value=2, name="msg")
+public byte[] getMsg() { return msg; }
+
+@ThriftField(value=2, name="msg")
+public void setMsg(final byte[] msg) { this.msg = msg; }
+}
View
26 src/java/com/facebook/rocks/swift/RocksException.java
@@ -0,0 +1,26 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("RocksException")
+public class RocksException extends Exception
+{
+ private static final long serialVersionUID = 1L;
+
+private byte[] msg;
+
+@ThriftField(value=1, name="msg")
+public byte[] getMsg() { return msg; }
+
+@ThriftField(value=1, name="msg")
+public void setMsg(final byte[] msg) { this.msg = msg; }
+
+private Code errorCode;
+
+@ThriftField(value=2, name="errorCode")
+public Code getErrorCode() { return errorCode; }
+
+@ThriftField(value=2, name="errorCode")
+public void setErrorCode(final Code errorCode) { this.errorCode = errorCode; }
+}
View
24 src/java/com/facebook/rocks/swift/RocksGetResponse.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("RocksGetResponse")
+public class RocksGetResponse
+{
+private RetCode retCode;
+
+@ThriftField(value=1, name="retCode")
+public RetCode getRetCode() { return retCode; }
+
+@ThriftField(value=1, name="retCode")
+public void setRetCode(final RetCode retCode) { this.retCode = retCode; }
+
+private byte[] value;
+
+@ThriftField(value=2, name="value")
+public byte[] getValue() { return value; }
+
+@ThriftField(value=2, name="value")
+public void setValue(final byte[] value) { this.value = value; }
+}
View
24 src/java/com/facebook/rocks/swift/RocksIterateResponse.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("RocksIterateResponse")
+public class RocksIterateResponse
+{
+private RetCode status;
+
+@ThriftField(value=1, name="status")
+public RetCode getStatus() { return status; }
+
+@ThriftField(value=1, name="status")
+public void setStatus(final RetCode status) { this.status = status; }
+
+private List<Kv> data;
+
+@ThriftField(value=2, name="data")
+public List<Kv> getData() { return data; }
+
+@ThriftField(value=2, name="data")
+public void setData(final List<Kv> data) { this.data = data; }
+}
View
112 src/java/com/facebook/rocks/swift/RocksService.java
@@ -0,0 +1,112 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.service.ThriftException;
+import com.facebook.swift.service.ThriftMethod;
+import com.facebook.swift.service.ThriftService;
+
+import java.io.Closeable;
+
+import java.util.List;
+
+@ThriftService("RocksService")
+public interface RocksService extends Closeable {
+ @ThriftMethod(value = "Put")
+ RetCode Put(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "key") byte[] key,
+ @ThriftField(value = 3, name = "value") byte[] value,
+ @ThriftField(value = 4, name = "options") WriteOptions options);
+
+ @ThriftMethod(value = "Delete")
+ RetCode Delete(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "key") byte[] key,
+ @ThriftField(value = 3, name = "options") WriteOptions options);
+
+ @ThriftMethod(value = "Write")
+ RetCode Write(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "batch") List<Kv> batch,
+ @ThriftField(value = 3, name = "options") WriteOptions options);
+
+ @ThriftMethod(value = "Get")
+ RocksGetResponse Get(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "inputkey") byte[] inputkey,
+ @ThriftField(value = 3, name = "options") ReadOptions options);
+
+ @ThriftMethod(value = "Iterate")
+ RocksIterateResponse Iterate(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "startKey") byte[] startKey,
+ @ThriftField(value = 3, name = "endKey") byte[] endKey,
+ @ThriftField(value = 4, name = "options") ReadOptions options,
+ @ThriftField(value = 5, name = "max") int max);
+
+ @ThriftMethod(value = "CreateSnapshot")
+ ResultSnapshot CreateSnapshot(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "startKey") byte[] startKey);
+
+ @ThriftMethod(value = "ReleaseSnapshot")
+ RetCode ReleaseSnapshot(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "snapshot") Snapshot snapshot);
+
+ @ThriftMethod(value = "CompactRange")
+ RetCode CompactRange(@ThriftField(value = 1, name = "dbname") byte[] dbname,
+ @ThriftField(value = 2, name = "start") byte[] start,
+ @ThriftField(value = 3, name = "endhere") byte[] endhere);
+
+ @ThriftMethod(value = "Empty")
+ boolean Empty();
+
+ @ThriftMethod(value = "Noop")
+ void Noop();
+
+ @ThriftMethod(value = "TaoAssocPut",
+ exception = {@ThriftException(type = IOError.class, id = 1)})
+ long TaoAssocPut(@ThriftField(value = 1, name = "tableName") byte[] tableName,
+ @ThriftField(value = 2, name = "assocType") long assocType,
+ @ThriftField(value = 3, name = "id1") long id1,
+ @ThriftField(value = 4, name = "id2") long id2,
+ @ThriftField(value = 5, name = "timestamp") long timestamp,
+ @ThriftField(value = 6, name = "visibility") AssocVisibility visibility,
+ @ThriftField(value = 7, name = "update_count") boolean updateCount,
+ @ThriftField(value = 8, name = "dataVersion") long dataVersion,
+ @ThriftField(value = 9, name = "data") byte[] data,
+ @ThriftField(value = 10, name = "wormhole_comment") byte[] wormholeComment)
+ throws IOError;
+
+ @ThriftMethod(value = "TaoAssocDelete",
+ exception = {@ThriftException(type = IOError.class, id = 1)})
+ long TaoAssocDelete(@ThriftField(value = 1, name = "tableName") byte[] tableName,
+ @ThriftField(value = 2, name = "assocType") long assocType,
+ @ThriftField(value = 3, name = "id1") long id1,
+ @ThriftField(value = 4, name = "id2") long id2,
+ @ThriftField(value = 5, name = "visibility") AssocVisibility visibility,
+ @ThriftField(value = 6, name = "update_count") boolean updateCount,
+ @ThriftField(value = 7, name = "wormhole_comment") byte[] wormholeComment)
+ throws IOError;
+
+ @ThriftMethod(value = "TaoAssocRangeGet",
+ exception = {@ThriftException(type = IOError.class, id = 1)})
+ List<TaoAssocGetResult> TaoAssocRangeGet(@ThriftField(value = 1,
+ name = "tableName") byte[] tableName,
+ @ThriftField(value = 2, name = "assocType") long assocType,
+ @ThriftField(value = 3, name = "id1") long id1,
+ @ThriftField(value = 4, name = "start_time") long startTime,
+ @ThriftField(value = 5, name = "end_time") long endTime,
+ @ThriftField(value = 6, name = "offset") long offset,
+ @ThriftField(value = 7, name = "limit") long limit)
+ throws IOError;
+
+ @ThriftMethod(value = "TaoAssocGet",
+ exception = {@ThriftException(type = IOError.class, id = 1)})
+ List<TaoAssocGetResult> TaoAssocGet(@ThriftField(value = 1, name = "tableName") byte[] tableName,
+ @ThriftField(value = 2, name = "assocType") long assocType,
+ @ThriftField(value = 3, name = "id1") long id1,
+ @ThriftField(value = 4, name = "id2s") List<Long> id2s)
+ throws IOError;
+
+ @ThriftMethod(value = "TaoAssocCount",
+ exception = {@ThriftException(type = IOError.class, id = 1)})
+ long TaoAssocCount(@ThriftField(value = 1, name = "tableName") byte[] tableName,
+ @ThriftField(value = 2, name = "assocType") long assocType,
+ @ThriftField(value = 3, name = "id1") long id1)
+ throws IOError;
+}
View
16 src/java/com/facebook/rocks/swift/Snapshot.java
@@ -0,0 +1,16 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("Snapshot")
+public class Snapshot
+{
+private long snapshotid;
+
+@ThriftField(value=1, name="snapshotid")
+public long getSnapshotid() { return snapshotid; }
+
+@ThriftField(value=1, name="snapshotid")
+public void setSnapshotid(final long snapshotid) { this.snapshotid = snapshotid; }
+}
View
46 src/java/com/facebook/rocks/swift/TaoAssocGetResult.java
@@ -0,0 +1,46 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("TaoAssocGetResult")
+public class TaoAssocGetResult
+{
+private long id1, id2;
+
+@ThriftField(value=1, name="id1")
+public long getId1() { return id1; }
+
+@ThriftField(value=1, name="id1")
+public void setId1(final long id1) { this.id1 = id1; }
+
+@ThriftField(value=1, name="id2")
+public long getId2() { return id2; }
+
+@ThriftField(value=1, name="id2")
+public void setId2(final long id2) { this.id2 = id2; }
+
+private long time;
+
+@ThriftField(value=4, name="time")
+public long getTime() { return time; }
+
+@ThriftField(value=4, name="time")
+public void setTime(final long time) { this.time = time; }
+
+private long dataVersion;
+
+@ThriftField(value=5, name="dataVersion")
+public long getDataVersion() { return dataVersion; }
+
+@ThriftField(value=5, name="dataVersion")
+public void setDataVersion(final long dataVersion) { this.dataVersion = dataVersion; }
+
+private byte[] data;
+
+@ThriftField(value=6, name="data")
+public byte[] getData() { return data; }
+
+@ThriftField(value=6, name="data")
+public void setData(final byte[] data) { this.data = data; }
+}
View
24 src/java/com/facebook/rocks/swift/WriteOptions.java
@@ -0,0 +1,24 @@
+package com.facebook.rocks.swift;
+
+import com.facebook.swift.codec.*;
+import java.util.*;
+
+@ThriftStruct("WriteOptions")
+public class WriteOptions
+{
+private boolean sync;
+
+@ThriftField(value=1, name="sync")
+public boolean isSync() { return sync; }
+
+@ThriftField(value=1, name="sync")
+public void setSync(final boolean sync) { this.sync = sync; }
+
+private boolean disableWAL;
+
+@ThriftField(value=2, name="disableWAL")
+public boolean isDisableWAL() { return disableWAL; }
+
+@ThriftField(value=2, name="disableWAL")
+public void setDisableWAL(final boolean disableWAL) { this.disableWAL = disableWAL; }
+}
View
311 src/java/com/facebook/rocks/swift/rocks.thrift
@@ -0,0 +1,311 @@
+// Copyright 2012 Facebook
+//The thirft file that was used to auto-generate the code for RocksDb
+
+namespace cpp facebook.rocks
+namespace java facebook.rocks
+namespace php rocks
+namespace py rocks
+
+// Types
+typedef binary Text
+typedef binary Bytes
+
+typedef binary Slice
+
+enum Code {
+ kOk = 0,
+ kEnd = 1,
+ kNotFound = 2,
+ kCorruption = 3,
+ kNotSupported = 4,
+ kInvalidArgument = 5,
+ kIOError = 6,
+ kSnapshotNotExists = 7,
+ kWrongShard = 8,
+ kUnknownError = 9,
+ kClientError = 10,
+}
+
+struct RetCode {
+ 1: Code state,
+ 2: Slice msg,
+}
+
+exception RocksException {
+ 1:Text msg,
+ 2:Code errorCode
+}
+
+//
+// An IOError exception from an assoc operation
+//
+exception IOError {
+ 1:string message
+}
+
+// Different compression types supported
+enum CompressionType {
+ kNoCompression = 0x0,
+ kSnappyCompression = 0x1,
+ kZlib = 0x2,
+ kBZip2 = 0x3
+}
+
+/**
+ * Holds the assoc get result of a id2
+ */
+struct TaoAssocGetResult {
+ /** id2 of assoc */
+ 1:i64 id2,
+
+ /** time stamp of the assoc */
+ 4:i64 time,
+
+ /** version of the data blob */
+ 5:i64 dataVersion,
+
+ /** serialized data of the asoc */
+ 6:Text data,
+}
+
+
+struct RocksGetResponse {
+ 1: RetCode retCode,
+ 2: Slice value
+}
+
+struct kv {
+ 1:Slice key,
+ 2:Slice value
+}
+
+struct RocksIterateResponse {
+ 1: RetCode status,
+ 2: list<kv> data,
+}
+
+// Options for writing
+struct WriteOptions {
+ 1:bool sync,
+ 2:bool disableWAL,
+}
+
+struct Snapshot {
+ 1:i64 snapshotid // server generated
+}
+
+// Snapshot result
+struct ResultSnapshot {
+ 1:RetCode status,
+ 2:Snapshot snapshot
+}
+
+// Options for reading. If you do not have a
+// snapshot, set snapshot.snapshotid = 0
+struct ReadOptions {
+ 1:bool verify_checksums,
+ 2:bool fill_cache,
+ 3:Snapshot snapshot
+}
+
+//
+// Visibility state for assoc
+//
+enum AssocVisibility
+{
+ VISIBLE = 0, // live object, include in lookups and count
+ DELETED = 1, // exclude from lookup queries and count, ok to
+ // delete permanently from persistent store
+ UNUSED1 = 2, // not used
+ HIDDEN = 3, // exclude from lookup queries and count
+ UNUSED2 = 4, // not used
+ HARD_DELETE = 5 // deleted by calling expunge, will be swept
+ // as soon as possible
+}
+
+service RocksService {
+ // puts a key in the database
+ RetCode Put(1:Text dbname,
+ 2:Slice key,
+ 3:Slice value,
+ 4:WriteOptions options),
+
+ // deletes a key from the database
+ RetCode Delete(1:Text dbname,
+ 2:Slice key,
+ 3:WriteOptions options),
+
+ // writes batch of keys into the database
+ RetCode Write(1:Text dbname,
+ 2:list<kv> batch,
+ 3:WriteOptions options),
+
+ // fetch a key from the DB.
+ // RocksResponse.status == kNotFound means key does non exist
+ // RocksResponse.status == kOk means key is found
+ RocksGetResponse Get(1:Text dbname,
+ 2:Slice inputkey,
+ 3:ReadOptions options),
+
+ // fetch a range of KVs.
+ // startKey gives the start key.
+ // endKey gives the end key.
+ // RocksIterateResponse.status == kOK means more data.
+ // RocksIterateResponse.status == kEnd means no data.
+ // All other return status means errors.
+ RocksIterateResponse Iterate(1:Text dbname,
+ 2:Slice startKey,
+ 3:Slice endKey,
+ 4:ReadOptions options,
+ 5:i32 max),
+
+ // Create snapshot.
+ ResultSnapshot CreateSnapshot(1:Text dbname,
+ 2:Slice startKey),
+
+ // Release snapshots
+ RetCode ReleaseSnapshot(1:Text dbname,
+ 2:Snapshot snapshot),
+
+ // compact a range of keys
+ // begin.size == 0 to start at a range earlier than the first existing key
+ // end.size == 0 to end at a range later than the last existing key
+ RetCode CompactRange(1:Text dbname,
+ 2:Slice start,
+ 3:Slice endhere),
+
+ bool Empty(),
+
+ void Noop(),
+
+ /**
+ * TAO Assoc Put operation.
+ * Note that currently the argument visibility has no effect.
+ *
+ * @if update_count is true, then return the updated count for this assoc
+ * @if update_count is false, then return 0
+ * @return negative number if failure
+ */
+ i64 TaoAssocPut(
+ /** name of table */
+ 1:Text tableName,
+
+ /** type assoc */
+ 2:i64 assocType,
+
+ /** id1 of assoc */
+ 3:i64 id1,
+
+ /** id2 of assoc */
+ 4:i64 id2,
+
+ /** timestamp of assoc */
+ 5:i64 timestamp,
+
+ /** visibility */
+ 6:AssocVisibility visibility,
+
+ /** whether to keep the count or not */
+ 7:bool update_count,
+
+ /** version of the data blob */
+ 8:i64 dataVersion,
+
+ /** serialized data of assoc */
+ 9:Text data,
+
+ /** wormhole comment */
+ 10:Text wormhole_comment
+ ) throws (1:IOError io),
+
+ /**
+ * TAO Assoc Delete operation.
+ *
+ * @return the updated count for this assoc
+ */
+ i64 TaoAssocDelete(
+ /** name of table */
+ 1:Text tableName,
+
+ /** type assoc */
+ 2:i64 assocType,
+
+ /** id1 of assoc */
+ 3:i64 id1,
+
+ /** id2 of assoc */
+ 4:i64 id2,
+
+ /** visibility flag for this delete */
+ 5:AssocVisibility visibility,
+
+ /** whether to keep the count or not */
+ 6:bool update_count,
+
+ /** wormhole comment */
+ 7:Text wormhole_comment
+ ) throws (1:IOError io),
+
+ /**
+ * TAO Assoc RangeGet operation.
+ * Obtain assocs in bewteen start_time and end_time in reverse time order.
+ * The range check is inclusive: start_time >= time && time >= end_time.
+ * And yes, start_time >= end_time because this range scan is a backward
+ * scan in time, starting with most recent time and scanning backwards
+ * for the most recent n assocs.
+ */
+ list<TaoAssocGetResult> TaoAssocRangeGet(
+ /** name of table */
+ 1:Text tableName,
+
+ /** type of assoc */
+ 2:i64 assocType,
+
+ /** id1 of assoc */
+ 3:i64 id1,
+
+ /** maximum timestamp of assocs to retrieve */
+ 4:i64 start_time,
+
+ /** minimum timestamp of assocs to retrieve */
+ 5:i64 end_time,
+
+ /** number of assocs to skip from start */
+ 6:i64 offset,
+
+ /** max number of assocs (columns) returned */
+ 7:i64 limit
+ ) throws (1:IOError io),
+
+ /**
+ * TAO Assoc Get operation.
+ */
+ list<TaoAssocGetResult> TaoAssocGet(
+ /** name of table */
+ 1:Text tableName,
+
+ /** type of assoc */
+ 2:i64 assocType,
+
+ /** id1 of assoc */
+ 3:i64 id1,
+
+ /** list of id2 need to be fetch */
+ 4:list<i64> id2s
+ ) throws (1:IOError io),
+
+ /**
+ * TAO Assoc Count Get operation.
+ * Returns the number of assocs for given id1 and assoc type
+ */
+ i64 TaoAssocCount(
+ /** name of table */
+ 1:Text tableName,
+
+ /** type of assoc */
+ 2:i64 assocType,
+
+ /** id1 of assoc */
+ 3:i64 id1,
+ ) throws (1:IOError io),
+}
Please sign in to comment.
Something went wrong with that request. Please try again.