diff --git a/doframework/api.py b/doframework/api.py index d0d0010..9c71191 100644 --- a/doframework/api.py +++ b/doframework/api.py @@ -8,80 +8,102 @@ import ray import rayvens -import ibm_boto3 -import boto3 +from doframework.core.inputs import get_configs +from doframework.core.storage import Storage, CamelKeysDict from doframework.flow.objectives import generate_objective, calculate_objectives from doframework.flow.datasets import generate_dataset from doframework.flow.solutions import generate_solution, files_from_data from doframework.flow.metrics import generate_metric, files_from_solution +from doframework.flow.mock import generate_objective_mock, generate_dataset_mock, generate_solution_mock, generate_metric_mock, sleep_time ################################################################################################################# ################################################################################################################# -def _get_s3_object(configs): - - s3 = configs['s3'] - - assert 'cloud_service_provider' in s3, 'Missing s3:cloud_service_provider in configs.' - assert s3['cloud_service_provider'] in ['ibm','aws'], 'cloud_service_provider in configs must be either `aws` or `ibm`.' - - if s3['cloud_service_provider'] == 'ibm': - - return ibm_boto3.resource(service_name='s3', - region_name=s3['region'], - endpoint_url=s3['endpoint_url'], - aws_access_key_id=s3['aws_access_key_id'], - aws_secret_access_key=s3['aws_secret_access_key']) +Args = namedtuple( + 'Args',[ + 'objectives', + 'datasets', + 'feasibility_regions', + 'run_mode', + 'distribute', + 'mcmc', + 'logger', + 'mock', + 'after_idle_for', + 'rayvens_logs', + 'alg_num_cpus' + ] + ) - if s3['cloud_service_provider'] == 'aws': +GenerateFunctionsDict = { + 'objective': generate_objective, + 'objective_mock': generate_objective_mock, + 'data' : generate_dataset, + 'data_mock' : generate_dataset_mock, + 'solution' : generate_solution, + 'solution_mock' : generate_solution_mock, + 'metric' : generate_metric, + 'metric_mock' : generate_metric_mock, +} - return boto3.resource(service_name='s3', - region_name=s3['region'], - aws_access_key_id=s3['aws_access_key_id'], - aws_secret_access_key=s3['aws_secret_access_key']) +################################################################################################################# +################################################################################################################# -def _get_buckets(configs): +def _get_source_config(from_bucket, to_bucket, configs): - s3_buckets = _get_s3_object(configs).buckets.all() - s3_buckets = [bucket.name for bucket in s3_buckets] + d = {} - return {name: bucket for name, bucket in configs['s3']['buckets'].items() if bucket in s3_buckets} - -def _get_source_config(from_bucket, to_bucket, configs): + if 's3' in configs: - s3 = configs['s3'] + s3 = configs['s3'] - d = dict(kind='cloud-object-storage-source', - name='source', - bucket_name=from_bucket, - access_key_id=s3['aws_access_key_id'], - secret_access_key=s3['aws_secret_access_key'], - region=s3['region'], - move_after_read=to_bucket - ) + d = dict(kind='cloud-object-storage-source', + bucket_name=from_bucket, + access_key_id=s3['aws_access_key_id'], + secret_access_key=s3['aws_secret_access_key'], + region=s3['region'], + move_after_read=to_bucket + ) - if 'endpoint_url' in s3: + if 'endpoint_url' in s3: - d = {**d,**dict(endpoint=s3['endpoint_url'])} + d = {**d,**dict(endpoint=s3['endpoint_url'])} + + if 'local' in configs: + + d = dict(kind='file-source', + path=from_bucket, + keep_file=False, + move_after_read=to_bucket + ) return d def _get_sink_config(to_bucket, configs): - s3 = configs['s3'] + d = {} - d = dict(kind='cloud-object-storage-sink', - name='sink', - bucket_name=to_bucket, - access_key_id=s3['aws_access_key_id'], - secret_access_key=s3['aws_secret_access_key'], - region=s3['region'] - ) + if 's3' in configs: + + s3 = configs['s3'] - if 'endpoint_url' in s3: + d = dict(kind='cloud-object-storage-sink', + bucket_name=to_bucket, + access_key_id=s3['aws_access_key_id'], + secret_access_key=s3['aws_secret_access_key'], + region=s3['region'] + ) - d = {**d,**dict(endpoint=s3['endpoint_url'])} + if 'endpoint_url' in s3: + + d = {**d,**dict(endpoint=s3['endpoint_url'])} + + if 'local' in configs: + + d = dict(kind='file-sink', + path=to_bucket + ) return d @@ -127,10 +149,29 @@ def _get_event_type(process_type,args): print('({}) ERROR ... '.format(process_type) + e) return None +def _get_event(process_type,process_json,event_type,args): + try: + if event_type == 'json': + process_input = json.loads(process_json['body']) + elif event_type == 'csv': + process_input = pd.read_csv(StringIO(process_json['body'])) + else: + process_input = None + return process_input + except json.JSONDecodeError as e: + if args.logger: + print('({}) ERROR ... Error occured while decoding file {} in json loads.'.format(process_json['filename'])) + print('({}) ERROR ... '.format(process_type) + e) + except Exception as e: + if args.logger: + print('({}) ERROR ... Error occured when extracting event content.'.format(process_type)) + print('({}) ERROR ... '.format(process_type) + e) + return None + def _number_of_iterations(process_input, args, process_type): try: if process_type == 'input': - n = calculate_objectives(process_input,args) + n = args.objectives if args.mock else calculate_objectives(process_input,args) elif process_type == 'objective': n = args.datasets elif process_type == 'data': @@ -140,6 +181,10 @@ def _number_of_iterations(process_input, args, process_type): else: n = None return n + except KeyError as e: + if args.logger: + print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type)) + print('({}) ERROR ... '.format(process_type) + e) except Exception as e: if args.logger: print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type)) @@ -154,14 +199,15 @@ def _get_extra_input(input_name, process_type, configs, args, buckets): extra = {} elif process_type == 'data': files = files_from_data(input_name) - objective = _get_s3_object(configs).Bucket(buckets['objectives_dest']).Object(files['objective']).get() - extra = {'objective': json.load(objective['Body'])} + storage = Storage(configs) + objective = storage.get(buckets['objectives_dest'],files['objective']) + extra = {'objective': json.load(objective)} elif process_type == 'solution': files = files_from_solution(input_name) - s3 = _get_s3_object(configs) - objective = s3.Bucket(buckets['objectives_dest']).Object(files['objective']).get() - data = s3.Bucket(buckets['data_dest']).Object(files['data']).get() - extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective['Body']), 'data': pd.read_csv(data['Body'])} + storage = Storage(configs) + objective = storage.get(buckets['objectives_dest'],files['objective']) + data = storage.get(buckets['data_dest'],files['data']) + extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective), 'data': pd.read_csv(data)} else: extra = None return extra @@ -189,13 +235,7 @@ def inner(context, event): assert ('body' in process_json) and ('filename' in process_json), 'Missing fields body and / or filename in event json.' event_type = _get_event_type(process_type,args) - - if event_type == 'json': - process_input = json.loads(process_json['body']) - elif event_type == 'csv': - process_input = pd.read_csv(StringIO(process_json['body'])) - else: - process_input = None + process_input = _get_event(process_type,process_json,event_type,args) assert process_input is not None, 'Unable to extract process input. Perhaps illegal event_type={}.'.format(event_type) if args.logger: print('({}) INFO ... Process successfully extracted event of type {}.'.format(process_type, event_type)) @@ -218,7 +258,7 @@ def inner(context, event): print('({}) INFO ... Process working on event {} uses extra input {}.'.format(process_type,input_name,list(extra.keys()))) else: print('({}) INFO ... Process working on event {} does not require extra input.'.format(process_type,input_name)) - extra = {**extra,**kwargs} + extra = {**extra,**kwargs,**configs,**{'mock': args.mock}} if args.distribute: _ = [f_dist.remote(context, process_input, input_name, **extra) for _ in range(n)] @@ -249,16 +289,15 @@ def inner(context, event): def resolve(predict_optimize): def inner(process_input, input_name, **kwargs): - - return generate_solution(predict_optimize, process_input, input_name, **kwargs) + + key = 'solution_mock' if 'mock' in kwargs and kwargs['mock'] else 'solution' # !!! + return GenerateFunctionsDict[key](predict_optimize, process_input, input_name, **kwargs) return inner ################################################################################################################# ################################################################################################################# -Args = namedtuple('Args',['objectives','datasets','feasibility_regions','run_mode','distribute','mcmc','logger','after_idle_for','rayvens_logs','alg_num_cpus']) - def run(generate_user_solution, configs_file, **kwargs): objectives = int(kwargs['objectives']) if 'objectives' in kwargs else 1 @@ -268,11 +307,12 @@ def run(generate_user_solution, configs_file, **kwargs): distribute = kwargs['distribute'] if 'distribute' in kwargs else True mcmc = kwargs['mcmc'] if 'mcmc' in kwargs else False logger = kwargs['logger'] if 'logger' in kwargs else True + mock = kwargs['mock'] if 'mock' in kwargs else False after_idle_for = kwargs['after_idle_for'] if 'after_idle_for' in kwargs else 200 rayvens_logs = kwargs['rayvens_logs'] if 'rayvens_logs' in kwargs else False alg_num_cpus = int(kwargs['alg_num_cpus']) if 'alg_num_cpus' in kwargs else 1 - args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, after_idle_for, rayvens_logs, alg_num_cpus) + args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, mock, after_idle_for, rayvens_logs, alg_num_cpus) if args.run_mode == 'operator': ray.init(address='auto') @@ -283,45 +323,64 @@ def run(generate_user_solution, configs_file, **kwargs): if args.logger: print('({}) INFO ... Running simulation with args objectives={o} datasets={s} feasibility_regions={r} distribute={d} run_mode={m} logger={l}'.format('root', o=args.objectives, s=args.datasets, r=args.feasibility_regions, d=args.distribute, m=args.run_mode, l=args.logger)) - with open(configs_file,'r') as file: - try: - configs = yaml.safe_load(file) - except yaml.YAMLError as e: - if args.logger: - print('({}) ERROR ... Could not load configs yaml.'.format('root')) - print(e) - raise e + if args.mock: + if args.logger: print('({}) INFO ... Running in MOCK mode.'.format('root')) - buckets = _get_buckets(configs) + configs = get_configs(configs_file) + storage = Storage(configs) + buckets = storage.buckets() @ray.remote(num_cpus=1) @_process('input', configs, args, buckets) def generate_objectives(context, process_input, input_name, **kwargs): - objective, generated_file = generate_objective(process_input, input_name,**kwargs) - event = rayvens.OutputEvent(json.dumps(objective),{"CamelAwsS3Key": generated_file}) - context.publish(event) + key = 'objective_mock' if 'mock' in kwargs and kwargs['mock'] else 'objective' + objective, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs) + + if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]): + key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs) + event = rayvens.OutputEvent(json.dumps(objective),{CamelKeysDict[key]: generated_file}) + context.publish(event) + else: + print('({}) ERROR ... generated objective {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('input',generated_file)) @ray.remote(num_cpus=1) @_process('objective', configs, args, buckets) def generate_datasets(context, process_input, input_name, **kwargs): - df, generated_file = generate_dataset(process_input, input_name, **kwargs) - event = rayvens.OutputEvent(df.to_csv(index=False),{"CamelAwsS3Key": generated_file}) - context.publish(event) + key = 'data_mock' if 'mock' in kwargs and kwargs['mock'] else 'data' + df, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs) + + if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]): + key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs) + event = rayvens.OutputEvent(df.to_csv(index=False),{CamelKeysDict[key]: generated_file}) + context.publish(event) + else: + print('({}) ERROR ... generated dataset {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('objective',generated_file)) @ray.remote(num_cpus=args.alg_num_cpus) @_process('data', configs, args, buckets, **kwargs) def generate_solutions(context, process_input, input_name, **kwargs): solution, generated_file = generate_user_solution(process_input, input_name, **kwargs) - event = rayvens.OutputEvent(json.dumps(solution),{"CamelAwsS3Key": generated_file}) - context.publish(event) + + if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]): + key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs) + event = rayvens.OutputEvent(json.dumps(solution),{CamelKeysDict[key]: generated_file}) + context.publish(event) + else: + print('({}) ERROR ... generated solution {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('data',generated_file)) @ray.remote(num_cpus=1) @_process('solution', configs, args, buckets) def generate_metrics(context, process_input, input_name, **kwargs): - metric, generated_file = generate_metric(process_input, input_name, **kwargs) - event = rayvens.OutputEvent(json.dumps(metric),{"CamelAwsS3Key": generated_file}) - context.publish(event) - + key = 'metric_mock' if 'mock' in kwargs and kwargs['mock'] else 'metric' + metric, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs) + + if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]): + key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs) + event = rayvens.OutputEvent(json.dumps(metric),{CamelKeysDict[key]: generated_file}) + context.publish(event) + else: + print('({}) ERROR ... generated metric {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('solution',generated_file)) + sources = ['inputs', 'objectives', 'data', 'solutions'] targets = ['objectives', 'data', 'solutions', 'metrics_dest'] operators = [generate_objectives, generate_datasets, generate_solutions, generate_metrics] diff --git a/doframework/core/inputs.py b/doframework/core/inputs.py index a516a52..1bf1f89 100644 --- a/doframework/core/inputs.py +++ b/doframework/core/inputs.py @@ -19,6 +19,7 @@ import random import string import logging +import yaml from typing import Optional from doframework.core.utils import logger @@ -75,13 +76,43 @@ def parse_vertex_num(sim_input: dict) -> int: return num +def get_configs(configs_file, is_logger: bool=True): + + with open(configs_file,'r') as file: + try: + configs = yaml.safe_load(file) + return configs + except yaml.YAMLError as e: + if is_logger: + print('({}) ERROR ... Could not load configs yaml. Check your path.'.format('root')) + print(e) + raise e + +def legit_configs(configs): + + assert any(['s3' in configs, 'local' in configs]), 'configs file requires either an `s3` or a `local` field.' + assert not all(['s3' in configs, 'local' in configs]), 'choose either `s3` or `local` sources and targets in configs file, but not both.' + + key = 's3'*('s3' in configs) + 'local'*('local' in configs) + storage = configs[key] + + assert 'buckets' in storage, 'configs files requires a list of storage resources under the field `buckets`' + + if key=='s3': + + assert 'cloud_service_provider' in storage, 'Missing s3:cloud_service_provider in configs.' + assert storage['cloud_service_provider'] in ['ibm','aws'], 'cloud_service_provider in configs must be either `aws` or `ibm`.' + assert 'region' in storage, 'Missing `region` field under `s3`' + assert 'aws_access_key_id' in storage, 'Missing `aws_access_key_id` field under `s3`' + assert 'aws_secret_access_key' in storage, 'Missing `aws_secret_access_key` field under `s3`' + if storage['cloud_service_provider'] == 'ibm': + assert 'endpoint_url' in storage, 'Missing `endpoint_url` field under `s3`' + #### TODO: assert if f[vertices] or omega has vertices and omega[vertices] has position then num of points > dim #### assert if f[vertices] has position, then omega has vertices and omega[vertices] has either num or position #### assert if f[values] has coeffs, then omega has vertices and omega[vertices] has either num or position @logger -def legit_input(sim_input: dict, - logger_name: Optional[str]=None, - is_raised: Optional[bool]=False): +def legit_input(sim_input: dict, logger_name: Optional[str]=None, is_raised: Optional[bool]=False): ''' Test input validity. @@ -172,7 +203,6 @@ def generate_id(N: int=8) -> str: return ''.join(random.choices(string.ascii_lowercase + string.digits, k=N)) - def setup_logger(logger_name: str, log_file: str, level=logging.DEBUG, to_stream: bool=False): l = logging.getLogger(logger_name) diff --git a/doframework/core/storage.py b/doframework/core/storage.py new file mode 100644 index 0000000..e6c3aee --- /dev/null +++ b/doframework/core/storage.py @@ -0,0 +1,138 @@ +from dataclasses import dataclass, field +import os +import json +from pathlib import Path +import ibm_boto3 +import boto3 + +from doframework.core.inputs import legit_configs + +CamelKeysDict = {'local': "CamelFileName", 's3': "CamelAwsS3Key"} + +def _get_s3_object(configs): # !!! + + assert 's3' in configs, 'The `_get_s3_object` function assumes `s3` sources and targets.' + + s3 = configs['s3'] + + if s3['cloud_service_provider'] == 'ibm': + + return ibm_boto3.resource(service_name='s3', + region_name=s3['region'], + endpoint_url=s3['endpoint_url'], + aws_access_key_id=s3['aws_access_key_id'], + aws_secret_access_key=s3['aws_secret_access_key']) + + if s3['cloud_service_provider'] == 'aws': + + return boto3.resource(service_name='s3', + region_name=s3['region'], + aws_access_key_id=s3['aws_access_key_id'], + aws_secret_access_key=s3['aws_secret_access_key']) + +@dataclass +class Storage: + ''' + Class for storage object: either an S3 object or a local file system. Initializes with user configs. + ''' + + configs: dict + storage_buckets: dict = field(init=False) + missing_buckets: dict = field(init=False) + + def __post_init__(self): + + legit_configs(self.configs) + + if 's3' in self.configs: + s3 = self.configs['s3'] + buckets_obj = _get_s3_object(self.configs).buckets.all() + buckets_list = [bucket.name for bucket in buckets_obj] + self.storage_buckets = {name: bucket for name, bucket in s3['buckets'].items() \ + if bucket in buckets_list} + self.missing_buckets = {name: bucket for name, bucket in s3['buckets'].items() \ + if bucket not in buckets_list} + if 'local' in self.configs: + local = self.configs['local'] + self.storage_buckets = {name: bucket for name, bucket in local['buckets'].items() \ + if Path(bucket).is_dir()} + self.missing_buckets = {name: bucket for name, bucket in local['buckets'].items() \ + if not Path(bucket).is_dir()} + + def buckets(self): + return self.storage_buckets + + def missing(self): + return self.missing_buckets + + def get(self,bucket,filename): + body_or_path = None + + if bucket in self.storage_buckets.values(): + if 's3' in self.configs: + body_or_path = _get_s3_object(self.configs).Bucket(bucket).Object(filename).get()['Body'] + if 'local' in self.configs: + body_or_path = open(os.path.join(bucket,filename),'r') + + return body_or_path + + def put(self,bucket,content,name,content_type): + + assert bucket in self.storage_buckets.values(), \ + 'The bucket you provided is not on the storage bucket list. Find the list by running storage.buckets().' + assert content_type in ['json','csv'], \ + 'put method uploads either jsonable (e.g., dict) or csv-esque (e.g., pd.DataFrame) content.' + + success = False + + try: + + if 's3' in self.configs: + + if content_type == 'json': + _get_s3_object(self.configs).Bucket(bucket).put_object( + Body=json.dumps(content), + Key=name + ) + if content_type == 'csv': + _get_s3_object(self.configs).Bucket(bucket).put_object( + Body=content.to_csv(index=False), + Key=name + ) + + success = True + + if 'local' in self.configs: + + if content_type == 'json': + with open(os.path.join(bucket,name), "w") as path: json.dump(content, path) + if content_type == 'csv': + content.to_csv(bucket,index=False) + + success = True + + except Exception as e: + + print(e) + + return success + + def count(self,bucket,extension): + assert bucket in self.storage_buckets.values(), \ + 'The bucket you provided is not on the storage bucket list. Find the list by running storage.buckets().' + assert extension in ['json','csv'], \ + 'count method counts either json or csv files in given bucket. provide either extension=`json` or extension=`csv`.' + + n = None + + if 's3' in self.configs: + + objects = [f for f in _get_s3_object(self.configs).Bucket(bucket).objects.all() if f.key.endswith(extension)] + n = len(objects) + + if 'local' in self.configs: + + objects = [f for f in Path(bucket).rglob('*') if Path(f).suffix in [f'.{extension}']] + n = len(objects) + + return n \ No newline at end of file diff --git a/doframework/flow/mock.py b/doframework/flow/mock.py new file mode 100644 index 0000000..97c321e --- /dev/null +++ b/doframework/flow/mock.py @@ -0,0 +1,294 @@ +import time +import logging +import re +from typing import Optional, Tuple + +import numpy as np +import pandas as pd +from scipy.stats import norm, multivariate_normal + +from doframework.core.inputs import generate_id +from doframework.core.utils import sample_standard_simplex +from doframework.flow.solutions import _ids_from_data +from doframework.flow.metrics import _ids_from_solution +from doframework.core.optimizer import optimalSolution + +sleep_time = 5 + +def generate_objective_mock(meta_input: dict, meta_name: str, **kwargs) -> Tuple[dict, str]: + ''' + generate_objective test for end-to-end integration. + + Parameters: + meta_input (dict): Meta data for objective target generation. + meta_name (str): Name of meta data file. + + Returns: + Returns a mock objective target dictionary and the name of the objective target file. + + >>> meta = {'data': {'N': 500, 'noise': 0.01}, 'input_file_name': 'input_test.json'} + >>> objective, generated_objective_file = generate_objective_mock(meta, meta['input_file_name']) + >>> objective + {'data': {'N': 500, 'noise': 0.01}, + 'input_file_name': 'input_test.json', + 'objective_id': 'negzm03t', + 'generated_file_name': 'objective_negzm03t.json'} + >>> generated_objective_file + 'objective_negzm03t.json' + ''' + + output_prefix = 'objective' + output_suffix = 'json' + + logger_name = kwargs['logger_name'] if 'logger_name' in kwargs else None + is_raised = kwargs['is_raised'] if 'is_raised' in kwargs else False + + time.sleep(sleep_time) + + objective_id = generate_id() + + output = {} + output['data'] = {} + + data = meta_input['data'] + + output['data']['N'] = data['N'] + output['data']['noise'] = data['noise'] + + assert meta_name == meta_input['input_file_name'], \ + 'Mismatch between file name recorded in json input and given file name.' + + output['input_file_name'] = meta_name + output['objective_id'] = objective_id + generated_file = ''.join(['_'.join([output_prefix,objective_id]),'.',output_suffix]) + output['generated_file_name'] = generated_file + + if logger_name: + log = logging.getLogger(logger_name) + log.info('Finished output for objective ID {}.'.format(objective_id)) + + return output, generated_file + +def generate_dataset_mock(obj_input: dict, obj_name: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]: + ''' + generate_dataset test for end-to-end integration. + + Parameters: + obj_input (dict): Objective target dictionary. + obj_name (str): Name of objective target file. + + Returns: + Returns a mock dataset as a pd.DataFrame and the name of the dataset file. + + >>> objective = {'data': {'N': 500, 'noise': 0.01},'input_file_name': 'input_test.json','objective_id': 'negzm03t','generated_file_name': 'objective_negzm03t.json'} + >>> df, generated_df_file = generate_dataset_mock(objective,objective['generated_file_name'],is_raised=True) + >>> df + x0 x1 y + 0 0.737314 0.058099 0.010967 + 1 -1.052423 0.217132 -0.045152 + 2 0.452205 0.058047 -0.005346 + 3 1.834529 -0.095925 -0.116584 + 4 0.978658 -0.086900 -0.013328 + ... + >>> generated_df_file + 'data_negzm03t_dsgwujaz.csv' + ''' + + input_prefix = 'objective' + input_suffix = 'json' + output_prefix = 'data' + output_suffix = 'csv' + + logger_name = kwargs['logger_name'] if 'logger_name' in kwargs else None + is_raised = kwargs['is_raised'] if 'is_raised' in kwargs else False + + time.sleep(sleep_time) + + try: + + objective_id = re.match(input_prefix+'_'+'(\w+)'+'.'+input_suffix,obj_name).group(1) + assert objective_id == obj_input['objective_id'], 'Mismatch between file name recorded in json and file name.' + + data_id = generate_id() + + N = obj_input['data']['N'] + noise = obj_input['data']['noise'] + + d = 2 + f = lambda x, y: (x**2 + y**2)*np.sin(np.pi*x*y/16) + fvect = np.vectorize(f) + + mu = np.zeros(d) + sigma = np.diag(d*sample_standard_simplex(d)) + X = multivariate_normal(mean=mu,cov=sigma).rvs(size=N) + y = fvect(X[:,0],X[:,1]) + norm.rvs(loc=0,scale=noise,size=X.shape[0]) + D = np.hstack([X,y[:,None]]) + df = pd.DataFrame(D,columns=[f'x{i}' for i in range(D.shape[1]-1)]+['y']) + + generated_file = ''.join(['_'.join([output_prefix,objective_id,data_id]),'.',output_suffix]) + + return df, generated_file + + except AssertionError as e: + if logger_name: + log = logging.getLogger(logger_name) + log.error(e) + if is_raised: raise e + except Exception as e: + if logger_name: + log = logging.getLogger(logger_name) + log.error(e) + if is_raised: raise e + + return None, None + +def generate_solution_mock(predict_optimize, data_input: pd.DataFrame, data_name: str, **kwargs) -> Tuple[Optional[dict], Optional[str]]: + ''' + generate_solution test for end-to-end integration. + + Parameters: + predict_optimize: Predit-then-optimize algorithm. + data_input (pd.DataFrame): Dataset. + data_name (str): Name of dataser file. + + Returns: + Returns a mock solution output dictionary and the name of the solution file. + + >>> from doframework.core.optimizer import predict_optimize + >>> meta = {'data': {'N': 500, 'noise': 0.01}, 'input_file_name': 'input_test.json'} + >>> objective, _ = generate_objective_mock(meta, meta['input_file_name']) + >>> extra = {'objective': objective} + >>> df, _ = generate_dataset_mock(objective,objective['generated_file_name'],is_raised=True) + >>> solution, generated_solution_file = generate_solution_mock(predict_optimize, df, generated_df_file, **extra) + >>> solution + {'omega': {'constraints': [[1.0, 0.0, -2.0], + [0.0, 1.0, -2.0], + [-1.0, -0.0, -2.0], + [-0.0, -1.0, -2.0]]}, + 'solution': {'min': {'arg': [-2.0, -2.0], 'pred': -0.10076280464617189}}, + 'objective_id': 'negzm03t', + 'data_id': 'dsgwujaz', + 'solution_id': 'p9t0nm07', + 'generated_file_name': 'solution_negzm03t_dsgwujaz_p9t0nm07.json'} + >>> generated_solution_file + 'solution_negzm03t_dsgwujaz_p9t0nm07.json' + ''' + + output_prefix = 'solution' + output_suffix = 'json' + extra_input = ['objective'] + + logger_name = kwargs['logger_name'] if 'logger_name' in kwargs else None + is_raised = kwargs['is_raised'] if 'is_raised' in kwargs else False + + time.sleep(sleep_time) + + if 'is_minimum' in kwargs: + is_minimum = kwargs['is_minimum'] + extra = kwargs + else: + is_minimum = True + extra = {**kwargs,**{'is_minimum': is_minimum}} + + if all([k in kwargs for k in extra_input]) and all([v is not None for k,v in kwargs.items() if k in extra_input]): + + objective_id, data_id = _ids_from_data(data_name) + + solution_id = generate_id() + objective = kwargs['objective'] + + D = data_input.to_numpy() + d = D.shape[-1]-1 + + output = {} + + output['omega'] = {} + constraints = np.hstack([np.vstack([np.eye(d),-np.eye(d)]), -d*np.ones(2*d)[:,None]]) + output['omega']['constraints'] = [list(c) for c in constraints] + + arg, val, model = predict_optimize(D, constraints, **extra) + solution = optimalSolution(arg, val) + + output['solution'] = {} + opt = 'min' if is_minimum else 'max' + + if all([solution.arg is not None,solution.val is not None]): + output['solution'][opt] = {} + output['solution'][opt]['arg'] = list(solution.arg) + output['solution'][opt]['pred'] = solution.val + else: + output['solution'][opt] = 'FAILED' + + output['objective_id'] = objective_id + output['data_id'] = data_id + output['solution_id'] = solution_id + generated_file = ''.join(['_'.join([output_prefix,objective_id,data_id,solution_id]),'.',output_suffix]) + output['generated_file_name'] = generated_file + + return output, generated_file + + else: + + return None, None + +def generate_metric_mock(solution_input: dict, solution_name: str, **kwargs) -> Tuple[Optional[dict], Optional[str]]: + ''' + generate_metric test for end-to-end integration. + + Parameters: + predict_optimize: Predit-then-optimize algorithm. + data_input (pd.DataFrame): Dataset. + data_name (str): Name of dataser file. + + Returns: + Returns a mock metric output dictionary and the name of the metric file. + + >>> from doframework.core.optimizer import predict_optimize + >>> meta = {'data': {'N': 500, 'noise': 0.01}, 'input_file_name': 'input_test.json'} + >>> objective, _ = generate_objective_mock(meta, meta['input_file_name']) + >>> df, _ = generate_dataset_mock(objective,objective['generated_file_name'],is_raised=True) + >>> extra = {'objective': objective, 'data': df} + >>> metric, generated_metric_file = generate_metric_mock(solution, generated_solution_file, **extra) + >>> metric + {'objective_id': 'negzm03t', + 'data_id': 'dsgwujaz', + 'solution_id': 'p9t0nm07', + 'generated_file_name': 'metrics_negzm03t_dsgwujaz_p9t0nm07.json'} + >>> generated_metric_file + 'metrics_negzm03t_dsgwujaz_p9t0nm07.json' + ''' + + output_prefix = 'metrics' + output_suffix = 'json' + extra_input = ['objective','data'] + + logger_name = kwargs['logger_name'] if 'logger_name' in kwargs else None + is_raised = kwargs['is_raised'] if 'is_raised' in kwargs else False + is_mcmc = kwargs['is_mcmc'] if 'is_mcmc' in kwargs else False + + time.sleep(sleep_time) + + if all([k in kwargs for k in extra_input]) and all([v is not None for k,v in kwargs.items() if k in extra_input]): + + objective_id, data_id, solution_id = _ids_from_solution(solution_name) + + objective = kwargs['objective'] + data = kwargs['data'] + D = data.to_numpy() + + constraints = np.array(solution_input['omega']['constraints']) + + output = {} + + output['objective_id'] = objective_id + output['data_id'] = data_id + output['solution_id'] = solution_id + + generated_file = ''.join(['_'.join([output_prefix,objective_id,data_id,solution_id]),'.',output_suffix]) + output['generated_file_name'] = generated_file + + return output, generated_file + + else: + + return None, None \ No newline at end of file diff --git a/examples/doframework_mock.py b/examples/doframework_mock.py new file mode 100644 index 0000000..6c1845e --- /dev/null +++ b/examples/doframework_mock.py @@ -0,0 +1,78 @@ +# +# Copyright IBM Corporation 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Run mock script: +# $ python doframework_mock.py -c configs.yaml + +import argparse +import numpy as np +import doframework as dof +from doframework.core.optimizer import predict_optimize +from doframework.core.storage import Storage +from doframework.core.inputs import get_configs + +@dof.resolve +def predict_optimize_resolved(data: np.array, constraints: np.array, **kwargs): + return predict_optimize(data, constraints, **kwargs) + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--configs", type=str, help="Path to user configs.yaml relative to current working directory.") + parser.add_argument("-o", "--objectives", type=int, default=3, help="Number of objectives to generate per meta input (default: 3).") + parser.add_argument("-d", "--datasets", type=int, default=2, help="Number of datasets to generate per objective (default: 2).") + parser.add_argument("-r", "--feasibility_regions", type=str, default=1, help="Number of feasibility regions to generate per dataset (default: 1).") + args = parser.parse_args() + + configs = get_configs(args.configs) + storage = Storage(configs) + buckets = storage.buckets() + + num_inputs_at_start = storage.count(buckets['inputs'],'json')+storage.count(buckets['inputs_dest'],'json') + num_objectives_at_start = storage.count(buckets['objectives'],'json')+storage.count(buckets['objectives_dest'],'json') + num_datasets_at_start = storage.count(buckets['data'],'csv')+storage.count(buckets['data_dest'],'csv') + num_solutions_at_start = storage.count(buckets['solutions'],'json')+storage.count(buckets['solutions_dest'],'json') + + expected_objectives = args.objectives + expected_datasets = expected_objectives*args.datasets + expected_solutions = expected_datasets*args.feasibility_regions + + bucket = buckets['inputs'] + name = 'input_test.json' + content = {"data": {"N": 500, "noise": 0.01}, "input_file_name": name} + response = storage.put(bucket,content,name,'json') + assert response, 'Failed to upload test input file to inputs bucket.' + + dof.run( + predict_optimize_resolved, + args.configs, + objectives=args.objectives, + datasets=args.datasets, + feasibility_regions=args.feasibility_regions, + after_idle_for=60, + logger=True, + mock=True # mock mode + ) + + num_inputs = storage.count(buckets['inputs'],'json')+storage.count(buckets['inputs_dest'],'json')-num_inputs_at_start + num_objectives = storage.count(buckets['objectives'],'json')+storage.count(buckets['objectives_dest'],'json')-num_objectives_at_start + num_datasets = storage.count(buckets['data'],'csv')+storage.count(buckets['data_dest'],'csv')-num_datasets_at_start + num_solutions = storage.count(buckets['solutions'],'json')+storage.count(buckets['solutions_dest'],'json')-num_solutions_at_start + + print(f'SANITY: compare the number of generated files to the expected:') + print(f'Generated {num_objectives} objectives out of expected {expected_objectives}.') + print(f'Generated {num_datasets} datasets out of expected {expected_datasets}.') + print(f'Generated {num_solutions} solutions out of expected {expected_solutions}.') \ No newline at end of file diff --git a/setup.py b/setup.py index 8aca517..e679c0d 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ 'pandas', 'matplotlib', 'ray[default,serve,k8s]>=1.4.1', - 'rayvens>=0.5.0', + 'rayvens>=0.6.0', 'ibm-cos-sdk>=2.10.0', 'boto3>=1.17.110', 'aiohttp>=3.7.4', @@ -31,8 +31,8 @@ ] setup(name='doframework', -version='0.1.3', -description='A testing framework for decision-optimization model learning algorithms.', +version='0.1.4', +description='A testing framework for optimization model learning algorithms.', long_description_content_type="text/markdown", long_description=open('README.md').read(), author='Orit Davidovich',