diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraConfig.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraConfig.scala index 821f284..6932c44 100644 --- a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraConfig.scala +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/CassandraConfig.scala @@ -15,5 +15,6 @@ object CassandraConfig { .setIfMissing("spark.cassandra.auth.username", CassandraUsername) .setIfMissing("spark.cassandra.auth.password", CassandraPassword) .setIfMissing("spark.cassandra.connection.keep_alive_ms", envOrElse("CASSANDRA_KEEP_ALIVE_MS", (batchDuration.milliseconds * 2).toString)) + .setIfMissing("spark.cassandra.connection.factory", "com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra.FortisConnectionFactory") } -} \ No newline at end of file +} diff --git a/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/FortisConnectionFactory.scala b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/FortisConnectionFactory.scala new file mode 100644 index 0000000..afbcffe --- /dev/null +++ b/src/main/scala/com/microsoft/partnercatalyst/fortis/spark/sinks/cassandra/FortisConnectionFactory.scala @@ -0,0 +1,14 @@ +package com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra + +import com.datastax.driver.core.Cluster +import com.datastax.driver.core.policies.{DCAwareRoundRobinPolicy, TokenAwarePolicy} +import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory} + +object FortisConnectionFactory extends CassandraConnectionFactory { + override def createCluster(conf: CassandraConnectorConf): Cluster = { + DefaultConnectionFactory + .clusterBuilder(conf) + .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().build())) + .build() + } +}