Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oscar_python/_providers/_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def upload_file(self, local_path, remote_path):
bucket_name = remote_path.split('/')[0]
file_key = remote_path.split('/', 1)[1]
file_name = local_path.split('/')[-1]
print("Uploading to bucket '{0}' with key '{1}'".format(bucket_name,file_key))
print("Uploading to bucket '{0}' with key '{1}'".format(bucket_name, file_key))
with open(local_path, 'rb') as data:
try:
self.client.upload_fileobj(data, bucket_name, file_key + "/" + file_name)
Expand Down
26 changes: 22 additions & 4 deletions oscar_python/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import base64
import json
import os
import requests
import liboidcagent as agent
Expand All @@ -32,14 +33,16 @@ def make_request(c, path, method, **kwargs):
url = c.endpoint+path

if method in ["post", "put"]:
if "token" in kwargs.keys() and kwargs["token"]:
if "token" in kwargs.keys() and kwargs["token"]:
headers = get_headers_with_token(kwargs["token"])
req_kwargs = {"headers": headers, "verify": c.ssl, "timeout": timeout}
if "data" in kwargs.keys() and kwargs["data"]:
result = requests.request(method, url, headers=headers, verify=c.ssl, data=kwargs["data"], timeout=timeout)
req_kwargs["data"] = kwargs["data"]
result = requests.request(method, url, **req_kwargs)
else:
result = requests.request(method, url, headers=headers, verify=c.ssl, timeout=timeout)

if "handle" in kwargs.keys() and kwargs["handle"] == False:
if "handle" in kwargs.keys() and kwargs["handle"] is False:
return result

result.raise_for_status()
Expand Down Expand Up @@ -94,7 +97,7 @@ def decode_b64(b64_str, file_out):
except ValueError:
print('Error decoding output: Invalid base64 string.')
except OSError:
print('Error decoding output: Failed to write decoded data to file.')
print('Error decoding output: Failed to write decoded data to file.')


def encode_input(data):
Expand All @@ -111,6 +114,21 @@ def encode_input(data):
return base64.b64encode(message_bytes)


def load_config(config_path, cluster_name="default"):
with open(config_path) as f:
config = json.load(f)
cluster = config["clusters"][cluster_name]
opts = {
"cluster_id": cluster_name,
"endpoint": cluster["endpoint"],
"ssl": cluster.get("ssl", True),
}
for key in ("user", "password", "shortname", "oidc_token", "refresh_token"):
if key in cluster:
opts[key] = cluster[key]
return opts


def decode_output(output, file_path):
if isBase64(output):
decode_b64(output, file_path)
Expand Down
128 changes: 122 additions & 6 deletions oscar_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
_SVC_PATH = "/system/services"
_LOGS_PATH = "/system/logs"
_RUN_PATH = "/run"
_STATUS_PATH="/system/status"
_STATUS_PATH = "/system/status"
_HEALTH_PATH = "/health"
_VOLUMES_PATH = "/system/volumes"
_BUCKETS_PATH = "/system/buckets"
_METRICS_PATH = "/system/metrics"
_QUOTAS_USER_PATH = "/system/quotas/user"


# _JOB_PATH = "/job"
Expand Down Expand Up @@ -101,13 +106,13 @@ def get_access_token(self):
""" Creates a generic storage client to interact with the storage providers
defined on a specific service of the refered OSCAR cluster """
def create_storage_client(self, svc=None):
if svc != None:
if svc is not None:
return Storage(
client_obj=self, svc_name=svc)
else:
return Storage(
client_obj=self)

""" Function to get cluster info """
def get_cluster_info(self):
return utils.make_request(self, _INFO_PATH, _GET)
Expand Down Expand Up @@ -137,17 +142,18 @@ def _check_fdl_definition(self, fdl_path):
raise Exception("FDL clusterID does not match current clusterID: {0}".format(err))
try:
if os.path.isabs(svc["script"]):
script_path = svc["script"]
script_path = svc["script"]
else:
fdl_directory = os.path.dirname(fdl_path)
script_path = os.path.join(fdl_directory, svc['script'])
with open(script_path) as s:
svc["script"] = s.read()
except IOError as e:
except IOError:
raise Exception("Couldn't read script")

# cpu parameter has to be string on the request
if type(svc["cpu"]) is int or type(svc["cpu"]) is float: svc["cpu"] = str(svc["cpu"])
if type(svc["cpu"]) is int or type(svc["cpu"]) is float:
svc["cpu"] = str(svc["cpu"])

except ValueError as err:
print(err)
Expand Down Expand Up @@ -198,6 +204,8 @@ def remove_service(self, name):
return utils.make_request(self, _SVC_PATH+"/"+name, _DELETE)

def _get_token(self, svc):
if self._AUTH_TYPE != 'basicauth':
return self.get_access_token()
service = utils.make_request(self, _SVC_PATH+"/"+svc, _GET)
service = json.loads(service.text)
return service["token"]
Expand All @@ -224,3 +232,111 @@ def remove_job(self, svc, job):
""" Remove all service jobs """
def remove_all_jobs(self, svc):
return utils.make_request(self, _LOGS_PATH+"/"+svc, _DELETE)

""" Check cluster health """
def health_check(self):
return utils.make_request(self, _HEALTH_PATH, _GET)

""" Get deployment status of a service """
def get_deployment_status(self, name):
return utils.make_request(self, _SVC_PATH + "/" + name + "/deployment", _GET)

""" Get deployment logs of a service """
def get_deployment_logs(self, name):
return utils.make_request(self, _SVC_PATH + "/" + name + "/deployment/logs", _GET)

""" List all managed volumes """
def list_volumes(self):
return utils.make_request(self, _VOLUMES_PATH, _GET)

""" Create a new managed volume """
def create_volume(self, name, size):
data = json.dumps({"name": name, "size": size})
return utils.make_request(self, _VOLUMES_PATH, _POST, data=data)

""" Get a specific managed volume """
def get_volume(self, name):
return utils.make_request(self, _VOLUMES_PATH + "/" + name, _GET)

""" Delete a managed volume """
def delete_volume(self, name):
return utils.make_request(self, _VOLUMES_PATH + "/" + name, _DELETE)

""" Create a bucket """
def create_bucket(self, name, visibility="private", allowed_users=None):
data = json.dumps({
"bucket_name": name,
"visibility": visibility,
"allowed_users": allowed_users or []
})
return utils.make_request(self, _BUCKETS_PATH, _POST, data=data)

""" Update a bucket """
def update_bucket(self, name, visibility, allowed_users=None):
data = json.dumps({
"bucket_name": name,
"visibility": visibility,
"allowed_users": allowed_users or []
})
return utils.make_request(self, _BUCKETS_PATH, _PUT, data=data)

""" List all buckets """
def list_buckets(self):
return utils.make_request(self, _BUCKETS_PATH, _GET)

""" Get a specific bucket """
def get_bucket(self, name):
return utils.make_request(self, _BUCKETS_PATH + "/" + name, _GET)

""" Delete a bucket """
def delete_bucket(self, name):
return utils.make_request(self, _BUCKETS_PATH + "/" + name, _DELETE)

""" Get a presigned URL for a bucket file """
def presign_bucket(self, name, object_key, operation="download", expires=0, content_type="", extra_headers=None):
path = _BUCKETS_PATH + "/" + name + "/presign"
data = json.dumps({
"object_key": object_key,
"operation": operation,
"expires": expires,
"content_type": content_type,
"extra_headers": extra_headers or {},
})
return utils.make_request(self, path, _POST, data=data)

""" Get system logs (admin only) """
def get_system_logs(self, timestamps=False, previous=False):
path = _LOGS_PATH
params = []
if timestamps:
params.append("timestamps=true")
if previous:
params.append("previous=true")
if params:
path += "?" + "&".join(params)
return utils.make_request(self, path, _GET)

""" Get metrics summary """
def get_metrics_summary(self):
return utils.make_request(self, _METRICS_PATH, _GET)

""" Get metrics breakdown """
def get_metrics_breakdown(self, group_by="service"):
return utils.make_request(self, _METRICS_PATH + "/breakdown?group_by=" + group_by, _GET)

""" Get metrics for a specific service """
def get_service_metrics(self, service_name):
return utils.make_request(self, _METRICS_PATH + "/" + service_name, _GET)

""" Get own quota """
def get_own_quota(self):
return utils.make_request(self, _QUOTAS_USER_PATH, _GET)

""" Get quota for a specific user """
def get_user_quota(self, user_id):
return utils.make_request(self, _QUOTAS_USER_PATH + "/" + user_id, _GET)

""" Update quota for a user """
def update_user_quota(self, user_id, cpu, memory):
data = json.dumps({"cpu": cpu, "memory": memory})
return utils.make_request(self, _QUOTAS_USER_PATH + "/" + user_id, _PUT, data=data)
2 changes: 1 addition & 1 deletion oscar_python/local_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from client import Client

client = Client("oscar-gpu-cluster","https://focused-boyd8.im.grycap.net", "oscar", "oscar123", True)
client = Client("oscar-gpu-cluster", "https://focused-boyd8.im.grycap.net", "oscar", "oscar123", True)

res = client.remove_service("cowsay")
if res:
Expand Down
7 changes: 3 additions & 4 deletions oscar_python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

# TODO check returns from functions
class Storage:
def __init__(self, client_obj, svc_name = None) -> None:
def __init__(self, client_obj, svc_name=None) -> None:
self.client_obj = client_obj
self.storage_providers = {}
if svc_name != None:
if svc_name is not None:
self.svc_name = svc_name
self._store_provider_from_service()
self._store_default_minio_provider()
Expand All @@ -46,13 +46,12 @@ def _store_provider_from_service(self):

""" Function to store the user credentials for the default MinIO provider """
def _store_default_minio_provider(self):
config = utils.make_request(self.client_obj, _CONFIG_PATH + "/" , _GET)
config = utils.make_request(self.client_obj, _CONFIG_PATH + "/", _GET)
if _MINIO in self.storage_providers:
self.storage_providers[_MINIO]["default"] = json.loads(config.text)["minio_provider"]
else:
default = {"default": json.loads(config.text)["minio_provider"]}
self.storage_providers[_MINIO] = default


""" Function to retreive credentials of a specific storage provider """
def _get_provider_creds(self, provider, provider_name):
Expand Down
Loading
Loading