Skip to content

Commit

Permalink
Add protocol_version to conn_config for Cassandrahook (#11036)
Browse files Browse the repository at this point in the history
  • Loading branch information
Songkran Nethan committed Oct 14, 2020
1 parent 545ba8e commit 0646849
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 1 deletion.
4 changes: 4 additions & 0 deletions airflow/providers/apache/cassandra/hooks/cassandra.py
Expand Up @@ -113,6 +113,10 @@ def __init__(self, cassandra_conn_id: str = 'cassandra_default'):
if ssl_options:
conn_config['ssl_options'] = ssl_options

protocol_version = conn.extra_dejson.get('protocol_version', None)
if protocol_version:
conn_config['protocol_version'] = protocol_version

self.cluster = Cluster(**conn_config)
self.keyspace = conn.schema
self.session = None
Expand Down
1 change: 1 addition & 0 deletions docs/howto/connection/cassandra.rst
Expand Up @@ -53,6 +53,7 @@ Extra (optional)
``RoundRobinPolicy``, ``DCAwareRoundRobinPolicy``, ``WhiteListRoundRobinPolicy`` and ``TokenAwarePolicy``. ``RoundRobinPolicy`` is the default load balancing policy.
* ``load_balancing_policy_args`` - This parameter specifies the arguments for the load balancing policy being used.
* ``cql_version`` - This parameter specifies the CQL version of cassandra.
* ``protocol_version`` - This parameter specifies the maximum version of the native protocol to use.
* ``ssl_options`` - This parameter specifies the details related to SSL, if it's enabled in Cassandra.


Expand Down
3 changes: 2 additions & 1 deletion tests/providers/apache/cassandra/hooks/test_cassandra.py
Expand Up @@ -43,7 +43,7 @@ def setUp(self):
host='host-1,host-2',
port='9042',
schema='test_keyspace',
extra='{"load_balancing_policy":"TokenAwarePolicy"}',
extra='{"load_balancing_policy":"TokenAwarePolicy","protocol_version":4}',
)
)
db.merge_conn(
Expand Down Expand Up @@ -84,6 +84,7 @@ def test_get_conn(self):
cluster = hook.get_cluster()
self.assertEqual(cluster.contact_points, ['host-1', 'host-2'])
self.assertEqual(cluster.port, 9042)
self.assertEqual(cluster.protocol_version, 4)
self.assertTrue(isinstance(cluster.load_balancing_policy, TokenAwarePolicy))

def test_get_lb_policy_with_no_args(self):
Expand Down

0 comments on commit 0646849

Please sign in to comment.