Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clowder #236

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
2 changes: 2 additions & 0 deletions pyincore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
from pyincore.client import Client
from pyincore.client import IncoreClient
from pyincore.client import InsecureIncoreClient
from pyincore.client import ClowderClient
from pyincore.hazardservice import HazardService
from pyincore.utils.expressioneval import Parser
from pyincore.dataservice import DataService
from pyincore.utils.geoutil import GeoUtil
from pyincore.utils.networkutil import NetworkUtil
from pyincore.dataservice import DataService
from pyincore.clowderdataservice import ClowderDataService
from pyincore.fragilityservice import FragilityService
from pyincore.repairservice import RepairService
from pyincore.restorationservice import RestorationService
Expand Down
212 changes: 211 additions & 1 deletion pyincore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ def retrieve_token_from_file(self):

Returns:
None if token file does not exist
dict: Dictionary containing authorization in the format "bearer access_token" if file exists, None otherwise
dict: Dictionary containing authorization in the format "bearer access_token" if file exists,
None otherwise

"""
if not os.path.isfile(self.token_file):
Expand Down Expand Up @@ -414,6 +415,215 @@ def clear_cache(self):
return


class ClowderClient(Client):
"""Clowder service client class. It contains token and service root url.

Args:
service_url (str): Service url.
token_file_name (str): Path to file containing the authorization token.

"""

def __init__(self, service_url: str = None, token_file_name: str = None):
super().__init__()
if service_url is None or len(service_url.strip()) == 0:
service_url = pyglobals.CLOWDER_API_PROD_URL
self.service_url = service_url

# hashlib requires bytes array for hash operations
byte_url_string = str.encode(self.service_url)
self.hashed_service_url = hashlib.sha256(byte_url_string).hexdigest()

self.create_service_json_entry()

# construct local directory and filename
cache_data = pyglobals.PYINCORE_USER_DATA_CACHE
if not os.path.exists(cache_data):
os.makedirs(cache_data)

self.hashed_svc_data_dir = os.path.join(cache_data, self.hashed_service_url)

if not os.path.exists(self.hashed_svc_data_dir):
os.makedirs(self.hashed_svc_data_dir)

# store the token file in the respective repository's directory
if token_file_name is None or len(token_file_name.strip()) == 0:
token_file_name = "." + self.hashed_service_url + "_token"
self.token_file = os.path.join(pyglobals.PYINCORE_USER_CACHE, token_file_name)

# read the apikey
if os.path.isfile(self.token_file):
try:
with open(self.token_file, 'r') as f:
authorization = f.read().splitlines()[0]
except IndexError:
raise
except OSError:
raise

self.session.headers["X-API-Key"] = authorization
print("Connection successful to Clowder services.", "pyIncore version detected:", pyglobals.PACKAGE_VERSION)

else:
print("Username/Password login not supported yet for Clowder services.")

# TODO check if we can use password/username to login to clowder
def login(self):
for attempt in range(pyglobals.MAX_LOGIN_ATTEMPTS):
try:
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
except EOFError as e:
logger.warning(e)
raise e
r = requests.post(self.token_url, data={'grant_type': 'password',
'client_id': pyglobals.CLIENT_ID,
'username': username, 'password': password})
if r.status_code == 200:
token = r.json()
if token is None or token["access_token"] is None:
logger.warning("Authentication Failed.")
exit(0)
authorization = str("bearer " + token["access_token"])
self.store_authorization_in_file(authorization)
self.session.headers['Authorization'] = authorization
return True
logger.warning("Authentication failed, attempting login again.")

logger.warning("Authentication failed.")
exit(0)

def store_authorization_in_file(self, authorization: str):
"""Store the access token in local file. If the file does not exist, this function creates it.

Args:
authorization (str): An authorization in the format "bearer access_token".

"""
try:
with open(self.token_file, 'w') as f:
f.write(authorization)
except IOError as e:
logger.warning(e)

def get(self, url: str, params=None, timeout=(30, 600), **kwargs):
"""Get server connection response.

Args:
url (str): Service url.
params (obj): Session parameters.
timeout (int): Session timeout.
**kwargs: A dictionary of external parameters.

Returns:
obj: HTTP response.

"""
r = self.session.get(url, params=params, timeout=timeout, **kwargs)

if r.status_code == 401:
raise

return self.return_http_response(r)

def post(self, url: str, data=None, json=None, timeout=(30, 600), **kwargs):
"""Post data on the server.

Args:
url (str): Service url.
data (obj): Data to be posted on the server.
json (obj): Description of the data, metadata json.
timeout (tuple[int,int]): Session timeout.
**kwargs: A dictionary of external parameters.

Returns:
obj: HTTP response.

"""
r = self.session.post(url, data=data, json=json, timeout=timeout, **kwargs)

if r.status_code == 401:
raise

return self.return_http_response(r)

def put(self, url: str, data=None, timeout=(30, 600), **kwargs):
"""Put data on the server.

Args:
url (str): Service url.
data (obj): Data to be put onn the server.
timeout (int): Session timeout.
**kwargs: A dictionary of external parameters.

Returns:
obj: HTTP response.

"""
r = self.session.put(url, data=data, timeout=timeout, **kwargs)

if r.status_code == 401:
raise

return self.return_http_response(r)

def delete(self, url: str, timeout=(30, 600), **kwargs):
"""Delete data on the server.

Args:
url (str): Service url.
timeout (int): Session timeout.
**kwargs: A dictionary of external parameters.

Returns:
obj: HTTP response.

"""
r = self.session.delete(url, timeout=timeout, **kwargs)

if r.status_code == 401:
raise

return self.return_http_response(r)

def create_service_json_entry(self):
update_hash_entry("add", hashed_url=self.hashed_service_url, service_url=self.service_url)

@staticmethod
def clear_root_cache():
# incase cache_data folder doesn't exist
if not os.path.isdir(pyglobals.PYINCORE_USER_DATA_CACHE):
logger.warning("User data cache does not exist")
return None

for root, dirs, files in os.walk(pyglobals.PYINCORE_USER_DATA_CACHE):
for d in dirs:
shutil.rmtree(os.path.join(root, d))
update_hash_entry("clear")
return None

def clear_cache(self):
"""
This function helps clear the data cache for a specific repository or the entire cache

Returns: None

"""
# incase cache_data folder doesn't exist
if not os.path.isdir(pyglobals.PYINCORE_USER_DATA_CACHE):
logger.warning("User data cache does not exist")
return None

if not os.path.isdir(self.hashed_svc_data_dir):
logger.warning("Cached folder doesn't exist")
return None

shutil.rmtree(self.hashed_svc_data_dir)
# clear entry from service.json
update_hash_entry("edit", hashed_url=self.hashed_service_url)
return


class InsecureIncoreClient(Client):
"""IN-CORE service client class that bypasses Ambassador auth. It contains token and service root url.

Expand Down
142 changes: 142 additions & 0 deletions pyincore/clowderdataservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) 2019 University of Illinois and others. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Mozilla Public License v2.0 which accompanies this distribution,
# and is available at https://www.mozilla.org/en-US/MPL/2.0/


import os
import re
import zipfile
from urllib.parse import urljoin

from pyincore import ClowderClient


class ClowderDataService:
"""Clowder Data service client.

Args:
client (ClowderClient): Service authentication.

"""

def __init__(self, client: ClowderClient):
self.client = client
self.base_url = urljoin(client.service_url, 'api/datasets/')
self.files_url = urljoin(client.service_url, 'api/files/')

def get_dataset_metadata(self, dataset_id: str):
"""Retrieve metadata from clowder data service. Dataset API endpoint is called.

Args:
dataset_id (str): ID of the Dataset.

Returns:
obj: HTTP response containing the metadata.

"""
# construct url with service, dataset api, and id
url = urljoin(self.base_url, dataset_id + "/metadata.jsonld")
r = self.client.get(url)
return r.json()

def get_dataset_blob(self, dataset_id: str, join=None):
"""Retrieve a blob of the dataset. Blob API endpoint is called.

Args:
dataset_id (str): ID of the Dataset.
join (bool): Add join parameter if True. Default None.

Returns:
str: Folder or file name.

"""
local_filename = None

# add another layer of dataset id folder to differentiate datasets with the same filename
cache_data_dir = os.path.join(self.client.hashed_svc_data_dir, dataset_id)

# if cache_data_dir doesn't exist create one
if not os.path.exists(cache_data_dir):
os.makedirs(cache_data_dir)
# for consistency check to ensure the repository hash is recorded in service.json
self.client.create_service_json_entry()

local_filename = self.download_dataset_blob(cache_data_dir, dataset_id)

# if cache_data_dir exist, check if id folder and zip file exist inside
else:
for fname in os.listdir(cache_data_dir):
if fname.endswith('.zip'):
local_filename = os.path.join(cache_data_dir, fname)
print('Dataset already exists locally. Reading from local cached zip.')
if not local_filename:
local_filename = self.download_dataset_blob(cache_data_dir, dataset_id)

folder = self.unzip_dataset(local_filename)
if folder is not None:
return folder
else:
return local_filename

def download_dataset_blob(self, cache_data_dir: str, dataset_id: str, join=None):
# construct url for file download
url = urljoin(self.base_url, dataset_id + '/download')

kwargs = {"stream": True}
if join is None:
r = self.client.get(url, **kwargs)
else:
payload = {}
if join is True:
payload['join'] = 'true'
elif join is False:
payload['join'] = 'false'
r = self.client.get(url, params=payload, **kwargs)

# extract filename
disposition = ""
for key in r.headers.keys():
if key.lower() == 'content-disposition':
disposition = r.headers[key]
fname = re.findall("filename\**=(.+)", disposition)

if len(fname) > 0:
local_filename = os.path.join(cache_data_dir, fname[0].strip('\"').strip('UTF-8').strip('\''))
else:
local_filename = dataset_id + ".zip"

# download
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)

return local_filename

def unzip_dataset(self, local_filename: str):
"""Unzip the dataset zip file.

Args:
local_filename (str): Name of the Dataset.

Returns:
str: Folder name with unzipped files.

"""
foldername, file_extension = os.path.splitext(local_filename)
# if it is not a zip file, no unzip
if not file_extension.lower() == '.zip':
print('It is not a zip file; no unzip')
return None
# check the folder existance, no unzip
if os.path.isdir(foldername):
print('Unzipped folder found in the local cache. Reading from it...')
return foldername
os.makedirs(foldername)

zip_ref = zipfile.ZipFile(local_filename, 'r')
zip_ref.extractall(foldername)
zip_ref.close()
return foldername
Loading