Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Update to new build scripts.

Describe ring can filter by dc and rack.
Filter out empty rows from AllRowsQuery
Added separate recipe to AllRowsReader
Added checkpoint manager to resume AllRowsQuery/Reader
  • Loading branch information...
commit a255e636942469091027401ea62bd7be2c36597e 1 parent 5c05d11
@elandau elandau authored
Showing with 1,767 additions and 239 deletions.
  1. +42 −28 build.gradle
  2. +1 −0  gradle.properties
  3. +12 −0 gradle/buildscript.gradle
  4. +23 −19 gradle/check.gradle
  5. +29 −18 gradle/convention.gradle
  6. +8 −4 gradle/license.gradle
  7. +27 −40 gradle/maven.gradle
  8. +1 −1  gradle/netflix-oss.gradle
  9. +64 −3 gradle/release.gradle
  10. +2 −2 gradle/wrapper/gradle-wrapper.properties
  11. +2 −2 gradlew
  12. +0 −1  netflix-oss.gradle
  13. +14 −3 pom.xml
  14. +0 −1  settings.gradle
  15. +8 −0 src/main/java/com/netflix/astyanax/CassandraOperationCategory.java
  16. +42 −1 src/main/java/com/netflix/astyanax/CassandraOperationType.java
  17. +6 −0 src/main/java/com/netflix/astyanax/ColumnListMutation.java
  18. +38 −1 src/main/java/com/netflix/astyanax/Keyspace.java
  19. +25 −7 src/main/java/com/netflix/astyanax/connectionpool/Host.java
  20. +10 −10 src/main/java/com/netflix/astyanax/connectionpool/impl/AbstractHostPartitionConnectionPool.java
  21. +1 −1  src/main/java/com/netflix/astyanax/connectionpool/impl/ConnectionPoolConfigurationImpl.java
  22. +48 −21 src/main/java/com/netflix/astyanax/connectionpool/impl/HostConnectionPoolPartition.java
  23. +11 −2 src/main/java/com/netflix/astyanax/connectionpool/impl/RoundRobinExecuteWithFailover.java
  24. +56 −23 src/main/java/com/netflix/astyanax/connectionpool/impl/TokenPartitionedTopology.java
  25. +31 −0 src/main/java/com/netflix/astyanax/ddl/ColumnDefinition.java
  26. +80 −0 src/main/java/com/netflix/astyanax/impl/AstyanaxCheckpointManager.java
  27. +4 −2 src/main/java/com/netflix/astyanax/impl/AstyanaxConfigurationImpl.java
  28. +24 −5 src/main/java/com/netflix/astyanax/impl/RingDescribeHostSupplier.java
  29. +73 −0 src/main/java/com/netflix/astyanax/partitioner/BOP20Partitioner.java
  30. +74 −0 src/main/java/com/netflix/astyanax/partitioner/BigInteger127Partitioner.java
  31. +57 −0 src/main/java/com/netflix/astyanax/partitioner/Partitioner.java
  32. +32 −0 src/main/java/com/netflix/astyanax/query/AllRowsQuery.java
  33. +44 −0 src/main/java/com/netflix/astyanax/query/CheckpointManager.java
  34. +2 −1  src/main/java/com/netflix/astyanax/query/ColumnFamilyQuery.java
  35. +515 −0 src/main/java/com/netflix/astyanax/recipes/reader/AllRowsReader.java
  36. +3 −2 src/main/java/com/netflix/astyanax/retry/BoundedExponentialBackoff.java
  37. +32 −0 src/main/java/com/netflix/astyanax/shallows/EmptyCheckpointManager.java
  38. +22 −0 src/main/java/com/netflix/astyanax/test/TestKeyspace.java
  39. +49 −1 src/main/java/com/netflix/astyanax/thrift/AbstractThriftAllRowsQueryImpl.java
  40. +39 −7 src/main/java/com/netflix/astyanax/thrift/ThriftAllRowsImpl.java
  41. +63 −15 src/main/java/com/netflix/astyanax/thrift/ThriftColumnFamilyQueryImpl.java
  42. +1 −1  src/main/java/com/netflix/astyanax/thrift/ThriftConverter.java
  43. +52 −12 src/main/java/com/netflix/astyanax/thrift/ThriftKeyspaceImpl.java
  44. +12 −1 src/main/java/com/netflix/astyanax/thrift/ThriftSyncConnectionFactoryImpl.java
  45. +33 −1 src/main/java/com/netflix/astyanax/thrift/ddl/ThriftColumnDefinitionImpl.java
  46. +21 −0 src/main/java/com/netflix/astyanax/util/BarrierCallableDecorator.java
  47. +16 −0 src/main/java/com/netflix/astyanax/util/Callables.java
  48. +14 −1 src/main/java/com/netflix/astyanax/util/TokenGenerator.java
  49. +2 −1  src/test/java/com/netflix/astyanax/connectionpool/impl/SimpleHostConnectionPoolTest.java
  50. +2 −1  src/test/java/com/netflix/astyanax/impl/RingDescribeHostSupplierTest.java
View
70 build.gradle
@@ -1,47 +1,61 @@
// Establish version and status
-ext.releaseVersion = '1.0.5'
-ext.githubProjectName = name
-group = 'com.netflix.astyanax'
+ext.githubProjectName = rootProject.name // Change if github project name is not the same as the root project's name
buildscript {
repositories { mavenCentral() }
+ apply from: file('gradle/buildscript.gradle'), to: buildscript
}
allprojects {
repositories { mavenCentral() }
}
-//apply from: file('gradle/release.gradle') // Not fully tested
apply from: file('gradle/convention.gradle')
apply from: file('gradle/maven.gradle')
apply from: file('gradle/check.gradle')
-//apply from: file('gradle/license.gradle') // Waiting for re-release
-
-// Closure to configure all the POM with extra info, common to all projects
-pom {
- project {
- url "https://github.com/Netflix/${githubProjectName}"
- scm {
- connection "scm:git:git@github.com:Netflix/${githubProjectName}.git"
- url "scm:git:git@github.com:Netflix/${githubProjectName}.git"
- developerConnection "scm:git:git@github.com:Netflix/${githubProjectName}.git"
- }
- issueManagement {
- system 'github'
- url "https://github.com/Netflix/${githubProjectName}/issues"
+apply from: file('gradle/license.gradle')
+apply from: file('gradle/release.gradle')
+
+subprojects {
+ // Closure to configure all the POM with extra info, common to all projects
+ pom {
+ project {
+ url "https://github.com/Netflix/${rootProject.githubProjectName}"
+ scm {
+ connection "scm:git:git@github.com:Netflix/${rootProject.githubProjectName}.git"
+ url "scm:git:git@github.com:Netflix/${rootProject.githubProjectName}.git"
+ developerConnection "scm:git:git@github.com:Netflix/${rootProject.githubProjectName}.git"
+ }
+ issueManagement {
+ system 'github'
+ url "https://github.com/Netflix/${rootProject.githubProjectName}/issues"
+ }
}
}
+
+ group = "com.netflix.${githubProjectName}" // TEMPLATE: Set to organization of project
+
+ dependencies {
+ compile 'javax.ws.rs:jsr311-api:1.1.1'
+ compile 'com.sun.jersey:jersey-core:1.11'
+ testCompile 'org.testng:testng:6.1.1'
+ testCompile 'org.mockito:mockito-core:1.8.5'
+ }
}
-dependencies {
- compile 'joda-time:joda-time:2.0'
- compile 'org.apache.servicemix.bundles:org.apache.servicemix.bundles.commons-csv:1.0-r706900_3'
- compile 'com.github.stephenc.high-scale-lib:high-scale-lib:1.1.2'
- compile 'com.google.guava:guava:11.0.2'
- compile 'org.apache.cassandra:cassandra-all:1.1.0'
- compile 'com.github.stephenc.eaio-uuid:uuid:3.2.0'
- compile 'org.slf4j:slf4j-api:1.6.4'
- compile 'org.codehaus.jettison:jettison:1.3.1'
- testCompile 'junit:junit:4.8.1'
+project(':') {
+ apply plugin: 'java'
+ dependencies {
+ compile 'joda-time:joda-time:2.0'
+ compile 'org.apache.servicemix.bundles:org.apache.servicemix.bundles.commons-csv:1.0-r706900_3'
+ compile 'com.github.stephenc.high-scale-lib:high-scale-lib:1.1.2'
+ compile 'com.google.guava:guava:11.0'
+ compile 'org.apache.cassandra:cassandra-all:1.1.0'
+ compile 'com.github.stephenc.eaio-uuid:uuid:3.2.0'
+ compile 'org.slf4j:slf4j-api:1.6.4'
+ compile 'org.codehaus.jettison:jettison:1.3.1'
+ compile 'commons-codec:commons-codec:1.6'
+ testCompile 'junit:junit:4.8.1'
+ }
}
View
1  gradle.properties
@@ -0,0 +1 @@
+version=1.54-SNAPSHOT
View
12 gradle/buildscript.gradle
@@ -0,0 +1,12 @@
+// Executed in context of buildscript
+repositories {
+ maven {
+ name 'build-repo'
+ url 'https://github.com/Netflix-Skunkworks/build-repo/raw/master/releases/'
+ }
+}
+dependencies {
+ classpath 'nl.javadude.gradle.plugins:license-gradle-plugin:0.6.0'
+ classpath 'com.mapvine:gradle-cobertura-plugin:0.1'
+ classpath 'gradle-release:gradle-release:1.0-SNAPSHOT'
+}
View
42 gradle/check.gradle
@@ -1,21 +1,25 @@
-// Checkstyle
-apply plugin: 'checkstyle'
-tasks.withType(Checkstyle) { ignoreFailures = true }
-checkstyle {
- ignoreFailures = true // Waiting on GRADLE-2163
- configFile = rootProject.file('codequality/checkstyle.xml')
-}
+subprojects {
+ // Checkstyle
+ apply plugin: 'checkstyle'
+ tasks.withType(Checkstyle) { ignoreFailures = true }
+ checkstyle {
+ ignoreFailures = true // Waiting on GRADLE-2163
+ configFile = rootProject.file('codequality/checkstyle.xml')
+ }
+
+ // FindBugs
+ apply plugin: 'findbugs'
+ //tasks.withType(Findbugs) { reports.html.enabled true }
-// FindBugs
-//apply plugin: 'findbugs'
-//findbugs {
-// ignoreFailures = true
-//}
-//tasks.withType(Findbugs) { reports.html.enabled true }
+ // PMD
+ apply plugin: 'pmd'
+ //tasks.withType(Pmd) { reports.html.enabled true }
-// PMD
-//apply plugin: 'pmd'
-//pmd {
-// ignoreFailures = true
-//}
-//tasks.withType(Pmd) { reports.html.enabled true }
+ apply plugin: 'cobertura'
+ cobertura {
+ sourceDirs = sourceSets.main.java.srcDirs
+ format = 'html'
+ includes = ['**/*.java', '**/*.groovy']
+ excludes = []
+ }
+}
View
47 gradle/convention.gradle
@@ -1,32 +1,43 @@
-ext.performingRelease = project.hasProperty('release') && Boolean.parseBoolean(project.release)
-def versionPostfix = performingRelease?'':'-SNAPSHOT'
+// For Artifactory
+rootProject.status = version.contains('-SNAPSHOT')?'snapshot':'release'
-apply plugin: 'java' // Plugin as major conventions
+subprojects { project ->
+ apply plugin: 'java' // Plugin as major conventions
-version = "${releaseVersion}${versionPostfix}"
+ version = rootProject.version
-sourceCompatibility = 1.6
+ sourceCompatibility = 1.6
-// GRADLE-2087 workaround, perform after java plugin
-status = performingRelease?'release':'snapshot'
+ // GRADLE-2087 workaround, perform after java plugin
+ status = rootProject.status
-task sourcesJar(type: Jar, dependsOn:classes) {
- classifier = 'sources'
- from sourceSets.main.allSource
-}
+ task sourcesJar(type: Jar, dependsOn:classes) {
+ classifier = 'sources'
+ from sourceSets.main.allSource
+ }
+
+ task javadocJar(type: Jar, dependsOn:javadoc) {
+ classifier = 'javadoc'
+ from javadoc.destinationDir
+ }
+
+ artifacts {
+ archives sourcesJar
+ archives javadocJar
+ }
-task javadocJar(type: Jar, dependsOn:javadoc) {
- classifier = 'javadoc'
- from javadoc.destinationDir
+ javadoc.doFirst { println "" }
}
-artifacts {
- archives sourcesJar
- archives javadocJar
+task aggregateJavadoc(type: Javadoc) {
+ description = 'Aggregate all subproject docs into a single docs directory'
+ source subprojects.collect {project -> project.sourceSets.main.allJava }
+ classpath = files(subprojects.collect {project -> project.sourceSets.main.compileClasspath})
+ destinationDir = new File(projectDir, 'doc')
}
// Generate wrapper, which is distributed as part of source to alleviate the need of installing gradle
task createWrapper(type: Wrapper) {
- gradleVersion = '1.0-milestone-9'
+ gradleVersion = '1.1'
}
View
12 gradle/license.gradle
@@ -1,5 +1,9 @@
-buildscript {
- dependencies { classpath 'nl.javadude.gradle.plugins:license-gradle-plugin:0.5' }
-}
+// Dependency for plugin was set in buildscript.gradle
-apply plugin: nl.javadude.gradle.plugins.license.LicensePlugin
+subprojects {
+apply plugin: 'license' //nl.javadude.gradle.plugins.license.LicensePlugin
+license {
+ header rootProject.file('codequality/HEADER')
+ ext.year = Calendar.getInstance().get(Calendar.YEAR)
+}
+}
View
67 gradle/maven.gradle
@@ -1,56 +1,43 @@
// Maven side of things
-apply plugin: 'maven' // Java plugin has to have been already applied for the conf2scope mappings to work
-apply plugin: 'signing'
+subprojects {
+ apply plugin: 'maven' // Java plugin has to have been already applied for the conf2scope mappings to work
+ apply plugin: 'signing'
-if (gradle.startParameter.taskNames.contains("uploadMavenCentral")) {
signing {
- required true
+ required { gradle.taskGraph.hasTask(uploadMavenCentral) }
sign configurations.archives
}
-} else {
- task signArchives {
- // do nothing
- }
-}
-/**
- * Publishing to Maven Central example provided from http://jedicoder.blogspot.com/2011/11/automated-gradle-project-deployment-to.html
- * artifactory will execute uploadArchives to force generation of ivy.xml, and we don't want that to trigger an upload to maven
- * central, so using custom upload task.
- */
-task uploadMavenCentral(type:Upload) {
- configuration = configurations.archives
- dependsOn 'signArchives'
- doFirst {
- repositories.mavenDeployer {
- beforeDeployment { org.gradle.api.artifacts.maven.MavenDeployment deployment -> signing.signPom(deployment) }
+ /**
+ * Publishing to Maven Central example provided from http://jedicoder.blogspot.com/2011/11/automated-gradle-project-deployment-to.html
+ */
+ task uploadMavenCentral(type:Upload, dependsOn: signArchives) {
+ configuration = configurations.archives
+ doFirst {
+ repositories.mavenDeployer {
+ beforeDeployment { org.gradle.api.artifacts.maven.MavenDeployment deployment -> signing.signPom(deployment) }
- // To test deployment locally, use the following instead of oss.sonatype.org
- //repository(url: "file://localhost/${rootProject.rootDir}/repo")
+ // To test deployment locally, use the following instead of oss.sonatype.org
+ //repository(url: "file://localhost/${rootProject.rootDir}/repo")
- repository(url: 'http://oss.sonatype.org/service/local/staging/deploy/maven2/') {
- authentication(userName: rootProject.sonatypeUsername, password: rootProject.sonatypePassword)
- }
+ repository(url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2') {
+ authentication(userName: rootProject.sonatypeUsername, password: rootProject.sonatypePassword)
+ }
- // Prevent datastamp from being appending to artifacts during deployment
- uniqueVersion = false
+ // Prevent datastamp from being appending to artifacts during deployment
+ uniqueVersion = false
- // Closure to configure all the POM with extra info, common to all projects
- pom.project {
- parent {
- groupId 'org.sonatype.oss'
- artifactId 'oss-parent'
- version '7'
- }
- licenses {
- license {
- name 'The Apache Software License, Version 2.0'
- url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
- distribution 'repo'
+ // Closure to configure all the POM with extra info, common to all projects
+ pom.project {
+ licenses {
+ license {
+ name 'The Apache Software License, Version 2.0'
+ url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+ distribution 'repo'
+ }
}
}
}
}
}
}
-
View
2  gradle/netflix-oss.gradle
@@ -1 +1 @@
-apply from: 'http://artifacts.netflix.com/build-local/artifactory.gradle'
+apply from: 'http://artifacts.netflix.com/gradle-netflix-local/artifactory.gradle'
View
67 gradle/release.gradle
@@ -1,6 +1,67 @@
-buildscript {
- dependencies { classpath group: 'no.entitas.gradle', name: 'gradle-release-plugin', version: '1.11' }
+apply plugin: 'release'
+
+// Ignore release plugin's task because it calls out via GradleBuild. This is a good place to put an email to send out
+task release(overwrite: true, dependsOn: commitNewVersion) << {
+ // This is a good place to put an email to send out
+}
+commitNewVersion.dependsOn updateVersion
+updateVersion.dependsOn createReleaseTag
+createReleaseTag.dependsOn preTagCommit
+def buildTasks = tasks.matching { it.name =~ /:build/ }
+preTagCommit.dependsOn buildTasks
+preTagCommit.dependsOn checkSnapshotDependencies
+//checkSnapshotDependencies.dependsOn confirmReleaseVersion // Introduced in 1.0, forces readLine
+//confirmReleaseVersion.dependsOn unSnapshotVersion
+checkSnapshotDependencies.dependsOn unSnapshotVersion // Remove once above is fixed
+unSnapshotVersion.dependsOn checkUpdateNeeded
+checkUpdateNeeded.dependsOn checkCommitNeeded
+checkCommitNeeded.dependsOn initScmPlugin
+
+// Call out to compile against internal repository
+task uploadArtifactory(type: GradleBuild) {
+ startParameter = project.gradle.startParameter.newInstance()
+ startParameter.addInitScript( file('gradle/netflix-oss.gradle') )
+ startParameter.getExcludedTaskNames().add('check')
+ tasks = [ 'build', 'artifactoryPublish' ]
}
-apply plugin: no.entitas.gradle.git.GitReleasePlugin // 'gitrelease'
+task releaseArtifactory(dependsOn: [checkSnapshotDependencies, uploadArtifactory])
+task buildWithArtifactory(type: GradleBuild) {
+ startParameter = project.gradle.startParameter.newInstance()
+ startParameter.addInitScript( file('gradle/netflix-oss.gradle') )
+ startParameter.getExcludedTaskNames().add('check')
+ tasks = [ 'build' ]
+}
+
+// Ensure upload happens before taggging but after all pre-checks
+//releaseArtifactory.dependsOn checkSnapshotDependencies
+createReleaseTag.dependsOn releaseArtifactory
+gradle.taskGraph.whenReady { taskGraph ->
+ if ( taskGraph.hasTask(uploadArtifactory) && rootProject.status == 'release' && !taskGraph.hasTask(':release') ) {
+ throw new GradleException('Project has to be release status releasing to Artifactory')
+ }
+}
+subprojects.each { project ->
+ project.uploadMavenCentral.dependsOn rootProject.checkSnapshotDependencies
+ rootProject.createReleaseTag.dependsOn project.uploadMavenCentral
+
+ gradle.taskGraph.whenReady { taskGraph ->
+ if ( taskGraph.hasTask(project.uploadMavenCentral) && !taskGraph.hasTask(':release') ) {
+ throw new GradleException('"release" task has to be run before uploading to Maven Central')
+ }
+ }
+}
+
+// Prevent plugin from asking for a version number interactively
+ext.'gradle.release.useAutomaticVersion' = "true"
+
+release {
+ // http://tellurianring.com/wiki/gradle/release
+ failOnCommitNeeded=true
+ failOnPublishNeeded=true
+ failOnUnversionedFiles=true
+ failOnUpdateNeeded=true
+ includeProjectNameInTag=true
+ requireBranch = null
+}
View
4 gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Wed Mar 14 13:19:56 PDT 2012
+#Tue Aug 14 16:28:54 PDT 2012
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=http\://services.gradle.org/distributions/gradle-1.0-milestone-9-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
View
4 gradlew
@@ -101,13 +101,13 @@ if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
- warn "Could not query businessSystem maximum file descriptor limit: $MAX_FD_LIMIT"
+ warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
- JAVA_OPTS="$JAVA_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
View
1  netflix-oss.gradle
@@ -1 +0,0 @@
-apply from: 'http://artifacts.netflix.com/gradle-netflix-local/artifactory.gradle'
View
17 pom.xml
@@ -48,7 +48,7 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
- <version>2.0</version>
+ <version>1.6.1</version>
</dependency>
<dependency>
@@ -64,6 +64,12 @@
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.6</version>
+ </dependency>
+
+ <dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.1.2</version>
@@ -72,7 +78,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>11.0.2</version>
+ <version>11.0</version>
</dependency>
<dependency>
@@ -80,7 +86,6 @@
<artifactId>cassandra-all</artifactId>
<version>1.1.0</version>
</dependency>
-
<dependency>
<groupId>com.github.stephenc.eaio-uuid</groupId>
@@ -102,6 +107,12 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
View
1  settings.gradle
@@ -1 +0,0 @@
-rootProject.name='astyanax'
View
8 src/main/java/com/netflix/astyanax/CassandraOperationCategory.java
@@ -0,0 +1,8 @@
+package com.netflix.astyanax;
+
+public enum CassandraOperationCategory {
+ READ,
+ WRITE,
+ OTHER,
+ CQL,
+}
View
43 src/main/java/com/netflix/astyanax/CassandraOperationType.java
@@ -1,5 +1,46 @@
package com.netflix.astyanax;
public enum CassandraOperationType {
- BATCH_MUTATE, GET_ROW, GET_ROWS_RANGE, GET_ROWS_SLICE, GET_ROWS_BY_INDEX, GET_COLUMN, CQL, DESCRIBE_RING, COUNTER_MUTATE, COLUMN_MUTATE, COLUMN_DELETE, COLUMN_INSERT, GET_COLUMN_COUNT, COPY_TO, DESCRIBE_KEYSPACE, TRUNCATE, DESCRIBE_CLUSTER, DESCRIBE_VERSION, DESCRIBE_SNITCH, DESCRIBE_PARTITIONER, DESCRIBE_SCHEMA_VERSION, GET_VERSION, DROP_COLUMN_FAMILY, DESCRIBE_KEYSPACES, DROP_KEYSPACE, ADD_COLUMN_FAMILY, UPDATE_COLUMN_FAMILY, ADD_KEYSPACE, UPDATE_KEYSPACE, SET_KEYSPACE, TEST,
+ BATCH_MUTATE(CassandraOperationCategory.WRITE),
+ GET_ROW(CassandraOperationCategory.READ),
+ GET_ROWS_RANGE(CassandraOperationCategory.READ),
+ GET_ROWS_SLICE(CassandraOperationCategory.READ),
+ GET_ROWS_BY_INDEX(CassandraOperationCategory.READ),
+ GET_COLUMN(CassandraOperationCategory.READ),
+ CQL(CassandraOperationCategory.CQL),
+ DESCRIBE_RING(CassandraOperationCategory.OTHER),
+ COUNTER_MUTATE(CassandraOperationCategory.WRITE),
+ COLUMN_MUTATE(CassandraOperationCategory.WRITE),
+ COLUMN_DELETE(CassandraOperationCategory.WRITE),
+ COLUMN_INSERT(CassandraOperationCategory.WRITE),
+ GET_COLUMN_COUNT(CassandraOperationCategory.READ),
+ COPY_TO(CassandraOperationCategory.WRITE),
+ DESCRIBE_KEYSPACE(CassandraOperationCategory.OTHER),
+ TRUNCATE(CassandraOperationCategory.OTHER),
+ DESCRIBE_CLUSTER(CassandraOperationCategory.OTHER),
+ DESCRIBE_VERSION(CassandraOperationCategory.OTHER),
+ DESCRIBE_SNITCH(CassandraOperationCategory.OTHER),
+ DESCRIBE_PARTITIONER(CassandraOperationCategory.OTHER),
+ DESCRIBE_SCHEMA_VERSION(CassandraOperationCategory.OTHER),
+ GET_VERSION(CassandraOperationCategory.OTHER),
+ DROP_COLUMN_FAMILY(CassandraOperationCategory.OTHER),
+ DESCRIBE_KEYSPACES(CassandraOperationCategory.OTHER),
+ DROP_KEYSPACE(CassandraOperationCategory.OTHER),
+ ADD_COLUMN_FAMILY(CassandraOperationCategory.OTHER),
+ UPDATE_COLUMN_FAMILY(CassandraOperationCategory.OTHER),
+ ADD_KEYSPACE(CassandraOperationCategory.OTHER),
+ UPDATE_KEYSPACE(CassandraOperationCategory.OTHER),
+ SET_KEYSPACE(CassandraOperationCategory.OTHER),
+ TEST(CassandraOperationCategory.OTHER),
+
+ ;
+ CassandraOperationCategory category;
+
+ CassandraOperationType(CassandraOperationCategory category) {
+ this.category = category;
+ }
+
+ public CassandraOperationCategory getCategory() {
+ return this.category;
+ }
}
View
6 src/main/java/com/netflix/astyanax/ColumnListMutation.java
@@ -86,6 +86,12 @@
ColumnListMutation<C> deleteColumn(C columnName);
+ /**
+ * The the timestamp for all subsequent column operation in this ColumnListMutation
+ * This timestamp does not affect the current timestamp for the entire MutationBatch
+ * @param timestamp
+ * @return
+ */
ColumnListMutation<C> setTimestamp(long timestamp);
/**
View
39 src/main/java/com/netflix/astyanax/Keyspace.java
@@ -51,7 +51,17 @@
String getKeyspaceName();
/**
- * Get a list of all tokens and their endpoints
+ * Describe the partitioner used by the cluster
+ *
+ * @return
+ * @throws ConnectionException
+ */
+ String describePartitioner() throws ConnectionException;
+
+ /**
+ * Get a list of all tokens and their endpoints. This call will return this list of ALL nodes
+ * in the cluster, including other regions. If you are only interested in the subset of
+ * nodes for a specific region then use describeRing(dc);
*
* @return
* @throws ConnectionException
@@ -59,6 +69,23 @@
List<TokenRange> describeRing() throws ConnectionException;
/**
+ * Get a list of all tokens and their endpoints for a specific dc only.
+ *
+ * @param dc - null for all dcs
+ * @return
+ * @throws ConnectionException
+ */
+ List<TokenRange> describeRing(String dc) throws ConnectionException;
+
+ /**
+ * Get a list of tokens and their endpoints for a specific dc/rack combination.
+ * @param dc
+ * @return
+ * @throws ConnectionException
+ */
+ List<TokenRange> describeRing(String dc, String rack) throws ConnectionException;
+
+ /**
* Describe the ring but use the last locally cached version if available.
* @param cached
* @return
@@ -134,6 +161,16 @@ SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) thro
*/
<K, C> OperationResult<Void> truncateColumnFamily(ColumnFamily<K, C> columnFamily) throws OperationException,
ConnectionException;
+
+ /**
+ * Delete all rows in a column family
+ *
+ * @param columnFamily
+ * @return
+ * @throws ConnectionException
+ * @throws OperationException
+ */
+ OperationResult<Void> truncateColumnFamily(String columnFamily) throws ConnectionException;
/**
* This method is used for testing purposes only. It is used to inject
View
32 src/main/java/com/netflix/astyanax/connectionpool/Host.java
@@ -23,6 +23,14 @@
import com.google.common.collect.Sets;
+/**
+ * Wrapper for the representation of the address for a cassandra node.
+ * This Host object is used by the connection pool to uniquely identify the host
+ * and track it's connections.
+ *
+ * @author elandau
+ *
+ */
public class Host {
private final String host;
@@ -37,6 +45,9 @@
public static Pattern ipPattern = Pattern
.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$");
+ /**
+ * Empty host used in error return codes
+ */
private Host() {
this.host = "None";
this.ipAddress = "0.0.0.0";
@@ -45,10 +56,17 @@ private Host() {
this.url = String.format("%s:%d", this.host, this.port);
}
- public Host(String url2, int defaultPort) {
+ /**
+ * Construct a Host from a host:port combination. The defaultPort is provided in case
+ * the hostAndPort2 value does not have a port specified.
+ *
+ * @param hostAndPort
+ * @param defaultPort
+ */
+ public Host(String hostAndPort, int defaultPort) {
- String tempHost = parseHostFromUrl(url2);
- this.port = parsePortFromUrl(url2, defaultPort);
+ String tempHost = parseHostFromHostAndPort(hostAndPort);
+ this.port = parsePortFromHostAndPort(hostAndPort, defaultPort);
Matcher match = ipPattern.matcher(tempHost);
String workHost;
@@ -78,11 +96,11 @@ public Host(String url2, int defaultPort) {
/**
* Parse the hostname from a "hostname:port" formatted string
*
- * @param urlPort
+ * @param hostAndPort
* @return
*/
- public static String parseHostFromUrl(String urlPort) {
- return urlPort.lastIndexOf(':') > 0 ? urlPort.substring(0, urlPort.lastIndexOf(':')) : urlPort;
+ public static String parseHostFromHostAndPort(String hostAndPort) {
+ return hostAndPort.lastIndexOf(':') > 0 ? hostAndPort.substring(0, hostAndPort.lastIndexOf(':')) : hostAndPort;
}
/**
@@ -92,7 +110,7 @@ public static String parseHostFromUrl(String urlPort) {
* @param defaultPort
* @return
*/
- public static int parsePortFromUrl(String urlPort, int defaultPort) {
+ public static int parsePortFromHostAndPort(String urlPort, int defaultPort) {
return urlPort.lastIndexOf(':') > 0 ? Integer.valueOf(urlPort.substring(urlPort.lastIndexOf(':') + 1,
urlPort.length())) : defaultPort;
}
View
20 src/main/java/com/netflix/astyanax/connectionpool/impl/AbstractHostPartitionConnectionPool.java
@@ -48,7 +48,7 @@
*
* @author elandau
*
- * @param <CL>
+ * @param <CL>
*/
public abstract class AbstractHostPartitionConnectionPool<CL> implements ConnectionPool<CL>,
SimpleHostConnectionPool.Listener<CL> {
@@ -56,17 +56,17 @@
protected final ConnectionPoolConfiguration config;
protected final ConnectionFactory<CL> factory;
protected final ConnectionPoolMonitor monitor;
- private final LatencyScoreStrategy latencyScoreStrategy;
protected final Topology<CL> topology;
+ private final LatencyScoreStrategy latencyScoreStrategy;
public AbstractHostPartitionConnectionPool(ConnectionPoolConfiguration config, ConnectionFactory<CL> factory,
ConnectionPoolMonitor monitor) {
- this.config = config;
- this.factory = factory;
- this.hosts = new NonBlockingHashMap<Host, HostConnectionPool<CL>>();
- this.monitor = monitor;
+ this.config = config;
+ this.factory = factory;
+ this.hosts = new NonBlockingHashMap<Host, HostConnectionPool<CL>>();
+ this.monitor = monitor;
this.latencyScoreStrategy = config.getLatencyScoreStrategy();
- this.topology = new TokenPartitionedTopology<CL>(this.latencyScoreStrategy);
+ this.topology = new TokenPartitionedTopology<CL>(this.latencyScoreStrategy);
}
@Override
@@ -120,9 +120,9 @@ public void onHostUp(HostConnectionPool<CL> pool) {
}
@Override
- public final boolean addHost(Host host, boolean refresh) {
+ public final synchronized boolean addHost(Host host, boolean refresh) {
// Already exists
- if (hosts.get(host) != null) {
+ if (hosts.containsKey(host)) {
return false;
}
@@ -166,7 +166,7 @@ public boolean hasHost(Host host) {
}
@Override
- public boolean removeHost(Host host, boolean refresh) {
+ public synchronized boolean removeHost(Host host, boolean refresh) {
HostConnectionPool<CL> pool = hosts.remove(host);
if (pool != null) {
monitor.onHostRemoved(host);
View
2  src/main/java/com/netflix/astyanax/connectionpool/impl/ConnectionPoolConfigurationImpl.java
@@ -34,7 +34,7 @@
* Default values
*/
public static final int DEFAULT_MAX_TIME_WHEN_EXHAUSTED = 2000;
- public static final int DEFAULT_SOCKET_TIMEOUT = 11000; // ms
+ public static final int DEFAULT_SOCKET_TIMEOUT = 11000; // ms
public static final int DEFAULT_CONNECT_TIMEOUT = 2000; // ms
public static final int DEFAULT_MAX_ACTIVE_PER_PARTITION = 3;
public static final int DEFAULT_INIT_PER_PARTITION = 0;
View
69 src/main/java/com/netflix/astyanax/connectionpool/impl/HostConnectionPoolPartition.java
@@ -1,9 +1,7 @@
package com.netflix.astyanax.connectionpool.impl;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -16,37 +14,48 @@
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy;
+/**
+ * Collection of pools that own a partition of the ring
+ *
+ * @author elandau
+ *
+ * @param <CL>
+ */
public class HostConnectionPoolPartition<CL> {
- private final BigInteger id;
- private final AtomicBoolean prioritize = new AtomicBoolean(false);
- private final NonBlockingHashSet<HostConnectionPool<CL>> pools = new NonBlockingHashSet<HostConnectionPool<CL>>();
+ private final BigInteger id;
+ private final LatencyScoreStrategy strategy;
+ private final AtomicBoolean prioritize = new AtomicBoolean(false);
+ private final NonBlockingHashSet<HostConnectionPool<CL>> pools = new NonBlockingHashSet<HostConnectionPool<CL>>();
private final AtomicReference<List<HostConnectionPool<CL>>> activePools = new AtomicReference<List<HostConnectionPool<CL>>>();
- private final LatencyScoreStrategy strategy;
public HostConnectionPoolPartition(BigInteger id, LatencyScoreStrategy strategy) {
- this.id = id;
+ this.id = id;
this.strategy = strategy;
- this.activePools.set(new ArrayList<HostConnectionPool<CL>>());
+ this.activePools.set(Lists.<HostConnectionPool<CL>>newArrayList());
}
/**
* Sets all pools for this partition. Removes old partitions and adds new
* one.
*
- * @param pools
+ * @param newPools
*/
- public synchronized boolean setPools(Collection<HostConnectionPool<CL>> pools) {
+ public synchronized boolean setPools(Collection<HostConnectionPool<CL>> newPools) {
Set<HostConnectionPool<CL>> toRemove = Sets.newHashSet(this.pools);
+ // Add new pools not previously seen
boolean didChange = false;
- for (HostConnectionPool<CL> pool : pools) {
- didChange |= this.pools.add(pool);
+ for (HostConnectionPool<CL> pool : newPools) {
+ if (this.pools.add(pool))
+ didChange = true;
toRemove.remove(pool);
}
+ // Remove pools for hosts that no longer exist
for (HostConnectionPool<CL> pool : toRemove) {
- didChange |= this.pools.remove(pool);
+ if (this.pools.remove(pool))
+ didChange = true;
}
if (didChange)
@@ -54,6 +63,12 @@ public synchronized boolean setPools(Collection<HostConnectionPool<CL>> pools) {
return didChange;
}
+ /**
+ * Add a new pool to the partition. Checks to see if the pool already
+ * existed. If so then there is no need to refresh the pool.
+ * @param pool
+ * @return
+ */
public synchronized boolean addPool(HostConnectionPool<CL> pool) {
if (this.pools.add(pool)) {
refresh();
@@ -76,23 +91,35 @@ public synchronized boolean removePool(HostConnectionPool<CL> pool) {
* @return
*/
public BigInteger id() {
- return this.id;
+ return id;
}
+ /**
+ * Return the list of active hosts (i.e. hosts that are not down)
+ * @return
+ */
public List<HostConnectionPool<CL>> getPools() {
- return this.activePools.get();
+ return activePools.get();
}
+ /**
+ * If true the the hosts are sorted by order of priority where the
+ * first host gives the best performance
+ * @return
+ */
public boolean isSorted() {
return prioritize.get();
}
- public void refresh() {
- List<HostConnectionPool<CL>> pools = Lists.newArrayList(this.pools);
- Iterator<HostConnectionPool<CL>> iter = pools.iterator();
- while (iter.hasNext()) {
- if (iter.next().isShutdown())
- iter.remove();
+ /**
+ * Refresh the partition
+ */
+ public synchronized void refresh() {
+ List<HostConnectionPool<CL>> pools = Lists.newArrayList();
+ for (HostConnectionPool<CL> pool : this.pools) {
+ if (!pool.isShutdown()) {
+ pools.add(pool);
+ }
}
this.activePools.set(strategy.sortAndfilterPartition(pools, prioritize));
}
View
13 src/main/java/com/netflix/astyanax/connectionpool/impl/RoundRobinExecuteWithFailover.java
@@ -30,8 +30,12 @@ public RoundRobinExecuteWithFailover(ConnectionPoolConfiguration config, Connect
monitor.incNoHosts();
throw new NoAvailableHostsException("No hosts to borrow from");
}
-
+
size = pools.size();
+ if (size <= 0) {
+ throw new NoAvailableHostsException("Strange pool size: " + size);
+ }
+
retryCountdown = Math.min(config.getMaxFailoverCount(), size);
if (retryCountdown < 0)
retryCountdown = size;
@@ -42,7 +46,12 @@ else if (retryCountdown == 0)
}
public int getNextHostIndex() {
- return index++ % size;
+ try {
+ return index % size;
+ }
+ finally {
+ index++;
+ }
}
public boolean canRetry() {
View
79 src/main/java/com/netflix/astyanax/connectionpool/impl/TokenPartitionedTopology.java
@@ -1,7 +1,6 @@
package com.netflix.astyanax.connectionpool.impl;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -12,24 +11,47 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy;
+/**
+ * Partition hosts by start token. Each token may map to a list of partitions.
+ *
+ * @author elandau
+ *
+ * @param <CL>
+ */
public class TokenPartitionedTopology<CL> implements Topology<CL> {
- private AtomicReference<List<HostConnectionPoolPartition<CL>>> sortedRing = new AtomicReference<List<HostConnectionPoolPartition<CL>>>();
+ /**
+ * Sorted list of partitions. A binary search is performed on this list to determine
+ * the list of hosts that own the token range.
+ */
+ private AtomicReference<List<HostConnectionPoolPartition<CL>>> sortedRing
+ = new AtomicReference<List<HostConnectionPoolPartition<CL>>>();
- private Map<BigInteger, HostConnectionPoolPartition<CL>> tokens = Maps.newHashMap();
+ /**
+ * Lookup of start token to partition
+ */
+ private NonBlockingHashMap<BigInteger, HostConnectionPoolPartition<CL>> tokens
+ = new NonBlockingHashMap<BigInteger, HostConnectionPoolPartition<CL>>();
+ /**
+ * Partition which contains all hosts. This is the fallback partition when no tokens are provided.
+ */
private HostConnectionPoolPartition<CL> allPools;
+ /**
+ * Strategy used to score hosts within a partition.
+ */
private LatencyScoreStrategy strategy;
/**
- * Comparator used to find the connection pool to the host which owns a
- * specific token
+ * Comparator used to find the partition mapping to a token
*/
@SuppressWarnings("rawtypes")
private Comparator tokenSearchComparator = new Comparator() {
@@ -42,6 +64,9 @@ public int compare(Object arg0, Object arg1) {
}
};
+ /**
+ * Compartor used to sort partitions in token order.
+ */
@SuppressWarnings("rawtypes")
private Comparator partitionComparator = new Comparator() {
@SuppressWarnings("unchecked")
@@ -64,6 +89,10 @@ public TokenPartitionedTopology(LatencyScoreStrategy strategy) {
@SuppressWarnings("unchecked")
@Override
+ /**
+ * Update the list of pools using the provided mapping of start token to collection of hosts
+ * that own the token
+ */
public synchronized boolean setPools(Map<BigInteger, Collection<HostConnectionPool<CL>>> ring) {
// Temporary list of token that will be removed if not found in the new ring
Set<BigInteger> tokensToRemove = Sets.newHashSet(tokens.keySet());
@@ -74,52 +103,56 @@ public synchronized boolean setPools(Map<BigInteger, Collection<HostConnectionPo
// Iterate all tokens
for (Entry<BigInteger, Collection<HostConnectionPool<CL>>> entry : ring.entrySet()) {
BigInteger token = entry.getKey();
- tokensToRemove.remove(token);
- didChange |= allPools.addAll(entry.getValue());
+ if (entry.getValue() != null && !entry.getValue().isEmpty()) {
+ tokensToRemove.remove(token);
+
+ // Always add to the all pools
+ if (allPools.addAll(entry.getValue()))
+ didChange = true;
- if (entry.getValue() != null) {
- // Add a new collection or modify an existing one
+ // Add a new partition or modify an existing one
HostConnectionPoolPartition<CL> partition = tokens.get(token);
if (partition == null) {
partition = makePartition(token);
tokens.put(token, partition);
}
- didChange |= partition.setPools(entry.getValue());
+ if (partition.setPools(entry.getValue()))
+ didChange = true;
}
}
// Remove the tokens that are no longer in the ring
- didChange |= !tokensToRemove.isEmpty();
for (BigInteger token : tokensToRemove) {
tokens.remove(token);
+ didChange = true;
}
// Sort partitions by token
if (didChange) {
- List<HostConnectionPoolPartition<CL>> partitions = new ArrayList<HostConnectionPoolPartition<CL>>(
- tokens.values());
+ List<HostConnectionPoolPartition<CL>> partitions = Lists.newArrayList(tokens.values());
Collections.sort(partitions, partitionComparator);
this.allPools.setPools(allPools);
refresh();
- this.sortedRing.set(partitions);
+
+ this.sortedRing.set(Collections.unmodifiableList(partitions));
}
return didChange;
}
@Override
- public void resumePool(HostConnectionPool<CL> pool) {
+ public synchronized void resumePool(HostConnectionPool<CL> pool) {
refresh();
}
@Override
- public void suspendPool(HostConnectionPool<CL> pool) {
+ public synchronized void suspendPool(HostConnectionPool<CL> pool) {
refresh();
}
@Override
- public void refresh() {
+ public synchronized void refresh() {
for (HostConnectionPoolPartition<CL> partition : tokens.values()) {
partition.refresh();
}
@@ -129,7 +162,7 @@ public void refresh() {
@Override
public HostConnectionPoolPartition<CL> getPartition(BigInteger token) {
// First, get a copy of the partitions.
- List<HostConnectionPoolPartition<CL>> partitions = this.sortedRing.get();
+ List<HostConnectionPoolPartition<CL>> partitions = this.sortedRing.get();
// Must have a token otherwise we default to the base class
// implementation
if (token == null || partitions == null || partitions.isEmpty()) {
@@ -147,8 +180,8 @@ public void refresh() {
@SuppressWarnings("unchecked")
int partitionIndex = Collections.binarySearch(partitions, token, tokenSearchComparator);
if (partitionIndex < 0) {
- partitionIndex = -(partitionIndex + 1);
- }
+ partitionIndex = -(partitionIndex + 1);
+ }
return partitions.get(partitionIndex % partitions.size());
}
@@ -163,7 +196,7 @@ public int getPartitionCount() {
}
@Override
- public void removePool(HostConnectionPool<CL> pool) {
+ public synchronized void removePool(HostConnectionPool<CL> pool) {
allPools.removePool(pool);
for (HostConnectionPoolPartition<CL> partition : tokens.values()) {
partition.removePool(pool);
@@ -172,7 +205,7 @@ public void removePool(HostConnectionPool<CL> pool) {
}
@Override
- public void addPool(HostConnectionPool<CL> pool) {
+ public synchronized void addPool(HostConnectionPool<CL> pool) {
allPools.addPool(pool);
allPools.refresh();
}
View
31 src/main/java/com/netflix/astyanax/ddl/ColumnDefinition.java
@@ -16,6 +16,7 @@
package com.netflix.astyanax.ddl;
import java.nio.ByteBuffer;
+import java.util.Map;
/**
* Interface to get/set a single column definition. The column definition is
@@ -121,4 +122,34 @@
* @return
*/
boolean hasIndex();
+
+ /**
+ * Get a map of all options associated with this column
+ * @return
+ */
+ Map<String, String> getOptions();
+
+ /**
+ * Get an option
+ * @param name Option name (TODO: Document these)
+ * 'class_name' - com.datastax.bdp.cassandra.index.solr.SolrSecondaryIndex
+ * @param defaultValue Default value to return if option not found
+ * @return
+ */
+ String getOption(String name, String defaultValue);
+
+ /**
+ * Set all extra options for this column. Will override any previous values.
+ * @param index_options
+ * @return
+ */
+ ColumnDefinition setOptions(Map<String, String> index_options);
+
+ /**
+ * Set an option
+ * @param name
+ * @param value
+ * @return Previous value or null if not previously set
+ */
+ String setOption(String name, String value);
}
View
80 src/main/java/com/netflix/astyanax/impl/AstyanaxCheckpointManager.java
@@ -0,0 +1,80 @@
+package com.netflix.astyanax.impl;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.SortedMap;
+
+import com.google.common.collect.Maps;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.query.CheckpointManager;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+
+/**
+ * Track checkpoints in cassandra
+ *
+ * @author elandau
+ *
+ */
+public class AstyanaxCheckpointManager implements CheckpointManager {
+
+ private final ByteBuffer bbKey;
+ private final Keyspace keyspace;
+ private final ColumnFamily<ByteBuffer, String> columnFamily;
+
+ @SuppressWarnings("rawtypes")
+ private final static Comparator tokenComparator = new Comparator() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(Object arg0, Object arg1) {
+ return new BigInteger((String)arg0).compareTo(new BigInteger((String)arg1));
+ }
+ };
+
+ public AstyanaxCheckpointManager(Keyspace keyspace, String columnFamily, String id) {
+ this(keyspace, columnFamily, StringSerializer.get().toByteBuffer(id));
+ }
+
+ public AstyanaxCheckpointManager(Keyspace keyspace, String columnFamily, Long id) {
+ this(keyspace, columnFamily, LongSerializer.get().toByteBuffer(id));
+ }
+
+ public AstyanaxCheckpointManager(Keyspace keyspace, String columnFamily, ByteBuffer bbKey) {
+ this.keyspace = keyspace;
+ this.bbKey = bbKey;
+ this.columnFamily = ColumnFamily.newColumnFamily(columnFamily, ByteBufferSerializer.get(), StringSerializer.get());
+ }
+
+ @Override
+ public void trackCheckpoint(String startToken, String checkpointToken) throws ConnectionException {
+ keyspace.prepareColumnMutation(columnFamily, bbKey, startToken).putValue(checkpointToken, null).execute();
+ }
+
+ @Override
+ public String getCheckpoint(String startToken) throws ConnectionException {
+ try {
+ System.out.println("Looking for token " + startToken);
+ return keyspace.prepareQuery(columnFamily).getKey(bbKey).getColumn(startToken).execute().getResult().getStringValue();
+ }
+ catch (NotFoundException e) {
+ return startToken;
+ }
+ }
+
+ @Override
+ public SortedMap<String, String> getCheckpoints() throws ConnectionException {
+ SortedMap<String, String> checkpoints = Maps.newTreeMap(tokenComparator);
+ for (Column<String> column : keyspace.prepareQuery(columnFamily).getKey(bbKey).execute().getResult()) {
+ checkpoints.put(column.getName(), column.getStringValue());
+ }
+
+ return checkpoints;
+ }
+
+}
View
6 src/main/java/com/netflix/astyanax/impl/AstyanaxConfigurationImpl.java
@@ -19,8 +19,10 @@
private ConsistencyLevel defaultWriteConsistencyLevel = ConsistencyLevel.CL_ONE;
private Clock clock = new MicrosecondsSyncClock();
private RetryPolicy retryPolicy = RunOnce.get();
- private ExecutorService asyncExecutor = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setDaemon(true)
- .build());
+ private ExecutorService asyncExecutor = Executors.newFixedThreadPool(5,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("AstyanaxAsync-%d")
+ .build());
private NodeDiscoveryType discoveryType = NodeDiscoveryType.NONE;
private int discoveryIntervalInSeconds = 30;
private ConnectionPoolType connectionPoolType = ConnectionPoolType.ROUND_ROBIN;
View
29 src/main/java/com/netflix/astyanax/impl/RingDescribeHostSupplier.java
@@ -13,13 +13,32 @@
import com.netflix.astyanax.connectionpool.TokenRange;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+/**
+ * HostSupplier that uses existing hosts in the connection pool to execute a ring
+ * describe and get the entire list of hosts and their tokens from Cassandra.
+ *
+ * @author elandau
+ *
+ */
public class RingDescribeHostSupplier implements Supplier<Map<BigInteger, List<Host>>> {
- private final Keyspace keyspace;
- private final int defaultPort;
+ private final Keyspace keyspace;
+ private final int defaultPort;
+ private final String dc;
+ private final String rack;
+
+ public RingDescribeHostSupplier(Keyspace keyspace, int defaultPort, String dc, String rack) {
+ this.keyspace = keyspace;
+ this.defaultPort = defaultPort;
+ this.dc = dc;
+ this.rack = rack;
+ }
+ public RingDescribeHostSupplier(Keyspace keyspace, int defaultPort, String dc) {
+ this(keyspace, defaultPort, dc, null);
+ }
+
public RingDescribeHostSupplier(Keyspace keyspace, int defaultPort) {
- this.keyspace = keyspace;
- this.defaultPort = defaultPort;
+ this(keyspace, defaultPort, null, null);
}
@Override
@@ -27,7 +46,7 @@ public RingDescribeHostSupplier(Keyspace keyspace, int defaultPort) {
try {
Map<BigInteger, List<Host>> hosts = Maps.newLinkedHashMap();
- for (TokenRange range : keyspace.describeRing()) {
+ for (TokenRange range : keyspace.describeRing(dc, rack)) {
hosts.put(new BigInteger(range.getEndToken()),
Lists.transform(range.getEndpoints(), new Function<String, Host>() {
@Override
View
73 src/main/java/com/netflix/astyanax/partitioner/BOP20Partitioner.java
@@ -0,0 +1,73 @@
+package com.netflix.astyanax.partitioner;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.codec.binary.Hex;
+
+import com.google.common.collect.Lists;
+import com.netflix.astyanax.connectionpool.TokenRange;
+import com.netflix.astyanax.connectionpool.impl.TokenRangeImpl;
+
+public class BOP20Partitioner implements Partitioner {
+
+ public static String MINIMUM = "0000000000000000000000000000000000000000";
+ public static String MAXIMUM = "ffffffffffffffffffffffffffffffffffffffff";
+ public static final BigInteger ONE = new BigInteger("1", 16);
+ public static int KEY_LENGTH = 20;
+
+ @Override
+ public String getMinToken() {
+ return MINIMUM;
+ }
+
+ @Override
+ public String getMaxToken() {
+ return MAXIMUM;
+ }
+
+ @Override
+ public List<TokenRange> splitTokenRange(String first, String last, int count) {
+ List<TokenRange> tokens = Lists.newArrayList();
+ for (int i = 0; i < count; i++) {
+ String startToken = getTokenMinusOne(getSegmentToken(count, i, new BigInteger(first, 16), new BigInteger(last, 16)));
+ String endToken;
+ if (i == count-1 && last.equals(getMaxToken()))
+ endToken = getMinToken();
+ else
+ endToken = getSegmentToken(count, i+1, new BigInteger(first, 16), new BigInteger(last, 16));
+ tokens.add(new TokenRangeImpl(startToken, endToken, new ArrayList<String>()));
+ }
+ return tokens;
+ }
+
+ @Override
+ public List<TokenRange> splitTokenRange(int count) {
+ return splitTokenRange(getMinToken(), getMaxToken(), count);
+ }
+
+ @Override
+ public String getTokenForKey(ByteBuffer key) {
+ if (key.remaining() != KEY_LENGTH) {
+ throw new IllegalArgumentException("Key must be a 20 byte array");
+ }
+ return new String(Hex.encodeHexString(key.duplicate().array()));
+ }
+
+ @Override
+ public String getTokenMinusOne(String token) {
+ if (token.equals("0") || token.equals(MINIMUM))
+ return MAXIMUM;
+
+ return new BigInteger(token, 16).subtract(ONE).toString(16);
+ }
+
+ public static String getSegmentToken(int size, int position, BigInteger minInitialToken, BigInteger maxInitialToken ) {
+ BigInteger decValue = minInitialToken;
+ if (position != 0)
+ decValue = maxInitialToken.multiply(new BigInteger("" + position)).divide(new BigInteger("" + size));
+ return decValue.toString(16);
+ }
+}
View
74 src/main/java/com/netflix/astyanax/partitioner/BigInteger127Partitioner.java
@@ -0,0 +1,74 @@
+package com.netflix.astyanax.partitioner;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.dht.RandomPartitioner;
+
+import com.google.common.collect.Lists;
+import com.netflix.astyanax.connectionpool.TokenRange;
+import com.netflix.astyanax.connectionpool.impl.TokenRangeImpl;
+
+public class BigInteger127Partitioner implements Partitioner {
+
+ public static final BigInteger MINIMUM = new BigInteger("" + 0);
+ public static final BigInteger MAXIMUM = new BigInteger("" + 2).pow(127);
+ public static final BigInteger ONE = new BigInteger("1");
+
+ private static final RandomPartitioner partitioner = new RandomPartitioner();
+
+ @Override
+ public String getMinToken() {
+ return MINIMUM.toString();
+ }
+
+ @Override
+ public String getMaxToken() {
+ return MAXIMUM.toString();
+ }
+
+ @Override
+ public List<TokenRange> splitTokenRange(String first, String last, int count) {
+ List<TokenRange> tokens = Lists.newArrayList();
+ for (int i = 0; i < count; i++) {
+ String startToken = getTokenMinusOne(getSegmentToken(count, i, new BigInteger(first), new BigInteger(last)));
+ String endToken;
+ if (i == count-1 && last.equals(getMaxToken()))
+ endToken = getMinToken();
+ else
+ endToken = getSegmentToken(count, i+1, new BigInteger(first), new BigInteger(last));
+ tokens.add(new TokenRangeImpl(startToken, endToken, new ArrayList<String>()));
+ }
+ return tokens;
+ }
+
+ @Override
+ public List<TokenRange> splitTokenRange(int count) {
+ return splitTokenRange(getMinToken(), getMaxToken(), count);
+ }
+
+ @Override
+ public String getTokenForKey(ByteBuffer key) {
+ return partitioner.getToken(key).toString();
+ }
+
+ @Override
+ public String getTokenMinusOne(String token) {
+ BigInteger bigInt = new BigInteger(token);
+ // if zero rotate to the Maximum else minus one.
+ if (bigInt.equals(MINIMUM))
+ return MAXIMUM.toString();
+ else
+ return bigInt.subtract(ONE).toString();
+ }
+
+ public static String getSegmentToken(int size, int position, BigInteger minInitialToken, BigInteger maxInitialToken ) {
+ BigInteger decValue = minInitialToken;
+ if (position != 0)
+ decValue = maxInitialToken.multiply(new BigInteger("" + position)).divide(new BigInteger("" + size));
+ return decValue.toString();
+ }
+
+}
View
57 src/main/java/com/netflix/astyanax/partitioner/Partitioner.java
@@ -0,0 +1,57 @@
+package com.netflix.astyanax.partitioner;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.netflix.astyanax.connectionpool.TokenRange;
+
+/**
+ * Base interface for token partitioning utilities
+ *
+ * @author elandau
+ *
+ */
+public interface Partitioner {
+ /**
+ * Return the smallest token in the token space
+ * @return
+ */
+ String getMinToken();
+
+ /**
+ * Return the largest token in the token space
+ * @return
+ */
+ String getMaxToken();
+
+ /**
+ * Return the token immediately before this one
+ */
+ String getTokenMinusOne(String token);
+
+ /**
+ * Split the token range into N equal size segments and return the start token
+ * of each segment
+ *
+ * @param first
+ * @param last
+ * @param count
+ * @return
+ */
+ List<TokenRange> splitTokenRange(String first, String last, int count);
+
+ /**
+ * Split the entire token range into 'count' equal size segments
+ * @param count
+ * @return
+ */
+ List<TokenRange> splitTokenRange(int count);
+
+ /**
+ * Return the token for the specifie key
+ * @param key
+ * @return
+ */
+ String getTokenForKey(ByteBuffer key);
+
+}
View
32 src/main/java/com/netflix/astyanax/query/AllRowsQuery.java
@@ -1,5 +1,6 @@
package com.netflix.astyanax.query;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -73,6 +74,13 @@
AllRowsQuery<K, C> setExceptionCallback(ExceptionCallback cb);
/**
+ * Use this checkpoint manager to keep track of progress as all rows are being iterated
+ * @param manager
+ * @return
+ */
+ AllRowsQuery<K, C> setCheckpointManager(CheckpointManager manager);
+
+ /**
* If true will repeat the last token in the previous block.
*
* @param repeatLastToken
@@ -81,6 +89,15 @@
AllRowsQuery<K, C> setRepeatLastToken(boolean repeatLastToken);
/**
+ * If set to false all empty rows will be filtered out internally.
+ * Default is false
+ *
+ * @param flag
+ * @return
+ */
+ AllRowsQuery<K, C> setIncludeEmptyRows(boolean flag);
+
+ /**
* Specify a non-contiguous set of columns to retrieve.
*
* @param columns
@@ -164,4 +181,19 @@
* @throws ConnectionException
*/
void executeWithCallback(RowCallback<K, C> callback) throws ConnectionException;
+
+ /**
+ * Execute the operation on a specific token range, instead of the entire range.
+ * Use this only is combination with setConcurrencyLevel being called otherwise
+ * it currently will not have any effect on the query. When using forTokenRange
+ * the specified token range will still be split into the number of threads
+ * specified by setConcurrencyLevel
+ *
+ * @param startToken
+ * @param endToken
+ * @return
+ */
+ AllRowsQuery<K, C> forTokenRange(BigInteger startToken, BigInteger endToken);
+
+ AllRowsQuery<K, C> forTokenRange(String startToken, String endToken);
}
View
44 src/main/java/com/netflix/astyanax/query/CheckpointManager.java
@@ -0,0 +1,44 @@
+package com.netflix.astyanax.query;
+
+import java.util.SortedMap;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+/**
+ * Interface for tracking checkpoints for a getAllRows query.
+ * The entire token range is split into a sorted set of start tokens. Each start token is
+ * mapped to a checkpoint with the following possible values
+ * 1. startToken - start of the token range
+ * 2. nextToken - the checkpoint equals the next token in the sorted set of start tokens. This means the range is done
+ * 3. > startToken AND < nextToken - a valid checkpoint
+ *
+ * @author elandau
+ *
+ */
+public interface CheckpointManager {
+ /**
+ * Trac the checkpoint for a specific range
+ *
+ * @param start
+ * @param end
+ * @throws Exception
+ */
+ void trackCheckpoint(String startToken, String checkpointToken) throws Exception;
+
+ /**
+ * Get the next checkpoint after the specified token
+ *
+ * @param start
+ * @param end
+ * @return
+ */
+ String getCheckpoint(String startToken) throws Exception ;
+
+ /**
+ * Return a sorted map of start tokens to their checkpoint
+ * @return
+ * @throws ConnectionException
+ */
+ SortedMap<String, String> getCheckpoints() throws Exception;
+
+}
View
3  src/main/java/com/netflix/astyanax/query/ColumnFamilyQuery.java
@@ -76,6 +76,7 @@
* @return
*/
RowSliceQuery<K, C> getKeyRange(K startKey, K endKey, String startToken, String endToken, int count);
+
/**
* Query a non-contiguous set of keys.
*
@@ -99,7 +100,7 @@
* @return
*/
RowSliceQuery<K, C> getKeySlice(Iterable<K> keys);
-
+
/**
* Query to get an iterator to all rows in the column family
*
View
515 src/main/java/com/netflix/astyanax/recipes/reader/AllRowsReader.java
@@ -0,0 +1,515 @@
+package com.netflix.astyanax.recipes.reader;
+
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.TokenRange;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnSlice;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.partitioner.BigInteger127Partitioner;
+import com.netflix.astyanax.partitioner.Partitioner;
+import com.netflix.astyanax.query.CheckpointManager;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.util.Callables;
+
+/**
+ * Recipe that is used to read all rows from a column family.
+ *
+ * @author elandau
+ *
+ * @param <K>
+ * @param <C>
+ */
+public class AllRowsReader<K, C> implements Callable<Boolean> {
+ private static final Logger LOG = LoggerFactory.getLogger(AllRowsReader.class);
+
+ private static final Partitioner DEFAULT_PARTITIONER = new BigInteger127Partitioner();
+ private final static int DEFAULT_PAGE_SIZE = 100;
+
+ private final Keyspace keyspace;
+ private final ColumnFamily<K, C> columnFamily;
+
+ private final int pageSize;
+ private final Integer concurrencyLevel; // Default to null will force ring describe
+ private final ExecutorService executor;
+ private final CheckpointManager checkpointManager;
+ private final Function<Row<K,C>, Boolean> rowFunction;
+ private final boolean repeatLastToken;
+ private final ColumnSlice<C> columnSlice;
+ private final String startToken;
+ private final String endToken;
+ private final Boolean includeEmptyRows; // Default to null will discard tombstones
+ private final List<Future<Boolean>> futures = Lists.newArrayList();
+ private final AtomicBoolean cancelling = new AtomicBoolean(false);
+ private final Partitioner partitioner;
+
+ public static class Builder<K, C> {
+ private final Keyspace keyspace;
+ private final ColumnFamily<K, C> columnFamily;
+
+ private Partitioner partitioner = DEFAULT_PARTITIONER;
+ private int pageSize = DEFAULT_PAGE_SIZE;
+ private Integer concurrencyLevel; // Default to null will force ring describe
+ private ExecutorService executor;
+ private CheckpointManager checkpointManager;
+ private Function<Row<K,C>, Boolean> rowFunction;
+ private boolean repeatLastToken;
+ private ColumnSlice<C> columnSlice;
+ private String startToken;
+ private String endToken;
+ private Boolean includeEmptyRows; // Default to null will discard tombstones
+
+ public Builder(Keyspace ks, ColumnFamily<K, C> columnFamily) {
+ this.keyspace = ks;
+ this.columnFamily = columnFamily;
+ }
+
+ /**
+ * Maximum number of rows to return for each incremental query to Cassandra.
+ * This limit also represents the page size when paginating.
+ *
+ * @param blockSize
+ * @return
+ */
+ public Builder<K, C> withPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ return this;
+ }
+
+ /**
+ * Use this checkpoint manager to keep track of progress as all rows are being iterated
+ * @param manager
+ * @return
+ */
+ public Builder<K, C> withCheckpointManager(CheckpointManager checkpointManager) {
+ this.checkpointManager = checkpointManager;
+ return this;
+ }
+
+ /**
+ * If true will repeat the last token in the previous block when calling cassandra. This
+ * feature is off by default and is used to handle situations where different row keys map
+ * to the same token value and they are split on a page boundary. The may not be efficient
+ * since it requires the entire row data to be fetched (based on the column slice)
+ *
+ * @param repeatLastToken
+ * @return
+ */
+ public Builder<K, C> withRepeatLastToken(boolean repeatLastToken) {
+ this.repeatLastToken = repeatLastToken;
+ return this;
+ }
+
+ /**
+ * Specify a non-contiguous set of columns to retrieve.
+ *
+ * @param columns
+ * @return
+ */
+ public Builder<K, C> withColumnSlice(C... columns) {
+ this.columnSlice = new ColumnSlice<C>(ImmutableList.copyOf(columns));
+ return this;
+ }
+
+ /**
+ * Specify a non-contiguous set of columns to retrieve.
+ *
+ * @param columns
+ * @return
+ */
+ public Builder<K, C> withColumnSlice(Collection<C> columns) {
+ this.columnSlice = new ColumnSlice<C>(columns);
+ return this;
+ }
+
+ /**
+ * Use this when your application caches the column slice.
+ *
+ * @param slice
+ * @return
+ */
+ public Builder<K, C> withColumnSlice(ColumnSlice<C> columns) {
+ this.columnSlice = columns;
+ return this;
+ }
+
+ /**
+ * Specify a range of columns to return.
+ *
+ * @param startColumn
+ * First column in the range
+ * @param endColumn
+ * Last column in the range
+ * @param reversed
+ * True if the order should be reversed. Note that for reversed,
+ * startColumn should be greater than endColumn.
+ * @param count
+ * Maximum number of columns to return (similar to SQL LIMIT)
+ * @return
+ */
+ public Builder<K, C> withColumnRange(C startColumn, C endColumn, boolean reversed, int count) {
+ this.columnSlice = new ColumnSlice<C>(startColumn, endColumn).setReversed(reversed).setLimit(count);
+ return this;
+ }
+
+ /**
+ * Split the query into N threads with each thread processing an equal size chunk from the token range.
+ *
+ * Note that the actual number of threads is still limited by the available threads in the thread
+ * pool that was set with the AstyanaxConfiguration.
+ *
+ * @param numberOfThreads
+ * @return
+ */
+ public Builder<K, C> withConcurrencyLevel(int concurrencyLevel) {
+ Preconditions.checkArgument(concurrencyLevel > 1, "Concurrency level must be > 1");
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ /**
+ * Execute the operation on a specific token range, instead of the entire range.
+ * Use this only is combination with setConcurrencyLevel being called otherwise
+ * it currently will not have any effect on the query. When using forTokenRange
+ * the specified token range will still be split into the number of threads
+ * specified by setConcurrencyLevel
+ *
+ * @param startToken
+ * @param endToken
+ * @return
+ */
+ public Builder<K, C> withTokenRange(BigInteger startToken, BigInteger endToken) {
+ this.startToken = startToken.toString();
+ this.endToken = endToken.toString();
+ return this;
+ }
+
+ public Builder<K, C> withTokenRange(String startToken, String endToken) {
+ this.startToken = startToken;
+ this.endToken = endToken;
+ return this;
+ }
+
+ /**
+ * Partitioner used to determine token ranges and how to break token ranges
+ * into sub parts. The default is BigInteger127Partitioner which is the
+ * RandomPartitioner in cassandra.
+ *
+ * @param partitioner
+ * @return
+ */
+ public Builder<K, C> withPartitioner(Partitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+ /**
+ * The default behavior is to exclude empty rows, other than when specifically asking
+ * for no columns back. Setting this to true will result in the row callback function
+ * being called for empty rows.
+ * @param flag
+ * @return
+ */
+ public Builder<K, C> withIncludeEmptyRows(Boolean flag) {
+ this.includeEmptyRows = flag;
+ return this;
+ }
+
+ /**
+ * Specify the callback function for each row being read. This callback must
+ * be implemented in a thread safe manner since it will be called by multiple
+ * internal threads.
+ * @param rowFunction
+ * @return
+ */
+ public Builder<K, C> forEachRow(Function<Row<K,C>, Boolean> rowFunction) {
+ this.rowFunction = rowFunction;
+ return this;
+ }
+
+ public AllRowsReader<K,C> build() {
+ return new AllRowsReader<K,C>(keyspace,
+ columnFamily,
+ concurrencyLevel,
+ executor,
+ checkpointManager,
+ rowFunction,
+ columnSlice,
+ startToken,
+ endToken,
+ includeEmptyRows,
+ pageSize,
+ repeatLastToken,
+ partitioner);
+ }
+ }
+
+ public AllRowsReader(Keyspace keyspace, ColumnFamily<K, C> columnFamily,
+ Integer concurrencyLevel,
+ ExecutorService executor,
+ CheckpointManager checkpointManager,
+ Function<Row<K, C>, Boolean> rowFunction,
+ ColumnSlice<C> columnSlice,
+ String startToken,
+ String endToken,
+ Boolean includeEmptyRows,
+ int pageSize,
+ boolean repeatLastToken,
+ Partitioner partitioner) {
+ super();
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.concurrencyLevel = concurrencyLevel;
+ this.executor = executor;
+ this.checkpointManager = checkpointManager;
+ this.rowFunction = rowFunction;
+ this.columnSlice = columnSlice;
+ this.startToken = startToken;
+ this.endToken = endToken;
+ this.pageSize = pageSize;
+ this.repeatLastToken = repeatLastToken;
+ this.partitioner = partitioner;
+
+ // Flag explicitly set
+ if (includeEmptyRows != null)
+ this.includeEmptyRows = includeEmptyRows;
+ // Asking for a column range of size 0
+ else if (columnSlice != null && columnSlice.getColumns() == null && columnSlice.getLimit() == 0)
+ this.includeEmptyRows = true;
+ // Default to false
+ else
+ this.includeEmptyRows = false;
+ }
+
+ private Callable<Boolean> makeTokenRangeTask(final String startToken, final String endToken) {
+ return new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ try {
+ String currentToken;
+ try {
+ currentToken = checkpointManager.getCheckpoint(startToken);
+ if (currentToken.equals(endToken)) {
+ return true;
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to get checking for startToken " + startToken, t);
+ cancel();
+ return false;
+ }
+
+ int localPageSize = pageSize;
+ int rowsToSkip = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ RowSliceQuery<K, C> query = keyspace
+ .prepareQuery(columnFamily).getKeyRange(null, null, currentToken, endToken, localPageSize);
+
+ if (columnSlice != null)
+ query.withColumnSlice(columnSlice);
+
+ Rows<K, C> rows = query.execute().getResult();
+ if (!rows.isEmpty()) {
+ // Iterate through all the rows and notify the callback function
+ for (Row<K,C> row : rows) {
+ try {
+ // When repeating the last row, rows to skip will be > 0
+ // We skip the rows that were repeated from the previous query
+ if (rowsToSkip > 0) {
+ rowsToSkip--;
+ continue;
+ }
+ if (!includeEmptyRows && (row.getColumns() == null || row.getColumns().isEmpty()))
+ continue;
+ if (!rowFunction.apply(row)) {
+ cancel();
+ return false;
+ }
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ cancel();
+ return false;
+ }
+ }
+
+ // Get the next block
+ if (rows.size() == localPageSize) {
+ Row<K, C> lastRow = rows.getRowByIndex(rows.size() - 1);
+ String lastToken = partitioner.getTokenForKey(lastRow.getRawKey());
+ checkpointManager.trackCheckpoint(startToken, currentToken);
+ if (repeatLastToken) {
+ // Start token is non-inclusive
+ currentToken = new BigInteger(lastToken).subtract(new BigInteger("1")).toString();
+
+ // Determine the number of rows to skip in the response. Since we are repeating the
+ // last token it's possible (although unlikely) that there is more than one key mapping to the
+ // token. We therefore count backwards the number of keys that have the same token and skip
+ // that number in the next iteration of the loop. If, for example, 3 keys matched but only 2 were
+ // returned in this iteration then the first 2 keys will be skipped from the next response.
+ rowsToSkip = 1;
+ for (int i = rows.size() - 2; i >= 0; i--, rowsToSkip++) {
+ if (!lastToken.equals(partitioner.getTokenForKey(rows.getRowByIndex(i).getRawKey()))) {
+ break;
+ }
+ }
+ LOG.info("Skipping : " + rowsToSkip);
+
+ if (rowsToSkip == localPageSize) {
+ localPageSize++;
+ }
+ }
+ else {
+ currentToken = lastToken;
+ }
+
+ continue;
+ }
+ }
+
+ // We're done!
+ checkpointManager.trackCheckpoint(startToken, endToken);
+ return true;
+ }
+ cancel();
+ return false;
+ } catch (Throwable t) {
+ LOG.error("Error process token/key range", t);
+ cancel();
+ return false;
+ }
+ }
+ };
+ }
+
+ /**
+ * Main execution block for the all rows query.
+ */
+ @Override
+ public Boolean call() throws Exception {
+ List<Callable<Boolean>> subtasks = Lists.newArrayList();
+
+ // We are iterating the entire ring using an arbitrary number of threads
+ if (this.concurrencyLevel != null) {
+ List<TokenRange> tokens = partitioner.splitTokenRange(
+ startToken == null ? partitioner.getMinToken() : startToken,
+ endToken == null ? partitioner.getMinToken() : endToken,
+ this.concurrencyLevel);
+
+ for (TokenRange range : tokens) {
+ subtasks.add(makeTokenRangeTask(range.getStartToken(), range.getEndToken()));
+ }
+ }
+ // We are iterating through each token range
+ else {
+ List<TokenRange> ranges = keyspace.describeRing();
+ for (TokenRange range : ranges) {
+ subtasks.add(makeTokenRangeTask(partitioner.getTokenMinusOne(range.getStartToken()), range.getEndToken()));
+ }
+ }
+
+ try {
+ // Use a local executor
+ if (executor == null) {
+ ExecutorService localExecutor = Executors
+ .newFixedThreadPool(subtasks.size(),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("AstyanaxAllRowsReader-%d")
+ .build());
+
+
+ try {
+ futures.addAll(startTasks(localExecutor, subtasks));
+ return waitForTasksToFinish();
+ }
+ finally {
+ localExecutor.shutdownNow();
+ }
+ }
+ // Use an externally provided executor
+ else {
+ futures.addAll(startTasks(executor, subtasks));
+ return waitForTasksToFinish();
+ }
+ }
+ catch (Throwable t) {
+ LOG.warn("AllRowsReader terminated", t);
+ cancel();
+ return false;
+ }
+ }
+