Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support InfluxDB v2 #242

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ecosystem into InfluxDB using Grafana for a frontend
Requirements:
* [Python 3.6.7+](https://www.python.org/downloads/release/python-367/)
* [Python3-pip](https://pip.pypa.io/en/stable/installing/)
* [InfluxDB 1.8.x](https://www.influxdata.com/)
* [InfluxDB 1.8.x or 2.x](https://www.influxdata.com/)
* [Grafana](https://grafana.com/)

<p align="center">
Expand Down Expand Up @@ -50,7 +50,7 @@ Please read [Asking for Support](https://wiki.cajun.pro/books/varken/chapter/ask

### InfluxDB
[InfluxDB Installation Documentation](https://wiki.cajun.pro/books/varken/page/influxdb-d1f)
Note: Only v1.8.x is currently supported.
Note: Only v1.8.x or v2.x are supported.

Influxdb is required but not packaged as part of Varken. Varken will create
its database on its own. If you choose to give varken user permissions that
Expand Down
1 change: 1 addition & 0 deletions data/varken.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ssl = false
verify_ssl = false
username = root
password = root
org = -

[tautulli-1]
url = tautulli.domain.tld:8181
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
requests==2.25.1
geoip2==2.9.0
influxdb==5.2.0
influxdb-client==1.30.0
schedule==0.6.0
distro==1.4.0
urllib3==1.26.5
80 changes: 63 additions & 17 deletions varken/dbmanager.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,91 @@
import re
from sys import exit
from logging import getLogger
from influxdb import InfluxDBClient
from requests.exceptions import ConnectionError
from influxdb.exceptions import InfluxDBServerError
from influxdb_client import InfluxDBClient, BucketRetentionRules
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.exceptions import InfluxDBError
from urllib3.exceptions import NewConnectionError


class DBManager(object):
def __init__(self, server):
self.server = server
self.logger = getLogger()
self.bucket = "varken"

if self.server.url == "influxdb.domain.tld":
self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration")
exit()
self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username,
password=self.server.password, ssl=self.server.ssl, database='varken',
verify_ssl=self.server.verify_ssl)

url = self.server.url
if 'http' not in url:
scheme = 'http'
if self.server.ssl:
scheme = 'https'
url = "{}://{}:{}".format(scheme, self.server.url, self.server.port)
token = f'{self.server.username}:{self.server.password}'

self.influx = InfluxDBClient(url=url, token=token,
verify_ssl=self.server.verify_ssl, org=self.server.org)

try:
version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
version = self.influx.version()
self.logger.info('Influxdb version: %s', version)
except ConnectionError:
self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname")
match = re.match(r'v?(\d+)\.', version)
if match:
self.version = int(match[1])
self.logger.info("Using InfluxDB API v%s", self.version)
else:
self.logger.critical("Unknown influxdb version")
exit(1)
except NewConnectionError:
self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid")
exit(1)

databases = [db['name'] for db in self.influx.get_list_database()]
if self.version >= 2:
# If we pass username/password to a v1 server, it breaks :(
self.influx = InfluxDBClient(url=url, username=self.server.username,
password=self.server.password,
verify_ssl=self.server.verify_ssl, org=self.server.org)
self.create_v2_bucket()
else:
self.create_v1_database()

if 'varken' not in databases:
def create_v2_bucket(self):
if not self.influx.buckets_api().find_bucket_by_name(self.bucket):
self.logger.info("Creating varken bucket")

retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30,
shard_group_duration_seconds=60 * 60)
self.influx.buckets_api().create_bucket(bucket_name=self.bucket,
retention_rules=retention)

def create_v1_database(self):
from influxdb import InfluxDBClient
client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username,
password=self.server.password, ssl=self.server.ssl, database=self.bucket,
verify_ssl=self.server.verify_ssl)
databases = [db['name'] for db in client.get_list_database()]

if self.bucket not in databases:
self.logger.info("Creating varken database")
self.influx.create_database('varken')
client.create_database(self.bucket)

retention_policies = [policy['name'] for policy in
self.influx.get_list_retention_policies(database='varken')]
client.get_list_retention_policies(database=self.bucket)]
if 'varken 30d-1h' not in retention_policies:
self.logger.info("Creating varken retention policy (30d-1h)")
self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1',
database='varken', default=True, shard_duration='1h')
client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1',
database=self.bucket, default=True, shard_duration='1h')

self.bucket = f'{self.bucket}/varken 30d-1h'

def write_points(self, data):
d = data
self.logger.debug('Writing Data to InfluxDB %s', d)
write_api = self.influx.write_api(write_options=SYNCHRONOUS)
try:
self.influx.write_points(d)
except (InfluxDBServerError, ConnectionError) as e:
write_api.write(bucket=self.bucket, record=data)
except (InfluxDBError, NewConnectionError) as e:
self.logger.error('Error writing data to influxdb. Dropping this set of data. '
'Check your database! Error: %s', e)
4 changes: 3 additions & 1 deletion varken/iniparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,15 @@ def parse_opts(self, read_file=False):

username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username'))
password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password'))

org = env.get('VRKN_INFLUXDB_ORG', self.config.get('influxdb', 'org'))
except NoOptionError as e:
self.logger.error('Missing key in %s. Error: %s', "influxdb", e)
self.rectify_ini()
return

self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl,
verify_ssl=verify_ssl)
verify_ssl=verify_ssl, org=org)

# Check for all enabled services
for service in self.services:
Expand Down
1 change: 1 addition & 0 deletions varken/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class InfluxServer(NamedTuple):
url: str = 'localhost'
username: str = 'root'
verify_ssl: bool = False
org: str = '-'


class SonarrServer(NamedTuple):
Expand Down
4 changes: 2 additions & 2 deletions varken/tautulli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from requests import Session, Request
from geoip2.errors import AddressNotFoundError
from datetime import datetime, timezone, date, timedelta
from influxdb.exceptions import InfluxDBClientError
from influxdb_client.client.exceptions import InfluxDBError

from varken.structures import TautulliStream
from varken.helpers import hashit, connection_handler, itemgetter_with_default
Expand Down Expand Up @@ -363,7 +363,7 @@ def get_historical(self, days=30):
)
try:
self.dbmanager.write_points(influx_payload)
except InfluxDBClientError as e:
except InfluxDBError as e:
if "beyond retention policy" in str(e):
self.logger.debug('Only imported 30 days of data per retention policy')
else:
Expand Down