Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
**.iml
__pycache__
**.egg-info
**.tmp
14 changes: 14 additions & 0 deletions config/haste_storage_client_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"haste_metadata_server": {
"host": "130.xxx.yy.zz",
"port": 27017
},
"os_swift": {
"username": "xxxxx",
"password": "xxxx",
"project_name": "xxxxx",
"user_domain_name": "xxxx",
"auth_url": "xxxxx",
"project_domain_name": "xxxx"
}
}
45 changes: 30 additions & 15 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
import time
import datetime
from haste_storage_client.core import HasteStorageClient
from keystoneauth1.identity import v3

# Create a password auth plugin
# See: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password
auth = v3.Password(auth_url='https://foo.se:5000/v3/',
username='my_snic_username',
password='my_snic_password',
user_domain_name='foo',
project_name='my_project',
project_domain_name='some_domain')
from haste_storage_client.core import HasteStorageClient, OS_SWIFT_STORAGE, TRASH
from haste_storage_client.interestingness_model import RestInterestingnessModel


haste_storage_client_config = {
'haste_metadata_server': {
'host': '130.xxx.yy.zz',
'port': 27017
},
'os_swift': {
# See: https://docs.openstack.org/keystoneauth/latest/
# api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password
'username': 'xxxxx',
'password': 'xxxx',
'project_name': 'xxxxx',
'user_domain_name': 'xxxx',
'auth_url': 'xxxxx',
'project_domain_name': 'xxxx'
}
}

# Identifies both the experiment, and the session (ie. unique each time the stream starts),
# for example, this would be a good format - this needs to be generated at the stream edge.
stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + "_exp1"
initials = 'jb'
stream_id = datetime.datetime.today().strftime('%Y_%m_%d__%H_%M_%S') + '_exp1_' + initials

# Optionally, specify REST server with interesting model:
interestingness_model = RestInterestingnessModel('http://localhost:5000/model/api/v0.1/evaluate')


client = HasteStorageClient(stream_id,
'localhost', # IP address of database server.
27017,
auth)
config=haste_storage_client_config,
interestingness_model=interestingness_model,
storage_policy=[(0.5, 1.0, OS_SWIFT_STORAGE)], # map 0.5<=interestingness<=1.0 to OS swift.
default_storage=TRASH) # discard blobs which don't match the policy above.

blob = b'this is a binary blob eg. image data.'
timestamp_cloud_edge = time.time()
Expand Down
136 changes: 109 additions & 27 deletions haste_storage_client/core.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,135 @@
from pymongo import MongoClient
from keystoneauth1 import session
from keystoneauth1.identity import v3
from os.path import expanduser
import swiftclient.client
import json

OS_SWIFT_STORAGE = 'os_swift'
TRASH = 'trash'
INTERESTINGNESS_DEFAULT = 1.0


class HasteStorageClient:

def __init__(self,
stream_id,
host,
port,
keystone_auth):
config=None,
interestingness_model=None,
storage_policy=None,
default_storage=OS_SWIFT_STORAGE):
"""
:param stream_id: String ID for the stream session - used to group all the data
(unique for each execution of the experiment)
:param host: Hostname/IP of database server.
:param port: Database server port. Usually 27017.
:param keystone_auth: OpenCloud keystone auth v3 password object,
see: https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.identity.v3.html#module-keystoneauth1.identity.v3.password
:param stream_id (str): ID for the stream session - used to group all the data for that streaming session.
*unique for each execution of the experiment*.
:param config (dict): dictionary of credentials and hostname for metadata server and storage,
see haste_storage_client_config.json for structure.
If `None`, will be read from ~/.haste/haste_storage_client_config.json
:param interestingness_model (InterestingnessModel): determines interestingness of the document,
and hence the intended storage platform(s) for the blob.
`None` implies all documents will have interestingness=1.0
:param storage_policy (list): policy mapping (closed) intervals of interestingness to storage platforms, eg.:
[(0.5, 1.0, 'os_swift')]
Overlapping intervals mean that the blob will be saved to multiple classes.
Valid storage platforms are: 'os_swift'
If `None`, `default_storage` will be used.
:param default_storage (str): storage platform if no policy matches the interestingness level.
valid platforms are those for `storage_policy`, and 'trash' meaning discard the blob.
"""
self.mongo_client = MongoClient(host, port)

if config is None:
try:
config = self.__read_config_file()
except:
raise ValueError('If config is None, provide a configuration file.')

if default_storage is None:
raise ValueError("default_storage_location cannot be None - did you mean 'trash'?")

self.mongo_client = MongoClient(config['haste_metadata_server']['host'],
config['haste_metadata_server']['port'])
self.mongo_db = self.mongo_client.streams
self.stream_id = stream_id
self.interestingness_model = interestingness_model
self.storage_policy = storage_policy
self.default_storage = default_storage
self.os_swift_conn = self.__open_os_swift_connection(config[OS_SWIFT_STORAGE])

keystone_session = session.Session(auth=keystone_auth)
self.swift_conn = swiftclient.client.Connection(session=keystone_session)
@staticmethod
def __read_config_file():
with open(expanduser('~/.haste/haste_storage_client_config.json')) as fh:
haste_storage_client_config = json.load(fh)
return haste_storage_client_config

@staticmethod
def __open_os_swift_connection(swift_auth_credentials):
auth = v3.Password(**swift_auth_credentials)
keystone_session = session.Session(auth=auth)
return swiftclient.client.Connection(session=keystone_session)

def save(self,
unix_timestamp,
location,
blob,
metadata):
"""
:param unix_timestamp: should come from the cloud edge (eg. microscope). floating point.
:param location: n-tuple representing spatial information (eg. (x,y)).
:param blob: binary blob (eg. image).
:param metadata: dictionary containing extracted metadata (eg. image features).
:param unix_timestamp (float): should come from the cloud edge (eg. microscope). floating point.
*Uniquely identifies the document within the streaming session*.
:param location (tuple): spatial information (eg. (x,y)).
:param blob (byte array): binary blob (eg. image).
:param metadata (dict): extracted metadata (eg. image features).
"""
blob_name = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp)
blob_location = 'swift'

result = self.mongo_db['strm_' + self.stream_id].insert({
'timestamp': unix_timestamp,
'location': location,
'blob_id': blob_name,
'blob_location': blob_location,
'metadata': metadata,
})
interestingness = self.__get_interestingness(metadata)
blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp)
blob_storage_platforms = self.__save_blob(blob_id, blob, interestingness)
if len(blob_storage_platforms) == 0:
blob_id = ''

document = {'timestamp': unix_timestamp,
'location': location,
'interestingness': interestingness,
'blob_id': blob_id,
'blob_storage_platforms': blob_storage_platforms,
'metadata': metadata, }
result = self.mongo_db['strm_' + self.stream_id].insert(document)

self.swift_conn.put_object('Haste_Stream_Storage', blob_name, blob)
return document

def close(self):
self.mongo_client.close()
self.swift_conn.close()
self.os_swift_conn.close()

def __save_blob(self, blob_id, blob, interestingness):
storage_platforms = []
if self.storage_policy is not None:
for min_interestingness, max_interestingness, storage in self.storage_policy:
if min_interestingness <= interestingness <= max_interestingness and (storage not in storage_platforms):
self.__save_blob_to_platform(blob, blob_id, storage)
storage_platforms.append(storage)

if len(storage_platforms) == 0 and self.default_storage != TRASH:
self.__save_blob_to_platform(blob, blob_id, self.default_storage)
storage_platforms.append(self.default_storage)

return storage_platforms

def __save_blob_to_platform(self, blob, blob_id, storage_platform):
if storage_platform == OS_SWIFT_STORAGE:
self.os_swift_conn.put_object('Haste_Stream_Storage', blob_id, blob)
elif storage_platform == TRASH:
raise ValueError('trash cannot be specified in a storage policy, only as a default')
else:
raise ValueError('unknown storage platform')

def __get_interestingness(self, metadata):
if self.interestingness_model is not None:
try:
result = self.interestingness_model.interestingness(metadata)
interestingness = result['interestingness']
except Exception as ex:
print(ex)
print('interestingness - falling back to ' + str(INTERESTINGNESS_DEFAULT))
interestingness = INTERESTINGNESS_DEFAULT
else:
interestingness = INTERESTINGNESS_DEFAULT
return interestingness
44 changes: 44 additions & 0 deletions haste_storage_client/interestingness_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import abc
import urllib.request
import urllib.parse
import json


class InterestingnessModel(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def interestingness(self,
metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""
raise NotImplementedError('users must define interestingness(..) to use this base class')


class RestInterestingnessModel(InterestingnessModel):

def __init__(self, url):
self.url = url
"""
:param url: should accept HTTP GET /foo?feature_1=1&feature_2=42
...and respond with:
{
"interestingness": 0.5
}
Where the interestingness is in the closed interval [0,1]
"""

def interestingness(self, metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""

headers = {'User-Agent': 'haste_storage_client (0.x)',
'Content-Type': 'application/json',
'Accept': 'application/json'}

data = urllib.parse.urlencode(metadata)
req = urllib.request.Request(self.url + '?' + data, headers=headers)
with urllib.request.urlopen(req) as response:
response = response.read().decode("utf-8")
decoded = json.loads(response)
return {'interestingness': float(decoded['interestingness'])}
6 changes: 5 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pip3 install -e .
## Example
See [example.py](example.py).

## Config
Optionally, place `haste_storage_client_config.json` in ~/.haste/ (or windows equivalent),
instead of specifying config in constructor.

### Note
It isn't possible to connect to the database server from outside the SNIC cloud, so for local dev/testing you'll
need to use port forwarding from another machine. https://help.ubuntu.com/community/SSH/OpenSSH/PortForwarding
need to use port forwarding from another machine. https://help.ubuntu.com/community/SSH/OpenSSH/PortForwarding
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from distutils.core import setup

setup(name='haste_storage_client',
version='0.1',
version='0.2',
py_modules=['haste_storage_client'],
install_requires=[
'pymongo',
Expand Down