Skip to content

Commit

Permalink
Merge pull request #21 from exponential-ventures/removing-catalysis-d…
Browse files Browse the repository at this point in the history
…ependency

Removing Catalysis' dependency
  • Loading branch information
nathanmartins committed Aug 12, 2020
2 parents f04ebb5 + 4d941b1 commit fa8fb4e
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 114 deletions.
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
asynctest==0.13.0
aurum==0.1.14
catalysis-client==1.0.1
aurum==0.1.15
freezegun==0.3.15
numpy==1.18.1
pandas==1.0.1
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@

setup(
name='stripping',
version='0.1.9',
version='0.1.10',
description='An easy to use pipeline solution for AI/ML experiments',
author='Adriano Marques, Nathan Martins, Thales Ribeiro',
author_email='adriano@xnv.io, nathan@xnv.io, thales@xnv.io',
python_requires='>=3.7.0',
install_requires=['catalysis-client', 'numpy'],
install_requires=['aurum', 'numpy'],
include_package_data=True,
license="GNU LGPLv3",
url='https://github.com/exponential-ventures/stripping',
Expand Down
16 changes: 14 additions & 2 deletions stripping/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,22 @@

import os

from catalysis.common.configuration import ClientConfiguration

from .cache import StepCache
from .executor import Stripping, Context
from .logging import Logging

logging = Logging().get_logger()

try:
from catalysis.common.configuration import ClientConfiguration

has_catalysis = True
except ImportError as error:
has_catalysis = False
logging.warn(f"Not using Catalysis: {str(error)}")
except Exception as error:
pass


def setup_stripping(cache_dir: str = None):
if cache_dir is None:
Expand All @@ -45,6 +53,8 @@ def setup_stripping(cache_dir: str = None):


def setup_stripping_with_catalysis(catalysis_credential_name: str, cache_dir: str = None):
if not has_catalysis:
raise RuntimeError("Catalysis is not available")
if cache_dir is None:
cache_dir = fetch_catalysis_default_location(catalysis_credential_name)

Expand All @@ -54,6 +64,8 @@ def setup_stripping_with_catalysis(catalysis_credential_name: str, cache_dir: st


def fetch_catalysis_default_location(catalysis_credential_name: str):
if not has_catalysis:
raise RuntimeError("Catalysis is not available")
credential = ClientConfiguration().get_credential(catalysis_credential_name)
if 'path' not in credential.keys():
raise RuntimeError(
Expand Down
19 changes: 13 additions & 6 deletions stripping/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@
from .exceptions import StepNotCached
from .singleton import SingletonDecorator
from .storage import CacheStorage
from catalysis.storage.storage_client import StorageClient

try:
from catalysis.storage.storage_client import StorageClient

has_catalysis = True
except ImportError as error:
has_catalysis = False
logging.warn(f"Not using Catalysis: {str(error)}")
except Exception as error:
pass

ACCESS = 'access'
DIR_PATH = 'path'
Expand Down Expand Up @@ -89,7 +98,7 @@ def __init__(self, catalysis_credential_name: str = ''):
self.__cached_dirs = {}
self.catalysis_client = None

if catalysis_credential_name != '':
if has_catalysis and catalysis_credential_name != '':
self.catalysis_client = StorageClient(catalysis_credential_name)

def add_dir(self, cache_dir):
Expand All @@ -107,8 +116,7 @@ async def force_delete(self, cache_dir):
logging.info('<!> {} deleted'.format(cache_dir))

if cache_dir in self.__cached_dirs:
del(self.__cached_dirs[cache_dir])

del (self.__cached_dirs[cache_dir])

async def strategy(self):
"""
Expand All @@ -123,7 +131,7 @@ async def strategy(self):
self.__cached_dirs[d] = {}
for dir_path in glob('{}/*'.format(d)):
self.__cached_dirs[d][dir_path] = {}
self.__cached_dirs[d][dir_path][ACCESS] = await self.__last_access( dir_path)
self.__cached_dirs[d][dir_path][ACCESS] = await self.__last_access(dir_path)
if self.__cached_dirs[d][dir_path][ACCESS] <= three_months_ago_timestamp:
await self.force_delete(dir_path)
await asyncio.sleep(0.2)
Expand Down Expand Up @@ -164,4 +172,3 @@ async def percentage_disk_free_space(self):
total = stats.f_frsize * stats.f_blocks
free = stats.f_frsize * stats.f_bavail
return (free / total) * 100

17 changes: 12 additions & 5 deletions stripping/elemental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@


import json
from pandas.core.frame import DataFrame
import logging
from datetime import datetime

from catalysis.storage import StorageClient
try:
from catalysis.storage import StorageClient

has_catalysis = True
except ImportError as error:
has_catalysis = False
logging.warn(f"Not using Catalysis: {str(error)}")
except Exception as error:
pass

STOUT = 'stdout'
FILE = 'file'
Expand All @@ -52,8 +60,7 @@ def column_selection(self, columns: list) -> None:
def filters(self, *filters):
self.__filters = filters

def report(self, report_name, path="/tmp/elemental_report.txt", report_type=STOUT,
catalysis_client: StorageClient = None):
def report(self, report_name, path="/tmp/elemental_report.txt", report_type=STOUT, catalysis_client=None):
self._path = path
self._report_type = report_type
self._report_name = report_name
Expand Down Expand Up @@ -88,7 +95,7 @@ def _elemental_report(self) -> None:
f.write(self._generate_json_report(self._report_name))
else:
f.write(self._generate_report(self._report_name))
else:
elif has_catalysis:
with self._catalysis_client.open(self._path, '+w') as f:
if self._report_type == JSON:
f.write(self._generate_json_report(self._report_name))
Expand Down
125 changes: 49 additions & 76 deletions stripping/executor.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
#!/usr/bin/env python3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##
## Authors: Adriano Marques
## Nathan Martins
## Thales Ribeiro
## ----------------
## | |
## | CONFIDENTIAL |
## | |
## ----------------
##
## Copyright (C) 2019 Exponential Ventures LLC
## Copyright Exponential Ventures LLC (C), 2019 All Rights Reserved
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Library General Public
## License as published by the Free Software Foundation; either
## version 2 of the License, or (at your option) any later version.
## Author: Adriano Marques <adriano@xnv.io>
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## Library General Public License for more details.
##
## You should have received a copy of the GNU Library General Public
## License along with this library; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
## If you do not have a written authorization to read this code
## PERMANENTLY REMOVE IT FROM YOUR SYSTEM IMMEDIATELY.
##


Expand All @@ -32,7 +26,6 @@

import numpy as np
import pandas as pd
from catalysis.storage import StorageClient

from .cache import StepCache
from .singleton import SingletonDecorator
Expand All @@ -46,6 +39,17 @@
except Exception as error:
pass

try:
from catalysis.storage import StorageClient

has_catalysis = True

except ImportError as error:
has_catalysis = False
logging.warn(f"Not using Catalysis: {str(error)}")
except Exception as error:
pass


@SingletonDecorator
class Context:
Expand All @@ -70,14 +74,13 @@ def __getattr__(self, attr_name):
else:

if os.path.exists(attr_file_name):
res = self._deserialize(attr_file_name)
setattr(self, attr_name, res)
return res
self._deserialize(attr_file_name)
return getattr(self, attr_name)

logging.warning(f"Attribute '{attr_name}' was not found.")
raise AttributeError(f"Attribute '{attr_name}' was not found.")

def serialize(self) -> None:

for attr in dir(self):
if attr.startswith("_") or attr == 'self':
continue
Expand All @@ -86,7 +89,7 @@ def serialize(self) -> None:
if inspect.ismethod(attribute):
continue

if isinstance(attribute, StorageClient):
if has_catalysis and isinstance(attribute, StorageClient):
continue

context_file_name = os.path.join(self.__context_location, attr)
Expand All @@ -103,20 +106,13 @@ def serialize(self) -> None:
logging.debug(f"Context Attribute '{attr}' is a python object of type '{type(attribute)}'.")
attr_file.write(pickle.dumps(attribute))
else:

if isinstance(attribute, pd.DataFrame):
logging.debug(f"Context Attribute '{attr}' is a Pandas DataFrame")
attribute.to_pickle(context_file_name)
else:

with open(context_file_name, 'wb') as attr_file:
if isinstance(attribute, np.ndarray):
logging.debug(f"Context Attribute '{attr}' is a numpy array.")
np.save(attr_file, attribute)
else:
logging.debug(
f" Context Attribute '{attr}' is a python object of type '{type(attribute)}'.")
pickle.dump(attribute, attr_file)
with open(context_file_name, 'wb') as attr_file:
if isinstance(attribute, np.ndarray):
logging.debug(f"Context Attribute '{attr}' is a numpy array.")
np.save(attr_file, attribute)
else:
logging.debug(f" Context Attribute '{attr}' is a python object of type '{type(attribute)}'.")
pickle.dump(attribute, attr_file)

def deserialize(self) -> None:

Expand All @@ -129,10 +125,9 @@ def deserialize(self) -> None:
self._deserialize(os.path.join(self.__context_location, attr_file_name))

def _deserialize(self, attr_file_name):
logging.debug(f"Deserializing context attribute from '{attr_file_name}'")
logging.info(f"Deserializing context attribute from '{attr_file_name}'")

# TODO Refactor this to be more elegant
# TODO Add 'pd.read_pickle' support
if self.catalysis_client is not None:
with self.catalysis_client.open(attr_file_name, 'rb') as attr_file:
try:
Expand All @@ -146,26 +141,17 @@ def _deserialize(self, attr_file_name):
setattr(self, attr_file_name, np.load(attr_file))
logging.debug(f"Successfully deserialized '{attr_file_name}' as a numpy array.")
else:

try:
return pd.read_pickle(attr_file_name)
except:

deserializing_methods = [
pickle.load,
np.load,
]

with open(attr_file_name, 'rb') as attr_file:

for m in deserializing_methods:

try:
return m(attr_file)
except:
pass

raise AttributeError(f"Unable to deserialize {attr_file}")
with open(attr_file_name, 'rb') as attr_file:
try:
logging.debug(f"Attempting to deserialize '{attr_file_name}' with pickle...")
setattr(self, attr_file_name, pickle.load(attr_file))
logging.debug(
f"Successfully deserialized '{attr_file_name}' as a python object of "
f"type '{type(getattr(self, attr_file_name))}'")
except Exception:
logging.debug(f"Attempting to deserialize '{attr_file_name}' with numpy...")
setattr(self, attr_file_name, np.load(attr_file))
logging.debug(f"Successfully deserialized '{attr_file_name}' as a numpy array.")


@SingletonDecorator
Expand Down Expand Up @@ -259,20 +245,7 @@ def get_chained_step(self, current_step):

@staticmethod
def commit_aurum(step_name: str) -> None:
if 'aurum' in sys.modules.keys():

try:
au.base.git.add_dirs(['.'])
au.base.git.commit(
commit_message=f"Auto commit step:{step_name}",
)
logging.info(f"step {step_name} has been committed in the repository")
except au.base.git.GitCommandError as e:
logging.warning(f"failed to commit to local repository: {e}")
return

try:
au.base.git.push()
logging.info(f"step {step_name} has been saved in the remote repository")
except au.base.git.GitCommandError as e:
logging.warning(f"failed to push to remote repository: {e}")
if 'au' in sys.modules:
au.base.git.commit(step_name)
au.base.git.push()
logging.info(f"step {step_name} has been saved in the Aurum's repository")
9 changes: 7 additions & 2 deletions stripping/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
from tempfile import TemporaryFile
from typing import Iterable

from catalysis.storage import StorageClient
try:
from catalysis.storage import StorageClient

has_catalysis = True
except ImportError:
has_catalysis = False

from .exceptions import StepNotCached

Expand All @@ -46,7 +51,7 @@ class CacheStorage:
def __init__(self, cache_dir: str, catalysis_credential_name: str = '') -> None:
self.cache_dir = cache_dir

if catalysis_credential_name != '':
if has_catalysis and catalysis_credential_name != '':
self.catalysis_client = StorageClient(catalysis_credential_name)
else:
self.catalysis_client = None
Expand Down
File renamed without changes.
Loading

0 comments on commit fa8fb4e

Please sign in to comment.