Skip to content

Commit

Permalink
CASSANDRA-18154: CEP-15: Enhance returning SELECT to allow partition …
Browse files Browse the repository at this point in the history
…and clustering IN clauses to return multiple partitions/rows
  • Loading branch information
dcapwell committed Jan 23, 2023
1 parent 56d7e69 commit 0ddfbbd
Show file tree
Hide file tree
Showing 29 changed files with 1,809 additions and 329 deletions.
5 changes: 5 additions & 0 deletions .build/cassandra-build-deps-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,10 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
<groupId>accord</groupId>
<artifactId>accord</artifactId>
<classifier>tests</classifier>
</dependency>
</dependencies>
</project>
2 changes: 1 addition & 1 deletion .build/include-accord.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"

accord_repo='https://github.com/apache/cassandra-accord.git'
accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de'
accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512'
accord_src="$bin/cassandra-accord"

checkout() {
Expand Down
21 changes: 21 additions & 0 deletions .build/parent-pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,27 @@
<artifactId>accord</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>accord</groupId>
<artifactId>accord</artifactId>
<version>1.0-SNAPSHOT</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>org.junit.jupiter</artifactId>
<groupId>junit-jupiter-api</groupId>
</exclusion>
<exclusion>
<artifactId>org.junit.jupiter</artifactId>
<groupId>junit-jupiter-engine</groupId>
</exclusion>
<exclusion>
<artifactId>ch.qos.logback</artifactId>
<groupId>logback-classic</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@
<jvmarg value="-ea"/>
<jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/>
<jvmarg value="-Dcassandra.debugrefcount=true"/>
<jvmarg value="-Xss256k"/>
<jvmarg value="-Xss384k"/>
<!-- When we do classloader manipulation SoftReferences can cause memory leaks
that can OOM our test runs. The next two settings informs our GC
algorithm to limit the metaspace size and clean up SoftReferences
Expand Down Expand Up @@ -1366,7 +1366,7 @@
<jvmarg value="-Djava.awt.headless=true"/>
<jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
<jvmarg value="-ea"/>
<jvmarg value="-Xss256k"/>
<jvmarg value="-Xss384k"/>
<jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
<jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
<jvmarg value="-Dcassandra.skip_sync=true" />
Expand Down Expand Up @@ -1412,7 +1412,7 @@
<jvmarg value="-Djava.awt.headless=true"/>
<jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
<jvmarg value="-ea"/>
<jvmarg value="-Xss256k"/>
<jvmarg value="-Xss384k"/>
<jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
<jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
<jvmarg value="-Dcassandra.skip_sync=true" />
Expand Down
1 change: 1 addition & 0 deletions ide/idea-iml-file.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<sourceFolder url="file://$MODULE_DIR$/test/simulator/asm" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/simulator/bootstrap" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/simulator/main" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/simulator/test" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
<sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/.idea" />
Expand Down
35 changes: 34 additions & 1 deletion ide/idea/workspace.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,40 @@
<option name="MAIN_CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="TEST_OBJECT" value="class" />
<option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMetaspaceSize=384M -XX:SoftRefLRUPolicyMSPerMB=0 -XX:HeapDumpPath=build/test -Dcassandra.strict.runtime.checks=true -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.tolerate_sstable_size=true -Dcassandra.skip_sync=true -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true" />
<option name="VM_PARAMETERS" value="
-ea
-Djava.library.path=$PROJECT_DIR$/lib/sigar-bin
-Djava.security.egd=file:/dev/urandom
-XX:-BackgroundCompilation
-XX:-TieredCompilation
-XX:ActiveProcessorCount=4
-XX:CICompilerCount=1
-XX:HeapDumpPath=build/test
-XX:MaxMetaspaceSize=384M
-XX:ReservedCodeCacheSize=256M
-XX:SoftRefLRUPolicyMSPerMB=0
-XX:Tier4CompileThreshold=1000
-Xbootclasspath/a:$PROJECT_DIR$/build/test/lib/jars/simulator-bootstrap.jar
-Xmx8G
-Xss384k
-javaagent:$PROJECT_DIR$/build/test/lib/jars/simulator-asm.jar
-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml
-Dcassandra.debugrefcount=false
-Dcassandra.logdir=$PROJECT_DIR$/build/test/logs
-Dcassandra.memtable_row_overhead_computation_step=100
-Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true
-Dcassandra.ring_delay_ms=10000
-Dcassandra.skip_sync=true
-Dcassandra.strict.runtime.checks=true
-Dcassandra.test.simulator.determinismcheck=strict
-Dcassandra.test.sstableformatdevelopment=true
-Dcassandra.tolerate_sstable_size=true
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables
-Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables
-Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml
-Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables
" />
<option name="PARAMETERS" value="" />
<fork_mode value="class" />
<option name="WORKING_DIRECTORY" value="" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.metrics;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

public class AccordClientRequestMetrics extends ClientRequestMetrics
{
public final Meter preempts;
public final Histogram keySize;

public AccordClientRequestMetrics(String scope)
{
super(scope);

preempts = Metrics.meter(factory.createMetricName("Preempts"));
keySize = Metrics.histogram(factory.createMetricName("KeySizeHistogram"), false);
}

@Override
public void release()
{
super.release();
Metrics.remove(factory.createMetricName("Preempts"));
Metrics.remove(factory.createMetricName("KeySizeHistogram"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ private CommandListener maybeWrapListener(CommandListener listener)
if (listener instanceof AccordCommandsForKey)
return new ListenerProxy.CommandsForKeyListenerProxy(((AccordCommandsForKey) listener).key());

//TODO - Support accord.messages.Defer

throw new RuntimeException("Unhandled non-transient listener: " + listener);
}

Expand Down
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/service/accord/AccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;

import accord.api.Result;
import accord.coordinate.Preempted;
import accord.coordinate.Timeout;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
Expand All @@ -40,6 +41,7 @@
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.metrics.AccordClientRequestMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter;
Expand All @@ -53,9 +55,13 @@

import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;

public class AccordService implements Shutdownable
{
public static final AccordClientRequestMetrics readMetrics = new AccordClientRequestMetrics("AccordRead");
public static final AccordClientRequestMetrics writeMetrics = new AccordClientRequestMetrics("AccordWrite");

public final Node node;
private final Shutdownable nodeShutdown;
private final AccordMessageSink messageSink;
Expand Down Expand Up @@ -122,8 +128,11 @@ public static long nowInMicros()
*/
public TxnData coordinate(Txn txn, ConsistencyLevel consistencyLevel)
{
AccordClientRequestMetrics metrics = txn.isWrite() ? writeMetrics : readMetrics;
final long startNanos = nanoTime();
try
{
metrics.keySize.update(txn.keys().size());
Future<Result> future = node.coordinate(txn);
Result result = future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
return (TxnData) result;
Expand All @@ -132,17 +141,35 @@ public TxnData coordinate(Txn txn, ConsistencyLevel consistencyLevel)
{
Throwable cause = e.getCause();
if (cause instanceof Timeout)
{
metrics.timeouts.mark();
throw throwTimeout(txn, consistencyLevel);
}
if (cause instanceof Preempted)
{
metrics.preempts.mark();
//TODO need to improve
// Coordinator "could" query the accord state to see whats going on but that doesn't exist yet.
// Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match
throw throwTimeout(txn, consistencyLevel);
}
metrics.failures.mark();
throw new RuntimeException(cause);
}
catch (InterruptedException e)
{
metrics.failures.mark();
throw new UncheckedInterruptedException(e);
}
catch (TimeoutException e)
{
metrics.timeouts.mark();
throw throwTimeout(txn, consistencyLevel);
}
finally
{
metrics.addNano(nanoTime() - startNanos);
}
}

private static RuntimeException throwTimeout(Txn txn, ConsistencyLevel consistencyLevel)
Expand Down
47 changes: 47 additions & 0 deletions src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.utils.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.AbstractMatcherFilter;
import ch.qos.logback.core.spi.FilterReply;

public class ClassNameFilter extends AbstractMatcherFilter<ILoggingEvent>
{
String loggerName;

public void setLoggerName(String loggerName)
{
this.loggerName = loggerName;
}

@Override
public FilterReply decide(ILoggingEvent event)
{
if (!isStarted()) return FilterReply.NEUTRAL;
if (event.getLoggerName().equals(loggerName)) return onMatch;
return onMismatch;
}

@Override
public void start()
{
if (loggerName != null) super.start();
}
}
14 changes: 14 additions & 0 deletions test/conf/logback-simulator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@
<!-- Shutdown hook ensures that async appender flushes -->
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>

<appender name="HISTORYLOG" class="ch.qos.logback.core.FileAppender">
<file>./build/test/logs/simulator/${run_start}-${run_seed}/history.log</file>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
<immediateFlush>true</immediateFlush>
<filter class="org.apache.cassandra.utils.logging.ClassNameFilter">
<loggerName>org.apache.cassandra.simulator.paxos.LoggingHistoryValidator</loggerName>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
<file>./build/test/logs/simulator/${run_start}-${run_seed}/${instance_id}/system.log</file>
<encoder>
Expand Down Expand Up @@ -56,6 +69,7 @@
<root level="INFO">
<appender-ref ref="INSTANCEFILE" />
<appender-ref ref="STDOUT" />
<appender-ref ref="HISTORYLOG" />
</root>
</configuration>

0 comments on commit 0ddfbbd

Please sign in to comment.