Skip to content

Commit

Permalink
Try to reconnect etcd/mqtt on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Aug 27, 2017
1 parent 412a528 commit 0a15152
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tests/registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ def test_registry(self):

registry_test.clear()
states = registry_test.read()
assert states is None
assert states == {}
32 changes: 32 additions & 0 deletions tuxeatpi_common/etcd_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Module defintion function to get etcd client"""
import logging
import time

import aio_etcd
import etcd


def get_etcd_client(host, port):
"""Return an etcd client"""
logger = logging.getLogger(name="tep").getChild('etcd_client')
while True:
try:
etcd_client = etcd.Client(host=host, port=port)
break
except etcd.EtcdConnectionFailed:
logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
return etcd_client


def get_aioetcd_client(host, port):
"""Return an aioetcd client"""
logger = logging.getLogger(name="tep").getChild('etcd_client')
while True:
try:
etcd_client = aio_etcd.Client(host=host, port=port)
break
except etcd.EtcdConnectionFailed:
logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)
return etcd_client
3 changes: 2 additions & 1 deletion tuxeatpi_common/intents.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import etcd

from tuxeatpi_common.etcd_client import get_etcd_client
from tuxeatpi_common.error import TuxEatPiError


Expand All @@ -13,7 +14,7 @@ class IntentsHandler(object):
def __init__(self, intent_folder, component_name):
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
self.etcd_client = etcd.Client(host=self.host, port=self.port)
self.etcd_client = get_etcd_client(host=self.host, port=self.port)
self.root_key = "/intents"
self.logger = logging.getLogger(name="tep").getChild(component_name).getChild('intents')
self.folder = intent_folder
Expand Down
4 changes: 3 additions & 1 deletion tuxeatpi_common/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import etcd

from tuxeatpi_common.etcd_client import get_etcd_client


class MemoryHandler(object):
"""Memory handler class"""
Expand All @@ -12,7 +14,7 @@ def __init__(self, component_name):
self.root_key = os.path.join("/memory", component_name)
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
self.etcd_client = etcd.Client(host=self.host, port=self.port)
self.etcd_client = get_etcd_client(host=self.host, port=self.port)

def save(self, key, value):
"""Save something in memory"""
Expand Down
10 changes: 9 additions & 1 deletion tuxeatpi_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import time

import paho.mqtt.client as paho

Expand Down Expand Up @@ -87,7 +88,14 @@ def on_publish(self, client, userdata, mid): # pylint: disable=W0221,W0613
def run(self):
"""Run MQTT client"""
# TODO handle reconnect
self.connect(self.host, self.port, 60)
while True:
try:
self.connect(self.host, self.port, 60)
break
except ConnectionRefusedError:
self.logger.warning("Can not connect to etcd server, retrying in 5 seconds")
time.sleep(5)

for topic_name in self.topics:
self.subscribe(topic_name, 0)
self.logger.info("Subcribe to topic %s", topic_name)
Expand Down
7 changes: 5 additions & 2 deletions tuxeatpi_common/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import etcd

from tuxeatpi_common.etcd_client import get_etcd_client


class RegistryHandler(object):
"""Registry handler class"""
Expand All @@ -17,7 +19,7 @@ def __init__(self, component_name, component_version):
self.key = os.path.join(self.root_key, component_name)
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
self.etcd_client = etcd.Client(host=self.host, port=self.port)
self.etcd_client = get_etcd_client(host=self.host, port=self.port)
self.logger = logging.getLogger(name="tep").getChild(component_name).getChild('register')

def ping(self, state="ALIVE"):
Expand All @@ -37,7 +39,8 @@ def read(self):
data = json.loads(raw_data.value)
states[data['name']] = data
except etcd.EtcdKeyNotFound:
return
self.logger.warning("Registry folder not found in Etcd")
return {}
return states

def set_notalive(self, data):
Expand Down
8 changes: 5 additions & 3 deletions tuxeatpi_common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import os
import json

import aio_etcd
import etcd

from tuxeatpi_common.etcd_client import get_etcd_client
from tuxeatpi_common.etcd_client import get_aioetcd_client


class SettingsHandler(object):
"""Settings handler class"""
Expand All @@ -18,8 +20,8 @@ def __init__(self, component):
self.host = os.environ.get("TEP_ETCD_HOST", "127.0.0.1")
self.port = int(os.environ.get("TEP_ETCD_PORT", 2379))
# TODO use only one client !!
self.etcd_sender = etcd.Client(host=self.host, port=self.port)
self.etcd_client = aio_etcd.Client(host=self.host, port=self.port)
self.etcd_sender = get_etcd_client(host=self.host, port=self.port)
self.etcd_client = get_aioetcd_client(host=self.host, port=self.port)
self.logger = logging.getLogger(name="tep").getChild(component.name).getChild('settings')
self.language = None
self.nlu_engine = None
Expand Down

0 comments on commit 0a15152

Please sign in to comment.