Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
**.iml
__pycache__
**.egg-info
**.tmp
**.tmp
.DS_Store
3 changes: 1 addition & 2 deletions config/haste_storage_client_config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"haste_metadata_server": {
"host": "130.xxx.yy.zz",
"port": 27017
"connection_string": "mongodb://130.xxx.yy.zz:27017"
},
"os_swift": {
"username": "xxxxx",
Expand Down
8 changes: 4 additions & 4 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

haste_storage_client_config = {
'haste_metadata_server': {
'host': '130.xxx.yy.zz',
'port': 27017
# See: https://docs.mongodb.com/manual/reference/connection-string/
'connection_string': 'mongodb://130.xxx.yy.zz:27017'
},
'os_swift': {
# See: https://docs.openstack.org/keystoneauth/latest/
Expand Down Expand Up @@ -36,12 +36,12 @@
storage_policy=[(0.5, 1.0, OS_SWIFT_STORAGE)], # map 0.5<=interestingness<=1.0 to OS swift.
default_storage=TRASH) # discard blobs which don't match the policy above.

blob = b'this is a binary blob eg. image data.'
blob_bytes = b'this is a binary blob eg. image data.'
timestamp_cloud_edge = time.time()

client.save(timestamp_cloud_edge,
(12.34, 56.78),
blob,
blob_bytes,
{'image_height_pixels': 300, # bag of extracted features here
'image_width_pixels': 300,
'number_of_green_pixels': 1234})
Expand Down
34 changes: 13 additions & 21 deletions haste_storage_client/core.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from pymongo import MongoClient
from keystoneauth1 import session
from keystoneauth1.identity import v3
from os.path import expanduser
import swiftclient.client
from .storage import OsSwiftStorage
import json

OS_SWIFT_STORAGE = 'os_swift'
Expand All @@ -23,6 +21,7 @@ def __init__(self,
*unique for each execution of the experiment*.
:param config (dict): dictionary of credentials and hostname for metadata server and storage,
see haste_storage_client_config.json for structure.
MongoDB connection string format, see: https://docs.mongodb.com/manual/reference/connection-string/
If `None`, will be read from ~/.haste/haste_storage_client_config.json
:param interestingness_model (InterestingnessModel): determines interestingness of the document,
and hence the intended storage platform(s) for the blob.
Expand All @@ -45,43 +44,36 @@ def __init__(self,
if default_storage is None:
raise ValueError("default_storage_location cannot be None - did you mean 'trash'?")

self.mongo_client = MongoClient(config['haste_metadata_server']['host'],
config['haste_metadata_server']['port'])
self.mongo_client = MongoClient(config['haste_metadata_server']['connection_string'])
self.mongo_db = self.mongo_client.streams
self.stream_id = stream_id
self.interestingness_model = interestingness_model
self.storage_policy = storage_policy
self.default_storage = default_storage
self.os_swift_conn = self.__open_os_swift_connection(config[OS_SWIFT_STORAGE])
self.os_swift_storage = OsSwiftStorage(config[OS_SWIFT_STORAGE])

@staticmethod
def __read_config_file():
with open(expanduser('~/.haste/haste_storage_client_config.json')) as fh:
haste_storage_client_config = json.load(fh)
return haste_storage_client_config

@staticmethod
def __open_os_swift_connection(swift_auth_credentials):
auth = v3.Password(**swift_auth_credentials)
keystone_session = session.Session(auth=auth)
return swiftclient.client.Connection(session=keystone_session)

def save(self,
unix_timestamp,
location,
blob,
blob_bytes,
metadata):
"""
:param unix_timestamp (float): should come from the cloud edge (eg. microscope). floating point.
*Uniquely identifies the document within the streaming session*.
:param location (tuple): spatial information (eg. (x,y)).
:param blob (byte array): binary blob (eg. image).
:param blob_bytes (byte array): binary blob (eg. image).
:param metadata (dict): extracted metadata (eg. image features).
"""

interestingness = self.__get_interestingness(metadata)
blob_id = 'strm_' + self.stream_id + '_ts_' + str(unix_timestamp)
blob_storage_platforms = self.__save_blob(blob_id, blob, interestingness)
blob_storage_platforms = self.__save_blob(blob_id, blob_bytes, interestingness)
if len(blob_storage_platforms) == 0:
blob_id = ''

Expand All @@ -97,25 +89,25 @@ def save(self,

def close(self):
self.mongo_client.close()
self.os_swift_conn.close()
self.os_swift_storage.close()

def __save_blob(self, blob_id, blob, interestingness):
def __save_blob(self, blob_id, blob_bytes, interestingness):
storage_platforms = []
if self.storage_policy is not None:
for min_interestingness, max_interestingness, storage in self.storage_policy:
if min_interestingness <= interestingness <= max_interestingness and (storage not in storage_platforms):
self.__save_blob_to_platform(blob, blob_id, storage)
self.__save_blob_to_platform(blob_bytes, blob_id, storage)
storage_platforms.append(storage)

if len(storage_platforms) == 0 and self.default_storage != TRASH:
self.__save_blob_to_platform(blob, blob_id, self.default_storage)
self.__save_blob_to_platform(blob_bytes, blob_id, self.default_storage)
storage_platforms.append(self.default_storage)

return storage_platforms

def __save_blob_to_platform(self, blob, blob_id, storage_platform):
def __save_blob_to_platform(self, blob_bytes, blob_id, storage_platform):
if storage_platform == OS_SWIFT_STORAGE:
self.os_swift_conn.put_object('Haste_Stream_Storage', blob_id, blob)
self.os_swift_storage.save_blob(blob_bytes, blob_id)
elif storage_platform == TRASH:
raise ValueError('trash cannot be specified in a storage policy, only as a default')
else:
Expand Down
88 changes: 44 additions & 44 deletions haste_storage_client/interestingness_model.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
import abc
import urllib.request
import urllib.parse
import json


class InterestingnessModel(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def interestingness(self,
metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""
raise NotImplementedError('users must define interestingness(..) to use this base class')


class RestInterestingnessModel(InterestingnessModel):

def __init__(self, url):
self.url = url
"""
:param url: should accept HTTP GET /foo?feature_1=1&feature_2=42
...and respond with:
{
"interestingness": 0.5
}
Where the interestingness is in the closed interval [0,1]
"""

def interestingness(self, metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""

headers = {'User-Agent': 'haste_storage_client (0.x)',
'Content-Type': 'application/json',
'Accept': 'application/json'}

data = urllib.parse.urlencode(metadata)
req = urllib.request.Request(self.url + '?' + data, headers=headers)
with urllib.request.urlopen(req) as response:
response = response.read().decode("utf-8")
decoded = json.loads(response)
return {'interestingness': float(decoded['interestingness'])}
import abc
import urllib.request
import urllib.parse
import json
class InterestingnessModel(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def interestingness(self,
metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""
raise NotImplementedError('users must define interestingness(..) to use this base class')
class RestInterestingnessModel(InterestingnessModel):
def __init__(self, url):
self.url = url
"""
:param url: should accept HTTP GET /foo?feature_1=1&feature_2=42
...and respond with:
{
"interestingness": 0.5
}
Where the interestingness is in the closed interval [0,1]
"""
def interestingness(self, metadata):
"""
:param metadata: dictionary containing extracted metadata (eg. image features).
"""
headers = {'User-Agent': 'haste_storage_client (0.x)',
'Content-Type': 'application/json',
'Accept': 'application/json'}
data = urllib.parse.urlencode(metadata)
req = urllib.request.Request(self.url + '?' + data, headers=headers)
with urllib.request.urlopen(req) as response:
response = response.read().decode("utf-8")
decoded = json.loads(response)
return {'interestingness': float(decoded['interestingness'])}
59 changes: 59 additions & 0 deletions haste_storage_client/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from keystoneauth1 import session
from keystoneauth1.identity import v3
import time
import abc
import swiftclient.client


class Storage(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def save_blob(self, blob_bytes, blob_id):
raise NotImplementedError('users must define method to use base class')

@abc.abstractmethod
def close(self):
raise NotImplementedError('users must define method to use base class')


# The auth token expires after 24 hours by default, but refresh more frequently:
OS_SWIFT_CONN_MAX_LIFETIME_SECONDS = 60 * 60


class OsSwiftStorage(Storage):

def __init__(self, config):
self.config = config
self.conn = None
self.conn_timestamp_connected = None
# Try to connect now, to fail fast:
self.__reauthenticate_if_needed()

def save_blob(self, blob_bytes, blob_id):
self.__reauthenticate_if_needed()

if isinstance(blob_bytes, bytearray):
# Workaround a bug in the OpenStack client - 'bytearray' is not properly handled as a content.
# (see https://bugs.launchpad.net/python-swiftclient/+bug/1741991)
blob_bytes = bytes(blob_bytes)

self.conn.put_object('Haste_Stream_Storage', blob_id, blob_bytes)

def close(self):
if self.conn is not None:
self.conn.close()

def __reauthenticate_if_needed(self):
if self.conn is None \
or self.conn_timestamp_connected is None \
or self.conn_timestamp_connected + OS_SWIFT_CONN_MAX_LIFETIME_SECONDS < time.time():
print('HasteStorageClient: (re)connecting os_swift...')

if self.conn is not None:
self.conn.close()
self.conn = None
self.conn_timestamp_connected = None

auth = v3.Password(**self.config)
keystone_session = session.Session(auth=auth)
self.conn = swiftclient.client.Connection(session=keystone_session)
self.conn_timestamp_connected = time.time()
6 changes: 5 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ For now, this simply calls the MongoDB and Swift Container clients. Python 3.x.
## Installation
For installation in [development mode](https://setuptools.readthedocs.io/en/latest/setuptools.html#development-mode):
```
git clone https://github.com/benblamey/HasteStorageClient.git
git clone https://github.com/HASTE-project/HasteStorageClient.git
cd HasteStorageClient
pip3 install -e .
```
Expand All @@ -27,3 +27,7 @@ instead of specifying config in constructor.
### Note
It isn't possible to connect to the database server from outside the SNIC cloud, so for local dev/testing you'll
need to use port forwarding from another machine. https://help.ubuntu.com/community/SSH/OpenSSH/PortForwarding


## Contributors
Ben Blamey, Andreas Hellander
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from distutils.core import setup

setup(name='haste_storage_client',
version='0.2',
py_modules=['haste_storage_client'],
install_requires=[
'pymongo',
Expand Down