Skip to content

Commit

Permalink
Implement SQL connector preferredLocalParallelism option (#254)
Browse files Browse the repository at this point in the history
This PR implements a new SQL mapping option to define the preferred
local parallelism for connectors that support configuring it. At the
moment, only the Kafka connector supports this. See linked issue for
more details.

Fixes #24357
Closes #26194

Co-authored-by: Hugo Hromic <hhromic@gmail.com>
GitOrigin-RevId: d80406e5a54db45a2bedc435c1956d221cc42d6a
  • Loading branch information
hhromic authored and actions-user committed Jan 29, 2024
1 parent f02d170 commit 4f9b037
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ public interface SqlConnector {

String OPTION_TYPE_AVRO_SCHEMA = "avroSchema";

/**
* The preferred local parallelism for connectors that support configuring it.
*/
String OPTION_PREFERRED_LOCAL_PARALLELISM = "preferredLocalParallelism";

/**
* Value for {@value #OPTION_KEY_FORMAT} and {@value #OPTION_VALUE_FORMAT}
* for Java serialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.kafka.KafkaProcessors;
import com.hazelcast.jet.kafka.impl.StreamKafkaP;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.jet.sql.impl.connector.HazelcastRexNode;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
Expand Down Expand Up @@ -144,7 +143,7 @@ public Vertex fullScanReader(
return context.getDag().newUniqueVertex(
table.toString(),
ProcessorMetaSupplier.of(
StreamKafkaP.PREFERRED_LOCAL_PARALLELISM,
table.preferredLocalParallelism(),
new RowProjectorProcessorSupplier(
table.kafkaConsumerProperties(),
table.dataConnectionName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.jet.kafka.impl.StreamKafkaP;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.inject.UpsertTargetDescriptor;
import com.hazelcast.jet.sql.impl.schema.JetTable;
Expand Down Expand Up @@ -115,6 +116,13 @@ QueryDataType[] types() {
return getFields().stream().map(TableField::getType).toArray(QueryDataType[]::new);
}

int preferredLocalParallelism() {
if (options.containsKey(SqlConnector.OPTION_PREFERRED_LOCAL_PARALLELISM)) {
return Integer.parseInt(options.get(SqlConnector.OPTION_PREFERRED_LOCAL_PARALLELISM));
}
return StreamKafkaP.PREFERRED_LOCAL_PARALLELISM;
}

@Override
public PlanObjectKey getObjectKey() {
return new KafkaPlanObjectKey(getSchemaName(), getSqlName(), topicName(), getFields(), options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_AVRO_SCHEMA;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_CLASS;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_PREFERRED_LOCAL_PARALLELISM;

final class PropertiesResolver {
static final String KEY_SERIALIZER = "key.serializer";
Expand All @@ -48,7 +49,8 @@ final class PropertiesResolver {
OPTION_KEY_FORMAT,
OPTION_KEY_CLASS,
OPTION_VALUE_FORMAT,
OPTION_VALUE_CLASS
OPTION_VALUE_CLASS,
OPTION_PREFERRED_LOCAL_PARALLELISM
);

// using strings instead of canonical names to not fail without Kafka on the classpath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.kafka.impl.StreamKafkaP;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaTable.KafkaPlanObjectKey;
import com.hazelcast.sql.impl.schema.TableField;
import com.hazelcast.sql.impl.type.QueryDataType;
Expand All @@ -27,9 +28,12 @@

import java.util.Map;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_PREFERRED_LOCAL_PARALLELISM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@RunWith(JUnitParamsRunner.class)
public class KafkaTableTest {
Expand Down Expand Up @@ -72,4 +76,46 @@ public void test_objectKeyEquality(
assertThat(k1.equals(k2)).isEqualTo(expected);
assertThat(k1.hashCode() == k2.hashCode()).isEqualTo(expected);
}

@SuppressWarnings("unused")
private Object[] preferredLocalParallelisms() {
return new Object[]{
new Object[]{"-2", -2, false},
new Object[]{"-1", -1, false},
new Object[]{"0", 0, false},
new Object[]{"1", 1, false},
new Object[]{"2", 2, false},
new Object[]{"not-an-int", null, true},
new Object[]{"3.14159", null, true},
};
}

@Test
@Parameters(method = "preferredLocalParallelisms")
public void when_preferredLocalParallelism_isDefined_then_parseInt(String plp, Integer expected, boolean shouldThrow) {
KafkaTable table = new KafkaTable(
null, null, null, null, null, null, null,
Map.of(OPTION_PREFERRED_LOCAL_PARALLELISM, plp),
null, null, null, null, null
);

if (shouldThrow) {
assertThatThrownBy(() -> table.preferredLocalParallelism())
.isInstanceOf(NumberFormatException.class);
} else {
assertThat(table.preferredLocalParallelism()).isEqualTo(expected);
}
}

@Test
public void when_preferredLocalParallelism_isNotDefined_then_useDefault() {
KafkaTable table = new KafkaTable(
null, null, null, null, null, null, null,
emptyMap(),
null, null, null, null, null
);

assertThat(table.preferredLocalParallelism())
.isEqualTo(StreamKafkaP.PREFERRED_LOCAL_PARALLELISM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_AVRO_SCHEMA;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_CLASS;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_VALUE_FORMAT;
import static com.hazelcast.jet.sql.impl.connector.SqlConnector.OPTION_PREFERRED_LOCAL_PARALLELISM;
import static com.hazelcast.jet.sql.impl.connector.kafka.PropertiesResolver.KEY_DESERIALIZER;
import static com.hazelcast.jet.sql.impl.connector.kafka.PropertiesResolver.KEY_SERIALIZER;
import static com.hazelcast.jet.sql.impl.connector.kafka.PropertiesResolver.VALUE_DESERIALIZER;
Expand Down Expand Up @@ -446,4 +447,16 @@ private static Properties resolveConsumerProperties(Map<String, String> options)
private static Properties resolveProducerProperties(Map<String, String> options) {
return PropertiesResolver.resolveProducerProperties(options, null, null);
}

@Test
public void when_consumerProperties_preferredLocalParallelismPropertyIsDefined_then_itIsIgnored() {
assertThat(resolveConsumerProperties(Map.of(OPTION_PREFERRED_LOCAL_PARALLELISM, "-1")))
.containsExactlyEntriesOf(Map.of(KEY_DESERIALIZER, ByteArrayDeserializer.class.getCanonicalName()));
}

@Test
public void when_producerProperties_preferredLocalParallelismPropertyIsDefined_then_itIsIgnored() {
assertThat(resolveProducerProperties(Map.of(OPTION_PREFERRED_LOCAL_PARALLELISM, "-1")))
.containsExactlyEntriesOf(Map.of(KEY_SERIALIZER, ByteArraySerializer.class.getCanonicalName()));
}
}

0 comments on commit 4f9b037

Please sign in to comment.