Skip to content

Commit

Permalink
Improve Etcd connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Sep 4, 2017
1 parent 980b0de commit 091a5b8
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 100 deletions.
16 changes: 11 additions & 5 deletions tests/intents_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

from tuxeatpi_common.error import TuxEatPiError
from tuxeatpi_common.intents import IntentsHandler
from tuxeatpi_common.etcd_client import EtcdWrapper


class TestIntents(object):

@classmethod
def setup_class(self):
self.intents_test = IntentsHandler("tests/intents", "test_intents")
etcd_host = None
etcd_port = None
self.etcd_wrapper = EtcdWrapper(etcd_host, etcd_port)
self.intents_test = IntentsHandler("tests/intents", "test_intents", self.etcd_wrapper)
self.test = "NOK"
self.thread = threading.Thread(target=self.watch_etcd, args=(self,))

Expand All @@ -35,7 +39,7 @@ def test_intents(self):
assert len(intents) == 1
assert intents[0].value == "NLU test file\n"

resp = self.intents_test.read("bad_key", wait=True, timeout=1)
resp = self.intents_test.read("bad_key", wait=False, timeout=5)
assert resp is None

self.thread = self.thread.start()
Expand All @@ -46,11 +50,13 @@ def test_intents(self):
class TestBadIntents(object):

def test_dialog(self):

intents_test = IntentsHandler("tests", "test_intents")
etcd_host = None
etcd_port = None
etcd_wrapper = EtcdWrapper(etcd_host, etcd_port)
intents_test = IntentsHandler("tests", "test_intents", etcd_wrapper)
with pytest.raises(TuxEatPiError) as exp:
intents_test.save("intents_test.py")

intents_test = IntentsHandler("tests/badintents", "test_intents")
intents_test = IntentsHandler("tests/badintents", "test_intents", etcd_wrapper)
ret = intents_test.save("nlu_test")
assert ret is None
9 changes: 6 additions & 3 deletions tests/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

import pytest

from tuxeatpi_common.error import TuxEatPiError
from tuxeatpi_common.memory import MemoryHandler
from tuxeatpi_common.etcd_client import EtcdWrapper


class TestMemory(object):

def test_memory(self):
# Create bad message
memory_test = MemoryHandler("test_memory")
etcd_host = "127.0.0.1"
etcd_port = 2379
self.etcd_wrapper = EtcdWrapper(etcd_host, etcd_port)
memory_test = MemoryHandler("test_memory", self.etcd_wrapper)
key = "mykey"
value = "myvalue"
memory_test.save(key, value)
Expand All @@ -25,4 +28,4 @@ def test_memory(self):
memory_test.delete(key)

resp = memory_test.read(key)
assert resp is None
assert resp == {}
6 changes: 5 additions & 1 deletion tests/registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
import pytest

from tuxeatpi_common.registry import RegistryHandler
from tuxeatpi_common.etcd_client import EtcdWrapper


class TestRegistry(object):

def test_registry(self):
# Create bad message
registry_test = RegistryHandler("test_registry", "0.1")
etcd_host = None
etcd_port = None
etcd_wrapper = EtcdWrapper(etcd_host, etcd_port)
registry_test = RegistryHandler("test_registry", "0.1", etcd_wrapper)
registry_test.ping()
states = registry_test.read()
assert 'test_registry' in states
Expand Down
11 changes: 8 additions & 3 deletions tuxeatpi_common/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tuxeatpi_common.settings import SettingsHandler
from tuxeatpi_common.intents import IntentsHandler
from tuxeatpi_common.registry import RegistryHandler
from tuxeatpi_common.etcd_client import EtcdWrapper


class TepBaseDaemon(object):
Expand All @@ -36,20 +37,24 @@ def __init__(self, name, workdir, intents_folder, dialog_folder, logging_level=l
self._mqtt_sender = MqttSender(self)
# Set the main loop to ON
self._run_main_loop = True
# Etcd
etcd_host = None
etcd_port = None
self.etcd_wrapper = EtcdWrapper(etcd_host, etcd_port)
# Initializer
self._initializer = Initializer(self)
# SubTasker
self._tasks_thread = SubTasker(self)
# Dialogs
self.dialogs = DialogsHandler(dialog_folder, self.name)
# Memory
self.memory = MemoryHandler(self.name)
self.memory = MemoryHandler(self.name, self.etcd_wrapper)
# Intents
self.intents = IntentsHandler(intents_folder, self.name)
self.intents = IntentsHandler(intents_folder, self.name, self.etcd_wrapper)
# Settings
self.settings = SettingsHandler(self)
# Registry
self.registry = RegistryHandler(self.name, self.version)
self.registry = RegistryHandler(self.name, self.version, self.etcd_wrapper)
# Add signal handler
signal.signal(signal.SIGINT, self.signal_handler)

Expand Down
126 changes: 108 additions & 18 deletions tuxeatpi_common/etcd_client.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,122 @@
"""Module defintion function to get etcd client"""
import logging
import os
import time
import json
import urllib3

import aiohttp
import aio_etcd
import etcd


def get_etcd_client(host, port):
"""Return an etcd client"""
logger = logging.getLogger(name="tep").getChild('etcd_client')
while True:
class EtcdWrapper(object):
"""Etcd Wrapper class to handle sync and async requests"""

def __init__(self, host=None, port=None):
# Get logger
self.logger = logging.getLogger(name="tep").getChild('etcd_client')
# Set host
if host is None:
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
else:
self.host = host
# Set port
if port is None:
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
else:
self.port = port
# Get clients
self.sync_client = None
self.sync_sender = None
self.async_client = None
self._connect()

def _connect(self):
"""Return an etcd client"""
# Get sync client receiver
while True:
try:
etcd_client = etcd.Client(self.host, self.port)
# Test connection
etcd_client.cluster_version # pylint: disable=W0104
# Save client
self.sync_client = etcd_client
break
except (etcd.EtcdConnectionFailed, urllib3.exceptions.MaxRetryError):
self.logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
# Get sync client sender
while True:
try:
etcd_sender = etcd.Client(self.host, self.port)
# Test connection
etcd_sender.cluster_version # pylint: disable=W0104
# Save client
self.sync_sender = etcd_sender
break
except (etcd.EtcdConnectionFailed, urllib3.exceptions.MaxRetryError):
self.logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
# Get async etcd client
self.async_client = aio_etcd.Client(self.host, self.port)

def read(self, key, recursive=False, wait=False, timeout=60):
"""Sync Etcd read operation"""
try:
etcd_client = etcd.Client(host=host, port=port)
break
return self.sync_client.read(key, recursive=recursive, wait=wait, timeout=timeout)
except etcd.EtcdKeyNotFound:
self.logger.warning("key %s not found in Etcd", key)
return None
except etcd.EtcdConnectionFailed:
logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
return etcd_client
# TODO Retry ? or just pass ?
self.logger.error("Can not read to etcd %s:%s with key %s. Connection lost ?",
self.host, self.port, key)
return None

def eternal_watch(self, key, recursive=False):
"""Sync Etcd watch operation"""
try:
return self.sync_client.eternal_watch(key, recursive=recursive)
except etcd.EtcdKeyNotFound:
self.logger.warning("key %s not found in Etcd", key)
return None
except etcd.EtcdConnectionFailed:
# TODO Retry ? or just pass ?
self.logger.error("Can not read to etcd %s:%s with key %s. Connection lost ?",
self.host, self.port, key)
return None

def get_aioetcd_client(host, port):
"""Return an aioetcd client"""
logger = logging.getLogger(name="tep").getChild('etcd_client')
while True:
def write(self, key, value, serialize=True):
"""Sync Etcd write operation"""
if serialize:
data = json.dumps(value)
else:
data = value
try:
etcd_client = aio_etcd.Client(host=host, port=port)
break
return self.sync_sender.write(key, data)
except etcd.EtcdConnectionFailed:
logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
return etcd_client
# TODO Retry ? or just pass ?
self.logger.error("Can not write to etcd %s:%s with key %s. Connection lost ?",
self.host, self.port, key)

def delete(self, key, recursive=False):
"""Sync Etcd delete operation"""
try:
self.sync_client.delete(key, recursive=recursive)
except etcd.EtcdKeyNotFound:
# TODO log
pass

async def async_read(self, key, wait=False):
"""Async Etcd read operation"""
try:
return await self.async_client.read(key, wait=wait)
except aio_etcd.EtcdKeyNotFound:
self.logger.warning("key %s not found in Etcd", key)
return None
except aiohttp.client_exceptions.ClientPayloadError:
# TODO Retry ? or just pass ?
self.logger.error("Can not read to etcd %s:%s with key %s. Connection lost ?",
self.host, self.port, key)
return None
25 changes: 9 additions & 16 deletions tuxeatpi_common/intents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@
import logging
import os

import etcd

from tuxeatpi_common.etcd_client import get_etcd_client
from tuxeatpi_common.error import TuxEatPiError


class IntentsHandler(object):
"""Intents handler class"""

def __init__(self, intent_folder, component_name):
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
self.etcd_client = get_etcd_client(host=self.host, port=self.port)
self.root_key = "/intents"
def __init__(self, intent_folder, component_name, etcd_wrapper):
self.logger = logging.getLogger(name="tep").getChild(component_name).getChild('intents')
self.etcd_wrapper = etcd_wrapper
self.root_key = "/intents"
self.folder = intent_folder
self.name = component_name

Expand Down Expand Up @@ -46,23 +41,21 @@ def save(self, nlu_engine):
intent_name,
self.name,
intent_file.name)
self.etcd_client.write(key, intent_data)
self.logger.info("Intent %s saved", intent_id)
if intent_data:
self.etcd_wrapper.write(key, intent_data, serialize=False)
self.logger.info("Intent %s saved", intent_id)

def read(self, nlu_engine, recursive=True, wait=True, timeout=30):
"""Read intent in etcd"""
key = os.path.join(self.root_key,
nlu_engine,
)
try:
return self.etcd_client.read(key, recursive=recursive,
wait=wait, timeout=timeout)
except etcd.EtcdWatchTimedOut:
return
return self.etcd_wrapper.read(key, recursive=recursive,
wait=wait, timeout=timeout)

def eternal_watch(self, nlu_engine, recursive=True):
"""Watch for changes in etcd"""
key = os.path.join(self.root_key,
nlu_engine,
)
return self.etcd_client.eternal_watch(key, recursive=recursive)
return self.etcd_wrapper.eternal_watch(key, recursive=recursive)
22 changes: 8 additions & 14 deletions tuxeatpi_common/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,30 @@
import json
import os

import etcd

from tuxeatpi_common.etcd_client import get_etcd_client


class MemoryHandler(object):
"""Memory handler class"""

def __init__(self, component_name):
def __init__(self, component_name, etcd_wrapper):
self.root_key = os.path.join("/memory", component_name)
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
self.etcd_client = get_etcd_client(host=self.host, port=self.port)
self.etcd_wrapper = etcd_wrapper

def save(self, key, value):
"""Save something in memory"""
key = os.path.join(self.root_key, key)
self.etcd_client.write(key, json.dumps(value))
self.etcd_wrapper.write(key, value)

def read(self, key):
"""Read something in memory"""
key = os.path.join(self.root_key, key)
try:
data = self.etcd_wrapper.read(key)
if data is not None:
# TODO return only the value
# FIXME: [E1101(no-member), ...] Instance of 'EtcdResult' has no 'value' member
return json.loads(self.etcd_client.read(key).value) # pylint: disable=E1101
except etcd.EtcdKeyNotFound:
return
return json.loads(data.value) # pylint: disable=E1101
return {}

def delete(self, key):
"""Delete something in memory"""
key = os.path.join(self.root_key, key)
self.etcd_client.delete(key, recursive=True)
self.etcd_wrapper.delete(key, recursive=True)

0 comments on commit 091a5b8

Please sign in to comment.