Skip to content
Permalink
Browse files
Features:
  * Implement lookbehind via tracker callbacks
  * Improve DSL
  * Rename maxLts to peek
  * Split lts visitors from visitors
  * Allow create table if not exists
  * Allow sampler to be triggered at every LTS
  * Allow local state validator to always run
  * Add Staged Runner
  * Add wait for token ranges
  * Make keyspace DDL configurable
  * Rename PartitionVisitor to Visitor

Bugfixes:
  * Fix for queue draining
  * Fix distribution of the single-op values
  * Fix bug in schema helper: static columns are listed as duplicates

Patch by Alex Petrov for CASSANDRA-16262

Co-authored-by: Caleb Rackliffe <calebrackliffe@gmail.com>
  • Loading branch information
ifesdjeen and maedhroz committed Jan 10, 2022
1 parent 26bb669 commit 275f188660b66743bf3f055c8d7da438ad826061
Showing 54 changed files with 1,550 additions and 628 deletions.
@@ -76,6 +76,7 @@
mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
mapper.registerSubtypes(Configuration.SingleVisitRunnerConfig.class);
mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
mapper.registerSubtypes(Configuration.NoOpDataTrackerConfiguration.class);

@@ -106,6 +107,7 @@
public final SchemaProviderConfiguration schema_provider;

public final boolean drop_schema;
public final String keyspace_ddl;
public final boolean create_schema;
public final boolean truncate_table;

@@ -117,13 +119,11 @@
public final PDSelectorConfiguration partition_descriptor_selector;
public final CDSelectorConfiguration clustering_descriptor_selector;

public final long run_time;
public final TimeUnit run_time_unit;

@JsonCreator
public Configuration(@JsonProperty("seed") long seed,
@JsonProperty("schema_provider") SchemaProviderConfiguration schema_provider,
@JsonProperty("drop_schema") boolean drop_schema,
@JsonProperty("create_keyspace") String keyspace_ddl,
@JsonProperty("create_schema") boolean create_schema,
@JsonProperty("truncate_schema") boolean truncate_table,
@JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
@@ -132,12 +132,11 @@ public Configuration(@JsonProperty("seed") long seed,
@JsonProperty("system_under_test") SutConfiguration system_under_test,
@JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
@JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
@JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
@JsonProperty(value = "run_time", defaultValue = "2") long run_time,
@JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit run_time_unit)
@JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector)
{
this.seed = seed;
this.schema_provider = schema_provider;
this.keyspace_ddl = keyspace_ddl;
this.drop_schema = drop_schema;
this.create_schema = create_schema;
this.truncate_table = truncate_table;
@@ -147,8 +146,6 @@ public Configuration(@JsonProperty("seed") long seed,
this.data_tracker = data_tracker;
this.partition_descriptor_selector = partition_descriptor_selector;
this.clustering_descriptor_selector = clustering_descriptor_selector;
this.run_time = run_time;
this.run_time_unit = run_time_unit;
this.runner = runner;
}

@@ -233,14 +230,15 @@ public static Run createRun(Configuration snapshot)
OpSelectors.MonotonicClock clock = snapshot.clock.make();

MetricReporter metricReporter = snapshot.metric_reporter.make();
// TODO: parse schema
SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);

// TODO: validate that operation kind is compatible with schema, due to statics etc
SystemUnderTest sut = snapshot.system_under_test.make();

SchemaSpec schemaSpec = snapshot.schema_provider.make(seed, sut);
schemaSpec.validate();

OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
// TODO: validate that operation kind is compactible with schema, due to statics etc
SystemUnderTest sut = snapshot.system_under_test.make();

return new Run(rng,
clock,
@@ -263,6 +261,7 @@ public static Runner createRunner(Configuration config)
long seed;
SchemaProviderConfiguration schema_provider = new DefaultSchemaProviderConfiguration();

String keyspace_ddl;
boolean drop_schema;
boolean create_schema;
boolean truncate_table;
@@ -275,9 +274,6 @@ public static Runner createRunner(Configuration config)
PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value

long run_time = 2;
TimeUnit run_time_unit = TimeUnit.HOURS;

public ConfigurationBuilder setSeed(long seed)
{
this.seed = seed;
@@ -290,20 +286,19 @@ public ConfigurationBuilder setSchemaProvider(SchemaProviderConfiguration schema
return this;
}

public ConfigurationBuilder setRunTime(long runTime, TimeUnit runTimeUnit)
public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
{
this.run_time_unit = Objects.requireNonNull(runTimeUnit, "unit");
this.run_time = runTime;
this.data_tracker = tracker;
return this;
}


public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
public ConfigurationBuilder setDataTracker(String keyspace_ddl)
{
this.data_tracker = tracker;
this.keyspace_ddl = keyspace_ddl;
return this;
}


public ConfigurationBuilder setClock(ClockConfiguration clock)
{
this.clock = clock;
@@ -370,20 +365,16 @@ public Configuration build()
return new Configuration(seed,
schema_provider,
drop_schema,
keyspace_ddl,
create_schema,
truncate_table,
metric_reporter,
clock,

runner,
system_under_test,
data_tracker,

partition_descriptor_selector,
clustering_descriptor_selector,

run_time,
run_time_unit);
clustering_descriptor_selector);
}
}

@@ -403,8 +394,6 @@ public ConfigurationBuilder unbuild()
builder.partition_descriptor_selector = partition_descriptor_selector;
builder.clustering_descriptor_selector = clustering_descriptor_selector;

builder.run_time = run_time;
builder.run_time_unit = run_time_unit;
return builder;
}

@@ -425,36 +414,10 @@ public NoOpDataTrackerConfiguration()

public DataTracker make()
{
return new DataTracker()
{
public void started(long lts)
{
}

public void finished(long lts)
{

}

public long maxStarted()
{
return 0;
}

public long maxConsecutiveFinished()
{
return 0;
}

public DataTrackerConfiguration toConfig()
{
return null;
}
};
return DataTracker.NO_OP;
}
}


@JsonTypeName("default")
public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
{
@@ -562,38 +525,74 @@ public interface RunnerConfiguration extends Runner.RunnerFactory
public static class ConcurrentRunnerConfig implements RunnerConfiguration
{
public final int concurrency;
public final List<VisitorConfiguration> visitor_factories;

@JsonProperty(value = "visitors")
public final List<VisitorConfiguration> visitorFactories;

public final long run_time;
public final TimeUnit run_time_unit;

@JsonCreator
public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "4") int concurrency,
@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
@JsonProperty(value = "run_time", defaultValue = "2") long runtime,
@JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
{
this.concurrency = concurrency;
this.visitor_factories = visitors;
this.visitorFactories = visitors;
this.run_time = runtime;
this.run_time_unit = runtimeUnit;
}

@Override
public Runner make(Run run, Configuration config)
{
return new Runner.ConcurrentRunner(run, config, concurrency, visitor_factories);
return new Runner.ConcurrentRunner(run, config, concurrency, visitorFactories, run_time, run_time_unit);
}
}

@JsonTypeName("sequential")
public static class SequentialRunnerConfig implements RunnerConfiguration
{
public final List<VisitorConfiguration> visitor_factories;
@JsonProperty(value = "visitors")
public final List<VisitorConfiguration> visitorFactories;

public final long run_time;
public final TimeUnit run_time_unit;

@JsonCreator
public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
@JsonProperty(value = "run_time", defaultValue = "2") long runtime,
@JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
{
this.visitorFactories = visitors;
this.run_time = runtime;
this.run_time_unit = runtimeUnit;
}

@Override
public Runner make(Run run, Configuration config)
{
return new Runner.SequentialRunner(run, config, visitorFactories, run_time, run_time_unit);
}
}

@JsonTypeName("single")
public static class SingleVisitRunnerConfig implements RunnerConfiguration
{
@JsonProperty(value = "visitors")
public final List<VisitorConfiguration> visitorFactories;

@JsonCreator
public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
public SingleVisitRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
{
this.visitor_factories = visitors;
this.visitorFactories = visitors;
}

@Override
public Runner make(Run run, Configuration config)
{
return new Runner.SequentialRunner(run, config, visitor_factories);
return new Runner.SingleVisitRunner(run, config, visitorFactories);
}
}

@@ -675,7 +674,7 @@ public WeightedSelectorBuilder()
operation_kind_weights = new HashMap<>();
}

public WeightedSelectorBuilder addWeight(T v, int weight)
public WeightedSelectorBuilder<T> addWeight(T v, int weight)
{
operation_kind_weights.put(v, weight);
return this;
@@ -852,7 +851,7 @@ public OpSelectors.DescriptorSelector make(OpSelectors.Rng rng, SchemaSpec schem
}
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface DistributionConfig extends Distribution.DistributionFactory
{
}
@@ -942,7 +941,7 @@ public MutatingVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfig
@Override
public Visitor make(Run run)
{
return new MutatingVisitor(run, row_visitor);
return new MutatingVisitor(run, row_visitor::make);
}
}

@@ -960,7 +959,7 @@ public LoggingVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfig
@Override
public Visitor make(Run run)
{
return new LoggingVisitor(run, row_visitor);
return new LoggingVisitor(run, row_visitor::make);
}
}

@@ -969,6 +968,8 @@ public static class AllPartitionsValidatorConfiguration implements VisitorConfig
{
public final int concurrency;
public final int trigger_after;

@JsonProperty("model")
public final Configuration.ModelConfiguration modelConfiguration;

@JsonCreator
@@ -1055,7 +1056,7 @@ public interface SchemaProviderConfiguration extends SchemaSpec.SchemaSpecFactor
@JsonTypeName("default")
public static class DefaultSchemaProviderConfiguration implements SchemaProviderConfiguration
{
public SchemaSpec make(long seed)
public SchemaSpec make(long seed, SystemUnderTest sut)
{
return SchemaGenerators.defaultSchemaSpecGen("harry", "table0")
.inflate(seed);
@@ -1103,7 +1104,7 @@ public FixedSchemaProviderConfiguration(SchemaSpec schemaSpec,
this.regular_columns = regulars;
this.static_keys = statics;
}
public SchemaSpec make(long seed)
public SchemaSpec make(long seed, SystemUnderTest sut)
{
return schemaSpec;
}
@@ -1122,6 +1123,4 @@ public MetricReporter make()
return MetricReporter.NO_OP;
}
}

// TODO: schema provider by DDL
}
@@ -43,7 +43,6 @@ public Run(OpSelectors.Rng rng,
OpSelectors.MonotonicClock clock,
OpSelectors.PdSelector pdSelector,
OpSelectors.DescriptorSelector descriptorSelector,

SchemaSpec schemaSpec,
DataTracker tracker,
SystemUnderTest sut,
@@ -59,7 +58,6 @@ private Run(OpSelectors.Rng rng,
OpSelectors.PdSelector pdSelector,
OpSelectors.DescriptorSelector descriptorSelector,
QueryGenerator rangeSelector,

SchemaSpec schemaSpec,
DataTracker tracker,
SystemUnderTest sut,
@@ -44,6 +44,6 @@ public boolean canCorrupt(ResultSetRow row)

public CompiledStatement corrupt(ResultSetRow row)
{
return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.maxLts()) + 1);
return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.peek()));
}
}
@@ -77,7 +77,7 @@ public CompiledStatement corrupt(ResultSetRow row)
row.pd,
mask,
schema.regularAndStaticColumnsMask(),
clock.rts(clock.maxLts()) + 1);
clock.rts(clock.peek()));
}
}

@@ -96,6 +96,6 @@ public CompiledStatement corrupt(ResultSetRow row)
row.cd,
mask,
schema.regularAndStaticColumnsMask(),
clock.rts(clock.maxLts()) + 1);
clock.rts(clock.peek()));
}
}
@@ -72,6 +72,6 @@ public CompiledStatement corrupt(ResultSetRow row)
// We do not know LTS of the deleted row. We could try inferring it, but that
// still won't help since we can't use it anyways, since collisions between a
// written value and tombstone are resolved in favour of tombstone.
return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.maxLts()) + 1);
return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.peek()));
}
}

0 comments on commit 275f188

Please sign in to comment.