Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws_wrapper/failover_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from aws_wrapper.utils.properties import Properties, WrapperProperties
from aws_wrapper.utils.rds_url_type import RdsUrlType
from aws_wrapper.utils.rdsutils import RdsUtils
from aws_wrapper.utils.subscribed_method_utils import SubscribedMethodUtils
from aws_wrapper.utils.utils import SubscribedMethodUtils
from aws_wrapper.writer_failover_handler import (WriterFailoverHandler,
WriterFailoverHandlerImpl)

Expand Down
357 changes: 309 additions & 48 deletions aws_wrapper/host_monitoring_plugin.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions aws_wrapper/plugin_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def force_refresh_host_list(self, connection: Optional[Connection] = None):
def connect(self, host_info: HostInfo, props: Properties) -> Connection:
...

def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Event) -> Connection:
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Optional[Event]) -> Connection:
...

def set_availability(self, host_aliases: FrozenSet[str], availability: HostAvailability):
Expand Down Expand Up @@ -263,7 +263,7 @@ def connect(self, host_info: HostInfo, props: Properties) -> Connection:
plugin_manager: PluginManager = self._container.plugin_manager
return plugin_manager.connect(host_info, props, self.current_connection is None)

def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Event) -> Connection:
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Optional[Event]) -> Connection:
plugin_manager: PluginManager = self._container.plugin_manager
return plugin_manager.force_connect(host_info, props, self.current_connection is None)

Expand Down
12 changes: 11 additions & 1 deletion aws_wrapper/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ HostMonitoringPlugin.ActivatedMonitoring=[HostMonitoringPlugin] Executing method
HostMonitoringPlugin.ClusterEndpointHostInfo=[HostMonitoringPlugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
HostMonitoringPlugin.MonitoringDeactivated=[HostMonitoringPlugin] Monitoring deactivated for method '{}'.
HostMonitoringPlugin.NullConnection=[HostMonitoringPlugin] Attempted to execute method '{}' but the current connection is None.
HostMonitoringPlugin.NullHostInfo=[HostMonitoringPlugin] Attempted to execute method '{}' but the current host info is None.
HostMonitoringPlugin.NullHostInfo=[HostMonitoringPlugin] Could not find HostInfo to monitor for the current connection.
HostMonitoringPlugin.NullHostInfoForMethod=[HostMonitoringPlugin] Attempted to execute method '{}' but the current host info is None.
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.
HostMonitoringPlugin.ErrorIdentifyingConnection=[HostMonitoringPlugin] An error occurred while identifying the connection database instance: '{}'.
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
Expand All @@ -57,6 +58,11 @@ HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.

IamPlugin.IsNullOrEmpty=[IamPlugin] Property "{}" is null or empty.

Monitor.NullContext=[Monitor] Parameter 'context' should not evaluate to None.
Monitor.NullDialect=[Monitor] The host monitoring plugin tried to check the connection status but was unable to identify the target driver dialect.
Monitor.OpeningMonitorConnection=[Monitor] Opening a monitoring connection to '{}'.
Monitor.OpenedMonitorConnection=[Monitor] Opened a monitoring connection to '{}'.

MonitorContext.ExceptionAbortingConnection=[MonitorContext] An exception occurred while attempting to abort the monitored connection: '{}'.
MonitorContext.HostAvailable=[MonitorContext] Host '{}' is *available*.
MonitorContext.HostUnavailable=[MonitorContext] Host '{}' is *unavailable*.
Expand All @@ -66,6 +72,10 @@ MonitorService.EmptyAliasSet=[MonitorService] Empty alias set passed for '{}'. T
MonitorService.ErrorPopulatingAliases=[MonitorService] An error occurred while populating aliases: '{}'.
MonitorService.NullDialect=[MonitorService] The host monitoring plugin tried to monitor the connection but was unable to identify the target driver dialect.

MonitoringThreadContainer.EmptyNodeKeys=[MonitorThreadContainer] The provided host_aliases set was empty.
MonitoringThreadContainer.ErrorGettingMonitor=[MonitorThreadContainer] Unable to find or create monitor for host with aliases '{}'.
MonitoringThreadContainer.NullMonitorReturnedFromSupplier=[MonitorThreadContainer] The monitor supplier passed into get_or_create_monitor returned None.

Plugin.UnsupportedMethod=[Plugin] '{}' is not supported by this plugin.

PluginManager.InvalidPlugin=[PluginManager] Invalid plugin requested: '{}'.
Expand Down
51 changes: 51 additions & 0 deletions aws_wrapper/utils/atomic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Lock


class AtomicInt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does there already exist a library that can do this for us?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked around, found this. Looks like you can use pypy but it looks like pypy is a different implementation of python instead of just a library. Not sure if we want to switch to get their implementation. Maybe we could create another task to investigate if it is viable or if there is a different library out there

def __init__(self, initial_value: int = 0):
self._value = initial_value
self._lock: Lock = Lock()

def get(self):
with self._lock:
return self._value

def set(self, value: int):
with self._lock:
self._value = value

def get_and_increment(self):
with self._lock:
value = self._value
self._value += 1
return value

def increment_and_get(self):
with self._lock:
self._value += 1
return self._value

def get_and_decrement(self):
with self._lock:
value = self._value
self._value -= 1
return value

def decrement_and_get(self):
with self._lock:
self._value -= 1
return self._value
83 changes: 83 additions & 0 deletions aws_wrapper/utils/concurrent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Lock
from typing import Callable, Generic, List, Optional, TypeVar

K = TypeVar('K')
V = TypeVar('V')


class ConcurrentDict(Generic[K, V]):
def __init__(self):
self._dict = dict()
self._lock = Lock()

def __len__(self):
return len(self._dict)

def get(self, key: K, default_value: Optional[V] = None) -> Optional[V]:
return self._dict.get(key, default_value)

def clear(self):
self._dict.clear()

def compute_if_present(self, key: K, remapping_func: Callable) -> Optional[V]:
with self._lock:
existing_value = self._dict.get(key)
if existing_value is None:
return None
new_value = remapping_func(key, existing_value)
if new_value is not None:
self._dict[key] = new_value
return new_value
else:
self._dict.pop(key, None)
return None

def compute_if_absent(self, key: K, mapping_func: Callable) -> Optional[V]:
with self._lock:
value = self._dict.get(key)
if value is None:
new_value = mapping_func(key)
if new_value is not None:
self._dict[key] = new_value
return new_value
return value

def put_if_absent(self, key: K, new_value: V) -> V:
with self._lock:
existing_value = self._dict.get(key)
if existing_value is None:
self._dict[key] = new_value
return new_value
return existing_value

def remove_if(self, predicate: Callable) -> bool:
with self._lock:
original_len = len(self._dict)
self._dict = {key: value for key, value in self._dict.items() if not predicate(key, value)}
return len(self._dict) < original_len

def remove_matching_values(self, removal_values: List[V]) -> bool:
with self._lock:
original_len = len(self._dict)
self._dict = {key: value for key, value in self._dict.items() if value not in removal_values}
return len(self._dict) < original_len

def apply_if(self, predicate: Callable, apply: Callable):
with self._lock:
for key, value in self._dict.items():
if predicate(key, value):
apply(key, value)
2 changes: 1 addition & 1 deletion aws_wrapper/utils/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class Properties(Dict[str, str]):
...
pass


class WrapperProperty:
Expand Down
28 changes: 28 additions & 0 deletions aws_wrapper/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from queue import Empty, Queue
from typing import Optional


Expand All @@ -35,3 +36,30 @@ def log_topology(hosts: list, message_prefix: Optional[str] = None):
msg = "\n\t".join(["<null>" if not host else str(host) for host in hosts])
prefix = "" if not message_prefix else message_prefix + " "
return prefix + f"Topology: {{\n\t{msg}}}"


class SubscribedMethodUtils:
# TODO: check for missing network methods
NETWORK_BOUND_METHODS = {
"Connection.commit",
"Connection.rollback",
"Cursor.callproc",
"Cursor.execute",
"Cursor.executemany"
}


class QueueUtils:
@staticmethod
def get(q: Queue):
try:
return q.get_nowait()
except Empty:
return None

@staticmethod
def clear(q: Queue):
with q.mutex:
q.queue.clear()
q.all_tasks_done.notify_all()
q.unfinished_tasks = 0
114 changes: 114 additions & 0 deletions unit_testing/test_atomic_int.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor

from aws_wrapper.utils.atomic import AtomicInt


def test_set_and_get():
n = AtomicInt(1)
assert 1 == n.get()
n.set(3)
assert 3 == n.get()


def test_get_and_increment():
n = AtomicInt()
assert 0 == n.get_and_increment()
assert 1 == n.get_and_increment()
n.set(5)
assert 5 == n.get_and_increment()


def test_get_and_increment__multithreaded():
n = AtomicInt()
num_threads = 50

def get_and_increment_thread(atomic_num: AtomicInt):
atomic_num.get_and_increment()

with ThreadPoolExecutor(num_threads) as executor:
for _ in range(num_threads):
executor.submit(get_and_increment_thread, n)

assert num_threads == n.get()


def test_increment_and_get():
n = AtomicInt()
assert 1 == n.increment_and_get()
assert 2 == n.increment_and_get()
assert 2 == n.get()
n.set(5)
assert 6 == n.increment_and_get()


def test_increment_and_get__multithreaded():
n = AtomicInt()
num_threads = 50

def increment_and_get_thread(atomic_num: AtomicInt):
atomic_num.increment_and_get()

with ThreadPoolExecutor(num_threads) as executor:
for _ in range(num_threads):
executor.submit(increment_and_get_thread, n)

assert num_threads == n.get()


def test_get_and_decrement():
n = AtomicInt()
assert 0 == n.get_and_decrement()
assert -1 == n.get_and_decrement()
n.set(5)
assert 5 == n.get_and_decrement()


def test_get_and_decrement__multithreaded():
num_threads = 50
n = AtomicInt(num_threads)

def get_and_decrement_thread(atomic_num: AtomicInt):
atomic_num.get_and_decrement()

with ThreadPoolExecutor(num_threads) as executor:
for _ in range(num_threads):
executor.submit(get_and_decrement_thread, n)

assert 0 == n.get()


def test_decrement_and_get():
n = AtomicInt()
assert -1 == n.decrement_and_get()
assert -2 == n.decrement_and_get()
assert -2 == n.get()
n.set(5)
assert 4 == n.decrement_and_get()


def test_decrement_and_get__multithreaded():
num_threads = 50
n = AtomicInt(num_threads)

def decrement_and_get_thread(atomic_num: AtomicInt):
atomic_num.decrement_and_get()

with ThreadPoolExecutor(num_threads) as executor:
for _ in range(num_threads):
executor.submit(decrement_and_get_thread, n)

assert 0 == n.get()
Loading