diff --git a/README.md b/README.md index fe95bca1..f753956b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ ecosystem into InfluxDB using Grafana for a frontend Requirements: * [Python 3.6.7+](https://www.python.org/downloads/release/python-367/) * [Python3-pip](https://pip.pypa.io/en/stable/installing/) -* [InfluxDB 1.8.x](https://www.influxdata.com/) +* [InfluxDB 1.8.x or 2.x](https://www.influxdata.com/) * [Grafana](https://grafana.com/)
@@ -50,7 +50,7 @@ Please read [Asking for Support](https://wiki.cajun.pro/books/varken/chapter/ask ### InfluxDB [InfluxDB Installation Documentation](https://wiki.cajun.pro/books/varken/page/influxdb-d1f) -Note: Only v1.8.x is currently supported. +Note: Only v1.8.x or v2.x are supported. Influxdb is required but not packaged as part of Varken. Varken will create its database on its own. If you choose to give varken user permissions that diff --git a/data/varken.example.ini b/data/varken.example.ini index b32eab6d..a63285c3 100644 --- a/data/varken.example.ini +++ b/data/varken.example.ini @@ -16,6 +16,7 @@ ssl = false verify_ssl = false username = root password = root +org = - [tautulli-1] url = tautulli.domain.tld:8181 diff --git a/requirements.txt b/requirements.txt index 523e4279..4477bd88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ requests==2.25.1 geoip2==2.9.0 influxdb==5.2.0 +influxdb-client==1.30.0 schedule==0.6.0 distro==1.4.0 urllib3==1.26.5 diff --git a/varken/dbmanager.py b/varken/dbmanager.py index c832fdf3..8250ce3c 100644 --- a/varken/dbmanager.py +++ b/varken/dbmanager.py @@ -1,45 +1,91 @@ +import re from sys import exit from logging import getLogger -from influxdb import InfluxDBClient -from requests.exceptions import ConnectionError -from influxdb.exceptions import InfluxDBServerError +from influxdb_client import InfluxDBClient, BucketRetentionRules +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.exceptions import InfluxDBError +from urllib3.exceptions import NewConnectionError class DBManager(object): def __init__(self, server): self.server = server self.logger = getLogger() + self.bucket = "varken" + if self.server.url == "influxdb.domain.tld": self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") exit() - self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, - password=self.server.password, ssl=self.server.ssl, database='varken', - verify_ssl=self.server.verify_ssl) + + url = self.server.url + if 'http' not in url: + scheme = 'http' + if self.server.ssl: + scheme = 'https' + url = "{}://{}:{}".format(scheme, self.server.url, self.server.port) + token = f'{self.server.username}:{self.server.password}' + + self.influx = InfluxDBClient(url=url, token=token, + verify_ssl=self.server.verify_ssl, org=self.server.org) + try: - version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] + version = self.influx.version() self.logger.info('Influxdb version: %s', version) - except ConnectionError: - self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname") + match = re.match(r'v?(\d+)\.', version) + if match: + self.version = int(match[1]) + self.logger.info("Using InfluxDB API v%s", self.version) + else: + self.logger.critical("Unknown influxdb version") + exit(1) + except NewConnectionError: + self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid") exit(1) - databases = [db['name'] for db in self.influx.get_list_database()] + if self.version >= 2: + # If we pass username/password to a v1 server, it breaks :( + self.influx = InfluxDBClient(url=url, username=self.server.username, + password=self.server.password, + verify_ssl=self.server.verify_ssl, org=self.server.org) + self.create_v2_bucket() + else: + self.create_v1_database() - if 'varken' not in databases: + def create_v2_bucket(self): + if not self.influx.buckets_api().find_bucket_by_name(self.bucket): + self.logger.info("Creating varken bucket") + + retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30, + shard_group_duration_seconds=60 * 60) + self.influx.buckets_api().create_bucket(bucket_name=self.bucket, + retention_rules=retention) + + def create_v1_database(self): + from influxdb import InfluxDBClient + client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, + password=self.server.password, ssl=self.server.ssl, database=self.bucket, + verify_ssl=self.server.verify_ssl) + databases = [db['name'] for db in client.get_list_database()] + + if self.bucket not in databases: self.logger.info("Creating varken database") - self.influx.create_database('varken') + client.create_database(self.bucket) retention_policies = [policy['name'] for policy in - self.influx.get_list_retention_policies(database='varken')] + client.get_list_retention_policies(database=self.bucket)] if 'varken 30d-1h' not in retention_policies: self.logger.info("Creating varken retention policy (30d-1h)") - self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', - database='varken', default=True, shard_duration='1h') + client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', + database=self.bucket, default=True, shard_duration='1h') + + self.bucket = f'{self.bucket}/varken 30d-1h' def write_points(self, data): d = data self.logger.debug('Writing Data to InfluxDB %s', d) + write_api = self.influx.write_api(write_options=SYNCHRONOUS) try: - self.influx.write_points(d) - except (InfluxDBServerError, ConnectionError) as e: + write_api.write(bucket=self.bucket, record=data) + except (InfluxDBError, NewConnectionError) as e: self.logger.error('Error writing data to influxdb. Dropping this set of data. ' 'Check your database! Error: %s', e) diff --git a/varken/iniparser.py b/varken/iniparser.py index bcb3b37d..3eca865f 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -154,13 +154,15 @@ def parse_opts(self, read_file=False): username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) + + org = env.get('VRKN_INFLUXDB_ORG', self.config.get('influxdb', 'org')) except NoOptionError as e: self.logger.error('Missing key in %s. Error: %s', "influxdb", e) self.rectify_ini() return self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl, org=org) # Check for all enabled services for service in self.services: diff --git a/varken/structures.py b/varken/structures.py index e3ee0940..2ec78ff9 100644 --- a/varken/structures.py +++ b/varken/structures.py @@ -18,6 +18,7 @@ class InfluxServer(NamedTuple): url: str = 'localhost' username: str = 'root' verify_ssl: bool = False + org: str = '-' class SonarrServer(NamedTuple): diff --git a/varken/tautulli.py b/varken/tautulli.py index 746685fd..371ece45 100644 --- a/varken/tautulli.py +++ b/varken/tautulli.py @@ -2,7 +2,7 @@ from requests import Session, Request from geoip2.errors import AddressNotFoundError from datetime import datetime, timezone, date, timedelta -from influxdb.exceptions import InfluxDBClientError +from influxdb_client.client.exceptions import InfluxDBError from varken.structures import TautulliStream from varken.helpers import hashit, connection_handler, itemgetter_with_default @@ -363,7 +363,7 @@ def get_historical(self, days=30): ) try: self.dbmanager.write_points(influx_payload) - except InfluxDBClientError as e: + except InfluxDBError as e: if "beyond retention policy" in str(e): self.logger.debug('Only imported 30 days of data per retention policy') else: