From ad6e6b05dd6a3deb047a26d0af732b8deaf7df0a Mon Sep 17 00:00:00 2001 From: Mingde Peng Date: Fri, 1 Jul 2022 11:02:22 +0800 Subject: [PATCH] [FLINK-27964][python] Support Cassandra connector in Python DataStream API --- .../cassandra/SimpleClusterBuilder.java | 507 ++++++++++++++++++ .../datastream/connectors/cassandra.py | 351 +++++++++++- .../connectors/tests/test_cassandra.py | 86 +++ .../connectors/tests/test_connectors.py | 30 -- 4 files changed, 939 insertions(+), 35 deletions(-) create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java create mode 100644 flink-python/pyflink/datastream/connectors/tests/test_cassandra.py diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java new file mode 100644 index 0000000000000..e71e74d9b6985 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metrics; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.policies.AddressTranslator; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.NoSpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.Policies; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +/** A Simple ClusterBuilder which is currently used in PyFlink Cassandra connector. */ +public class SimpleClusterBuilder extends ClusterBuilder implements Serializable { + + private static final long serialVersionUID = 1L; + + // Transit map for builder cluster + public final Map clusterMap; + // ClusterBuilder method constant + private static final String CLUSTER_NAME = "clusterName"; + private static final String CLUSTER_PORT = "clusterPort"; + private static final String ALLOW_BETA_PROTOCOL_VERSION = "allowBetaProtocolVersion"; + private static final String MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "maxSchemaAgreementWaitSeconds"; + private static final String CONTACT_POINT = "contactPoint"; + private static final String CONTACT_POINTS = "contactPoints"; + private static final String CONTACT_POINTS_WITH_PORTS = "contactPointsWithPorts"; + private static final String LOAD_BALANCING_POLICY = "loadBalancingPolicy"; + private static final String RECONNECTION_POLICY = "reconnectionPolicy"; + private static final String RETRY_POLICY = "retryPolicy"; + private static final String SPECULATIVE_EXECUTION_POLICY = "speculativeExecutionPolicy"; + private static final String CREDENTIALS = "credentials"; + private static final String WITHOUT_METRICS = "withoutMetrics"; + private static final String WITHOUT_JMX_REPORTING = "withoutJMXReporting"; + private static final String NO_COMPACT = "noCompact"; + // LoadBalancingPolicies support for PyFlink + private static final String ROUND_ROBIN_POLICY = "roundRobinPolicy"; + private static final String DC_AWARE_ROUND_ROBIN_POLICY = "dCAwareRoundRobinPolicy"; + // ReconnectionPolicies support for PyFlink + private static final String EXPONENTIAL_RECONNECTION_POLICY = "exponentialReconnectionPolicy"; + private static final String CONSTANT_RECONNECTION_POLICY = "constantReconnectionPolicy"; + // RetryPolicies support for PyFlink + private static final String CONSISTENCY_RETRY_POLICY = "consistencyRetryPolicy"; + private static final String FALLTHROUGH_RETRY_POLICY = "fallthroughRetryPolicy"; + // SpeculativeExecutionPolicies support for PyFlink + private static final String NO_SPECULATIVE_EXECUTION_POLICY = "noSpeculativeExecutionPolicy"; + private static final String CONSTANT_SPECULATIVE_EXECUTION_POLICY = + "constantSpeculativeExecutionPolicy"; + // True index for mark the configuration take effect + private static final String TRUE_INDEX = "true"; + + public SimpleClusterBuilder() { + clusterMap = new HashMap<>(); + } + + /** + * An optional name for the create cluster. + * + *

Note: this is not related to the Cassandra cluster name (though you are free to provide + * the same name). See {@link Cluster#getClusterName} for details. + * + *

If you use this method and create more than one Cluster instance in the same JVM (which + * should be avoided unless you need to connect to multiple Cassandra clusters), you should make + * sure each Cluster instance get a unique name or you may have a problem with JMX reporting. + * + * @param name the cluster name to use for the created Cluster instance. + * @return this Builder. + */ + public SimpleClusterBuilder withClusterName(String name) { + this.clusterMap.put(CLUSTER_NAME, name); + return this; + } + + /** + * The port to use to connect to the Cassandra host. + * + *

If not set through this method, the default port (9042) will be used instead. + * + * @param port the port to set. + * @return this Builder. + */ + public SimpleClusterBuilder withPort(int port) { + this.clusterMap.put(CLUSTER_PORT, port); + return this; + } + + /** + * Create cluster connection using latest development protocol version, which is currently in + * beta. Calling this method will result into setting USE_BETA flag in all outgoing messages, + * which allows server to negotiate the supported protocol version even if it is currently in + * beta. + * + *

This feature is only available starting with version {@link ProtocolVersion#V5 V5}. + * + *

Use with caution, refer to the server and protocol documentation for the details on latest + * protocol version. + * + * @return this Builder. + */ + public SimpleClusterBuilder allowBetaProtocolVersion() { + this.clusterMap.put(ALLOW_BETA_PROTOCOL_VERSION, TRUE_INDEX); + return this; + } + + /** + * Sets the maximum time to wait for schema agreement before returning from a DDL query. + * + *

If not set through this method, the default value (10 seconds) will be used. + * + * @param maxSchemaAgreementWaitSeconds the new value to set. + * @return this Builder. + * @throws IllegalStateException if the provided value is zero or less. + */ + public SimpleClusterBuilder withMaxSchemaAgreementWaitSeconds( + int maxSchemaAgreementWaitSeconds) { + this.clusterMap.put(MAX_SCHEMA_AGREEMENT_WAIT_SECONDS, maxSchemaAgreementWaitSeconds); + return this; + } + + /** + * Adds a contact point - or many if the given address resolves to multiple InetAddress + * s (A records). + * + *

Contact points are addresses of Cassandra nodes that the driver uses to discover the + * cluster topology. Only one contact point is required (the driver will retrieve the address of + * the other nodes automatically), but it is usually a good idea to provide more than one + * contact point, because if that single contact point is unavailable, the driver cannot + * initialize itself correctly. + * + *

Note that by default (that is, unless you use the {@link #withLoadBalancingPolicy}) method + * of this builder), the first successfully contacted host will be used to define the local + * data-center for the client. If follows that if you are running Cassandra in a multiple + * data-center setting, it is a good idea to only provide contact points that are in the same + * datacenter than the client, or to provide manually the load balancing policy that suits your + * need. + * + *

If the host name points to a DNS record with multiple a-records, all InetAddresses + * returned will be used. Make sure that all resulting InetAddresss returned point + * to the same cluster and datacenter. + * + * @param address the address of the node(s) to connect to. + * @return this Builder. + * @throws IllegalArgumentException if the given {@code address} could not be resolved. + * @throws SecurityException if a security manager is present and permission to resolve the host + * name is denied. + */ + public SimpleClusterBuilder addContactPoint(String address) { + this.clusterMap.put(CONTACT_POINT, address); + return this; + } + + /** + * Adds contact points. + * + *

See {@link Cluster.Builder#addContactPoint} for more details on contact points. + * + *

Note that all contact points must be resolvable; if any of them cannot be + * resolved, this method will fail. + * + * @param addresses addresses of the nodes to add as contact points. + * @return this Builder. + * @throws IllegalArgumentException if any of the given {@code addresses} could not be resolved. + * @throws SecurityException if a security manager is present and permission to resolve the host + * name is denied. + * @see Cluster.Builder#addContactPoint + */ + public SimpleClusterBuilder addContactPoints(String... addresses) { + this.clusterMap.put(CONTACT_POINTS, addresses); + return this; + } + + /** + * Adds contact points. + * + *

See {@link Cluster.Builder#addContactPoint} for more details on contact points. Contrarily + * to other {@code addContactPoints} methods, this method allows to provide a different port for + * each contact point. Since Cassandra nodes must always all listen on the same port, this is + * rarely what you want and most users should prefer other {@code addContactPoints} methods to + * this one. However, this can be useful if the Cassandra nodes are behind a router and are not + * accessed directly. Note that if you are in this situation (Cassandra nodes are behind a + * router, not directly accessible), you almost surely want to provide a specific {@link + * AddressTranslator} (through {@link #withAddressTranslator}) to translate actual Cassandra + * node addresses to the addresses the driver should use, otherwise the driver will not be able + * to auto-detect new nodes (and will generally not function optimally). + * + * @param addresses addresses of the nodes to add as contact points. + * @return this Builder + * @see Cluster.Builder#addContactPoint + */ + public SimpleClusterBuilder addContactPointsWithPorts(InetSocketAddress... addresses) { + this.clusterMap.put(CONTACT_POINTS_WITH_PORTS, addresses); + return this; + } + + /** + * Configures the load balancing policy to use for the new cluster. + * + *

If no load balancing policy is set through this method, {@link + * Policies#defaultLoadBalancingPolicy} will be used instead. + * + * @param policy the load balancing policy to use. + * @return this Builder. + */ + public SimpleClusterBuilder withLoadBalancingPolicy(LoadBalancingPolicy policy) { + if (policy instanceof RoundRobinPolicy) { + this.clusterMap.put(LOAD_BALANCING_POLICY, ROUND_ROBIN_POLICY); + } else if (policy instanceof DCAwareRoundRobinPolicy) { + this.clusterMap.put(LOAD_BALANCING_POLICY, DC_AWARE_ROUND_ROBIN_POLICY); + } + return this; + } + + /** + * Configures the reconnection policy to use for the new cluster. + * + *

If no reconnection policy is set through this method, {@link + * Policies#DEFAULT_RECONNECTION_POLICY} will be used instead. + * + * @param policy the reconnection policy to use. + * @return this Builder. + */ + public SimpleClusterBuilder withReconnectionPolicy(ReconnectionPolicy policy) { + StringBuilder builder = new StringBuilder(); + + if (policy instanceof ExponentialReconnectionPolicy) { + long baseDelayMs = ((ExponentialReconnectionPolicy) policy).getBaseDelayMs(); + long maxDelayMs = ((ExponentialReconnectionPolicy) policy).getMaxDelayMs(); + String policyInfo = + builder.append(EXPONENTIAL_RECONNECTION_POLICY) + .append("|") + .append(baseDelayMs) + .append("|") + .append(maxDelayMs) + .toString(); + this.clusterMap.put(RECONNECTION_POLICY, policyInfo); + } else if (policy instanceof ConstantReconnectionPolicy) { + long constantDelayMs = ((ConstantReconnectionPolicy) policy).getConstantDelayMs(); + String policyInfo = + builder.append(CONSTANT_RECONNECTION_POLICY) + .append("|") + .append(constantDelayMs) + .toString(); + this.clusterMap.put(RECONNECTION_POLICY, policyInfo); + } + return this; + } + + /** + * Configures the retry policy to use for the new cluster. + * + *

If no retry policy is set through this method, {@link Policies#DEFAULT_RETRY_POLICY} will + * be used instead. + * + * @param policy the retry policy to use. + * @return this Builder. + */ + public SimpleClusterBuilder withRetryPolicy(RetryPolicy policy) { + if (policy instanceof DefaultRetryPolicy) { + this.clusterMap.put(RETRY_POLICY, CONSISTENCY_RETRY_POLICY); + } else if (policy instanceof FallthroughRetryPolicy) { + this.clusterMap.put(RETRY_POLICY, FALLTHROUGH_RETRY_POLICY); + } + return this; + } + + /** + * Configures the speculative execution policy to use for the new cluster. + * + *

If no policy is set through this method, {@link + * Policies#defaultSpeculativeExecutionPolicy()} will be used instead. + * + * @param policy the policy to use. + * @return this Builder. + */ + public SimpleClusterBuilder withSpeculativeExecutionPolicy(SpeculativeExecutionPolicy policy) { + if (policy instanceof NoSpeculativeExecutionPolicy) { + this.clusterMap.put(SPECULATIVE_EXECUTION_POLICY, NO_SPECULATIVE_EXECUTION_POLICY); + } else if (policy instanceof ConstantSpeculativeExecutionPolicyExt) { + long delayMillis = ((ConstantSpeculativeExecutionPolicyExt) policy).getDelayMillis(); + int maxExecutions = ((ConstantSpeculativeExecutionPolicyExt) policy).getMaxExecutions(); + String policyInfo = + CONSTANT_SPECULATIVE_EXECUTION_POLICY + "|" + delayMillis + "|" + maxExecutions; + this.clusterMap.put(SPECULATIVE_EXECUTION_POLICY, policyInfo); + } + return this; + } + + /** + * Uses the provided credentials when connecting to Cassandra hosts. + * + *

This should be used if the Cassandra cluster has been configured to use the {@code + * PasswordAuthenticator}. If the the default {@code AllowAllAuthenticator} is used instead, + * using this method has no effect. + * + * @param username the username to use to login to Cassandra hosts. + * @param password the password corresponding to {@code username}. + * @return this Builder. + */ + public SimpleClusterBuilder withCredentials(String username, String password) { + String credentialsInfo = CREDENTIALS + "|" + username + "|" + password; + this.clusterMap.put(CREDENTIALS, credentialsInfo); + return this; + } + + /** + * Disables metrics collection for the created cluster (metrics are enabled by default + * otherwise). + * + * @return this builder. + */ + public SimpleClusterBuilder withoutMetrics() { + this.clusterMap.put(WITHOUT_METRICS, TRUE_INDEX); + return this; + } + + /** + * Disables JMX reporting of the metrics. + * + *

JMX reporting is enabled by default (see {@link Metrics}) but can be disabled using this + * option. If metrics are disabled, this is a no-op. + * + * @return this builder. + */ + public SimpleClusterBuilder withoutJMXReporting() { + this.clusterMap.put(WITHOUT_JMX_REPORTING, TRUE_INDEX); + return this; + } + + /** + * Enables the NO_COMPACT startup option. + * + * @return this builder. + * @see CASSANDRA-10857 + */ + public SimpleClusterBuilder withNoCompact() { + this.clusterMap.put(NO_COMPACT, TRUE_INDEX); + return this; + } + + /** + * Configures the connection to Cassandra. The configuration is done by calling methods on the + * builder object and finalizing the configuration with build(). + * + * @param builder connection builder + * @return configured connection + */ + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + String clusterName = (String) this.clusterMap.get(CLUSTER_NAME); + String contactPoint = (String) this.clusterMap.get(CONTACT_POINT); + String[] contactPoints = (String[]) this.clusterMap.get(CONTACT_POINTS); + String loadBalancingPolicy = (String) this.clusterMap.get(LOAD_BALANCING_POLICY); + String reconnectionPolicy = (String) this.clusterMap.get(RECONNECTION_POLICY); + String retryPolicy = (String) this.clusterMap.get(RETRY_POLICY); + String credentials = (String) this.clusterMap.get(CREDENTIALS); + String metrics = (String) this.clusterMap.get(WITHOUT_METRICS); + String noCompact = (String) this.clusterMap.get(NO_COMPACT); + String jMXReporting = (String) this.clusterMap.get(WITHOUT_JMX_REPORTING); + String allowBetaProtocolVersion = (String) this.clusterMap.get(ALLOW_BETA_PROTOCOL_VERSION); + Integer port = (Integer) this.clusterMap.get(CLUSTER_PORT); + String speculativeExecutionPolicy = + (String) this.clusterMap.get(SPECULATIVE_EXECUTION_POLICY); + InetSocketAddress[] contactPointsWithPorts = + (InetSocketAddress[]) this.clusterMap.get(CONTACT_POINTS_WITH_PORTS); + Integer maxSchemaAgreementWaitSeconds = + (Integer) this.clusterMap.get(MAX_SCHEMA_AGREEMENT_WAIT_SECONDS); + if (clusterName != null) { + builder.withClusterName(clusterName); + } + if (port != null) { + builder.withPort(port); + } + if (allowBetaProtocolVersion != null) { + builder.allowBetaProtocolVersion(); + } + if (maxSchemaAgreementWaitSeconds != null) { + builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds); + } + if (contactPoint != null) { + builder.addContactPoint(contactPoint); + } + if (contactPoints != null) { + builder.addContactPoints(contactPoints); + } + if (contactPointsWithPorts != null) { + builder.addContactPointsWithPorts(contactPointsWithPorts); + } + if (loadBalancingPolicy != null) { + if (ROUND_ROBIN_POLICY.equals(loadBalancingPolicy)) { + builder.withLoadBalancingPolicy(new RoundRobinPolicy()); + } else if (DC_AWARE_ROUND_ROBIN_POLICY.equals(loadBalancingPolicy)) { + builder.withLoadBalancingPolicy( + new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())); + } + } + if (reconnectionPolicy != null) { + String[] split = reconnectionPolicy.split("\\|"); + String name = split[0]; + + if (EXPONENTIAL_RECONNECTION_POLICY.equals(name)) { + int baseDelayMs = Integer.parseInt(split[1]); + int maxDelayMs = Integer.parseInt(split[2]); + builder.withReconnectionPolicy( + new ExponentialReconnectionPolicy(baseDelayMs, maxDelayMs)); + } else if (CONSTANT_RECONNECTION_POLICY.equals(name)) { + int constantDelayMs = Integer.parseInt(split[1]); + builder.withReconnectionPolicy(new ConstantReconnectionPolicy(constantDelayMs)); + } + } + if (retryPolicy != null) { + if (CONSISTENCY_RETRY_POLICY.equals(retryPolicy)) { + builder.withRetryPolicy(Policies.defaultRetryPolicy()); + } else if (FALLTHROUGH_RETRY_POLICY.equals(retryPolicy)) { + builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); + } + } + if (speculativeExecutionPolicy != null) { + if (NO_SPECULATIVE_EXECUTION_POLICY.equals(speculativeExecutionPolicy)) { + builder.withSpeculativeExecutionPolicy(NoSpeculativeExecutionPolicy.INSTANCE); + } + } + if (credentials != null) { + String[] split = credentials.split("\\|"); + String userName = split[1]; + String passWord = split[2]; + builder.withCredentials(userName, passWord); + } + if (jMXReporting != null) { + builder.withoutJMXReporting(); + } + if (metrics != null) { + builder.withoutMetrics(); + } + if (noCompact != null) { + builder.withNoCompact(); + } + return builder.build(); + } +} + +/** Extend ConstantSpeculativeExecutionPolicy. */ +class ConstantSpeculativeExecutionPolicyExt extends ConstantSpeculativeExecutionPolicy { + /** the number of speculative executions. Must be strictly positive. */ + private final int maxExecutions; + /** + * The delay between each speculative execution. Must be >= 0. A zero delay means it should + * immediately send `maxSpeculativeExecutions` requests along with the original request. + */ + private final long delayMillis; + /** + * Builds a new instance. + * + * @param delayMillis the delay between each speculative execution. Must be >= 0. A zero delay + * means it should immediately send `maxSpeculativeExecutions` requests along with the + * original request. + * @param maxExecutions the number of speculative executions. Must be strictly positive. + * @throws IllegalArgumentException if one of the arguments does not respect the preconditions + * above. + */ + public ConstantSpeculativeExecutionPolicyExt(long delayMillis, int maxExecutions) { + super(delayMillis, maxExecutions); + this.delayMillis = delayMillis; + this.maxExecutions = maxExecutions; + } + + /** Get the number of speculative executions. */ + public int getMaxExecutions() { + return maxExecutions; + } + + /** Get the delay between each speculative execution. */ + public long getDelayMillis() { + return delayMillis; + } +} diff --git a/flink-python/pyflink/datastream/connectors/cassandra.py b/flink-python/pyflink/datastream/connectors/cassandra.py index 4cca90753e5b9..820b96e539a8c 100644 --- a/flink-python/pyflink/datastream/connectors/cassandra.py +++ b/flink-python/pyflink/datastream/connectors/cassandra.py @@ -19,13 +19,19 @@ from pyflink.common import Duration from pyflink.java_gateway import get_gateway +from pyflink.util.java_utils import to_jarray __all__ = ['ConsistencyLevel', 'MapperOptions', + 'LoadBalancingPolicy', + 'ReconnectionPolicy', + 'RetryPolicy', + 'SpeculativeExecutionPolicy', 'ClusterBuilder', 'CassandraCommitter', - 'CassandraFailureHandler'] + 'CassandraFailureHandler' + ] # ---- Classes introduced to construct the MapperOptions ---- @@ -126,14 +132,349 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions': return self +# ---- Classes introduced to construct the ClusterBuilder ---- + + +class LoadBalancingPolicy(object): + """ + The policy that decides which Cassandra hosts to contact for each new query. + + The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy + is expected to exclude down hosts from query plans. + """ + + def __init__(self, j_load_balancing_policy): + self._j_load_balancing_policy = j_load_balancing_policy + + @staticmethod + def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy': + """ + A DCAwareRoundRobinPolicy with token awareness. + + This is also the default load balancing policy. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy()) + + @staticmethod + def round_robin_policy() -> 'LoadBalancingPolicy': + """ + A Round-robin load balancing policy. + + This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the + next one (following the round-robin order) is tried, until all hosts have been tried. + + This policy is not datacenter aware and will include every known Cassandra hosts in its + round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will + want to use the DCAwareRoundRobinPolicy load balancing policy instead. + """ + JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy + return LoadBalancingPolicy(JRoundRobinPolicy()) + + +class ReconnectionPolicy(object): + """ + Policy that decides how often the reconnection to a dead node is attempted. + + Note that if the driver receives a push notification from the Cassandra cluster that a node is + UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be + created (in effect, the driver reset the scheduler). + + The default ExponentialReconnectionPolicy policy is usually adequate. + """ + + def __init__(self, j_reconnection_policy): + self._j_reconnection_policy = j_reconnection_policy + + @staticmethod + def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \ + -> 'ReconnectionPolicy': + """ + The default load reconnection policy. + + A reconnection policy that waits exponentially longer between each reconnection attempt + (but keeps a constant delay once a maximum delay is reached). + """ + JExponentialReconnectionPolicy = get_gateway().jvm. \ + com.datastax.driver.core.policies.ExponentialReconnectionPolicy + return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms)) + + @staticmethod + def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy': + """ + A reconnection policy that waits a constant time between each reconnection attempt. + """ + JConstantReconnectionPolicy = get_gateway().jvm.\ + com.datastax.driver.core.policies.ConstantReconnectionPolicy + return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms)) + + +class RetryPolicy(object): + """ + A policy that defines a default behavior to adopt when a request fails. + + There are three possible decisions: + - RETHROW: no retry should be attempted and an exception should be thrown. + - RETRY: the operation will be retried. The consistency level of the retry should be specified. + - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the + operation that triggered the Cassandra exception will return an empty result set. + """ + + def __init__(self, j_retry_policy): + self._j_retry_policy = j_retry_policy + + @staticmethod + def consistency_retry_policy() -> 'RetryPolicy': + """ + The default retry policy. + + This policy retries queries in only two cases: + - On a read timeout, retries once on the same host if enough replicas replied but data was + not retrieved. + - On a write timeout, retries once on the same host if we timeout while writing the + distributed log used by batch statements. + - On an unavailable exception, retries once on the next host. + - On a request error, such as a client timeout, the query is retried on the next host. + Do not retry on read or write failures. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return RetryPolicy(JPolicies.defaultRetryPolicy()) + + @staticmethod + def fallthrough_retry_policy() -> 'RetryPolicy': + """ + A retry policy that never retries (nor ignores). + """ + JFallthroughRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies. \ + FallthroughRetryPolicy + return RetryPolicy(JFallthroughRetryPolicy.INSTANCE) + + @staticmethod + def logging_retry_policy(policy: 'RetryPolicy') -> 'RetryPolicy': + """ + Creates a new RetryPolicy that logs the decision of policy. + """ + JLoggingRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies. \ + LoggingRetryPolicy + return RetryPolicy(JLoggingRetryPolicy(policy._j_retry_policy)) + + +class SpeculativeExecutionPolicy(object): + """ + The policy that decides if the driver will send speculative queries to the next hosts when the + current host takes too long to respond. + + Note that only idempotent statements will be speculatively retried. + """ + + def __init__(self, j_speculative_execution_policy): + self._j_speculative_execution_policy = j_speculative_execution_policy + + @staticmethod + def no_speculative_execution_policy() -> 'SpeculativeExecutionPolicy': + """ + The default speculative retry policy. + + A SpeculativeExecutionPolicy that never schedules speculative executions. + """ + JNoSpeculativeExecutionPolicy = get_gateway().jvm. \ + com.datastax.driver.core.policies.NoSpeculativeExecutionPolicy + return SpeculativeExecutionPolicy(JNoSpeculativeExecutionPolicy.INSTANCE) + + @staticmethod + def constant_speculative_execution_policy(constant_delay_millis: int, + max_speculative_executions: int) \ + -> 'SpeculativeExecutionPolicy': + """ + A SpeculativeExecutionPolicy that schedules a given number of speculative executions, + separated by a fixed delay. + """ + JConstantSpeculativeExecutionPolicy = get_gateway().jvm. \ + org.apache.flink.streaming.connectors.cassandra.ConstantSpeculativeExecutionPolicyExt + return SpeculativeExecutionPolicy( + JConstantSpeculativeExecutionPolicy(constant_delay_millis, max_speculative_executions)) + + class ClusterBuilder(object): """ This class is used to configure a Cluster after deployment. The cluster represents the connection that will be established to Cassandra. + + A simple method to construct ClusterBuilder. + + Example: + :: + + >>> cluster_builder = ClusterBuilder() \\ + ... .add_contact_points("127.0.0.1") \\ + ... .with_port(9042) \\ + ... .with_cluster_name("cluster_name") \\ + ... .with_credentials("user", "password") """ - def __init__(self, j_cluster_builder): - self._j_cluster_builder = j_cluster_builder + def __init__(self): + JSimpleClusterBuilder = get_gateway().jvm.\ + org.apache.flink.streaming.connectors.cassandra.SimpleClusterBuilder + self._j_cluster_builder = JSimpleClusterBuilder() + + def with_cluster_name(self, name: str) -> 'ClusterBuilder': + """ + An optional name for the creation cluster. + + Note: this is not related to the Cassandra cluster name (though you are free to provide the + same name). + """ + self._j_cluster_builder.withClusterName(name) + return self + + def with_port(self, port: int) -> 'ClusterBuilder': + """ + The port to use to connect to the Cassandra host. + + If not set through this method, the default port (9042) will be used instead. + """ + self._j_cluster_builder.withPort(port) + return self + + def allow_beta_protocol_version(self) -> 'ClusterBuilder': + """ + Create cluster connection using latest development protocol version, which is currently in + beta. Calling this method will result into setting USE_BETA flag in all outgoing messages, + which allows server to negotiate the supported protocol version even if it is currently in + beta. + + This feature is only available starting with version V5. + + Use with caution, refer to the server and protocol documentation for the details on latest + protocol version. + """ + self._j_cluster_builder.allowBetaProtocolVersion() + return self + + def with_max_schema_agreement_wait_seconds(self, max_schema_agreement_wait_seconds: int) \ + -> 'ClusterBuilder': + """ + Sets the maximum time to wait for schema agreement before returning from a DDL query. + + If not set through this method, the default value (10 seconds) will be used. + """ + self._j_cluster_builder.withMaxSchemaAgreementWaitSeconds(max_schema_agreement_wait_seconds) + return self + + def add_contact_point(self, address: str) -> 'ClusterBuilder': + """ + Adds a contact point. + """ + self._j_cluster_builder.addContactPoint(address) + return self + + def add_contact_points(self, *addresses: str) -> 'ClusterBuilder': + """ + Adds contact points. + """ + self._j_cluster_builder.addContactPoints(to_jarray(get_gateway().jvm.String, addresses)) + return self + + def add_contact_points_with_ports(self, *addresses: str) -> 'ClusterBuilder': + """ + Adds contact points with ports. + + Note: All the addresses should be the following format: "hostname:port". + """ + JInetSocketAddress = get_gateway().jvm.java.net.InetSocketAddress + + j_addresses = [] + for add in addresses: + split = add.split(":") + hostname = split[0] + port = int(split[1]) + j_addresses.append(JInetSocketAddress(hostname, port)) + + j_inet_socket_address_array = to_jarray(JInetSocketAddress, j_addresses) + self._j_cluster_builder.addContactPointsWithPorts(j_inet_socket_address_array) + return self + + def with_load_balancing_policy(self, policy: LoadBalancingPolicy) -> 'ClusterBuilder': + """ + Configures the load balancing policy to use for the new cluster. + + If no load balancing policy is set through this method, the DCAwareRoundRobinPolicy will be + used instead. + """ + self._j_cluster_builder.withLoadBalancingPolicy(policy._j_load_balancing_policy) + return self + + def with_reconnection_policy(self, policy: ReconnectionPolicy) -> 'ClusterBuilder': + """ + Configures the reconnection policy to use for the new cluster. + + If no reconnection policy is set through this method, the ExponentialReconnectionPolicy + will be used instead. + """ + self._j_cluster_builder.withReconnectionPolicy(policy._j_reconnection_policy) + return self + + def with_retry_policy(self, policy: RetryPolicy) -> 'ClusterBuilder': + """ + Configures the retry policy to use for the new cluster. + + If no retry policy is set through this method, Consistency Retry policy will be used + instead. + """ + self._j_cluster_builder.withRetryPolicy(policy._j_retry_policy) + return self + + def with_speculative_execution_policy(self, policy: SpeculativeExecutionPolicy) \ + -> 'ClusterBuilder': + """ + Configures the speculative execution policy to use for the new cluster. + """ + self._j_cluster_builder.withSpeculativeExecutionPolicy( + policy._j_speculative_execution_policy) + return self + + def with_credentials(self, username: str, password: str) -> 'ClusterBuilder': + """ + Uses the provided credentials when connecting to Cassandra hosts. + + This should be used if the Cassandra cluster has been configured to use the + PasswordAuthenticator. If the default AllowAllAuthenticator is used instead, using this + method has no effect. + """ + self._j_cluster_builder.withCredentials(username, password) + return self + + def without_metrics(self) -> 'ClusterBuilder': + """ + Disables metrics collection for the created cluster (metrics are enabled by default + otherwise). + """ + self._j_cluster_builder.withoutMetrics() + return self + + def without_jmx_reporting(self) -> 'ClusterBuilder': + """ + Disables JMX reporting of the metrics. + + JMX reporting is enabled by default but can be disabled using this option. If metrics are + disabled, this is a no-op + """ + self._j_cluster_builder.withoutJMXReporting() + return self + + def with_no_compact(self) -> 'ClusterBuilder': + """ + Enables the NO_COMPACT startup option. + + When this option is supplied, SELECT, UPDATE, DELETE and BATCH statements on COMPACT STORAGE + tables function in "compatibility" mode which allows seeing these tables as if they were + "regular" CQL tables. + + This option only effects interactions with tables using COMPACT STORAGE and is only + supported by C* 4.0+ and DSE 6.0+. + """ + self._j_cluster_builder.withNoCompact() + return self class CassandraCommitter(object): @@ -220,7 +561,7 @@ def set_uid_hash(self, uid_hash: str) -> 'CassandraSink': identifying an operator through the default hash mechanics fails (e.g. because of changes between Flink versions). - Note that this should be used as a workaround or for trouble shooting. The provided hash + Note that this should be used as a workaround or for troubleshooting. The provided hash needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail. @@ -354,7 +695,7 @@ def enable_ignore_null_fields(self) -> 'CassandraSink.CassandraSinkBuilder': """ Enables ignoring null values, treats null values as unset and avoids writing null fields and creating tombstones. - This call has no effect if CassandraSinkBuilder.enableWriteAheadLog() is called. + This call has no effect if CassandraSinkBuilder.enable_write_ahead_log() is called. """ self._j_cassandra_sink_builder.enableIgnoreNullFields() return self diff --git a/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py b/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py new file mode 100644 index 0000000000000..0bba38087c9ee --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/tests/test_cassandra.py @@ -0,0 +1,86 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.common import Types +from pyflink.datastream.connectors import CassandraSink +from pyflink.datastream.connectors.cassandra import MapperOptions, ConsistencyLevel, \ + ClusterBuilder, LoadBalancingPolicy, ReconnectionPolicy, RetryPolicy, SpeculativeExecutionPolicy +from pyflink.datastream.connectors.tests.test_connectors import ConnectorTestBase + + +class CassandraSinkTest(ConnectorTestBase): + @classmethod + def _get_jars_relative_path(cls): + return '/flink-connectors/flink-connector-cassandra' + + def test_cassandra_sink(self): + type_info = Types.ROW([Types.STRING(), Types.INT()]) + ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deefg', 4)], + type_info=type_info) + cassandra_sink_builder = CassandraSink.add_sink(ds) + + cassandra_sink = cassandra_sink_builder \ + .set_host('localhost', 9876) \ + .set_query('query') \ + .enable_ignore_null_fields() \ + .set_mapper_options(MapperOptions() + .ttl(1) + .timestamp(100) + .tracing(True) + .if_not_exists(False) + .consistency_level(ConsistencyLevel.ANY) + .save_null_fields(True)) \ + .set_max_concurrent_requests(1000) \ + .build() + + cassandra_sink.name('cassandra_sink').set_parallelism(3) + + plan = eval(self.env.get_execution_plan()) + self.assertEqual("Sink: cassandra_sink", plan['nodes'][1]['type']) + self.assertEqual(3, plan['nodes'][1]['parallelism']) + + def test_cassandra_sink_with_cluster(self): + type_info = Types.ROW([Types.STRING(), Types.STRING()]) + ds = self.env.from_collection([('a', "1"), ('b', "2"), ('c', "3"), ('d', "4")], + type_info=type_info) + cassandra_sink_builder = CassandraSink.add_sink(ds) + + cluster_builder = ClusterBuilder() \ + .with_cluster_name("cluster name") \ + .with_port(9042) \ + .with_max_schema_agreement_wait_seconds(3600) \ + .add_contact_points("127.0.0.1", "127.0.0.2") \ + .with_load_balancing_policy(LoadBalancingPolicy.dc_aware_round_robin_policy()) \ + .with_reconnection_policy(ReconnectionPolicy.exponential_reconnection_policy()) \ + .with_retry_policy(RetryPolicy.consistency_retry_policy()) \ + .with_speculative_execution_policy(SpeculativeExecutionPolicy. + constant_speculative_execution_policy(1, 2)) \ + .with_credentials("user", "pwd") \ + .without_metrics() \ + .without_jmx_reporting() \ + .with_no_compact() + + cassandra_sink = cassandra_sink_builder \ + .set_cluster_builder(cluster_builder) \ + .set_query('query') \ + .build() + + cassandra_sink.name('cassandra_sink').set_parallelism(3) + + plan = eval(self.env.get_execution_plan()) + self.assertEqual("Sink: cassandra_sink", plan['nodes'][1]['type']) + self.assertEqual(3, plan['nodes'][1]['parallelism']) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py index 4bb7f0084b8c6..cb513af282279 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_connectors.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_connectors.py @@ -30,7 +30,6 @@ RMQConnectionConfig, PulsarSource, StartCursor, PulsarDeserializationSchema, StopCursor, \ SubscriptionType, PulsarSink, PulsarSerializationSchema, DeliveryGuarantee, TopicRoutingMode, \ MessageDelayer, FlinkKinesisConsumer, KinesisStreamsSink, KinesisFirehoseSink -from pyflink.datastream.connectors.cassandra import CassandraSink, MapperOptions, ConsistencyLevel from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction from pyflink.java_gateway import get_gateway @@ -669,32 +668,3 @@ def test_kinesis_firehose_sink(self): self.assertEqual( get_field_value(kinesis_firehose_sink.get_java_function(), 'deliveryStreamName'), 'stream-1') - - -class CassandraSinkTest(PyFlinkStreamingTestCase): - - def test_cassandra_sink(self): - type_info = Types.ROW([Types.STRING(), Types.INT()]) - ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)], - type_info=type_info) - cassandra_sink_builder = CassandraSink.add_sink(ds) - - cassandra_sink = cassandra_sink_builder\ - .set_host('localhost', 9876) \ - .set_query('query') \ - .enable_ignore_null_fields() \ - .set_mapper_options(MapperOptions() - .ttl(1) - .timestamp(100) - .tracing(True) - .if_not_exists(False) - .consistency_level(ConsistencyLevel.ANY) - .save_null_fields(True)) \ - .set_max_concurrent_requests(1000) \ - .build() - - cassandra_sink.name('cassandra_sink').set_parallelism(3) - - plan = eval(self.env.get_execution_plan()) - self.assertEqual("Sink: cassandra_sink", plan['nodes'][1]['type']) - self.assertEqual(3, plan['nodes'][1]['parallelism'])