From c92bb4e4a5684611dc65b53c5791fae4bce5d94b Mon Sep 17 00:00:00 2001 From: aviau Date: Fri, 20 Feb 2015 11:37:12 -0500 Subject: [PATCH] Moving towards v0.9.0: removed deprecated endpoints, fixed write_points, and more... --- influxdb/client.py | 615 ++++------------------------------ tests/influxdb/client_test.py | 403 +++++----------------- 2 files changed, 151 insertions(+), 867 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 6fcae994..4e4ffcbf 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -18,7 +18,7 @@ class InfluxDBClientError(Exception): - "Raised when an error occurs in the request" + """Raised when an error occurs in the request""" def __init__(self, content, code): super(InfluxDBClientError, self).__init__( "{0}: {1}".format(code, content)) @@ -99,7 +99,13 @@ def __init__(self, 'Content-type': 'application/json', 'Accept': 'text/plain'} - # Change member variables + @staticmethod + def _get_values_from_query_response(response): + """Returns a list of values from a query response""" + values = [ + value[0] for value in response['results'][0]['rows'][0]['values'] + ] + return values def switch_database(self, database): """ @@ -112,18 +118,6 @@ def switch_database(self, database): """ self._database = database - def switch_db(self, database): - """ - DEPRECATED. Change client database. - - """ - warnings.warn( - "switch_db is deprecated, and will be removed " - "in future versions. Please use " - "``InfluxDBClient.switch_database(database)`` instead.", - FutureWarning) - return self.switch_database(database) - def switch_user(self, username, password): """ switch_user() @@ -173,173 +167,98 @@ def request(self, url, method='GET', params=None, data=None, else: raise InfluxDBClientError(response.content, response.status_code) - def write(self, data): - """ Provided as convenience for influxdb v0.9.0, this may change. """ + def write(self, data, params=None, expected_response_code=200): + """ Write to influxdb """ self.request( url="write", method='POST', - params=None, + params=params, data=data, - expected_response_code=200 + expected_response_code=expected_response_code ) return True - # Writing Data - # - # Assuming you have a database named foo_production you can write data - # by doing a POST to /db/foo_production/series?u=some_user&p=some_password - # with a JSON body of points. - - def write_points(self, data, time_precision='s', *args, **kwargs): + def query(self, + query, + params={}, + expected_response_code=200, + database=None): """ - Write to multiple time series names. + Query data - :param data: A list of dicts. - :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' - or 'u'. - :param batch_size: [Optional] Value to write the points in batches - instead of all at one time. Useful for when doing data dumps from - one database to another or when doing a massive write operation - :type batch_size: int + :param params: Additional parameters to be passed to requests. + :param database: Database to query, default to None. + :param expected_response_code: Expected response code. Defaults to 200. """ - def list_chunks(l, n): - """ Yield successive n-sized chunks from l. - """ - for i in xrange(0, len(l), n): - yield l[i:i + n] - - batch_size = kwargs.get('batch_size') - if batch_size: - for item in data: - name = item.get('name') - columns = item.get('columns') - point_list = item.get('points') - - for batch in list_chunks(point_list, batch_size): - item = [{ - "points": batch, - "name": name, - "columns": columns - }] - self._write_points( - data=item, - time_precision=time_precision) - - return True - - return self._write_points(data=data, time_precision=time_precision) - - def write_points_with_precision(self, data, time_precision='s'): - """ - DEPRECATED. Write to multiple time series names + params['q'] = query + if database: + params['db'] = database + + response = self.request( + url="query", + method='GET', + params=params, + data=None, + expected_response_code=expected_response_code + ) + return response.json() + def write_points(self, + points, + time_precision=None, + database=None, + retention_policy=None, + *args, + **kwargs): """ - warnings.warn( - "write_points_with_precision is deprecated, and will be removed " - "in future versions. Please use " - "``InfluxDBClient.write_points(time_precision='..')`` instead.", - FutureWarning) - return self._write_points(data=data, time_precision=time_precision) - - def _write_points(self, data, time_precision): - if time_precision not in ['s', 'm', 'ms', 'u']: + Write to multiple time series names. + + :param points: A list of dicts. + :param time_precision: [Optional, default None] Either 's', 'm', 'ms' + or 'u'. + :param database The database to write the points to. Defaults to + the client's current db. + :param retention_policy The retention policy for the points. + """ + #TODO: re-implement chunks. + return self._write_points(points=points, + time_precision=time_precision, + database=database, + retention_policy=retention_policy) + + def _write_points(self, points, time_precision, database, retention_policy): + if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]: raise Exception( - "Invalid time precision is given. (use 's', 'm', 'ms' or 'u')") + "Invalid time precision is given. (use 'n', 'u', 'ms', 's', 'm' or 'h')") - if self.use_udp and time_precision != 's': + if self.use_udp and (time_precision != 's' or None): raise Exception( "InfluxDB only supports seconds precision for udp writes" ) - url = "db/{0}/series".format(self._database) - - params = { - 'time_precision': time_precision + data = { + 'points': points } + if time_precision: + data['precision'] = time_precision + + if retention_policy: + data['retentionPolicy'] = retention_policy + + data['database'] = database or self._database + if self.use_udp: self.send_packet(data) else: - self.request( - url=url, - method='POST', - params=params, + self.write( data=data, expected_response_code=200 ) return True - # One Time Deletes - - def delete_points(self, name): - """ - Delete an entire series - """ - url = "db/{0}/series/{1}".format(self._database, name) - - self.request( - url=url, - method='DELETE', - expected_response_code=204 - ) - - return True - - # Regularly Scheduled Deletes - - def create_scheduled_delete(self, json_body): - """ - TODO: Create scheduled delete - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - # get list of deletes - # curl http://localhost:8086/db/site_dev/scheduled_deletes - # - # remove a regularly scheduled delete - # curl -X DELETE http://localhost:8086/db/site_dev/scheduled_deletes/:id - - def get_list_scheduled_delete(self): - """ - TODO: Get list of scheduled deletes - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def remove_scheduled_delete(self, delete_id): - """ - TODO: Remove scheduled delete - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def query(self, query, time_precision='s', chunked=False): - """ - Quering data - - :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' - or 'u'. - :param chunked: [Optional, default=False] True if the data shall be - retrieved in chunks, False otherwise. - """ - return self._query(query, time_precision=time_precision, - chunked=chunked) - - # Querying Data - # - # GET db/:name/series. It takes five parameters def _query(self, query, time_precision='s', chunked=False): if time_precision not in ['s', 'm', 'ms', 'u']: raise Exception( @@ -371,404 +290,14 @@ def _query(self, query, time_precision='s', chunked=False): else: return response.json() - # Creating and Dropping Databases - # - # ### create a database - # curl -X POST http://localhost:8086/db -d '{"name": "site_development"}' - # - # ### drop a database - # curl -X DELETE http://localhost:8086/db/site_development - - def create_database(self, database): - """ - create_database() - - Create a database on the InfluxDB server. - - :param database: the name of the database to create - :type database: string - :rtype: boolean - """ - url = "db" - - data = {'name': database} - - self.request( - url=url, - method='POST', - data=data, - expected_response_code=201 - ) - - return True - - def delete_database(self, database): - """ - delete_database() - - Drop a database on the InfluxDB server. - - :param database: the name of the database to delete - :type database: string - :rtype: boolean - """ - url = "db/{0}".format(database) - - self.request( - url=url, - method='DELETE', - expected_response_code=204 - ) - - return True - - # ### get list of databases - # curl -X GET http://localhost:8086/db - def get_list_database(self): """ Get the list of databases """ - url = "db" - - response = self.request( - url=url, - method='GET', - expected_response_code=200 - ) - - return response.json() - - def get_database_list(self): - """ - DEPRECATED. Get the list of databases - - """ - warnings.warn( - "get_database_list is deprecated, and will be removed " - "in future versions. Please use " - "``InfluxDBClient.get_list_database`` instead.", - FutureWarning) - return self.get_list_database() - - def delete_series(self, series): - """ - delete_series() - - Drop a series on the InfluxDB server. - - :param series: the name of the series to delete - :type series: string - :rtype: boolean - """ - url = "db/{0}/series/{1}".format( - self._database, - series + return self._get_values_from_query_response( + self.query("SHOW DATABASES") ) - self.request( - url=url, - method='DELETE', - expected_response_code=204 - ) - - return True - - def get_list_series(self): - """ - Get a list of all time series in a database - """ - - response = self._query('list series') - - series_list = [] - for series in response[0]['points']: - series_list.append(series[1]) - - return series_list - - def get_list_continuous_queries(self): - """ - Get a list of continuous queries - """ - - response = self._query('list continuous queries') - queries_list = [] - for query in response[0]['points']: - queries_list.append(query[2]) - - return queries_list - - # Security - # get list of cluster admins - # curl http://localhost:8086/cluster_admins?u=root&p=root - - # add cluster admin - # curl -X POST http://localhost:8086/cluster_admins?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update cluster admin password - # curl -X POST http://localhost:8086/cluster_admins/paul?u=root&p=root \ - # -d '{"password": "new pass"}' - - # delete cluster admin - # curl -X DELETE http://localhost:8086/cluster_admins/paul?u=root&p=root - - # Database admins, with a database name of site_dev - # get list of database admins - # curl http://localhost:8086/db/site_dev/admins?u=root&p=root - - # add database admin - # curl -X POST http://localhost:8086/db/site_dev/admins?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update database admin password - # curl -X POST http://localhost:8086/db/site_dev/admins/paul?u=root&p=root\ - # -d '{"password": "new pass"}' - - # delete database admin - # curl -X DELETE \ - # http://localhost:8086/db/site_dev/admins/paul?u=root&p=root - - def get_list_cluster_admins(self): - """ - Get list of cluster admins - """ - response = self.request( - url="cluster_admins", - method='GET', - expected_response_code=200 - ) - - return response.json() - - def add_cluster_admin(self, new_username, new_password): - """ - Add cluster admin - """ - data = { - 'name': new_username, - 'password': new_password - } - - self.request( - url="cluster_admins", - method='POST', - data=data, - expected_response_code=200 - ) - - return True - - def update_cluster_admin_password(self, username, new_password): - """ - Update cluster admin password - """ - url = "cluster_admins/{0}".format(username) - - data = { - 'password': new_password - } - - self.request( - url=url, - method='POST', - data=data, - expected_response_code=200 - ) - - return True - - def delete_cluster_admin(self, username): - """ - Delete cluster admin - """ - url = "cluster_admins/{0}".format(username) - - self.request( - url=url, - method='DELETE', - expected_response_code=200 - ) - - return True - - def set_database_admin(self, username): - """ - Set user as database admin - """ - return self.alter_database_admin(username, True) - - def unset_database_admin(self, username): - """ - Unset user as database admin - """ - return self.alter_database_admin(username, False) - - def alter_database_admin(self, username, is_admin): - url = "db/{0}/users/{1}".format(self._database, username) - - data = {'admin': is_admin} - - self.request( - url=url, - method='POST', - data=data, - expected_response_code=200 - ) - - return True - - def get_list_database_admins(self): - """ - TODO: Get list of database admins - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def add_database_admin(self, new_username, new_password): - """ - TODO: Add cluster admin - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def update_database_admin_password(self, username, new_password): - """ - TODO: Update database admin password - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def delete_database_admin(self, username): - """ - TODO: Delete database admin - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - ### - # Limiting User Access - - # Database users - # get list of database users - # curl http://localhost:8086/db/site_dev/users?u=root&p=root - - # add database user - # curl -X POST http://localhost:8086/db/site_dev/users?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update database user password - # curl -X POST http://localhost:8086/db/site_dev/users/paul?u=root&p=root \ - # -d '{"password": "new pass"}' - - # delete database user - # curl -X DELETE http://localhost:8086/db/site_dev/users/paul?u=root&p=root - - def get_database_users(self): - """ - Get list of database users - """ - url = "db/{0}/users".format(self._database) - - response = self.request( - url=url, - method='GET', - expected_response_code=200 - ) - - return response.json() - - def add_database_user(self, new_username, new_password, permissions=None): - """ - Add database user - - :param permissions: A ``(readFrom, writeTo)`` tuple - """ - url = "db/{0}/users".format(self._database) - - data = { - 'name': new_username, - 'password': new_password - } - - if permissions: - try: - data['readFrom'], data['writeTo'] = permissions - except (ValueError, TypeError): - raise TypeError( - "'permissions' must be (readFrom, writeTo) tuple" - ) - - self.request( - url=url, - method='POST', - data=data, - expected_response_code=200 - ) - - return True - - def update_database_user_password(self, username, new_password): - """ - Update password - """ - url = "db/{0}/users/{1}".format(self._database, username) - - data = { - 'password': new_password - } - - self.request( - url=url, - method='POST', - data=data, - expected_response_code=200 - ) - - if username == self._username: - self._password = new_password - - return True - - def delete_database_user(self, username): - """ - Delete database user - """ - url = "db/{0}/users/{1}".format(self._database, username) - - self.request( - url=url, - method='DELETE', - expected_response_code=200 - ) - - return True - - # update the user by POSTing to db/site_dev/users/paul - - def update_permission(self, username, json_body): - """ - TODO: Update read/write permission - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - def send_packet(self, packet): data = json.dumps(packet) byte = data.encode('utf-8') diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 8cec9657..7b6bb2d2 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -90,12 +90,6 @@ def test_switch_database(self): cli.switch_database('another_database') assert cli._database == 'another_database' - @raises(FutureWarning) - def test_switch_db_deprecated(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'database') - cli.switch_db('another_database') - assert cli._database == 'another_database' - def test_switch_user(self): cli = InfluxDBClient('host', 8086, 'username', 'password', 'database') cli.switch_user('another_username', 'another_password') @@ -134,19 +128,36 @@ def test_write_points(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/db/db/series" + "http://localhost:8086/write" ) cli = InfluxDBClient(database='db') cli.write_points( - self.dummy_points - ) - - self.assertListEqual( - json.loads(m.last_request.body), - self.dummy_points + self.dummy_points, ) + self.assertDictEqual( + {u'points': + [ + { + u'points': [ + [u'1', 1, 1.0], + [u'2', 2, 2.0] + ], + u'name': u'foo', + u'columns': [ + u'column_one', + u'column_two', + u'column_three' + ] + } + ], + u'database': u'db', + }, + json.loads(m.last_request.body) + ) + + @unittest.skip('Not implemented for 0.9') def test_write_points_string(self): with requests_mock.Mocker() as m: m.register_uri( @@ -164,6 +175,7 @@ def test_write_points_string(self): self.dummy_points ) + @unittest.skip('Not implemented for 0.9') def test_write_points_batch(self): with _mocked_session('post', 200, self.dummy_points): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') @@ -172,6 +184,7 @@ def test_write_points_batch(self): batch_size=2 ) is True + @unittest.skip('Not implemented for 0.9') def test_write_points_udp(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('0.0.0.0', 4444)) @@ -187,6 +200,7 @@ def test_write_points_udp(self): assert self.dummy_points == \ json.loads(received_data.decode(), strict=True) + @unittest.skip('Not implemented for 0.9') def test_write_bad_precision_udp(self): cli = InfluxDBClient( 'localhost', 8086, 'root', 'root', @@ -209,15 +223,45 @@ def test_write_points_fails(self): cli.write_points([]) def test_write_points_with_precision(self): - with _mocked_session('post', 200, self.dummy_points): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - assert cli.write_points(self.dummy_points) is True + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/write" + ) + + cli = InfluxDBClient(database='db') + cli.write_points( + self.dummy_points, + time_precision='n' + ) + + self.assertDictEqual( + {u'points': + [ + { + u'points': [ + [u'1', 1, 1.0], + [u'2', 2, 2.0] + ], + u'name': u'foo', + u'columns': [ + u'column_one', + u'column_two', + u'column_three' + ] + } + ], + u'database': u'db', + u'precision': u'n', + }, + json.loads(m.last_request.body) + ) def test_write_points_bad_precision(self): cli = InfluxDBClient() with self.assertRaisesRegexp( Exception, - "Invalid time precision is given. \(use 's', 'm', 'ms' or 'u'\)" + "Invalid time precision is given. \(use 'n', 'u', 'ms', 's', 'm' or 'h'\)" ): cli.write_points( self.dummy_points, @@ -230,38 +274,6 @@ def test_write_points_with_precision_fails(self): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') cli.write_points_with_precision([]) - def test_delete_points(self): - with _mocked_session('delete', 204) as mocked: - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - assert cli.delete_points("foo") is True - - assert len(mocked.call_args_list) == 1 - args, kwds = mocked.call_args_list[0] - - assert kwds['params'] == {'u': 'username', 'p': 'password'} - assert kwds['url'] == 'http://host:8086/db/db/series/foo' - - @raises(Exception) - def test_delete_points_with_wrong_name(self): - with _mocked_session('delete', 400): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.delete_points("nonexist") - - @raises(NotImplementedError) - def test_create_scheduled_delete(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.create_scheduled_delete([]) - - @raises(NotImplementedError) - def test_get_list_scheduled_delete(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.get_list_scheduled_delete() - - @raises(NotImplementedError) - def test_remove_scheduled_delete(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.remove_scheduled_delete(1) - def test_query(self): data = [ { @@ -278,6 +290,7 @@ def test_query(self): result = cli.query('select column_one from foo;') assert len(result[0]['points']) == 4 + @unittest.skip('Not implemented for 0.9') def test_query_chunked(self): cli = InfluxDBClient(database='db') example_object = { @@ -316,14 +329,7 @@ def test_query_fail(self): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') cli.query('select column_one from foo;') - def test_query_bad_precision(self): - cli = InfluxDBClient() - with self.assertRaisesRegexp( - Exception, - "Invalid time precision is given. \(use 's', 'm', 'ms' or 'u'\)" - ): - cli.query('select column_one from foo', time_precision='g') - + @unittest.skip('Not implemented for 0.9') def test_create_database(self): with _mocked_session('post', 201, {"name": "new_db"}): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') @@ -335,6 +341,7 @@ def test_create_database_fails(self): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') cli.create_database('new_db') + @unittest.skip('Not implemented for 0.9') def test_delete_database(self): with _mocked_session('delete', 204): cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') @@ -347,13 +354,21 @@ def test_delete_database_fails(self): cli.delete_database('old_db') def test_get_list_database(self): - data = [ - {"name": "a_db"} - ] - with _mocked_session('get', 200, data): + data = { + "results": + [ + {"rows": [ + {"columns": ["name"], + "values":[["mydb"], ["myotherdb"]]}]} + ] + } + + with _mocked_session('get', 200, json.dumps(data)): cli = InfluxDBClient('host', 8086, 'username', 'password') - assert len(cli.get_list_database()) == 1 - assert cli.get_list_database()[0]['name'] == 'a_db' + self.assertListEqual( + cli.get_list_database(), + [u'mydb', u'myotherdb'] + ) @raises(Exception) def test_get_list_database_fails(self): @@ -361,27 +376,7 @@ def test_get_list_database_fails(self): cli = InfluxDBClient('host', 8086, 'username', 'password') cli.get_list_database() - @raises(FutureWarning) - def test_get_database_list_deprecated(self): - data = [ - {"name": "a_db"} - ] - with _mocked_session('get', 200, data): - cli = InfluxDBClient('host', 8086, 'username', 'password') - assert len(cli.get_database_list()) == 1 - assert cli.get_database_list()[0]['name'] == 'a_db' - - def test_delete_series(self): - with _mocked_session('delete', 204): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.delete_series('old_series') - - @raises(Exception) - def test_delete_series_fails(self): - with _mocked_session('delete', 401): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.delete_series('old_series') - + @unittest.skip('Not implemented for 0.9') def test_get_series_list(self): cli = InfluxDBClient(database='db') @@ -400,243 +395,3 @@ def test_get_series_list(self): cli.get_list_series(), ['foo', 'bar'] ) - - def test_get_continuous_queries(self): - cli = InfluxDBClient(database='db') - - with requests_mock.Mocker() as m: - - # Tip: put this in a json linter! - example_response = '[ { "name": "continuous queries", "columns"' \ - ': [ "time", "id", "query" ], "points": [ [ ' \ - '0, 1, "select foo(bar,95) from \\"foo_bar' \ - 's\\" group by time(5m) into response_times.' \ - 'percentiles.5m.95" ], [ 0, 2, "select perce' \ - 'ntile(value,95) from \\"response_times\\" g' \ - 'roup by time(5m) into response_times.percen' \ - 'tiles.5m.95" ] ] } ]' - - m.register_uri( - requests_mock.GET, - "http://localhost:8086/db/db/series", - text=example_response - ) - - self.assertListEqual( - cli.get_list_continuous_queries(), - [ - 'select foo(bar,95) from "foo_bars" group ' - 'by time(5m) into response_times.percentiles.5m.95', - - 'select percentile(value,95) from "response_times" group ' - 'by time(5m) into response_times.percentiles.5m.95' - ] - ) - - def test_get_list_cluster_admins(self): - pass - - def test_add_cluster_admin(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/cluster_admins" - ) - - cli = InfluxDBClient(database='db') - cli.add_cluster_admin( - new_username='paul', - new_password='laup' - ) - - self.assertDictEqual( - json.loads(m.last_request.body), - { - 'name': 'paul', - 'password': 'laup' - } - ) - - def test_update_cluster_admin_password(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/cluster_admins/paul" - ) - - cli = InfluxDBClient(database='db') - cli.update_cluster_admin_password( - username='paul', - new_password='laup' - ) - - self.assertDictEqual( - json.loads(m.last_request.body), - {'password': 'laup'} - ) - - def test_delete_cluster_admin(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.DELETE, - "http://localhost:8086/cluster_admins/paul", - status_code=200, - ) - - cli = InfluxDBClient(database='db') - cli.delete_cluster_admin(username='paul') - - self.assertIsNone(m.last_request.body) - - def test_set_database_admin(self): - pass - - def test_unset_database_admin(self): - pass - - def test_alter_database_admin(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/db/db/users/paul" - ) - - cli = InfluxDBClient(database='db') - cli.alter_database_admin( - username='paul', - is_admin=False - ) - - self.assertDictEqual( - json.loads(m.last_request.body), - { - 'admin': False - } - ) - - @raises(NotImplementedError) - def test_get_list_database_admins(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.get_list_database_admins() - - @raises(NotImplementedError) - def test_add_database_admin(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.add_database_admin('admin', 'admin_secret_password') - - @raises(NotImplementedError) - def test_update_database_admin_password(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.update_database_admin_password('admin', 'admin_secret_password') - - @raises(NotImplementedError) - def test_delete_database_admin(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.delete_database_admin('admin') - - def test_get_database_users(self): - cli = InfluxDBClient('localhost', 8086, 'username', 'password', 'db') - - example_response = \ - '[{"name":"paul","isAdmin":false,"writeTo":".*","readFrom":".*"},'\ - '{"name":"bobby","isAdmin":false,"writeTo":".*","readFrom":".*"}]' - - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.GET, - "http://localhost:8086/db/db/users", - text=example_response - ) - users = cli.get_database_users() - - self.assertEqual(json.loads(example_response), users) - - def test_add_database_user(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/db/db/users" - ) - cli = InfluxDBClient(database='db') - cli.add_database_user( - new_username='paul', - new_password='laup', - permissions=('.*', '.*') - ) - - self.assertDictEqual( - json.loads(m.last_request.body), - { - 'writeTo': '.*', - 'password': 'laup', - 'readFrom': '.*', - 'name': 'paul' - } - ) - - def test_add_database_user_bad_permissions(self): - cli = InfluxDBClient() - - with self.assertRaisesRegexp( - Exception, - "'permissions' must be \(readFrom, writeTo\) tuple" - ): - cli.add_database_user( - new_password='paul', - new_username='paul', - permissions=('hello', 'hello', 'hello') - ) - - def test_update_database_user_password(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/db/db/users/paul" - ) - - cli = InfluxDBClient(database='db') - cli.update_database_user_password( - username='paul', - new_password='laup' - ) - - self.assertDictEqual( - json.loads(m.last_request.body), - {'password': 'laup'} - ) - - def test_update_database_user_password_current_user(self): - cli = InfluxDBClient( - username='root', - password='hello', - database='database' - ) - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.POST, - "http://localhost:8086/db/database/users/root" - ) - - cli.update_database_user_password( - username='root', - new_password='bye' - ) - - self.assertEqual(cli._password, 'bye') - - def test_delete_database_user(self): - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.DELETE, - "http://localhost:8086/db/db/users/paul" - ) - - cli = InfluxDBClient(database='db') - cli.delete_database_user(username='paul') - - self.assertIsNone(m.last_request.body) - - @raises(NotImplementedError) - def test_update_permission(self): - cli = InfluxDBClient('host', 8086, 'username', 'password', 'db') - cli.update_permission('admin', [])