Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* <li>Driver creates BulkWriterContext using constructor</li>
* <li>Driver extracts BulkWriterConfig in {@link CassandraBulkSourceRelation} constructor</li>
* <li>BulkWriterConfig gets broadcast to executors</li>
* <li>Executors reconstruct BulkWriterContext via {@link BulkWriterContext#from(BulkWriterConfig)}</li>
* <li>Executors reconstruct BulkWriterContext via {@link BulkWriterConfig#toBulkWriterContext()}</li>
* </ol>
*
* <p>Broadcastable wrappers used in BulkWriterConfig:
Expand Down Expand Up @@ -112,7 +112,7 @@ protected AbstractBulkWriterContext(@NotNull BulkSparkConf conf,
/**
* Constructor for executor usage.
* Reconstructs components from broadcast configuration on executors.
* This is used by the factory method {@link BulkWriterContext#from(BulkWriterConfig)}.
* This is used by {@link BulkWriterConfig#toBulkWriterContext()}.
*
* @param config immutable configuration for the bulk writer with pre-computed values
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;

import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -42,10 +43,10 @@
* and minimize Spark SizeEstimator overhead.
* <p>
* On executors, {@link BulkWriterContext} instances are reconstructed from this config using
* {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the broadcastable
* wrappers and reconstructs the full implementations with fresh data from Cassandra Sidecar.
* {@link #toBulkWriterContext()}, which creates the appropriate context implementation and
* reconstructs the full implementations with fresh data from Cassandra Sidecar.
*/
public final class BulkWriterConfig implements Serializable
public class BulkWriterConfig implements Serializable
{
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -111,4 +112,17 @@ public String getLowestCassandraVersion()
{
return lowestCassandraVersion;
}

/**
* Factory method that reconstructs a {@link BulkWriterContext} on executors from this broadcast config.
* Subclasses may override this to return custom context implementations for specialized reconstruction.
*
* @return a new BulkWriterContext instance appropriate for the current configuration
*/
public BulkWriterContext toBulkWriterContext()
{
return getConf().isCoordinatedWriteConfigured()
? new CassandraCoordinatedBulkWriterContext(this)
: new CassandraBulkWriterContext(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.spark.api.java.JavaSparkContext;

/**
* Context for bulk write operations, providing access to cluster, job, schema, and transport information.
* <p>
* Serialization Architecture:
* This interface does NOT extend Serializable. BulkWriterContext instances are never broadcast to executors.
* Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct BulkWriterContext instances
* from the config using the factory method {@link #from(BulkWriterConfig)}.
* from the config using {@link BulkWriterConfig#toBulkWriterContext()}.
* <p>
* The implementations ({@link CassandraBulkWriterContext}, {@link CassandraCoordinatedBulkWriterContext})
* do NOT have serialVersionUID fields as they are never serialized.
Expand All @@ -53,23 +54,12 @@ public interface BulkWriterContext
TransportContext transportContext();

/**
* Factory method to create a BulkWriterContext from a BulkWriterConfig on executors.
* This method reconstructs context instances on executors from the broadcast configuration.
* The driver creates contexts directly using constructors, not this method.
* Converts this context into an immutable {@link BulkWriterConfig} suitable for broadcasting to executors.
* Executors reconstruct a full {@link BulkWriterContext} from the config via
* {@link BulkWriterConfig#toBulkWriterContext()}.
*
* @param config the immutable configuration object broadcast from driver
* @return a new BulkWriterContext instance
* @param sparkContext the Spark context (used to obtain default parallelism)
* @return an immutable config containing all broadcastable state
*/
static BulkWriterContext from(BulkWriterConfig config)
{
BulkSparkConf conf = config.getConf();
if (conf.isCoordinatedWriteConfigured())
{
return new CassandraCoordinatedBulkWriterContext(config);
}
else
{
return new CassandraBulkWriterContext(config);
}
}
BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
Expand Down Expand Up @@ -90,69 +89,13 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s
this.sqlContext = sqlContext;
this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
// Extract immutable configuration from the context for broadcasting
BulkWriterConfig config = extractConfig(writerContext, sparkContext.defaultParallelism());
BulkWriterConfig config = writerContext.toBulkWriterConfigForBroadcasting(sparkContext);
this.broadcastConfig = sparkContext.<BulkWriterConfig>broadcast(config);
ReplicaAwareFailureHandler<RingInstance> failureHandler = new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
this.writeValidator = new BulkWriteValidator(writerContext, failureHandler);
this.simpleTaskScheduler = new SimpleTaskScheduler();
}

/**
* Extracts immutable configuration from a BulkWriterContext for broadcasting.
* Creates BroadcastableCluster, BroadcastableJobInfo, and BroadcastableSchemaInfo
* to ensure zero transient fields and avoid Logger references in the broadcast object.
*/
private static BulkWriterConfig extractConfig(BulkWriterContext context, int sparkDefaultParallelism)
{
if (context instanceof AbstractBulkWriterContext)
{
AbstractBulkWriterContext abstractContext = (AbstractBulkWriterContext) context;
ClusterInfo originalClusterInfo = abstractContext.cluster();

// Create BroadcastableCluster to avoid transient fields in broadcast
IBroadcastableClusterInfo broadcastableClusterInfo;
if (originalClusterInfo instanceof CassandraClusterInfoGroup)
{
// Coordinated write scenario
@SuppressWarnings("unchecked")
CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) originalClusterInfo;
broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(
multiCluster,
abstractContext.bulkSparkConf()
);
}
else
{
// Single cluster scenario
broadcastableClusterInfo = BroadcastableClusterInfo.from(
originalClusterInfo,
abstractContext.bulkSparkConf()
);
}

// Create BroadcastableJobInfo to avoid Logger in TokenPartitioner
BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(
abstractContext.job(),
abstractContext.bulkSparkConf()
);

// Create BroadcastableSchemaInfo to avoid Logger in TableSchema
BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(
abstractContext.schema()
);

return new BulkWriterConfig(
abstractContext.bulkSparkConf(),
sparkDefaultParallelism,
broadcastableJobInfo,
broadcastableClusterInfo,
broadcastableSchemaInfo,
abstractContext.lowestCassandraVersion()
);
}
throw new IllegalArgumentException("Cannot extract config from context type: " + context.getClass().getName());
}

@Override
@NotNull
public SQLContext sqlContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.lang3.StringUtils;

import org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;

Expand All @@ -45,7 +46,7 @@ protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
}

/**
* Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)} factory method.
* Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
* This constructor is only used on executors to reconstruct context from broadcast config.
*
* @param config immutable configuration for the bulk writer
Expand Down Expand Up @@ -81,4 +82,21 @@ protected MultiClusterContainer<UUID> generateRestoreJobIds()
{
return MultiClusterContainer.ofSingle(bridge().getTimeUUID());
}

@Override
public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext)
{
IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfo.from(cluster(), bulkSparkConf());
BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf());
BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema());

return new BulkWriterConfig(
bulkSparkConf(),
sparkContext.defaultParallelism(),
broadcastableJobInfo,
broadcastableClusterInfo,
broadcastableSchemaInfo,
lowestCassandraVersion()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,32 @@
/**
* Driver-only implementation of {@link ClusterInfo} for single cluster operations.
* <p>
* This class is NOT serialized and does NOT have a serialVersionUID.
* When broadcasting to executors, the driver extracts information from this class
* and creates a {@link BroadcastableClusterInfo} instance, which is then included
* in the {@link BulkWriterConfig} that gets broadcast.
* This class is NOT serialized. When broadcasting to executors, the driver extracts
* broadcast-safe fields via {@link BroadcastableClusterInfo#from(ClusterInfo, BulkSparkConf)}
* and includes the result in the {@link BulkWriterConfig} that gets broadcast.
* <p>
* This class implements Serializable only because the {@link ClusterInfo} interface
* requires it (for use as a field type in broadcast classes), but instances of this
* class are never directly serialized.
* On executors, a new instance is reconstructed from {@link BroadcastableClusterInfo}
* using {@link #CassandraClusterInfo(BroadcastableClusterInfo)}, reusing broadcast-safe
* fields and fetching other data fresh from Sidecar.
*
* @see BroadcastableClusterInfo for the broadcast-safe subset of fields
*/
public class CassandraClusterInfo implements ClusterInfo, Closeable
{
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfo.class);

// -- Broadcast-safe fields --
// Extracted by BroadcastableClusterInfo.from() and sent to executors.
// Changes here must be reflected in BroadcastableClusterInfo.
protected final BulkSparkConf conf;
protected final String clusterId;
protected String cassandraVersion;
protected Partitioner partitioner;

// -- Driver-only fields (not broadcast) --
// NOT included in BroadcastableClusterInfo. Either expensive to serialize
// (token mappings, schema) or non-serializable (CassandraContext, Futures).
// Executors reconstruct these fresh from Sidecar via CassandraClusterInfo(BroadcastableClusterInfo).
protected volatile TokenRangeMapping<RingInstance> tokenRangeReplicas;
protected volatile String keyspaceSchema;
protected volatile ReplicationFactor replicationFactor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class RecordWriter
*/
public RecordWriter(BulkWriterConfig config, String[] columnNames)
{
this(BulkWriterContext.from(config), columnNames, TaskContext::get, SortedSSTableWriter::new);
this(config.toBulkWriterContext(), columnNames, TaskContext::get, SortedSSTableWriter::new);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.cassandra.spark.bulkwriter.RingInstance;
import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo;
import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
Expand Down Expand Up @@ -136,7 +137,14 @@ public static CassandraClusterInfoGroup fromBulkSparkConf(BulkSparkConf conf, Fu
return new CassandraClusterInfoGroup(clusterInfos);
}

@VisibleForTesting // ONLY FOR TESTING
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing the annotation? I think it is still only used by test code

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is no longer test-only, custom IBroadcastableClusterInfo implementations that reconstruct cluster infos individually and wrap them in a group will override this

/**
* Creates a {@link CassandraClusterInfoGroup} from a pre-built list of {@link ClusterInfo} instances.
* This factory is intended for custom {@link IBroadcastableClusterInfo} implementations that reconstruct
* cluster infos individually and need to wrap them in a group.
*
* @param clusterInfos the list of already-reconstructed ClusterInfo instances
* @return a new CassandraClusterInfoGroup
*/
public static CassandraClusterInfoGroup createFrom(List<ClusterInfo> clusterInfos)
{
return new CassandraClusterInfoGroup(clusterInfos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
import org.apache.commons.lang3.StringUtils;

import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext;
import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
import org.apache.cassandra.spark.bulkwriter.BroadcastableJobInfo;
import org.apache.cassandra.spark.bulkwriter.BroadcastableSchemaInfo;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.DataTransport;
import org.apache.cassandra.spark.bulkwriter.IBroadcastableClusterInfo;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;

Expand All @@ -50,7 +55,7 @@ public CassandraCoordinatedBulkWriterContext(@NotNull BulkSparkConf conf,
}

/**
* Constructor used by {@link org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)} factory method.
* Constructor used by {@link BulkWriterConfig#toBulkWriterContext()}.
* This constructor is only used on executors to reconstruct context from broadcast config.
*
* @param config immutable configuration for the bulk writer
Expand Down Expand Up @@ -115,4 +120,22 @@ protected CassandraClusterInfoGroup clusterInfoGroup()
{
return (CassandraClusterInfoGroup) cluster();
}

@Override
public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext)
{
CassandraClusterInfoGroup multiCluster = clusterInfoGroup();
IBroadcastableClusterInfo broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(multiCluster, bulkSparkConf());
BroadcastableJobInfo broadcastableJobInfo = BroadcastableJobInfo.from(job(), bulkSparkConf());
BroadcastableSchemaInfo broadcastableSchemaInfo = BroadcastableSchemaInfo.from(schema());

return new BulkWriterConfig(
bulkSparkConf(),
sparkContext.defaultParallelism(),
broadcastableJobInfo,
broadcastableClusterInfo,
broadcastableSchemaInfo,
lowestCassandraVersion()
);
}
}
Loading
Loading