CASSANALYTICS-168 Need the ability to broadcast and reconstruct subclasses on executors#205
CASSANALYTICS-168 Need the ability to broadcast and reconstruct subclasses on executors#205skoppu22 wants to merge 7 commits into
Conversation
|
|
||
| // Extract only broadcast-safe cluster metadata | ||
|
|
||
| // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable |
There was a problem hiding this comment.
Currently the distinction on what is transient and what is not is implicit, derived from the verbiage in this method. How can we instead make it clear within the ClusterInfo what fields are serializable and what are not? Brainstorming, thinking:
- javadoc comments
- usage of an
@Serializableand@Serialinterface (kind of overloading and using in a different way than the formal usage but would annotate the intent) - Adding our own
@Immutablestyle interface for something or otherwise denoting the fields final or pushing them to being final if appropriate
Having the serializability state of these fields denoted here in comments is brittle and runs a real risk of drift; changes in ClusterInfo could easily break these contracts in the future w/out another maintainer realizing it.
There was a problem hiding this comment.
Removed these comments. In CassandraClusterInfo, grouped fields by serializable state and added comments
| public BulkWriterContext toBulkWriterContext() | ||
| { | ||
| BulkSparkConf conf = getConf(); | ||
| if (conf.isCoordinatedWriteConfigured()) |
There was a problem hiding this comment.
stylistic nit: you could rewrite this as:
return conf.isCoordinatedWriteConfigured() ?
new CassandraCoordinatedBulkWriterContext(this) :
new CassandraBulkWriterContext(this);
Whether or not you think that's more clear is another story entirely. :)
|
|
||
| // Extract only broadcast-safe cluster metadata | ||
|
|
||
| // ClusterInfo has transient fields (CassandraContext, token mappings) that are not serializable |
There was a problem hiding this comment.
Same as above - how can we make this more explicit near the source of the data and its serializability (and reflect the downstream expectation of that serializability) instead of having that information and expectation only reflected here?
|
|
||
| BulkWriterContext context = customConfig.toBulkWriterContext(); | ||
| assertThat(context).isNotNull(); | ||
| // The OSS default would return CassandraBulkWriterContext or CassandraCoordinatedBulkWriterContext, |
There was a problem hiding this comment.
The OSS default <- this is the OSS project. Is this comment from another context and need to refine here?
| return new CassandraClusterInfoGroup(clusterInfos); | ||
| } | ||
|
|
||
| @VisibleForTesting // ONLY FOR TESTING |
There was a problem hiding this comment.
Why removing the annotation? I think it is still only used by test code
There was a problem hiding this comment.
This method is no longer test-only, custom IBroadcastableClusterInfo implementations that reconstruct cluster infos individually and wrap them in a group will override this
yifan-c
left a comment
There was a problem hiding this comment.
Some nits. The patch LGTM
| @Override | ||
| public BulkWriterConfig toBulkWriterConfigForBroadcasting(JavaSparkContext sparkContext) | ||
| { | ||
| CassandraClusterInfoGroup multiCluster = (CassandraClusterInfoGroup) cluster(); |
| @Test | ||
| void testReconstructJobInfoOnExecutorCanBeOverridden() | ||
| { | ||
| JobInfo expectedJobInfo = mock(JobInfo.class); | ||
| ClusterInfo mockCluster = mock(ClusterInfo.class); | ||
|
|
||
| IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() | ||
| { | ||
| @Override | ||
| public Partitioner getPartitioner() | ||
| { | ||
| return Partitioner.Murmur3Partitioner; | ||
| } | ||
|
|
||
| @Override | ||
| public String getLowestCassandraVersion() | ||
| { | ||
| return "4.0.0"; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public String clusterId() | ||
| { | ||
| return null; | ||
| } | ||
|
|
||
| @NotNull | ||
| @Override | ||
| public BulkSparkConf getConf() | ||
| { | ||
| return mock(BulkSparkConf.class); | ||
| } | ||
|
|
||
| @Override | ||
| public ClusterInfo reconstruct() | ||
| { | ||
| return mockCluster; | ||
| } | ||
| }; | ||
|
|
||
| BulkSparkConf mockConf = mock(BulkSparkConf.class); | ||
| BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); | ||
| when(mockJobInfo.getConf()).thenReturn(mockConf); | ||
| when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); | ||
| BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); | ||
|
|
||
| BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); | ||
|
|
||
| // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo | ||
| TestBulkWriterContext context = new TestBulkWriterContext(config) | ||
| { | ||
| @Override | ||
| protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) | ||
| { | ||
| return expectedJobInfo; | ||
| } | ||
| }; | ||
|
|
||
| assertThat(context.job()).isSameAs(expectedJobInfo); | ||
| } |
There was a problem hiding this comment.
customBroadcastable is unnecessary. You can simply the test as the below.
| @Test | |
| void testReconstructJobInfoOnExecutorCanBeOverridden() | |
| { | |
| JobInfo expectedJobInfo = mock(JobInfo.class); | |
| ClusterInfo mockCluster = mock(ClusterInfo.class); | |
| IBroadcastableClusterInfo customBroadcastable = new IBroadcastableClusterInfo() | |
| { | |
| @Override | |
| public Partitioner getPartitioner() | |
| { | |
| return Partitioner.Murmur3Partitioner; | |
| } | |
| @Override | |
| public String getLowestCassandraVersion() | |
| { | |
| return "4.0.0"; | |
| } | |
| @Nullable | |
| @Override | |
| public String clusterId() | |
| { | |
| return null; | |
| } | |
| @NotNull | |
| @Override | |
| public BulkSparkConf getConf() | |
| { | |
| return mock(BulkSparkConf.class); | |
| } | |
| @Override | |
| public ClusterInfo reconstruct() | |
| { | |
| return mockCluster; | |
| } | |
| }; | |
| BulkSparkConf mockConf = mock(BulkSparkConf.class); | |
| BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); | |
| when(mockJobInfo.getConf()).thenReturn(mockConf); | |
| when(mockJobInfo.getRestoreJobIds()).thenReturn(MultiClusterContainer.ofSingle(UUID.randomUUID())); | |
| BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); | |
| BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, customBroadcastable, mockSchemaInfo, "4.0.0"); | |
| // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo | |
| TestBulkWriterContext context = new TestBulkWriterContext(config) | |
| { | |
| @Override | |
| protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) | |
| { | |
| return expectedJobInfo; | |
| } | |
| }; | |
| assertThat(context.job()).isSameAs(expectedJobInfo); | |
| } | |
| @Test | |
| void testReconstructJobInfoOnExecutorCanBeOverridden() | |
| { | |
| JobInfo expectedJobInfo = mock(JobInfo.class); | |
| BulkSparkConf mockConf = mock(BulkSparkConf.class); | |
| BroadcastableJobInfo mockJobInfo = mock(BroadcastableJobInfo.class); | |
| BroadcastableSchemaInfo mockSchemaInfo = mock(BroadcastableSchemaInfo.class); | |
| BulkWriterConfig config = new BulkWriterConfig(mockConf, 4, mockJobInfo, mock(IBroadcastableClusterInfo.class), mockSchemaInfo, "4.0.0"); | |
| // Subclass that overrides reconstructJobInfoOnExecutor to return custom JobInfo | |
| TestBulkWriterContext context = new TestBulkWriterContext(config) | |
| { | |
| @Override | |
| protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo jobInfo) | |
| { | |
| return expectedJobInfo; | |
| } | |
| }; | |
| assertThat(context.job()).isSameAs(expectedJobInfo); | |
| } |
8132278 to
50090d7
Compare
50090d7 to
05811d5
Compare
|
Committed using the legacy way as the PR is missing the update in Commit is 652cc17 |
After the BulkWriterConfig broadcast refactor f960685, bulk writer’s context/cluster/config subclasses cannot be instantiated on executors. For any job whose driver-side context was instantiated from a subclass, the executor silently instantiates base class implementations. Hence need to add the ability to broadcast and reconstruct subclasses on executors.
Circle CI link: https://app.circleci.com/pipelines/github/skoppu22/cassandra-analytics/116/workflows/6b53f2bd-017f-4dbe-917f-8d6794dfd24b