Skip to content

Commit

Permalink
Merge pull request #996 from datastax/python-895_require-only-one-res…
Browse files Browse the repository at this point in the history
…olved-contact-point

Python 895 require only one resolved contact point
  • Loading branch information
mambocab committed Nov 5, 2018
2 parents 16ad9ee + 9c47bcc commit 8c1a909
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -9,6 +9,7 @@ Bug Fixes
* Fix OSS driver's virtual table support against DSE 6.0.X and future server releases (PYTHON-1020)
* ResultSet.one() fails if the row_factory is using a generator (PYTHON-1026)
* Log profile name on attempt to create existing profile (PYTHON-944)
* Cluster instantiation fails if any contact points' hostname resolution fails (PYTHON-895)

Other
-----
Expand Down
10 changes: 10 additions & 0 deletions cassandra/__init__.py
Expand Up @@ -686,3 +686,13 @@ class UnsupportedOperation(DriverException):
for more details.
"""
pass


class UnresolvableContactPoints(DriverException):
"""
The driver was unable to resolve any provided hostnames.
Note that this is *not* raised when a :class:`.Cluster` is created with no
contact points, only when lookup fails for all hosts
"""
pass
41 changes: 35 additions & 6 deletions cassandra/cluster.py
Expand Up @@ -43,7 +43,8 @@

from cassandra import (ConsistencyLevel, AuthenticationFailed,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException, ProtocolVersion)
SchemaTargetType, DriverException, ProtocolVersion,
UnresolvableContactPoints)
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported)
from cassandra.cqltypes import UserType
Expand Down Expand Up @@ -86,6 +87,7 @@ def _is_gevent_monkey_patched():
import gevent.socket
return socket.socket is gevent.socket.socket


# default to gevent when we are monkey patched with gevent, eventlet when
# monkey patched with eventlet, otherwise if libev is available, use that as
# the default because it's fastest. Otherwise, use asyncore.
Expand Down Expand Up @@ -181,6 +183,7 @@ def _shutdown_clusters():
for cluster in clusters:
cluster.shutdown()


atexit.register(_shutdown_clusters)


Expand All @@ -190,6 +193,35 @@ def default_lbp_factory():
return DCAwareRoundRobinPolicy()


def _addrinfo_or_none(contact_point, port):
"""
A helper function that wraps socket.getaddrinfo and returns None
when it fails to, e.g. resolve one of the hostnames. Used to address
PYTHON-895.
"""
try:
return socket.getaddrinfo(contact_point, port,
socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror:
log.debug('Could not resolve hostname "{}" '
'with port {}'.format(contact_point, port))
return None


def _resolve_contact_points(contact_points, port):
resolved = tuple(_addrinfo_or_none(p, port)
for p in contact_points)

if resolved and all((x is None for x in resolved)):
raise UnresolvableContactPoints(contact_points, port)

resolved = tuple(r for r in resolved if r is not None)

return [endpoint[4][0]
for addrinfo in resolved
for endpoint in addrinfo]


class ExecutionProfile(object):
load_balancing_policy = None
"""
Expand Down Expand Up @@ -822,8 +854,8 @@ def __init__(self,

self.port = port

self.contact_points_resolved = [endpoint[4][0] for a in self.contact_points
for endpoint in socket.getaddrinfo(a, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)]
self.contact_points_resolved = _resolve_contact_points(self.contact_points,
self.port)

self.compression = compression

Expand Down Expand Up @@ -1086,7 +1118,6 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
if not_done:
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")


def get_min_requests_per_connection(self, host_distance):
return self._min_requests_per_connection[host_distance]

Expand Down Expand Up @@ -2029,7 +2060,6 @@ def default_serial_consistency_level(self, cl):
.. versionadded:: 3.8.0
"""


encoder = None
"""
A :class:`~cassandra.encoder.Encoder` instance that will be used when
Expand Down Expand Up @@ -2219,7 +2249,6 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
load_balancing_policy = execution_profile.load_balancing_policy
spec_exec_policy = execution_profile.speculative_execution_policy


fetch_size = query.fetch_size
if fetch_size is FETCH_SIZE_UNSET and self._protocol_version >= 2:
fetch_size = self.default_fetch_size
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/simulacron/__init__.py
Expand Up @@ -35,20 +35,24 @@ def tearDown(self):


class SimulacronCluster(SimulacronBase):

cluster, connect = None, True

@classmethod
def setUpClass(cls):
if SIMULACRON_JAR is None or CASSANDRA_VERSION < Version("2.1"):
return

start_and_prime_singledc()
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
cls.session = cls.cluster.connect(wait_for_all_pools=True)
if cls.connect:
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION, compression=False)
cls.session = cls.cluster.connect(wait_for_all_pools=True)

@classmethod
def tearDownClass(cls):
if SIMULACRON_JAR is None or CASSANDRA_VERSION < Version("2.1"):
return

cls.cluster.shutdown()
if cls.cluster:
cls.cluster.shutdown()
stop_simulacron()

29 changes: 27 additions & 2 deletions tests/integration/simulacron/test_cluster.py
Expand Up @@ -17,10 +17,13 @@
import unittest # noqa

from tests.integration.simulacron import SimulacronCluster
from tests.integration import requiressimulacron
from tests.integration import (requiressimulacron, PROTOCOL_VERSION)
from tests.integration.simulacron.utils import prime_query

from cassandra import WriteTimeout, WriteType, ConsistencyLevel
from cassandra import (WriteTimeout, WriteType,
ConsistencyLevel, UnresolvableContactPoints)
from cassandra.cluster import Cluster


@requiressimulacron
class ClusterTests(SimulacronCluster):
Expand Down Expand Up @@ -53,3 +56,25 @@ def test_writetimeout(self):
self.assertIn(consistency, str(wt))
self.assertIn(str(received_responses), str(wt))
self.assertIn(str(required_responses), str(wt))


@requiressimulacron
class ClusterDNSResolutionTests(SimulacronCluster):

connect = False

def tearDown(self):
if self.cluster:
self.cluster.shutdown()

def test_connection_with_one_unresolvable_contact_point(self):
# shouldn't raise anything due to name resolution failures
self.cluster = Cluster(['127.0.0.1', 'dns.invalid'],
protocol_version=PROTOCOL_VERSION,
compression=False)

def test_connection_with_only_unresolvable_contact_points(self):
with self.assertRaises(UnresolvableContactPoints):
self.cluster = Cluster(['dns.invalid'],
protocol_version=PROTOCOL_VERSION,
compression=False)

0 comments on commit 8c1a909

Please sign in to comment.