From 7d268b91cdacb42068dbddd73935aa5355dad23f Mon Sep 17 00:00:00 2001 From: Ben Blamey Date: Mon, 20 Nov 2017 22:09:40 +0100 Subject: [PATCH 1/5] interesting model rest client --- example_interestingness.py | 43 ++++++++++++ haste_storage_client/core.py | 47 +++++++++++-- haste_storage_client/interestingness_model.py | 66 +++++++++++++++++++ setup.py | 2 +- 4 files changed, 150 insertions(+), 8 deletions(-) create mode 100644 example_interestingness.py create mode 100644 haste_storage_client/interestingness_model.py diff --git a/example_interestingness.py b/example_interestingness.py new file mode 100644 index 0000000..6687b19 --- /dev/null +++ b/example_interestingness.py @@ -0,0 +1,43 @@ +import time +import datetime +from haste_storage_client.core import HasteStorageClient +from haste_storage_client.interestingness_model import RestInterestingnessModel +from keystoneauth1.identity import v3 + +# Create a password auth plugin +# See: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password +auth = v3.Password(auth_url='https://foo.se:5000/v3/', + username='my_snic_username', + password='my_snic_password', + user_domain_name='foo', + project_name='my_project', + project_domain_name='some_domain') + +# Identifies both the experiment, and the session (ie. unique each time the stream starts), +# for example, this would be a good format - this needs to be generated at the stream edge. +stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + "_exp1" + +# Specify REST server with interesting model: +interestingnessModel = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') + +client = HasteStorageClient(stream_id, + 'localhost', # IP address of database server. + 27017, + auth, + interestingness_model=interestingnessModel) + +blob = b'this is a binary blob eg. image data.' +timestamp_cloud_edge = time.time() + +client.save(timestamp_cloud_edge, + (12.34, 56.78), + blob, + {'image_height_pixels': 300, # bag of extracted features here + 'image_width_pixels': 300, + 'number_of_green_pixels': 1234}) + +client.close() + + + +#interestingnessModel.interestingness(123456, (1, 2, 3), {'metadata_1': 1, 'bar': 2}) diff --git a/haste_storage_client/core.py b/haste_storage_client/core.py index 7625f14..726db73 100644 --- a/haste_storage_client/core.py +++ b/haste_storage_client/core.py @@ -4,11 +4,14 @@ class HasteStorageClient: + INTERESTINGNESS_DEFAULT = 1.0 + def __init__(self, stream_id, host, port, - keystone_auth): + keystone_auth, + interestingness_model=None): """ :param stream_id: String ID for the stream session - used to group all the data (unique for each execution of the experiment) @@ -16,10 +19,13 @@ def __init__(self, :param port: Database server port. Usually 27017. :param keystone_auth: OpenCloud keystone auth v3 password object, see: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password + :param interestingness_model: InterestingnessModel to determine interestingness of the document, + and hence the intended storage class. """ self.mongo_client = MongoClient(host, port) self.mongo_db = self.mongo_client.streams self.stream_id = stream_id + self.interestingness_model = interestingness_model keystone_session = session.Session(auth=keystone_auth) self.swift_conn = swiftclient.client.Connection(session=keystone_session) @@ -30,24 +36,51 @@ def save(self, blob, metadata): """ - :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. + :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. uniquely identifies the + document. :param location: n-tuple representing spatial information (eg. (x,y)). :param blob: binary blob (eg. image). :param metadata: dictionary containing extracted metadata (eg. image features). """ - blob_name = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp) - blob_location = 'swift' + + interestingness = self.__interestingness(location, metadata, unix_timestamp) + + blob_id, blob_location = self.__save_blob(blob, interestingness, unix_timestamp) result = self.mongo_db['strm_' + self.stream_id].insert({ 'timestamp': unix_timestamp, 'location': location, - 'blob_id': blob_name, + 'interestingness': interestingness, + 'blob_id': blob_id, 'blob_location': blob_location, 'metadata': metadata, }) - self.swift_conn.put_object('Haste_Stream_Storage', blob_name, blob) - def close(self): self.mongo_client.close() self.swift_conn.close() + + def __save_blob(self, blob, interestingness, unix_timestamp): + if interestingness > 0.1: + blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp) + blob_location = 'swift' + self.swift_conn.put_object('Haste_Stream_Storage', blob_id, blob) + else: + blob_id = None + blob_location = '(deleted)' + return blob_id, blob_location + + def __interestingness(self, location, metadata, unix_timestamp): + if self.interestingness_model is not None: + try: + result = self.interestingness_model.interestingness(unix_timestamp, + location, + metadata) + interestingness = result['interestingness'] + except Exception as ex: + print(ex) + print('falling back to ' + str(self.INTERESTINGNESS_DEFAULT)) + interestingness = self.INTERESTINGNESS_DEFAULT + else: + interestingness = self.INTERESTINGNESS_DEFAULT + return interestingness diff --git a/haste_storage_client/interestingness_model.py b/haste_storage_client/interestingness_model.py new file mode 100644 index 0000000..6c570e2 --- /dev/null +++ b/haste_storage_client/interestingness_model.py @@ -0,0 +1,66 @@ +import abc +import urllib.request +import urllib.parse +import json + + +class InterestingnessModel(object, metaclass=abc.ABCMeta): + @abc.abstractmethod + def interestingness(self, + unix_timestamp, + location, + metadata): + """ + :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. + :param location: n-tuple representing spatial information (eg. (x,y)). + :param metadata: dictionary containing extracted metadata (eg. image features). + """ + raise NotImplementedError('users must define interestingness(..) to use this base class') + + +class RestInterestingnessModel(InterestingnessModel): + + def __init__(self, url): + self.url = url + """ + :param url for Rest Service, should accept HTTP GET JSON of the format: + { + 'unix_timestamp': 1234.5678, + 'location': [0.1,2.3,4.5], + 'metadata': { + 'feature_1':'foo', + 'feature_2':42 + } + } + ...and respond with something like the following: + { + 'interestingness':0.5 + } + Where the interestingness is in the closed interval [0,1] + """ + + + def interestingness(self, + unix_timestamp, + location, + metadata): + """ + :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. + :param location: n-tuple representing spatial information (eg. (x,y)). + :param metadata: dictionary containing extracted metadata (eg. image features). + """ + + headers = {'User-Agent': 'haste_storage_client (0.x)', + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + + values = {'unix_timestamp': unix_timestamp, + 'location': location, + 'metadata': metadata} + + data = urllib.parse.urlencode(values) + req = urllib.request.Request(self.url + '?' + data, headers=headers) + with urllib.request.urlopen(req) as response: + response = response.read().decode("utf-8") + decoded = json.loads(response) + return {'interestingness': float(decoded['interestingness'])} diff --git a/setup.py b/setup.py index a015d29..cf9c0c0 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='haste_storage_client', - version='0.1', + version='0.2', py_modules=['haste_storage_client'], install_requires=[ 'pymongo', From 673ed1e127f056aa45c4c83558c75807ea3df838 Mon Sep 17 00:00:00 2001 From: Ben Blamey Date: Mon, 20 Nov 2017 22:13:48 +0100 Subject: [PATCH 2/5] tidy up --- example.py | 7 ++- example_interestingness.py | 43 ------------------- haste_storage_client/interestingness_model.py | 15 +++---- 3 files changed, 13 insertions(+), 52 deletions(-) delete mode 100644 example_interestingness.py diff --git a/example.py b/example.py index a956b85..723742a 100644 --- a/example.py +++ b/example.py @@ -1,6 +1,7 @@ import time import datetime from haste_storage_client.core import HasteStorageClient +from haste_storage_client.interestingness_model import RestInterestingnessModel from keystoneauth1.identity import v3 # Create a password auth plugin @@ -16,10 +17,14 @@ # for example, this would be a good format - this needs to be generated at the stream edge. stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + "_exp1" +# Optionally, specify REST server with interesting model: +interestingnessModel = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') + client = HasteStorageClient(stream_id, 'localhost', # IP address of database server. 27017, - auth) + auth, + interestingness_model=interestingnessModel) blob = b'this is a binary blob eg. image data.' timestamp_cloud_edge = time.time() diff --git a/example_interestingness.py b/example_interestingness.py deleted file mode 100644 index 6687b19..0000000 --- a/example_interestingness.py +++ /dev/null @@ -1,43 +0,0 @@ -import time -import datetime -from haste_storage_client.core import HasteStorageClient -from haste_storage_client.interestingness_model import RestInterestingnessModel -from keystoneauth1.identity import v3 - -# Create a password auth plugin -# See: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password -auth = v3.Password(auth_url='https://foo.se:5000/v3/', - username='my_snic_username', - password='my_snic_password', - user_domain_name='foo', - project_name='my_project', - project_domain_name='some_domain') - -# Identifies both the experiment, and the session (ie. unique each time the stream starts), -# for example, this would be a good format - this needs to be generated at the stream edge. -stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + "_exp1" - -# Specify REST server with interesting model: -interestingnessModel = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') - -client = HasteStorageClient(stream_id, - 'localhost', # IP address of database server. - 27017, - auth, - interestingness_model=interestingnessModel) - -blob = b'this is a binary blob eg. image data.' -timestamp_cloud_edge = time.time() - -client.save(timestamp_cloud_edge, - (12.34, 56.78), - blob, - {'image_height_pixels': 300, # bag of extracted features here - 'image_width_pixels': 300, - 'number_of_green_pixels': 1234}) - -client.close() - - - -#interestingnessModel.interestingness(123456, (1, 2, 3), {'metadata_1': 1, 'bar': 2}) diff --git a/haste_storage_client/interestingness_model.py b/haste_storage_client/interestingness_model.py index 6c570e2..7823ee4 100644 --- a/haste_storage_client/interestingness_model.py +++ b/haste_storage_client/interestingness_model.py @@ -25,21 +25,20 @@ def __init__(self, url): """ :param url for Rest Service, should accept HTTP GET JSON of the format: { - 'unix_timestamp': 1234.5678, - 'location': [0.1,2.3,4.5], - 'metadata': { - 'feature_1':'foo', - 'feature_2':42 + "unix_timestamp": 1234.5678, + "location": [0.1,2.3,4.5], + "metadata": { + "feature_1":"foo", + "feature_2":42 } } - ...and respond with something like the following: + ...and respond with: { - 'interestingness':0.5 + "interestingness":0.5 } Where the interestingness is in the closed interval [0,1] """ - def interestingness(self, unix_timestamp, location, From d7e2b095cf3b9a0952fd3b8f8a5ffcdf96a3f0a8 Mon Sep 17 00:00:00 2001 From: Ben Blamey Date: Mon, 20 Nov 2017 22:55:02 +0100 Subject: [PATCH 3/5] storage policy - WIP --- example.py | 4 ++- haste_storage_client/core.py | 48 +++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/example.py b/example.py index 723742a..6b37fa7 100644 --- a/example.py +++ b/example.py @@ -24,7 +24,9 @@ 'localhost', # IP address of database server. 27017, auth, - interestingness_model=interestingnessModel) + interestingness_model=interestingnessModel, + storage_policy=[(lambda x: x > 0.5, 'swift')], + default_storage_class='trash') blob = b'this is a binary blob eg. image data.' timestamp_cloud_edge = time.time() diff --git a/haste_storage_client/core.py b/haste_storage_client/core.py index 726db73..2db0e88 100644 --- a/haste_storage_client/core.py +++ b/haste_storage_client/core.py @@ -11,7 +11,9 @@ def __init__(self, host, port, keystone_auth, - interestingness_model=None): + interestingness_model=None, + storage_policy=None, + default_storage_class='swift'): """ :param stream_id: String ID for the stream session - used to group all the data (unique for each execution of the experiment) @@ -21,11 +23,20 @@ def __init__(self, see: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password :param interestingness_model: InterestingnessModel to determine interestingness of the document, and hence the intended storage class. + :param storage_policy: policy mapping interestingness to storage class(es). Supported are 'swift' and + None (meaning discard). + :param default_storage_class: default storage class if no matches in policy. + None means the document will be discarded. """ + if default_storage_class is None: + raise ValueError("default_storage_location cannot be None - did you mean 'trash'?") + self.mongo_client = MongoClient(host, port) self.mongo_db = self.mongo_client.streams self.stream_id = stream_id self.interestingness_model = interestingness_model + self.storage_policy = storage_policy + self.default_storage_class = default_storage_class keystone_session = session.Session(auth=keystone_auth) self.swift_conn = swiftclient.client.Connection(session=keystone_session) @@ -38,21 +49,23 @@ def save(self, """ :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. uniquely identifies the document. - :param location: n-tuple representing spatial information (eg. (x,y)). + :param location: n-tuple representing spatial information (eg. (bsc,y)). :param blob: binary blob (eg. image). :param metadata: dictionary containing extracted metadata (eg. image features). """ interestingness = self.__interestingness(location, metadata, unix_timestamp) + blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp) + blob_storage_classes = self.__save_blob(blob_id, blob, interestingness) - blob_id, blob_location = self.__save_blob(blob, interestingness, unix_timestamp) + blob_storage_classes = ['Trash' if bsc is None else bsc for bsc in blob_storage_classes] result = self.mongo_db['strm_' + self.stream_id].insert({ 'timestamp': unix_timestamp, 'location': location, 'interestingness': interestingness, 'blob_id': blob_id, - 'blob_location': blob_location, + 'blob_storage_classes': blob_storage_classes, 'metadata': metadata, }) @@ -60,15 +73,28 @@ def close(self): self.mongo_client.close() self.swift_conn.close() - def __save_blob(self, blob, interestingness, unix_timestamp): - if interestingness > 0.1: - blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp) - blob_location = 'swift' + def __save_blob(self, blob_id, blob, interestingness): + blob_locations = [] + if self.storage_policy is not None: + for interestingness_lambda, storage_class in self.storage_policy: + if interestingness_lambda(interestingness): + self.__save_blob_to_class(blob, blob_id, storage_class) + blob_locations.append(storage_class) + + if blob_locations.count == 0: + self.__save_blob_to_class(blob, blob_id, self.default_storage_class) + blob_locations.append(self.default_storage_class) + + return blob_locations + + def __save_blob_to_class(self, blob, blob_id, storage_class): + if storage_class is None: + # Implies discard. + pass + if storage_class == 'swift': self.swift_conn.put_object('Haste_Stream_Storage', blob_id, blob) else: - blob_id = None - blob_location = '(deleted)' - return blob_id, blob_location + raise ValueError('unknown storage class') def __interestingness(self, location, metadata, unix_timestamp): if self.interestingness_model is not None: From ce7de0834771896127ee43632b3d3fcc22c8e2e4 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Wed, 22 Nov 2017 12:18:45 +0000 Subject: [PATCH 4/5] move client config to a json configuration file or config dictionary --- config/haste_storage_client_config.json | 1 + example.py | 29 +++++++++++--------- haste_storage_client/core.py | 35 ++++++++++++++++++++----- 3 files changed, 46 insertions(+), 19 deletions(-) create mode 100644 config/haste_storage_client_config.json diff --git a/config/haste_storage_client_config.json b/config/haste_storage_client_config.json new file mode 100644 index 0000000..b6dfb99 --- /dev/null +++ b/config/haste_storage_client_config.json @@ -0,0 +1 @@ +{"haste_metadata_db_port": "27017", "os_swift_auth_credentials": {"username": xxxxx", "project_name": "xxxxx", "user_domain_name": xxxx", "auth_url": "xxxxx", "password": "xxxx", "project_domain_name": "xxxx"}, "haste_metadata_db_server": "130.xxx.yy.zz"} diff --git a/example.py b/example.py index 6b37fa7..33bf673 100644 --- a/example.py +++ b/example.py @@ -2,16 +2,22 @@ import datetime from haste_storage_client.core import HasteStorageClient from haste_storage_client.interestingness_model import RestInterestingnessModel -from keystoneauth1.identity import v3 -# Create a password auth plugin -# See: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password -auth = v3.Password(auth_url='https://foo.se:5000/v3/', - username='my_snic_username', - password='my_snic_password', - user_domain_name='foo', - project_name='my_project', - project_domain_name='some_domain') + + +haste_storage_client_config = { + + "haste_metadata_db_server":"haste storage server IP", + "haste_metadata_db_port":"haste storage server port", + "os_swift_auth_credentials":{ + "auth_url":'https://foo.se:5000/v3/', + "username":'my_snic_username', + "password":'my_snic_password', + "user_domain_name":'foo', + "project_name":'my_project', + "project_domain_name":'some_domain' + } +} # Identifies both the experiment, and the session (ie. unique each time the stream starts), # for example, this would be a good format - this needs to be generated at the stream edge. @@ -20,10 +26,9 @@ # Optionally, specify REST server with interesting model: interestingnessModel = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') + + client = HasteStorageClient(stream_id, - 'localhost', # IP address of database server. - 27017, - auth, interestingness_model=interestingnessModel, storage_policy=[(lambda x: x > 0.5, 'swift')], default_storage_class='trash') diff --git a/haste_storage_client/core.py b/haste_storage_client/core.py index 2db0e88..a9ccd7e 100644 --- a/haste_storage_client/core.py +++ b/haste_storage_client/core.py @@ -1,16 +1,16 @@ from pymongo import MongoClient from keystoneauth1 import session +from keystoneauth1.identity import v3 import swiftclient.client - +from os.path import expanduser +import json class HasteStorageClient: INTERESTINGNESS_DEFAULT = 1.0 def __init__(self, stream_id, - host, - port, - keystone_auth, + config = None, interestingness_model=None, storage_policy=None, default_storage_class='swift'): @@ -28,18 +28,39 @@ def __init__(self, :param default_storage_class: default storage class if no matches in policy. None means the document will be discarded. """ + + if config is None: + try: + config=self._get_config() + except: + raise ValueError("If config is None, provide a configuration file.") + if default_storage_class is None: raise ValueError("default_storage_location cannot be None - did you mean 'trash'?") - self.mongo_client = MongoClient(host, port) + + self.mongo_client = MongoClient(config["haste_metadata_db_server"], int(config["haste_metadata_db_port"])) self.mongo_db = self.mongo_client.streams self.stream_id = stream_id self.interestingness_model = interestingness_model self.storage_policy = storage_policy self.default_storage_class = default_storage_class + + # Establish a connection to the OpenStack Swift storage backend + self.swift_conn = self._get_os_swift_connection(config["os_swift_auth_credentials"]) - keystone_session = session.Session(auth=keystone_auth) - self.swift_conn = swiftclient.client.Connection(session=keystone_session) + + def _get_config(self): + home = expanduser("~") + default_config_dir = home+"/.haste" + with open(default_config_dir+"/haste_storage_client_config.json") as fh: + haste_storage_client_config = json.load(fh) + return haste_storage_client_config + + def _get_os_swift_connection(self, swift_auth_credentials): + auth = v3.Password(**swift_auth_credentials) + keystone_session = session.Session(auth=auth) + return swiftclient.client.Connection(session=keystone_session) def save(self, unix_timestamp, From 73f3499dc0c1be40d1c241e8b8f3ff2eb7ea62a4 Mon Sep 17 00:00:00 2001 From: Ben Blamey Date: Fri, 24 Nov 2017 23:21:31 +0100 Subject: [PATCH 5/5] tidy up --- .gitignore | 1 + config/haste_storage_client_config.json | 15 +- example.py | 41 ++--- haste_storage_client/core.py | 158 +++++++++--------- haste_storage_client/interestingness_model.py | 29 +--- readme.md | 6 +- 6 files changed, 126 insertions(+), 124 deletions(-) diff --git a/.gitignore b/.gitignore index f7c83f6..5c22d36 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **.iml __pycache__ **.egg-info +**.tmp \ No newline at end of file diff --git a/config/haste_storage_client_config.json b/config/haste_storage_client_config.json index b6dfb99..a7895ad 100644 --- a/config/haste_storage_client_config.json +++ b/config/haste_storage_client_config.json @@ -1 +1,14 @@ -{"haste_metadata_db_port": "27017", "os_swift_auth_credentials": {"username": xxxxx", "project_name": "xxxxx", "user_domain_name": xxxx", "auth_url": "xxxxx", "password": "xxxx", "project_domain_name": "xxxx"}, "haste_metadata_db_server": "130.xxx.yy.zz"} +{ + "haste_metadata_server": { + "host": "130.xxx.yy.zz", + "port": 27017 + }, + "os_swift": { + "username": "xxxxx", + "password": "xxxx", + "project_name": "xxxxx", + "user_domain_name": "xxxx", + "auth_url": "xxxxx", + "project_domain_name": "xxxx" + } +} diff --git a/example.py b/example.py index 33bf673..bb4ca01 100644 --- a/example.py +++ b/example.py @@ -1,37 +1,40 @@ import time import datetime -from haste_storage_client.core import HasteStorageClient +from haste_storage_client.core import HasteStorageClient, OS_SWIFT_STORAGE, TRASH from haste_storage_client.interestingness_model import RestInterestingnessModel - haste_storage_client_config = { - - "haste_metadata_db_server":"haste storage server IP", - "haste_metadata_db_port":"haste storage server port", - "os_swift_auth_credentials":{ - "auth_url":'https://foo.se:5000/v3/', - "username":'my_snic_username', - "password":'my_snic_password', - "user_domain_name":'foo', - "project_name":'my_project', - "project_domain_name":'some_domain' - } + 'haste_metadata_server': { + 'host': '130.xxx.yy.zz', + 'port': 27017 + }, + 'os_swift': { + # See: https://docs.openstack.org/keystoneauth/latest/ + # api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password + 'username': 'xxxxx', + 'password': 'xxxx', + 'project_name': 'xxxxx', + 'user_domain_name': 'xxxx', + 'auth_url': 'xxxxx', + 'project_domain_name': 'xxxx' + } } # Identifies both the experiment, and the session (ie. unique each time the stream starts), # for example, this would be a good format - this needs to be generated at the stream edge. -stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + "_exp1" +initials = 'jb' +stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + '_exp1_' + initials # Optionally, specify REST server with interesting model: -interestingnessModel = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') - +interestingness_model = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate') client = HasteStorageClient(stream_id, - interestingness_model=interestingnessModel, - storage_policy=[(lambda x: x > 0.5, 'swift')], - default_storage_class='trash') + config=haste_storage_client_config, + interestingness_model=interestingness_model, + storage_policy=[(0.5, 1.0, OS_SWIFT_STORAGE)], # map 0.5<=interestingness<=1.0 to OS swift. + default_storage=TRASH) # discard blobs which don't match the policy above. blob = b'this is a binary blob eg. image data.' timestamp_cloud_edge = time.time() diff --git a/haste_storage_client/core.py b/haste_storage_client/core.py index a9ccd7e..a78d06c 100644 --- a/haste_storage_client/core.py +++ b/haste_storage_client/core.py @@ -1,133 +1,135 @@ from pymongo import MongoClient from keystoneauth1 import session from keystoneauth1.identity import v3 -import swiftclient.client from os.path import expanduser +import swiftclient.client import json +OS_SWIFT_STORAGE = 'os_swift' +TRASH = 'trash' +INTERESTINGNESS_DEFAULT = 1.0 + + class HasteStorageClient: - INTERESTINGNESS_DEFAULT = 1.0 def __init__(self, stream_id, - config = None, + config=None, interestingness_model=None, storage_policy=None, - default_storage_class='swift'): + default_storage=OS_SWIFT_STORAGE): """ - :param stream_id: String ID for the stream session - used to group all the data - (unique for each execution of the experiment) - :param host: Hostname/IP of database server. - :param port: Database server port. Usually 27017. - :param keystone_auth: OpenCloud keystone auth v3 password object, - see: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password - :param interestingness_model: InterestingnessModel to determine interestingness of the document, - and hence the intended storage class. - :param storage_policy: policy mapping interestingness to storage class(es). Supported are 'swift' and - None (meaning discard). - :param default_storage_class: default storage class if no matches in policy. - None means the document will be discarded. + :param stream_id (str): ID for the stream session - used to group all the data for that streaming session. + *unique for each execution of the experiment*. + :param config (dict): dictionary of credentials and hostname for metadata server and storage, + see haste_storage_client_config.json for structure. + If `None`, will be read from ~/.haste/haste_storage_client_config.json + :param interestingness_model (InterestingnessModel): determines interestingness of the document, + and hence the intended storage platform(s) for the blob. + `None` implies all documents will have interestingness=1.0 + :param storage_policy (list): policy mapping (closed) intervals of interestingness to storage platforms, eg.: + [(0.5, 1.0, 'os_swift')] + Overlapping intervals mean that the blob will be saved to multiple classes. + Valid storage platforms are: 'os_swift' + If `None`, `default_storage` will be used. + :param default_storage (str): storage platform if no policy matches the interestingness level. + valid platforms are those for `storage_policy`, and 'trash' meaning discard the blob. """ if config is None: try: - config=self._get_config() + config = self.__read_config_file() except: - raise ValueError("If config is None, provide a configuration file.") - - if default_storage_class is None: + raise ValueError('If config is None, provide a configuration file.') + + if default_storage is None: raise ValueError("default_storage_location cannot be None - did you mean 'trash'?") - - self.mongo_client = MongoClient(config["haste_metadata_db_server"], int(config["haste_metadata_db_port"])) + self.mongo_client = MongoClient(config['haste_metadata_server']['host'], + config['haste_metadata_server']['port']) self.mongo_db = self.mongo_client.streams self.stream_id = stream_id self.interestingness_model = interestingness_model self.storage_policy = storage_policy - self.default_storage_class = default_storage_class - - # Establish a connection to the OpenStack Swift storage backend - self.swift_conn = self._get_os_swift_connection(config["os_swift_auth_credentials"]) - - - def _get_config(self): - home = expanduser("~") - default_config_dir = home+"/.haste" - with open(default_config_dir+"/haste_storage_client_config.json") as fh: + self.default_storage = default_storage + self.os_swift_conn = self.__open_os_swift_connection(config[OS_SWIFT_STORAGE]) + + @staticmethod + def __read_config_file(): + with open(expanduser('~/.haste/haste_storage_client_config.json')) as fh: haste_storage_client_config = json.load(fh) return haste_storage_client_config - - def _get_os_swift_connection(self, swift_auth_credentials): + + @staticmethod + def __open_os_swift_connection(swift_auth_credentials): auth = v3.Password(**swift_auth_credentials) keystone_session = session.Session(auth=auth) return swiftclient.client.Connection(session=keystone_session) - + def save(self, unix_timestamp, location, blob, metadata): """ - :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. uniquely identifies the - document. - :param location: n-tuple representing spatial information (eg. (bsc,y)). - :param blob: binary blob (eg. image). - :param metadata: dictionary containing extracted metadata (eg. image features). + :param unix_timestamp (float): should come from the cloud edge (eg. microscope). floating point. + *Uniquely identifies the document within the streaming session*. + :param location (tuple): spatial information (eg. (x,y)). + :param blob (byte array): binary blob (eg. image). + :param metadata (dict): extracted metadata (eg. image features). """ - interestingness = self.__interestingness(location, metadata, unix_timestamp) + interestingness = self.__get_interestingness(metadata) blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp) - blob_storage_classes = self.__save_blob(blob_id, blob, interestingness) + blob_storage_platforms = self.__save_blob(blob_id, blob, interestingness) + if len(blob_storage_platforms) == 0: + blob_id = '' - blob_storage_classes = ['Trash' if bsc is None else bsc for bsc in blob_storage_classes] + document = {'timestamp': unix_timestamp, + 'location': location, + 'interestingness': interestingness, + 'blob_id': blob_id, + 'blob_storage_platforms': blob_storage_platforms, + 'metadata': metadata, } + result = self.mongo_db['strm_' + self.stream_id].insert(document) - result = self.mongo_db['strm_' + self.stream_id].insert({ - 'timestamp': unix_timestamp, - 'location': location, - 'interestingness': interestingness, - 'blob_id': blob_id, - 'blob_storage_classes': blob_storage_classes, - 'metadata': metadata, - }) + return document def close(self): self.mongo_client.close() - self.swift_conn.close() + self.os_swift_conn.close() def __save_blob(self, blob_id, blob, interestingness): - blob_locations = [] + storage_platforms = [] if self.storage_policy is not None: - for interestingness_lambda, storage_class in self.storage_policy: - if interestingness_lambda(interestingness): - self.__save_blob_to_class(blob, blob_id, storage_class) - blob_locations.append(storage_class) - - if blob_locations.count == 0: - self.__save_blob_to_class(blob, blob_id, self.default_storage_class) - blob_locations.append(self.default_storage_class) - - return blob_locations - - def __save_blob_to_class(self, blob, blob_id, storage_class): - if storage_class is None: - # Implies discard. - pass - if storage_class == 'swift': - self.swift_conn.put_object('Haste_Stream_Storage', blob_id, blob) + for min_interestingness, max_interestingness, storage in self.storage_policy: + if min_interestingness <= interestingness <= max_interestingness and (storage not in storage_platforms): + self.__save_blob_to_platform(blob, blob_id, storage) + storage_platforms.append(storage) + + if len(storage_platforms) == 0 and self.default_storage != TRASH: + self.__save_blob_to_platform(blob, blob_id, self.default_storage) + storage_platforms.append(self.default_storage) + + return storage_platforms + + def __save_blob_to_platform(self, blob, blob_id, storage_platform): + if storage_platform == OS_SWIFT_STORAGE: + self.os_swift_conn.put_object('Haste_Stream_Storage', blob_id, blob) + elif storage_platform == TRASH: + raise ValueError('trash cannot be specified in a storage policy, only as a default') else: - raise ValueError('unknown storage class') + raise ValueError('unknown storage platform') - def __interestingness(self, location, metadata, unix_timestamp): + def __get_interestingness(self, metadata): if self.interestingness_model is not None: try: - result = self.interestingness_model.interestingness(unix_timestamp, - location, - metadata) + result = self.interestingness_model.interestingness(metadata) interestingness = result['interestingness'] except Exception as ex: print(ex) - print('falling back to ' + str(self.INTERESTINGNESS_DEFAULT)) - interestingness = self.INTERESTINGNESS_DEFAULT + print('interestingness - falling back to ' + str(INTERESTINGNESS_DEFAULT)) + interestingness = INTERESTINGNESS_DEFAULT else: - interestingness = self.INTERESTINGNESS_DEFAULT + interestingness = INTERESTINGNESS_DEFAULT return interestingness diff --git a/haste_storage_client/interestingness_model.py b/haste_storage_client/interestingness_model.py index 7823ee4..75866dc 100644 --- a/haste_storage_client/interestingness_model.py +++ b/haste_storage_client/interestingness_model.py @@ -7,12 +7,8 @@ class InterestingnessModel(object, metaclass=abc.ABCMeta): @abc.abstractmethod def interestingness(self, - unix_timestamp, - location, metadata): """ - :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. - :param location: n-tuple representing spatial information (eg. (x,y)). :param metadata: dictionary containing extracted metadata (eg. image features). """ raise NotImplementedError('users must define interestingness(..) to use this base class') @@ -23,29 +19,16 @@ class RestInterestingnessModel(InterestingnessModel): def __init__(self, url): self.url = url """ - :param url for Rest Service, should accept HTTP GET JSON of the format: - { - "unix_timestamp": 1234.5678, - "location": [0.1,2.3,4.5], - "metadata": { - "feature_1":"foo", - "feature_2":42 - } - } + :param url: should accept HTTP GET /foo?feature_1=1&feature_2=42 ...and respond with: { - "interestingness":0.5 + "interestingness": 0.5 } Where the interestingness is in the closed interval [0,1] """ - def interestingness(self, - unix_timestamp, - location, - metadata): + def interestingness(self, metadata): """ - :param unix_timestamp: should come from the cloud edge (eg. microscope). floating point. - :param location: n-tuple representing spatial information (eg. (x,y)). :param metadata: dictionary containing extracted metadata (eg. image features). """ @@ -53,11 +36,7 @@ def interestingness(self, 'Content-Type': 'application/json', 'Accept': 'application/json'} - values = {'unix_timestamp': unix_timestamp, - 'location': location, - 'metadata': metadata} - - data = urllib.parse.urlencode(values) + data = urllib.parse.urlencode(metadata) req = urllib.request.Request(self.url + '?' + data, headers=headers) with urllib.request.urlopen(req) as response: response = response.read().decode("utf-8") diff --git a/readme.md b/readme.md index a6de937..61ac720 100644 --- a/readme.md +++ b/readme.md @@ -20,6 +20,10 @@ pip3 install -e . ## Example See [example.py](example.py). +## Config +Optionally, place `haste_storage_client_config.json` in ~/.haste/ (or windows equivalent), +instead of specifying config in constructor. + ### Note It isn't possible to connect to the database server from outside the SNIC cloud, so for local dev/testing you'll -need to use port forwarding from another machine. https://help.ubuntu.com/community/SSH/OpenSSH/PortForwarding \ No newline at end of file +need to use port forwarding from another machine. https://help.ubuntu.com/community/SSH/OpenSSH/PortForwarding