Skip to content
Permalink
Browse files
Support auto discover user tables for comparison
Patch by Yifan Cai; reviewed by Sam Tunnicliffe for CASSANDRA-15953

closes #10
  • Loading branch information
yifan-c authored and beobal committed Jul 27, 2020
1 parent e9782c6 commit 2267933c3ac009808271e1831cf56258c99cf2d3
Showing 12 changed files with 489 additions and 20 deletions.
@@ -3,6 +3,11 @@
## Configuration
See `spark-job/localconfig.yaml` for an example config.

See `spark-job/localconfig-multi-keyspaces.yaml` for an example config that compares tables under multiple keyspaces.

See `spark-job/localconfig-auto-discover.yaml` for an example config that auto discovers all user tables to compare.
The auto discover mode excludes all system keyspaces and any keyspaces defined at `disallowed_keyspaces` in the yaml file.

## Custom cluster providers
To make it easy to run in any environment the cluster providers are pluggable - there are two interfaces to implement.
First, the `ClusterProvider` interface is used to create a connection to the clusters, and it is configured using
@@ -50,6 +55,8 @@ $ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace2"
$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml
# If rows are created in "keyspace2", you can run pick up the localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See the command below.
# $ spark-submit --verbose --files ./spark-job/localconfig-multi-keyspaces.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-multi-keyspaces.yaml
# To use the auto discover mode, you can run the job with localconfig-auto-discover.yaml, which has the keyspace_tables field removed.
# $ spark-submit --verbose --files ./spark-job/localconfig-auto-discover.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-auto-discover.yaml
# ... logs
INFO DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
## start api-server:
@@ -19,6 +19,7 @@

package org.apache.cassandra.diff.api;

import java.io.FileInputStream;
import java.io.IOException;

import com.google.common.collect.Lists;
@@ -35,7 +36,7 @@ public static void main(String[] args) throws IOException {
String filename = args[0];
JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean();

DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(filename));
DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(new FileInputStream(filename)));
factoryBean.setResourceProviders(Lists.newArrayList(new SingletonResourceProvider(diffResource),
new SingletonResourceProvider(new HealthResource())));
factoryBean.setAddress("http://localhost:8089/");
@@ -20,13 +20,46 @@
package org.apache.cassandra.diff;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public interface JobConfiguration extends Serializable {
List<KeyspaceTablePair> keyspaceTables();

default boolean shouldAutoDiscoverTables() {
List<KeyspaceTablePair> list = keyspaceTables();
return null == list || list.isEmpty();
}

/**
* @return qualified tables defined in the configuration. Return null if not defined.
*/
@Nullable List<KeyspaceTablePair> keyspaceTables();

/**
* @return a list of keyspace names that are disallowed for comparison. Return null if not defined.
*/
@Nullable List<String> disallowedKeyspaces();

/**
* @return filtered qualified tables based on the keyspaceTables and disallowedKeypsaces defined.
* Return null if keyspaceTables is not defined.
*/
@Nullable default List<KeyspaceTablePair> filteredKeyspaceTables() {
List<String> disallowedKeyspaces = disallowedKeyspaces();
List<KeyspaceTablePair> list = keyspaceTables();
if (disallowedKeyspaces != null && !disallowedKeyspaces.isEmpty() && list != null && !list.isEmpty()) {
Set<String> filter = new HashSet<>(disallowedKeyspaces);
return list.stream().filter(t -> !filter.contains(t.keyspace)).collect(Collectors.toList());
} else {
return list;
}
}

int splits();

@@ -19,8 +19,7 @@

package org.apache.cassandra.diff;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.math.BigInteger;
import java.util.HashSet;
import java.util.List;
@@ -35,6 +34,7 @@
public class YamlJobConfiguration implements JobConfiguration {
public int splits = 10000;
public List<KeyspaceTablePair> keyspace_tables;
public List<String> disallowed_keyspaces;
public int buckets = 100;
public int rate_limit = 10000;
public String job_id = null;
@@ -48,20 +48,20 @@ public class YamlJobConfiguration implements JobConfiguration {
public String specific_tokens = null;
public String disallowed_tokens = null;

public static YamlJobConfiguration load(String file) {
public static YamlJobConfiguration load(InputStream inputStream) {
Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class,
Thread.currentThread().getContextClassLoader()));
try {
return yaml.loadAs(new FileInputStream(file), YamlJobConfiguration.class);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
return yaml.loadAs(inputStream, YamlJobConfiguration.class);
}

public List<KeyspaceTablePair> keyspaceTables() {
return keyspace_tables;
}

public List<String> disallowedKeyspaces() {
return disallowed_keyspaces;
}

public int splits() {
return splits;
}
@@ -6,11 +6,32 @@
public class YamlJobConfigurationTest {
@Test
public void testLoadYaml() {
JobConfiguration jobConfiguration = YamlJobConfiguration.load("src/test/resources/testconfig.yaml");
JobConfiguration jobConfiguration = load("testconfig.yaml");
Assert.assertEquals(3, jobConfiguration.keyspaceTables().size());
jobConfiguration.keyspaceTables().forEach(kt -> {
Assert.assertTrue("Keyspace segment is not loaded correctly", kt.keyspace.contains("ks"));
Assert.assertTrue("Table segment is not loaded correctly", kt.table.contains("tb"));
});
}

@Test
public void testLoadYamlWithKeyspaceTablesAbsent() {
JobConfiguration jobConfiguration = load("test_load_config_no_keyspace_tables.yaml");
Assert.assertNull(jobConfiguration.keyspaceTables());
Assert.assertNull(jobConfiguration.disallowedKeyspaces());
Assert.assertNull(jobConfiguration.filteredKeyspaceTables());
Assert.assertTrue(jobConfiguration.shouldAutoDiscoverTables());
}

@Test
public void testLoadYamlFilterOutDisallowedKeyspaces() {
JobConfiguration jobConfiguration = load("test_load_config_all_keyspaces_filtered_out.yaml");
Assert.assertNotNull(jobConfiguration.filteredKeyspaceTables());
Assert.assertTrue("All tables should be filtered out", jobConfiguration.filteredKeyspaceTables().isEmpty());
Assert.assertFalse("It should not be in the discover mode", jobConfiguration.shouldAutoDiscoverTables());
}

private JobConfiguration load(String filename) {
return YamlJobConfiguration.load(getClass().getClassLoader().getResourceAsStream(filename));
}
}
@@ -0,0 +1,57 @@
# List of keyspace.tables to diff
keyspace_tables:
- ks1.tb1
- ks1.tb2
- ks2.tb3

# It makes no sense to filter out keyspaces defined in keyspace_tables
# Only have it for the testing purpose.
disallowed_keyspaces:
- ks1
- ks2

# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
splits: 10000

# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
buckets: 100

# global rate limit - this is how many q/s you think the target clusters can handle
rate_limit: 10000

# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef

# Fetch size to use for the query fetching the tokens in the cluster
token_scan_fetch_size: 1000
# Fetch size to use for the queries fetching the rows of each partition
partition_read_fetch_size: 1000

read_timeout_millis: 10000
reverse_read_probability: 0.5
consistency_level: ALL
metadata_options:
keyspace: cassandradiff
replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
ttl: 31536000
should_init: true
cluster_config:
source:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_1"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"
target:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_2"
contact_points: "127.0.0.1"
port: "9043"
dc: "datacenter1"
metadata:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"
@@ -0,0 +1,45 @@
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
splits: 10000

# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
buckets: 100

# global rate limit - this is how many q/s you think the target clusters can handle
rate_limit: 10000

# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef

# Fetch size to use for the query fetching the tokens in the cluster
token_scan_fetch_size: 1000
# Fetch size to use for the queries fetching the rows of each partition
partition_read_fetch_size: 1000

read_timeout_millis: 10000
reverse_read_probability: 0.5
consistency_level: ALL
metadata_options:
keyspace: cassandradiff
replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
ttl: 31536000
should_init: true
cluster_config:
source:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_1"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"
target:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_2"
contact_points: "127.0.0.1"
port: "9043"
dc: "datacenter1"
metadata:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"
@@ -0,0 +1,45 @@
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
splits: 10000

# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
buckets: 100

# global rate limit - this is how many q/s you think the target clusters can handle
rate_limit: 10000

# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef

# Fetch size to use for the query fetching the tokens in the cluster
token_scan_fetch_size: 1000
# Fetch size to use for the queries fetching the rows of each partition
partition_read_fetch_size: 1000

read_timeout_millis: 10000
reverse_read_probability: 0.5
consistency_level: ALL
metadata_options:
keyspace: cassandradiff
replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
ttl: 31536000
should_init: true
cluster_config:
source:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_1"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"
target:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test_2"
contact_points: "127.0.0.1"
port: "9043"
dc: "datacenter1"
metadata:
impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
name: "local_test"
contact_points: "127.0.0.1"
port: "9042"
dc: "datacenter1"

0 comments on commit 2267933

Please sign in to comment.