From b037b605489bd81329f02644753e37d534027d72 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Thu, 7 May 2026 17:19:10 +0100 Subject: [PATCH 1/7] Enable extensibility for bulk writer broadcast/reconstruction --- .../bulkwriter/AbstractBulkWriterContext.java | 4 +- .../spark/bulkwriter/BulkWriterConfig.java | 26 +- .../spark/bulkwriter/BulkWriterContext.java | 26 +- .../CassandraBulkSourceRelation.java | 58 +--- .../CassandraBulkWriterContext.java | 29 +- .../spark/bulkwriter/RecordWriter.java | 2 +- .../CassandraClusterInfoGroup.java | 9 +- ...CassandraCoordinatedBulkWriterContext.java | 34 ++- .../BulkWriterConfigExtensibilityTest.java | 254 ++++++++++++++++++ .../bulkwriter/MockBulkWriterContext.java | 7 + 10 files changed, 365 insertions(+), 84 deletions(-) create mode 100644 cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java index b6eff3c80..fd2210d0b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java @@ -54,7 +54,7 @@ *
  • Driver creates BulkWriterContext using constructor
  • *
  • Driver extracts BulkWriterConfig in {@link CassandraBulkSourceRelation} constructor
  • *
  • BulkWriterConfig gets broadcast to executors
  • - *
  • Executors reconstruct BulkWriterContext via {@link BulkWriterContext#from(BulkWriterConfig)}
  • + *
  • Executors reconstruct BulkWriterContext via {@link BulkWriterConfig#toBulkWriterContext()}
  • * * *

    Broadcastable wrappers used in BulkWriterConfig: @@ -112,7 +112,7 @@ protected AbstractBulkWriterContext(@NotNull BulkSparkConf conf, /** * Constructor for executor usage. * Reconstructs components from broadcast configuration on executors. - * This is used by the factory method {@link BulkWriterContext#from(BulkWriterConfig)}. + * This is used by {@link BulkWriterConfig#toBulkWriterContext()}. * * @param config immutable configuration for the bulk writer with pre-computed values */ diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java index e2a3283aa..4359e1bcd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java @@ -21,6 +21,7 @@ import java.io.Serializable; +import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext; import org.jetbrains.annotations.NotNull; /** @@ -42,10 +43,10 @@ * and minimize Spark SizeEstimator overhead. *

    * On executors, {@link BulkWriterContext} instances are reconstructed from this config using - * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the broadcastable - * wrappers and reconstructs the full implementations with fresh data from Cassandra Sidecar. + * {@link #toBulkWriterContext()}, which creates the appropriate context implementation and + * reconstructs the full implementations with fresh data from Cassandra Sidecar. */ -public final class BulkWriterConfig implements Serializable +public class BulkWriterConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -111,4 +112,23 @@ public String getLowestCassandraVersion() { return lowestCassandraVersion; } + + /** + * Factory method that reconstructs a {@link BulkWriterContext} on executors from this broadcast config. + * Subclasses may override this to return custom context implementations for specialized reconstruction. + * + * @return a new BulkWriterContext instance appropriate for the current configuration + */ + public BulkWriterContext toBulkWriterContext() + { + BulkSparkConf conf = getConf(); + if (conf.isCoordinatedWriteConfigured()) + { + return new CassandraCoordinatedBulkWriterContext(this); + } + else + { + return new CassandraBulkWriterContext(this); + } + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java index a76b7c60f..8f240215c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java @@ -22,6 +22,7 @@ import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext; import org.apache.cassandra.spark.common.stats.JobStatsPublisher; import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.spark.api.java.JavaSparkContext; /** * Context for bulk write operations, providing access to cluster, job, schema, and transport information. @@ -29,7 +30,7 @@ * Serialization Architecture: * This interface does NOT extend Serializable. BulkWriterContext instances are never broadcast to executors. * Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct BulkWriterContext instances - * from the config using the factory method {@link #from(BulkWriterConfig)}. + * from the config using {@link BulkWriterConfig#toBulkWriterContext()}. *

    * The implementations ({@link CassandraBulkWriterContext}, {@link CassandraCoordinatedBulkWriterContext}) * do NOT have serialVersionUID fields as they are never serialized. @@ -53,23 +54,12 @@ public interface BulkWriterContext TransportContext transportContext(); /** - * Factory method to create a BulkWriterContext from a BulkWriterConfig on executors. - * This method reconstructs context instances on executors from the broadcast configuration. - * The driver creates contexts directly using constructors, not this method. + * Converts this context into an immutable {@link BulkWriterConfig} suitable for broadcasting to executors. + * Executors reconstruct a full {@link BulkWriterContext} from the config via + * {@link BulkWriterConfig#toBulkWriterContext()}. * - * @param config the immutable configuration object broadcast from driver - * @return a new BulkWriterContext instance + * @param sparkContext the Spark context (used to obtain default parallelism) + * @return an immutable config containing all broadcastable state */ - static BulkWriterContext from(BulkWriterConfig config) - { - BulkSparkConf conf = config.getConf(); - if (conf.isCoordinatedWriteConfigured()) - { - return new CassandraCoordinatedBulkWriterContext(config); - } - else - { - return new CassandraBulkWriterContext(config); - } - } + BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index f6323af36..d6c8fbab8 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -90,69 +90,13 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s this.sqlContext = sqlContext; this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()); // Extract immutable configuration from the context for broadcasting - BulkWriterConfig config = extractConfig(writerContext, sparkContext.defaultParallelism()); + BulkWriterConfig config = writerContext.toBulkWriterConfigForBroadcasting(sparkContext); this.broadcastConfig = sparkContext.broadcast(config); ReplicaAwareFailureHandler failureHandler = new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); this.simpleTaskScheduler = new SimpleTaskScheduler(); } - /** - * Extracts immutable configuration from a BulkWriterContext for broadcasting. - * Creates BroadcastableCluster, BroadcastableJobInfo, and BroadcastableSchemaInfo - * to ensure zero transient fields and avoid Logger references in the broadcast object. - */ - private static BulkWriterConfig extractConfig(BulkWriterContext context, int sparkDefaultParallelism) - { - if (context instanceof AbstractBulkWriterContext) - { - AbstractBulkWriterContext abstractContext = (AbstractBulkWriterContext) context; - ClusterInfo originalClusterInfo = abstractContext.cluster(); - - // Create BroadcastableCluster to avoid transient fields in broadcast - IBroadcastableClusterInfo broadcastableClusterInfo; - if (originalClusterInfo instanceof CassandraClusterInfoGroup) - { - // Coordinated write scenario - @SuppressWarnings("unchecked") - CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) originalClusterInfo; - broadcastableClusterInfo = BroadcastableClusterInfoGroup.from( - multiCluster, - abstractContext.bulkSparkConf() - ); - } - else - { - // Single cluster scenario - broadcastableClusterInfo = BroadcastableClusterInfo.from( - originalClusterInfo, - abstractContext.bulkSparkConf() - ); - } - - // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner - BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from( - abstractContext.job(), - abstractContext.bulkSparkConf() - ); - - // Create BroadcastableSchemaInfo to avoid Logger in TableSchema - BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from( - abstractContext.schema() - ); - - return new BulkWriterConfig( - abstractContext.bulkSparkConf(), - sparkDefaultParallelism, - broadcastableJobInfo, - broadcastableClusterInfo, - broadcastableSchemaInfo, - abstractContext.lowestCassandraVersion() - ); - } - throw new IllegalArgumentException("Cannot extract config from context type: " + context.getClass().getName()); - } - @Override @NotNull public SQLContext sqlContext() diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 5c5168374..84146eb5f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -45,7 +46,7 @@ protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf, } /** - * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)} factory method. + * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}. * This constructor is only used on executors to reconstruct context from broadcast config. * * @param config immutable configuration for the bulk writer @@ -81,4 +82,30 @@ protected MultiClusterContainer generateRestoreJobIds() { return MultiClusterContainer.ofSingle(bridge().getTimeUUID()); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + ClusterInfo originalClusterInfo = cluster(); + + // Extract only broadcast-safe cluster metadata + + // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable + IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfo.from(originalClusterInfo, bulkSparkConf()); + + // TokenPartitioner contains a Logger field that is not serializable and expensive for SizeEstimator + BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); + + // TableSchema contains a Logger field that is not serializable and expensive for SizeEstimator + BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); + + return new BulkWriterConfig( + bulkSparkConf(), + sparkContext.defaultParallelism(), + broadcastableJobInfo, + broadcastableClusterInfo, + broadcastableSchemaInfo, + lowestCassandraVersion() + ); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 33d094e41..d707a651b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -93,7 +93,7 @@ public class RecordWriter */ public RecordWriter(BulkWriterConfig config, String[] columnNames) { - this(BulkWriterContext.from(config), columnNames, TaskContext::get, SortedSSTableWriter::new); + this(config.toBulkWriterContext(), columnNames, TaskContext::get, SortedSSTableWriter::new); } @VisibleForTesting diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java index 9fd870cdc..128fdaca1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java @@ -136,7 +136,14 @@ public static CassandraClusterInfoGroup fromBulkSparkConf(BulkSparkConf conf, Fu return new CassandraClusterInfoGroup(clusterInfos); } - @VisibleForTesting // ONLY FOR TESTING + /** + * Creates a {@link CassandraClusterInfoGroup} from a pre-built list of {@link ClusterInfo} instances. + * This factory is intended for custom {@link IBroadcastableClusterInfo} implementations that reconstruct + * cluster infos individually and need to wrap them in a group. + * + * @param clusterInfos the list of already-reconstructed ClusterInfo instances + * @return a new CassandraClusterInfoGroup + */ public static CassandraClusterInfoGroup createFrom(List clusterInfos) { return new CassandraClusterInfoGroup(clusterInfos); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java index 5329fc213..2c18297db 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java @@ -25,10 +25,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext; +import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.BroadcastableJobInfo; +import org.apache.cassandra.spark.bulkwriter.BroadcastableSchemaInfo; import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig; import org.apache.cassandra.spark.bulkwriter.ClusterInfo; import org.apache.cassandra.spark.bulkwriter.DataTransport; +import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -50,7 +55,7 @@ public CassandraCoordinatedBulkWriterContext(@NotNull BulkSparkConf conf, } /** - * Constructor used by {@link org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)} factory method. + * Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}. * This constructor is only used on executors to reconstruct context from broadcast config. * * @param config immutable configuration for the bulk writer @@ -115,4 +120,31 @@ protected CassandraClusterInfoGroup clusterInfoGroup() { return (CassandraClusterInfoGroup) cluster(); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + ClusterInfo originalClusterInfo = cluster(); + + // Extract only broadcast-safe cluster metadata + + // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable + CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) originalClusterInfo; + IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf()); + + // TokenPartitioner contains a Logger field that is not serializable and expensive for SizeEstimator + BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); + + // TableSchema contains a Logger field that is not serializable and expensive for SizeEstimator + BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); + + return new BulkWriterConfig( + bulkSparkConf(), + sparkContext.defaultParallelism(), + broadcastableJobInfo, + broadcastableClusterInfo, + broadcastableSchemaInfo, + lowestCassandraVersion() + ); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java new file mode 100644 index 000000000..c718708c1 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.util.UUID; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer; +import org.apache.cassandra.spark.common.stats.JobStatsPublisher; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests that verify the extensibility contract for the bulk writer broadcast/reconstruction chain. + * These tests prove that downstream implementations can: + *

    + */ +class BulkWriterConfigExtensibilityTest +{ + @Test + void testToBulkWriterContextCanBeOverridden() + { + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + IBroadcastableClusterInfo mockClusterInfo = mock(IBroadcastableClusterInfo.class); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + + // A custom BulkWriterConfig subclass overriding toBulkWriterContext() + BulkWriterConfig customConfig = new BulkWriterConfig(mockConf, 4, mockJobInfo, mockClusterInfo, mockSchemaInfo, "4.0.0") + { + @Override + public BulkWriterContext toBulkWriterContext() + { + return mock(BulkWriterContext.class); + } + }; + + BulkWriterContext context = customConfig.toBulkWriterContext(); + assertThat(context).isNotNull(); + // The OSS default would return CassandraBulkWriterContext or CassandraCoordinatedBulkWriterContext, + // but our subclass returns a mock — proving the override is dispatched. + assertThat(context).isNotInstanceOf(CassandraBulkWriterContext.class); + } + + @Test + void testCustomIBroadcastableClusterInfoReconstructIsCalled() + { + ClusterInfo expectedCluster = mock(ClusterInfo.class); + + // Custom IBroadcastableClusterInfo whose reconstruct() returns a specific ClusterInfo + IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() + { + @Override + public Partitioner getPartitioner() + { + return Partitioner.Murmur3Partitioner; + } + + @Override + public String getLowestCassandraVersion() + { + return "4.0.0"; + } + + @Nullable + @Override + public String clusterId() + { + return null; + } + + @NotNull + @Override + public BulkSparkConf getConf() + { + return mock(BulkSparkConf.class); + } + + @Override + public ClusterInfo reconstruct() + { + return expectedCluster; + } + }; + + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + when(mockJobInfo.getConf()).thenReturn(mockConf); + when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); + + // Use a test subclass that overrides expensive methods to avoid needing real infrastructure + TestBulkWriterContext context = new TestBulkWriterContext(config); + + assertThat(context.cluster()).isSameAs(expectedCluster); + } + + @Test + void testReconstructJobInfoOnExecutorCanBeOverridden() + { + JobInfo expectedJobInfo = mock(JobInfo.class); + ClusterInfo mockCluster = mock(ClusterInfo.class); + + IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() + { + @Override + public Partitioner getPartitioner() + { + return Partitioner.Murmur3Partitioner; + } + + @Override + public String getLowestCassandraVersion() + { + return "4.0.0"; + } + + @Nullable + @Override + public String clusterId() + { + return null; + } + + @NotNull + @Override + public BulkSparkConf getConf() + { + return mock(BulkSparkConf.class); + } + + @Override + public ClusterInfo reconstruct() + { + return mockCluster; + } + }; + + BulkSparkConf mockConf = mock(BulkSparkConf.class); + BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); + when(mockJobInfo.getConf()).thenReturn(mockConf); + when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); + BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); + + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); + + // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo + TestBulkWriterContext context = new TestBulkWriterContext(config) + { + @Override + protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) + { + return expectedJobInfo; + } + }; + + assertThat(context.job()).isSameAs(expectedJobInfo); + } + + /** + * Minimal AbstractBulkWriterContext subclass for testing executor-side reconstruction + * without requiring real Cassandra infrastructure. + */ + private static class TestBulkWriterContext extends AbstractBulkWriterContext + { + TestBulkWriterContext(@NotNull BulkWriterConfig config) + { + super(config); + } + + @Override + protected ClusterInfo buildClusterInfo() + { + throw new UnsupportedOperationException("Driver-only"); + } + + @Override + protected void validateKeyspaceReplication() + { + } + + @Override + protected MultiClusterContainer generateRestoreJobIds() + { + throw new UnsupportedOperationException("Driver-only"); + } + + @Override + protected CassandraBridge buildCassandraBridge() + { + return mock(CassandraBridge.class); + } + + @Override + protected TransportContext buildTransportContext(boolean isOnDriver) + { + return mock(TransportContext.class); + } + + @Override + protected JobStatsPublisher buildJobStatsPublisher() + { + return mock(JobStatsPublisher.class); + } + + @Override + protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) + { + return mock(JobInfo.class); + } + + @Override + protected SchemaInfo reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo) + { + return mock(SchemaInfo.class); + } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(org.apache.spark.api.java.JavaSparkContext sparkContext) + { + throw new UnsupportedOperationException("Not needed for test"); + } + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index d9cefac43..cb5564f0c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -60,6 +60,7 @@ import org.apache.cassandra.spark.exception.SidecarApiCallException; import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; import org.apache.cassandra.spark.validation.StartupValidator; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -551,4 +552,10 @@ public void startupValidate() { StartupValidator.instance().perform(); } + + @Override + public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) + { + throw new UnsupportedOperationException("Not implemented in mock"); + } } From 3a6ca2af215e911c645ff4afe43c7aa46f4ad281 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Fri, 8 May 2026 13:13:25 +0100 Subject: [PATCH 2/7] remove import --- .../cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java | 1 - .../cloudstorage/coordinated/CassandraClusterInfoGroup.java | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index d6c8fbab8..44bd5e166 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -42,7 +42,6 @@ import org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult; import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator; import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator; -import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java index 128fdaca1..4a434a7a5 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java @@ -48,6 +48,7 @@ import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo; import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo; import org.apache.cassandra.spark.bulkwriter.WriteAvailability; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; From bbf1f421da789bb727bef45b9529ccc96761dd21 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Fri, 8 May 2026 18:17:14 +0100 Subject: [PATCH 3/7] resolve comments --- .../spark/bulkwriter/BulkWriterConfig.java | 12 +++------- .../CassandraBulkWriterContext.java | 11 +--------- .../bulkwriter/CassandraClusterInfo.java | 22 +++++++++++++------ ...CassandraCoordinatedBulkWriterContext.java | 11 +--------- .../BulkWriterConfigExtensibilityTest.java | 2 +- .../BulkReaderMultiDCConsistencyTest.java | 3 +-- 6 files changed, 22 insertions(+), 39 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java index 4359e1bcd..c58753f1d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java @@ -121,14 +121,8 @@ public String getLowestCassandraVersion() */ public BulkWriterContext toBulkWriterContext() { - BulkSparkConf conf = getConf(); - if (conf.isCoordinatedWriteConfigured()) - { - return new CassandraCoordinatedBulkWriterContext(this); - } - else - { - return new CassandraBulkWriterContext(this); - } + return getConf().isCoordinatedWriteConfigured() + ? new CassandraCoordinatedBulkWriterContext(this) + : new CassandraBulkWriterContext(this); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 84146eb5f..f54b7190e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -86,17 +86,8 @@ protected MultiClusterContainer generateRestoreJobIds() @Override public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) { - ClusterInfo originalClusterInfo = cluster(); - - // Extract only broadcast-safe cluster metadata - - // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable - IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfo.from(originalClusterInfo, bulkSparkConf()); - - // TokenPartitioner contains a Logger field that is not serializable and expensive for SizeEstimator + IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfo.from(cluster(), bulkSparkConf()); BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); - - // TableSchema contains a Logger field that is not serializable and expensive for SizeEstimator BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); return new BulkWriterConfig( diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index ad143fb0c..a9c120871 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -65,24 +65,32 @@ /** * Driver-only implementation of {@link ClusterInfo} for single cluster operations. *

    - * This class is NOT serialized and does NOT have a serialVersionUID. - * When broadcasting to executors, the driver extracts information from this class - * and creates a {@link BroadcastableClusterInfo} instance, which is then included - * in the {@link BulkWriterConfig} that gets broadcast. + * This class is NOT serialized. When broadcasting to executors, the driver extracts + * broadcast-safe fields via {@link BroadcastableClusterInfo#from(ClusterInfo, BulkSparkConf)} + * and includes the result in the {@link BulkWriterConfig} that gets broadcast. *

    - * This class implements Serializable only because the {@link ClusterInfo} interface - * requires it (for use as a field type in broadcast classes), but instances of this - * class are never directly serialized. + * On executors, a new instance is reconstructed from {@link BroadcastableClusterInfo} + * using {@link #CassandraClusterInfo(BroadcastableClusterInfo)}, reusing broadcast-safe + * fields and fetching other data fresh from Sidecar. + * + * @see BroadcastableClusterInfo for the broadcast-safe subset of fields */ public class CassandraClusterInfo implements ClusterInfo, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfo.class); + // -- Broadcast-safe fields -- + // Extracted by BroadcastableClusterInfo.from() and sent to executors. + // Changes here must be reflected in BroadcastableClusterInfo. protected final BulkSparkConf conf; protected final String clusterId; protected String cassandraVersion; protected Partitioner partitioner; + // -- Driver-only fields (not broadcast) -- + // NOT included in BroadcastableClusterInfo. Either expensive to serialize + // (token mappings, schema) or non-serializable (CassandraContext, Futures). + // Executors reconstruct these fresh from Sidecar via CassandraClusterInfo(BroadcastableClusterInfo). protected volatile TokenRangeMapping tokenRangeReplicas; protected volatile String keyspaceSchema; protected volatile ReplicationFactor replicationFactor; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java index 2c18297db..827bad10e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java @@ -124,18 +124,9 @@ protected CassandraClusterInfoGroup clusterInfoGroup() @Override public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) { - ClusterInfo originalClusterInfo = cluster(); - - // Extract only broadcast-safe cluster metadata - - // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable - CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) originalClusterInfo; + CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) cluster(); IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf()); - - // TokenPartitioner contains a Logger field that is not serializable and expensive for SizeEstimator BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); - - // TableSchema contains a Logger field that is not serializable and expensive for SizeEstimator BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); return new BulkWriterConfig( diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java index c718708c1..3c8a478bb 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java @@ -65,7 +65,7 @@ public BulkWriterContext toBulkWriterContext() BulkWriterContext context = customConfig.toBulkWriterContext(); assertThat(context).isNotNull(); - // The OSS default would return CassandraBulkWriterContext or CassandraCoordinatedBulkWriterContext, + // The base class would return CassandraBulkWriterContext or CassandraCoordinatedBulkWriterContext, // but our subclass returns a mock — proving the override is dispatched. assertThat(context).isNotInstanceOf(CassandraBulkWriterContext.class); } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java index fd87abd59..ba54f53cc 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.stream.Collectors; -import com.datastax.driver.core.exceptions.ReadTimeoutException; import org.junit.jupiter.api.Test; import net.bytebuddy.ByteBuddy; @@ -204,7 +203,7 @@ void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException } catch (Exception e) { - if (attempt == 10 || !(e instanceof ReadTimeoutException)) + if (attempt == 10 || !e.getClass().getName().endsWith("ReadTimeoutException")) { throw e; } From 190b988ffa9ab30d6e9f56a08c5e9e266b9fd58f Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Fri, 8 May 2026 18:34:07 +0100 Subject: [PATCH 4/7] add comment --- .../cassandra/analytics/BulkReaderMultiDCConsistencyTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java index ba54f53cc..80867b2dd 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java @@ -203,6 +203,9 @@ void eachQuorumIsNotQuorum() throws IOException, NoSuchMethodException } catch (Exception e) { + // ReadTimeoutException here is of type org.apache.cassandra.exceptions.ReadTimeoutException + // which is not available on the integration-tests compile classpath + // Hence checking for class name instead of using instanceof if (attempt == 10 || !e.getClass().getName().endsWith("ReadTimeoutException")) { throw e; From 1f8bc5540e65149c20deba05131645a2743953a4 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 12 May 2026 12:36:29 +0100 Subject: [PATCH 5/7] minor comments --- ...CassandraCoordinatedBulkWriterContext.java | 2 +- .../BulkWriterConfigExtensibilityTest.java | 87 +------------------ 2 files changed, 5 insertions(+), 84 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java index 827bad10e..9b00c7a1d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java @@ -124,7 +124,7 @@ protected CassandraClusterInfoGroup clusterInfoGroup() @Override public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) { - CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) cluster(); + CassandraClusterInfoGroup multiCluster = clusterInfoGroup(); IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf()); BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf()); BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema()); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java index 3c8a478bb..380c44600 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfigExtensibilityTest.java @@ -26,9 +26,7 @@ import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer; import org.apache.cassandra.spark.common.stats.JobStatsPublisher; -import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -74,52 +72,15 @@ public BulkWriterContext toBulkWriterContext() void testCustomIBroadcastableClusterInfoReconstructIsCalled() { ClusterInfo expectedCluster = mock(ClusterInfo.class); - - // Custom IBroadcastableClusterInfo whose reconstruct() returns a specific ClusterInfo - IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() - { - @Override - public Partitioner getPartitioner() - { - return Partitioner.Murmur3Partitioner; - } - - @Override - public String getLowestCassandraVersion() - { - return "4.0.0"; - } - - @Nullable - @Override - public String clusterId() - { - return null; - } - - @NotNull - @Override - public BulkSparkConf getConf() - { - return mock(BulkSparkConf.class); - } - - @Override - public ClusterInfo reconstruct() - { - return expectedCluster; - } - }; + IBroadcastableClusterInfo mockBroadcastable = mock(IBroadcastableClusterInfo.class); + when(mockBroadcastable.reconstruct()).thenReturn(expectedCluster); BulkSparkConf mockConf = mock(BulkSparkConf.class); BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); - when(mockJobInfo.getConf()).thenReturn(mockConf); - when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); - BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, mockBroadcastable, mockSchemaInfo, "4.0.0"); - // Use a test subclass that overrides expensive methods to avoid needing real infrastructure TestBulkWriterContext context = new TestBulkWriterContext(config); assertThat(context.cluster()).isSameAs(expectedCluster); @@ -129,50 +90,10 @@ public ClusterInfo reconstruct() void testReconstructJobInfoOnExecutorCanBeOverridden() { JobInfo expectedJobInfo = mock(JobInfo.class); - ClusterInfo mockCluster = mock(ClusterInfo.class); - - IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() - { - @Override - public Partitioner getPartitioner() - { - return Partitioner.Murmur3Partitioner; - } - - @Override - public String getLowestCassandraVersion() - { - return "4.0.0"; - } - - @Nullable - @Override - public String clusterId() - { - return null; - } - - @NotNull - @Override - public BulkSparkConf getConf() - { - return mock(BulkSparkConf.class); - } - - @Override - public ClusterInfo reconstruct() - { - return mockCluster; - } - }; - BulkSparkConf mockConf = mock(BulkSparkConf.class); BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); - when(mockJobInfo.getConf()).thenReturn(mockConf); - when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); - - BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); + BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, mock(IBroadcastableClusterInfo.class), mockSchemaInfo, "4.0.0"); // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo TestBulkWriterContext context = new TestBulkWriterContext(config) From 05811d5d75d1352562e76c1f99c7b0a683df2d89 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 12 May 2026 14:17:57 +0100 Subject: [PATCH 6/7] fix maven too many requests error --- scripts/build-dtest-jars.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh index fe7d4a969..56154f5df 100755 --- a/scripts/build-dtest-jars.sh +++ b/scripts/build-dtest-jars.sh @@ -107,6 +107,9 @@ exit 0 git clean -fd CASSANDRA_VERSION=$(cat build.xml | grep 'property name="base.version"' | awk -F "\"" '{print $4}') # Loop to prevent failure due to maven-ant-tasks not downloading a jar. + # Sleep between retries to allow Maven Central rate limit window to reset. + # Successfully downloaded artifacts remain cached in ~/.m2/repository, + # so subsequent attempts only need to fetch the ones that failed. for x in $(seq 1 3); do if [ -f "${DTEST_JAR_DIR}/dtest-${CASSANDRA_VERSION}.jar" ]; then RETURN="0" @@ -117,6 +120,8 @@ exit 0 if [ "${RETURN}" -eq "0" ]; then break fi + echo "Attempt ${x} failed; sleeping 600s before retry to reset Maven Central rate limit..." + sleep 600 fi done # Exit, if we didn't build successfully From 7352f6699389062e4ec1c104ffecbfd96a0d9347 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 12 May 2026 15:29:28 +0100 Subject: [PATCH 7/7] remove sleep --- scripts/build-dtest-jars.sh | 5 ----- 1 file changed, 5 deletions(-) diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh index 56154f5df..fe7d4a969 100755 --- a/scripts/build-dtest-jars.sh +++ b/scripts/build-dtest-jars.sh @@ -107,9 +107,6 @@ exit 0 git clean -fd CASSANDRA_VERSION=$(cat build.xml | grep 'property name="base.version"' | awk -F "\"" '{print $4}') # Loop to prevent failure due to maven-ant-tasks not downloading a jar. - # Sleep between retries to allow Maven Central rate limit window to reset. - # Successfully downloaded artifacts remain cached in ~/.m2/repository, - # so subsequent attempts only need to fetch the ones that failed. for x in $(seq 1 3); do if [ -f "${DTEST_JAR_DIR}/dtest-${CASSANDRA_VERSION}.jar" ]; then RETURN="0" @@ -120,8 +117,6 @@ exit 0 if [ "${RETURN}" -eq "0" ]; then break fi - echo "Attempt ${x} failed; sleeping 600s before retry to reset Maven Central rate limit..." - sleep 600 fi done # Exit, if we didn't build successfully