Skip to content

Commit

Permalink
Merge pull request #13 from AIAScience/simplify-store-and-retrieve-da…
Browse files Browse the repository at this point in the history
…tasets

Add convenience functions for storing and retrieving datasets
  • Loading branch information
meklitAIA committed Aug 14, 2019
2 parents b2ef4db + 123c803 commit 06af74f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 20 deletions.
11 changes: 11 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,21 @@ Installation

Usage

.. code:: python
import mflux_ai
mflux_ai.set_env_vars(token="insert_your_token_here")
Storing and retrieving datasets

.. code:: python
my_dataset = np.zeros(shape=(10000, 100), dtype=np.float32)
dataset_filename = "my-dataset.pkl"
mflux_ai.put_dataset(my_dataset, dataset_filename)
my_loaded_dataset = mflux_ai.get_dataset(dataset_filename)
assert_array_equal(my_dataset, my_loaded_dataset)
87 changes: 87 additions & 0 deletions mflux_ai/mflux_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
import json
import os

import joblib
import requests
import tempfile

from minio import Minio
from minio.error import BucketAlreadyOwnedByYou, BucketAlreadyExists, ResponseError

SERVER_HOST = "https://www.mflux.ai"
_minio_client = None


class MfluxClient(object):
Expand Down Expand Up @@ -73,3 +79,84 @@ def save_cache_to_file(self):
json.dump(self.variables, cache_file)
except IOError:
print("Could not create file:", self.cache_file_name)


def get_minio_client():
"""Return a Minio instance. If one has been created earlier, return the same instance."""
global _minio_client

assert os.environ.get("MLFLOW_S3_ENDPOINT_URL", None) is not None
assert os.environ.get("AWS_ACCESS_KEY_ID", None) is not None
assert os.environ.get("AWS_SECRET_ACCESS_KEY", None) is not None
if not _minio_client:
_minio_client = Minio(
os.environ["MLFLOW_S3_ENDPOINT_URL"]
.replace("http://", "")
.replace(":9000/", ":9000"),
access_key=os.environ["AWS_ACCESS_KEY_ID"],
secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
secure=False,
)

return _minio_client


def ensure_bucket_exists(bucket_name):
"""Create a bucket with the given bucket_name if it doesn't already exist."""
minio_client = get_minio_client()

try:
minio_client.make_bucket(bucket_name)
except BucketAlreadyOwnedByYou as err:
pass
except BucketAlreadyExists as err:
pass
except ResponseError as err:
raise


def put_dataset(value, object_name, bucket_name="datasets"):
"""
Store an object/dataset in the MFlux.ai cloud. It gets pickled by joblib.dump.
:param value: The object/dataset to store. It should be picklable.
:param object_name: The name of the dataset
:param bucket_name: (Optional, defaults to "datasets") Name of the bucket to store the
object/dataset in. Think of it as a folder. This name must not contain underscores. For
more info on bucket name restrictions, see
https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
If the bucket does not already exist, it will be created.
:return:
"""
minio_client = get_minio_client()

ensure_bucket_exists(bucket_name)

tmp_file_path = os.path.join(tempfile.gettempdir(), object_name)
joblib.dump(value, tmp_file_path, compress=True)

try:
minio_client.fput_object(
bucket_name, object_name=object_name, file_path=tmp_file_path
)
except ResponseError as err:
print(err)


def get_dataset(object_name, bucket_name="datasets"):
"""
Retrieve the object/dataset with the given object_name from the MFlux.ai cloud. The object
gets unpickled by joblib.load.
"""
minio_client = get_minio_client()

downloaded_file_path = os.path.join(tempfile.gettempdir(), object_name)
try:
data = minio_client.get_object(bucket_name, object_name)
with open(downloaded_file_path, "wb") as file_data:
for d in data.stream(32 * 1024):
file_data.write(d)
except ResponseError as err:
raise

return joblib.load(downloaded_file_path)
40 changes: 20 additions & 20 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,42 @@

from setuptools import setup, find_packages

with open('README.rst') as readme_file:
with open("README.rst") as readme_file:
readme = readme_file.read()

with open('HISTORY.rst') as history_file:
with open("HISTORY.rst") as history_file:
history = history_file.read()

requirements = ['requests', ]
requirements = ["joblib<1", "minio>=4,<5", "requests"]

setup_requirements = ['pytest-runner', ]
setup_requirements = ["pytest-runner"]

test_requirements = ['pytest', ]
test_requirements = ["pytest"]

setup(
author="AIA Science AS",
author_email='mflux.ai@aiascience.com',
author_email="mflux.ai@aiascience.com",
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Natural Language :: English',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
],
description="Open source code for the mflux-ai python package",
install_requires=requirements,
license="BSD license",
long_description=readme + '\n\n' + history,
long_description=readme + "\n\n" + history,
include_package_data=True,
keywords='mflux_ai',
name='mflux-ai',
packages=find_packages(include=['mflux_ai']),
keywords="mflux_ai",
name="mflux-ai",
packages=find_packages(include=["mflux_ai"]),
setup_requires=setup_requirements,
test_suite='tests',
test_suite="tests",
tests_require=test_requirements,
url='https://github.com/AIAScience/mflux-ai-python',
version='0.1.1',
Expand Down

0 comments on commit 06af74f

Please sign in to comment.