Skip to content

Commit

Permalink
RANGER-3982: updated Python client to support Ranger KMS REST APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
mneethiraj committed Dec 1, 2022
1 parent 32687a1 commit d0c6bdb
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 18 deletions.
110 changes: 109 additions & 1 deletion intg/src/main/python/README.md
Expand Up @@ -35,7 +35,7 @@ Verify if apache-ranger client is installed:

Package Version
------------ ---------
apache-ranger 0.0.7
apache-ranger 0.0.8
```

## Usage
Expand Down Expand Up @@ -120,4 +120,112 @@ ranger.delete_service_by_id(created_service.id)
print(' deleted service: id=' + str(created_service.id))

```

```python test_ranger_kms.py```
```python
# test_ranger_kms.py
from apache_ranger.client.ranger_kms_client import RangerKMSClient
from apache_ranger.client.ranger_client import HadoopSimpleAuth
from apache_ranger.model.ranger_kms import RangerKey
import time


##
## Step 1: create a client to connect to Apache Ranger KMS
##
kms_url = 'http://localhost:9292'
kms_auth = HadoopSimpleAuth('keyadmin')

# For Kerberos authentication
#
# from requests_kerberos import HTTPKerberosAuth
#
# kms_auth = HTTPKerberosAuth()
#
# For HTTP Basic authentication
#
# kms_auth = ('keyadmin', 'rangerR0cks!')

kms_client = RangerKMSClient(kms_url, kms_auth)



##
## Step 2: Let's call KMS APIs
##

kms_status = kms_client.kms_status()
print('kms_status():', kms_status)
print()

key_name = 'test_' + str(int(time.time() * 1000))

key = kms_client.create_key(RangerKey({'name':key_name}))
print('create_key(' + key_name + '):', key)
print()

rollover_key = kms_client.rollover_key(key_name, key.material)
print('rollover_key(' + key_name + '):', rollover_key)
print()

kms_client.invalidate_cache_for_key(key_name)
print('invalidate_cache_for_key(' + key_name + ')')
print()

key_metadata = kms_client.get_key_metadata(key_name)
print('get_key_metadata(' + key_name + '):', key_metadata)
print()

current_key = kms_client.get_current_key(key_name)
print('get_current_key(' + key_name + '):', current_key)
print()

encrypted_keys = kms_client.generate_encrypted_key(key_name, 6)
print('generate_encrypted_key(' + key_name + ', ' + str(6) + '):')
for i in range(len(encrypted_keys)):
encrypted_key = encrypted_keys[i]
decrypted_key = kms_client.decrypt_encrypted_key(key_name, encrypted_key.versionName, encrypted_key.iv, encrypted_key.encryptedKeyVersion.material)
reencrypted_key = kms_client.reencrypt_encrypted_key(key_name, encrypted_key.versionName, encrypted_key.iv, encrypted_key.encryptedKeyVersion.material)
print(' encrypted_keys[' + str(i) + ']: ', encrypted_key)
print(' decrypted_key[' + str(i) + ']: ', decrypted_key)
print(' reencrypted_key[' + str(i) + ']:', reencrypted_key)
print()

reencrypted_keys = kms_client.batch_reencrypt_encrypted_keys(key_name, encrypted_keys)
print('batch_reencrypt_encrypted_keys(' + key_name + ', ' + str(len(encrypted_keys)) + '):')
for i in range(len(reencrypted_keys)):
print(' batch_reencrypt_encrypted_key[' + str(i) + ']:', reencrypted_keys[i])
print()

key_versions = kms_client.get_key_versions(key_name)
print('get_key_versions(' + key_name + '):', len(key_versions))
for i in range(len(key_versions)):
print(' key_versions[' + str(i) + ']:', key_versions[i])
print()

for i in range(len(key_versions)):
key_version = kms_client.get_key_version(key_versions[i].versionName)
print('get_key_version(' + str(i) + '):', key_version)
print()

key_names = kms_client.get_key_names()
print('get_key_names():', len(key_names))
for i in range(len(key_names)):
print(' key_name[' + str(i) + ']:', key_names[i])
print()

keys_metadata = kms_client.get_keys_metadata(key_names)
print('get_keys_metadata(' + str(key_names) + '):', len(keys_metadata))
for i in range(len(keys_metadata)):
print(' key_metadata[' + str(i) + ']:', keys_metadata[i])
print()

key = kms_client.get_key(key_name)
print('get_key(' + key_name + '):', key)
print()

kms_client.delete_key(key_name)
print('delete_key(' + key_name + ')')
```

For more examples, checkout `sample-client` python project in [ranger-examples](https://github.com/apache/ranger/blob/master/ranger-examples/sample-client/src/main/python/sample_client.py) module.
29 changes: 26 additions & 3 deletions intg/src/main/python/apache_ranger/client/ranger_client.py
Expand Up @@ -30,10 +30,14 @@
from apache_ranger.utils import *
from requests import Session
from requests import Response
from requests.auth import AuthBase
from urllib.parse import urlencode
from urllib.parse import urljoin

LOG = logging.getLogger(__name__)

QUERY_PARAM_USER_DOT_NAME = 'user.name'.encode("utf-8")


class RangerClient:
def __init__(self, url, auth):
Expand Down Expand Up @@ -368,6 +372,21 @@ def delete_policy_deltas(self, days, reloadServicePoliciesCache):
DELETE_POLICY_DELTAS = API(URI_POLICY_DELTAS, HttpMethod.DELETE, HTTPStatus.NO_CONTENT)



class HadoopSimpleAuth(AuthBase):
def __init__(self, user_name):
self.user_name = user_name.encode("utf-8")

def __call__(self, req):
sep_char = '?'

if req.url.find('?') != -1:
sep_char = '&'

req.url = req.url + sep_char + urlencode({ QUERY_PARAM_USER_DOT_NAME: self.user_name })

return req

class Message(RangerBase):
def __init__(self, attrs=None):
if attrs is None:
Expand Down Expand Up @@ -449,17 +468,21 @@ def call_api(self, api, query_params=None, request_data=None):
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("<== __call_api(%s, %s, %s), result=%s", vars(api), params, request_data, response)

LOG.debug(response.json())
LOG.debug(response.content)

ret = response.json()
if response.content:
try:
ret = response.json()
except Exception:
ret = response.content
except Exception as e:
print(e)

LOG.exception("Exception occurred while parsing response with msg: %s", e)

raise RangerServiceException(api, response)
elif response.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
LOG.error("Ranger admin unavailable. HTTP Status: %s", HTTPStatus.SERVICE_UNAVAILABLE)
LOG.error("Ranger server at %s unavailable. HTTP Status: %s", self.url, HTTPStatus.SERVICE_UNAVAILABLE)

ret = None
elif response.status_code == HTTPStatus.NOT_FOUND:
Expand Down
157 changes: 157 additions & 0 deletions intg/src/main/python/apache_ranger/client/ranger_kms_client.py
@@ -0,0 +1,157 @@
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import json
import logging
from apache_ranger.exceptions import RangerServiceException
from apache_ranger.client.ranger_client import RangerClientHttp
from apache_ranger.model.ranger_kms import RangerKey
from apache_ranger.model.ranger_kms import RangerKeyVersion
from apache_ranger.model.ranger_kms import RangerKeyMetadata
from apache_ranger.model.ranger_kms import RangerEncryptedKeyVersion
from apache_ranger.utils import *

LOG = logging.getLogger(__name__)

#
# Python client for KMS REST APIs
# More details in https://hadoop.apache.org/docs/current/hadoop-kms/index.html#KMS_HTTP_REST_API
#
class RangerKMSClient:
def __init__(self, url, auth):
self.client_http = RangerClientHttp(url, auth)

logging.getLogger("requests").setLevel(logging.WARNING)


def create_key(self, key):
resp = self.client_http.call_api(RangerKMSClient.CREATE_KEY, request_data=key)

return type_coerce(resp, RangerKeyVersion)

def rollover_key(self, key_name, material=None):
resp = self.client_http.call_api(RangerKMSClient.ROLLOVER_KEY.format_path({ 'name': key_name }), request_data={ 'material': material})

return type_coerce(resp, RangerKeyVersion)

def invalidate_cache_for_key(self, key_name):
self.client_http.call_api(RangerKMSClient.INVALIDATE_CACHE_FOR_KEY.format_path({ 'name': key_name }))

def delete_key(self, key_name):
self.client_http.call_api(RangerKMSClient.DELETE_KEY.format_path({ 'name': key_name }))

def get_key_metadata(self, key_name):
resp = self.client_http.call_api(RangerKMSClient.GET_KEY_METADATA.format_path({ 'name': key_name }))

return type_coerce(resp, RangerKeyMetadata)

def get_current_key(self, key_name):
resp = self.client_http.call_api(RangerKMSClient.GET_CURRENT_KEY.format_path({ 'name': key_name }))

return type_coerce(resp, RangerKeyVersion)

def generate_encrypted_key(self, key_name, num_keys):
resp = self.client_http.call_api(RangerKMSClient.GENERATE_ENCRYPTED_KEY.format_path({'name': key_name}), query_params={'eek_op': 'generate', 'num_keys': num_keys})

return type_coerce_list(resp, RangerEncryptedKeyVersion)

def decrypt_encrypted_key(self, key_name, version_name, iv, material):
resp = self.client_http.call_api(RangerKMSClient.DECRYPT_ENCRYPTED_KEY.format_path({'version_name': version_name}), request_data={'name': key_name, 'iv': iv, 'material': material}, query_params={'eek_op': 'decrypt'})

return type_coerce(resp, RangerKeyVersion)

def reencrypt_encrypted_key(self, key_name, version_name, iv, material):
resp = self.client_http.call_api(RangerKMSClient.REENCRYPT_ENCRYPTED_KEY.format_path({'version_name': version_name}), request_data={'name': key_name, 'iv': iv, 'material': material}, query_params={'eek_op': 'reencrypt'})

return type_coerce(resp, RangerEncryptedKeyVersion)

def batch_reencrypt_encrypted_keys(self, key_name, encrypted_key_versions):
resp = self.client_http.call_api(RangerKMSClient.BATCH_REENCRYPT_ENCRYPTED_KEYS.format_path({'name': key_name}), request_data=encrypted_key_versions)

return type_coerce_list(resp, RangerEncryptedKeyVersion)

def get_key_version(self, version_name):
resp = self.client_http.call_api(RangerKMSClient.GET_KEY_VERSION.format_path({ 'version_name': version_name }))

return type_coerce(resp, RangerKeyVersion)

def get_key_versions(self, key_name):
resp = self.client_http.call_api(RangerKMSClient.GET_KEY_VERSIONS.format_path({ 'name': key_name}))

return type_coerce_list(resp, RangerKeyVersion)

def get_key_names(self):
resp = self.client_http.call_api(RangerKMSClient.GET_KEYS_NAMES)

return resp

def get_keys_metadata(self, key_names):
resp = self.client_http.call_api(RangerKMSClient.GET_KEYS_METADATA, query_params={'key': key_names})

return type_coerce_list(resp, RangerKeyMetadata)

# Ranger KMS
def get_key(self, key_name):
resp = self.client_http.call_api(RangerKMSClient.GET_KEY.format_path({ 'name': key_name }))

return type_coerce(resp, RangerKeyMetadata)

# Ranger KMS
def kms_status(self):
resp = self.client_http.call_api(RangerKMSClient.KMS_STATUS)

return resp

# URIs
URI_KEYS = "kms/v1/keys"
URI_KEY_BY_NAME = "kms/v1/key/{name}"
URI_KEY_INVALIDATE_CACHE = URI_KEY_BY_NAME + "/_invalidatecache"
URI_KEY_METADATA = URI_KEY_BY_NAME + "/_metadata"
URI_CURRENT_KEY = URI_KEY_BY_NAME + "/_currentversion"
URI_KEY_EEK = URI_KEY_BY_NAME + "/_eek"
URI_BATCH_REENCRYPT_KEYS = URI_KEY_BY_NAME + "/_reencryptbatch"
URI_KEY_VERSIONS = URI_KEY_BY_NAME + "/_versions"
URI_KEY_VERSION_BY_NAME = "kms/v1/keyversion/{version_name}"
URI_KEY_VERSION_BY_NAME_EEK = URI_KEY_VERSION_BY_NAME + "/_eek"
URI_KEYS_NAMES = URI_KEYS + "/names"
URI_KEYS_METADATA = URI_KEYS + "/metadata"

# Ranger KMS
URI_KMS_STATUS = "kms/api/status"


# APIs
CREATE_KEY = API(URI_KEYS, HttpMethod.POST, HTTPStatus.CREATED)
ROLLOVER_KEY = API(URI_KEY_BY_NAME, HttpMethod.POST, HTTPStatus.OK)
INVALIDATE_CACHE_FOR_KEY = API(URI_KEY_INVALIDATE_CACHE, HttpMethod.POST, HTTPStatus.OK)
DELETE_KEY = API(URI_KEY_BY_NAME, HttpMethod.DELETE, HTTPStatus.OK)
GET_KEY_METADATA = API(URI_KEY_METADATA, HttpMethod.GET, HTTPStatus.OK)
GET_CURRENT_KEY = API(URI_CURRENT_KEY, HttpMethod.GET, HTTPStatus.OK)
GENERATE_ENCRYPTED_KEY = API(URI_KEY_EEK, HttpMethod.GET, HTTPStatus.OK)
DECRYPT_ENCRYPTED_KEY = API(URI_KEY_VERSION_BY_NAME_EEK, HttpMethod.POST, HTTPStatus.OK)
REENCRYPT_ENCRYPTED_KEY = API(URI_KEY_VERSION_BY_NAME_EEK, HttpMethod.POST, HTTPStatus.OK)
BATCH_REENCRYPT_ENCRYPTED_KEYS = API(URI_BATCH_REENCRYPT_KEYS, HttpMethod.POST, HTTPStatus.OK)
GET_KEY_VERSION = API(URI_KEY_VERSION_BY_NAME, HttpMethod.GET, HTTPStatus.OK)
GET_KEY_VERSIONS = API(URI_KEY_VERSIONS, HttpMethod.GET, HTTPStatus.OK)
GET_KEYS_NAMES = API(URI_KEYS_NAMES, HttpMethod.GET, HTTPStatus.OK)
GET_KEYS_METADATA = API(URI_KEYS_METADATA, HttpMethod.GET, HTTPStatus.OK)

# Ranger KMS
GET_KEY = API(URI_KEY_BY_NAME, HttpMethod.GET, HTTPStatus.OK)
KMS_STATUS = API(URI_KMS_STATUS, HttpMethod.GET, HTTPStatus.OK)
11 changes: 8 additions & 3 deletions intg/src/main/python/apache_ranger/exceptions.py
Expand Up @@ -36,10 +36,15 @@ def __init__(self, api, response):
print(response)

if api is not None and response is not None:
respJson = response.json()
if response.content:
try:
respJson = response.json()
self.msgDesc = respJson['msgDesc'] if respJson is not None and 'msgDesc' in respJson else None
self.messageList = respJson['messageList'] if respJson is not None and 'messageList' in respJson else None
except Exception:
self.msgDesc = response.content
self.messageList = [ response.content ]

self.statusCode = response.status_code
self.msgDesc = respJson['msgDesc'] if respJson is not None and 'msgDesc' in respJson else None
self.messageList = respJson['messageList'] if respJson is not None and 'messageList' in respJson else None

Exception.__init__(self, "{} {} failed: expected_status={}, status={}, message={}".format(self.method, self.path, self.expected_status, self.statusCode, self.msgDesc))
17 changes: 12 additions & 5 deletions intg/src/main/python/apache_ranger/model/ranger_base.py
Expand Up @@ -29,18 +29,25 @@ def __getattr__(self, attr):
return self.get(attr)

def __setattr__(self, key, value):
self.__setitem__(key, value)
if value is None:
self.__delitem__(key)
else:
self.__setitem__(key, value)

def __setitem__(self, key, value):
super(RangerBase, self).__setitem__(key, value)
self.__dict__.update({key: value})
if value is None:
self.__delitem__(key)
else:
super(RangerBase, self).__setitem__(key, value)
self.__dict__.update({key: value})

def __delattr__(self, item):
self.__delitem__(item)

def __delitem__(self, key):
super(RangerBase, self).__delitem__(key)
del self.__dict__[key]
if key in self.__dict__:
super(RangerBase, self).__delitem__(key)
del self.__dict__[key]

def __repr__(self):
return json.dumps(self)
Expand Down

0 comments on commit d0c6bdb

Please sign in to comment.