/
utils.py
66 lines (51 loc) · 1.82 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Utilities for working with influxdb."""
import logging
from threading import Thread
from django.conf import settings
from influxdb import InfluxDBClient
logger = logging.getLogger(__name__)
def get_client():
"""Returns an ``InfluxDBClient`` instance."""
return InfluxDBClient(
settings.INFLUXDB_HOST,
settings.INFLUXDB_PORT,
settings.INFLUXDB_USER,
settings.INFLUXDB_PASSWORD,
settings.INFLUXDB_DATABASE,
timeout=settings.INFLUXDB_TIMEOUT,
ssl=getattr(settings, 'INFLUXDB_SSL', False),
verify_ssl=getattr(settings, 'INFLUXDB_VERIFY_SSL', False),
)
def query(query):
"""Wrapper around ``InfluxDBClient.query()``."""
client = get_client()
return client.query(query)
def write_points(data, force_disable_threading=False):
"""
Writes a series to influxdb.
:param data: Array of dicts, as required by
https://github.com/influxdb/influxdb-python
:param force_disable_threading: When being called from the Celery task, we
set this to `True` so that the user doesn't accidentally use Celery and
threading at the same time.
"""
if getattr(settings, 'INFLUXDB_DISABLED', False):
return
client = get_client()
use_threading = getattr(settings, 'INFLUXDB_USE_THREADING', False)
if force_disable_threading:
use_threading = False
if use_threading is True:
thread = Thread(target=process_points, args=(client, data, ))
thread.start()
else:
process_points(client, data)
def process_points(client, data): # pragma: no cover
"""Method to be called via threading module."""
try:
client.write_points(data)
except Exception as err:
if getattr(settings, 'INFLUXDB_FAIL_SILENTLY', True):
logger.error(err)
else:
raise err