Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 148 additions & 89 deletions doframework/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,80 +8,102 @@

import ray
import rayvens
import ibm_boto3
import boto3

from doframework.core.inputs import get_configs
from doframework.core.storage import Storage, CamelKeysDict
from doframework.flow.objectives import generate_objective, calculate_objectives
from doframework.flow.datasets import generate_dataset
from doframework.flow.solutions import generate_solution, files_from_data
from doframework.flow.metrics import generate_metric, files_from_solution
from doframework.flow.mock import generate_objective_mock, generate_dataset_mock, generate_solution_mock, generate_metric_mock, sleep_time

#################################################################################################################
#################################################################################################################

def _get_s3_object(configs):

s3 = configs['s3']

assert 'cloud_service_provider' in s3, 'Missing s3:cloud_service_provider in configs.'
assert s3['cloud_service_provider'] in ['ibm','aws'], 'cloud_service_provider in configs must be either `aws` or `ibm`.'

if s3['cloud_service_provider'] == 'ibm':

return ibm_boto3.resource(service_name='s3',
region_name=s3['region'],
endpoint_url=s3['endpoint_url'],
aws_access_key_id=s3['aws_access_key_id'],
aws_secret_access_key=s3['aws_secret_access_key'])
Args = namedtuple(
'Args',[
'objectives',
'datasets',
'feasibility_regions',
'run_mode',
'distribute',
'mcmc',
'logger',
'mock',
'after_idle_for',
'rayvens_logs',
'alg_num_cpus'
]
)

if s3['cloud_service_provider'] == 'aws':
GenerateFunctionsDict = {
'objective': generate_objective,
'objective_mock': generate_objective_mock,
'data' : generate_dataset,
'data_mock' : generate_dataset_mock,
'solution' : generate_solution,
'solution_mock' : generate_solution_mock,
'metric' : generate_metric,
'metric_mock' : generate_metric_mock,
}

return boto3.resource(service_name='s3',
region_name=s3['region'],
aws_access_key_id=s3['aws_access_key_id'],
aws_secret_access_key=s3['aws_secret_access_key'])
#################################################################################################################
#################################################################################################################

def _get_buckets(configs):
def _get_source_config(from_bucket, to_bucket, configs):

s3_buckets = _get_s3_object(configs).buckets.all()
s3_buckets = [bucket.name for bucket in s3_buckets]
d = {}

return {name: bucket for name, bucket in configs['s3']['buckets'].items() if bucket in s3_buckets}

def _get_source_config(from_bucket, to_bucket, configs):
if 's3' in configs:

s3 = configs['s3']
s3 = configs['s3']

d = dict(kind='cloud-object-storage-source',
name='source',
bucket_name=from_bucket,
access_key_id=s3['aws_access_key_id'],
secret_access_key=s3['aws_secret_access_key'],
region=s3['region'],
move_after_read=to_bucket
)
d = dict(kind='cloud-object-storage-source',
bucket_name=from_bucket,
access_key_id=s3['aws_access_key_id'],
secret_access_key=s3['aws_secret_access_key'],
region=s3['region'],
move_after_read=to_bucket
)

if 'endpoint_url' in s3:
if 'endpoint_url' in s3:

d = {**d,**dict(endpoint=s3['endpoint_url'])}
d = {**d,**dict(endpoint=s3['endpoint_url'])}

if 'local' in configs:

d = dict(kind='file-source',
path=from_bucket,
keep_file=False,
move_after_read=to_bucket
)

return d

def _get_sink_config(to_bucket, configs):

s3 = configs['s3']
d = {}

d = dict(kind='cloud-object-storage-sink',
name='sink',
bucket_name=to_bucket,
access_key_id=s3['aws_access_key_id'],
secret_access_key=s3['aws_secret_access_key'],
region=s3['region']
)
if 's3' in configs:

s3 = configs['s3']

if 'endpoint_url' in s3:
d = dict(kind='cloud-object-storage-sink',
bucket_name=to_bucket,
access_key_id=s3['aws_access_key_id'],
secret_access_key=s3['aws_secret_access_key'],
region=s3['region']
)

d = {**d,**dict(endpoint=s3['endpoint_url'])}
if 'endpoint_url' in s3:

d = {**d,**dict(endpoint=s3['endpoint_url'])}

if 'local' in configs:

d = dict(kind='file-sink',
path=to_bucket
)

return d

Expand Down Expand Up @@ -127,10 +149,29 @@ def _get_event_type(process_type,args):
print('({}) ERROR ... '.format(process_type) + e)
return None

def _get_event(process_type,process_json,event_type,args):
try:
if event_type == 'json':
process_input = json.loads(process_json['body'])
elif event_type == 'csv':
process_input = pd.read_csv(StringIO(process_json['body']))
else:
process_input = None
return process_input
except json.JSONDecodeError as e:
if args.logger:
print('({}) ERROR ... Error occured while decoding file {} in json loads.'.format(process_json['filename']))
print('({}) ERROR ... '.format(process_type) + e)
except Exception as e:
if args.logger:
print('({}) ERROR ... Error occured when extracting event content.'.format(process_type))
print('({}) ERROR ... '.format(process_type) + e)
return None

def _number_of_iterations(process_input, args, process_type):
try:
if process_type == 'input':
n = calculate_objectives(process_input,args)
n = args.objectives if args.mock else calculate_objectives(process_input,args)
elif process_type == 'objective':
n = args.datasets
elif process_type == 'data':
Expand All @@ -140,6 +181,10 @@ def _number_of_iterations(process_input, args, process_type):
else:
n = None
return n
except KeyError as e:
if args.logger:
print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type))
print('({}) ERROR ... '.format(process_type) + e)
except Exception as e:
if args.logger:
print('({}) ERROR ... Error occured when calculating n in number_of_iterations.'.format(process_type))
Expand All @@ -154,14 +199,15 @@ def _get_extra_input(input_name, process_type, configs, args, buckets):
extra = {}
elif process_type == 'data':
files = files_from_data(input_name)
objective = _get_s3_object(configs).Bucket(buckets['objectives_dest']).Object(files['objective']).get()
extra = {'objective': json.load(objective['Body'])}
storage = Storage(configs)
objective = storage.get(buckets['objectives_dest'],files['objective'])
extra = {'objective': json.load(objective)}
elif process_type == 'solution':
files = files_from_solution(input_name)
s3 = _get_s3_object(configs)
objective = s3.Bucket(buckets['objectives_dest']).Object(files['objective']).get()
data = s3.Bucket(buckets['data_dest']).Object(files['data']).get()
extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective['Body']), 'data': pd.read_csv(data['Body'])}
storage = Storage(configs)
objective = storage.get(buckets['objectives_dest'],files['objective'])
data = storage.get(buckets['data_dest'],files['data'])
extra = {'is_mcmc': args.mcmc, 'objective': json.load(objective), 'data': pd.read_csv(data)}
else:
extra = None
return extra
Expand Down Expand Up @@ -189,13 +235,7 @@ def inner(context, event):
assert ('body' in process_json) and ('filename' in process_json), 'Missing fields body and / or filename in event json.'

event_type = _get_event_type(process_type,args)

if event_type == 'json':
process_input = json.loads(process_json['body'])
elif event_type == 'csv':
process_input = pd.read_csv(StringIO(process_json['body']))
else:
process_input = None
process_input = _get_event(process_type,process_json,event_type,args)
assert process_input is not None, 'Unable to extract process input. Perhaps illegal event_type={}.'.format(event_type)
if args.logger: print('({}) INFO ... Process successfully extracted event of type {}.'.format(process_type, event_type))

Expand All @@ -218,7 +258,7 @@ def inner(context, event):
print('({}) INFO ... Process working on event {} uses extra input {}.'.format(process_type,input_name,list(extra.keys())))
else:
print('({}) INFO ... Process working on event {} does not require extra input.'.format(process_type,input_name))
extra = {**extra,**kwargs}
extra = {**extra,**kwargs,**configs,**{'mock': args.mock}}

if args.distribute:
_ = [f_dist.remote(context, process_input, input_name, **extra) for _ in range(n)]
Expand Down Expand Up @@ -249,16 +289,15 @@ def inner(context, event):
def resolve(predict_optimize):

def inner(process_input, input_name, **kwargs):

return generate_solution(predict_optimize, process_input, input_name, **kwargs)

key = 'solution_mock' if 'mock' in kwargs and kwargs['mock'] else 'solution' # !!!
return GenerateFunctionsDict[key](predict_optimize, process_input, input_name, **kwargs)

return inner

#################################################################################################################
#################################################################################################################

Args = namedtuple('Args',['objectives','datasets','feasibility_regions','run_mode','distribute','mcmc','logger','after_idle_for','rayvens_logs','alg_num_cpus'])

def run(generate_user_solution, configs_file, **kwargs):

objectives = int(kwargs['objectives']) if 'objectives' in kwargs else 1
Expand All @@ -268,11 +307,12 @@ def run(generate_user_solution, configs_file, **kwargs):
distribute = kwargs['distribute'] if 'distribute' in kwargs else True
mcmc = kwargs['mcmc'] if 'mcmc' in kwargs else False
logger = kwargs['logger'] if 'logger' in kwargs else True
mock = kwargs['mock'] if 'mock' in kwargs else False
after_idle_for = kwargs['after_idle_for'] if 'after_idle_for' in kwargs else 200
rayvens_logs = kwargs['rayvens_logs'] if 'rayvens_logs' in kwargs else False
alg_num_cpus = int(kwargs['alg_num_cpus']) if 'alg_num_cpus' in kwargs else 1

args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, after_idle_for, rayvens_logs, alg_num_cpus)
args = Args(objectives, datasets, feasibility_regions, run_mode, distribute, mcmc, logger, mock, after_idle_for, rayvens_logs, alg_num_cpus)

if args.run_mode == 'operator':
ray.init(address='auto')
Expand All @@ -283,45 +323,64 @@ def run(generate_user_solution, configs_file, **kwargs):
if args.logger: print('({}) INFO ... Running simulation with args objectives={o} datasets={s} feasibility_regions={r} distribute={d} run_mode={m} logger={l}'.format('root',
o=args.objectives, s=args.datasets, r=args.feasibility_regions, d=args.distribute, m=args.run_mode, l=args.logger))

with open(configs_file,'r') as file:
try:
configs = yaml.safe_load(file)
except yaml.YAMLError as e:
if args.logger:
print('({}) ERROR ... Could not load configs yaml.'.format('root'))
print(e)
raise e
if args.mock:
if args.logger: print('({}) INFO ... Running in MOCK mode.'.format('root'))

buckets = _get_buckets(configs)
configs = get_configs(configs_file)
storage = Storage(configs)
buckets = storage.buckets()

@ray.remote(num_cpus=1)
@_process('input', configs, args, buckets)
def generate_objectives(context, process_input, input_name, **kwargs):
objective, generated_file = generate_objective(process_input, input_name,**kwargs)
event = rayvens.OutputEvent(json.dumps(objective),{"CamelAwsS3Key": generated_file})
context.publish(event)
key = 'objective_mock' if 'mock' in kwargs and kwargs['mock'] else 'objective'
objective, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)

if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
event = rayvens.OutputEvent(json.dumps(objective),{CamelKeysDict[key]: generated_file})
context.publish(event)
else:
print('({}) ERROR ... generated objective {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('input',generated_file))

@ray.remote(num_cpus=1)
@_process('objective', configs, args, buckets)
def generate_datasets(context, process_input, input_name, **kwargs):
df, generated_file = generate_dataset(process_input, input_name, **kwargs)
event = rayvens.OutputEvent(df.to_csv(index=False),{"CamelAwsS3Key": generated_file})
context.publish(event)
key = 'data_mock' if 'mock' in kwargs and kwargs['mock'] else 'data'
df, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)

if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
event = rayvens.OutputEvent(df.to_csv(index=False),{CamelKeysDict[key]: generated_file})
context.publish(event)
else:
print('({}) ERROR ... generated dataset {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('objective',generated_file))

@ray.remote(num_cpus=args.alg_num_cpus)
@_process('data', configs, args, buckets, **kwargs)
def generate_solutions(context, process_input, input_name, **kwargs):
solution, generated_file = generate_user_solution(process_input, input_name, **kwargs)
event = rayvens.OutputEvent(json.dumps(solution),{"CamelAwsS3Key": generated_file})
context.publish(event)

if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
event = rayvens.OutputEvent(json.dumps(solution),{CamelKeysDict[key]: generated_file})
context.publish(event)
else:
print('({}) ERROR ... generated solution {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('data',generated_file))

@ray.remote(num_cpus=1)
@_process('solution', configs, args, buckets)
def generate_metrics(context, process_input, input_name, **kwargs):
metric, generated_file = generate_metric(process_input, input_name, **kwargs)
event = rayvens.OutputEvent(json.dumps(metric),{"CamelAwsS3Key": generated_file})
context.publish(event)

key = 'metric_mock' if 'mock' in kwargs and kwargs['mock'] else 'metric'
metric, generated_file = GenerateFunctionsDict[key](process_input, input_name, **kwargs)

if any(['local' in kwargs, 's3' in kwargs]) and not all(['local' in kwargs, 's3' in kwargs]):
key = 'local'*('local' in kwargs) + 's3'*('s3' in kwargs)
event = rayvens.OutputEvent(json.dumps(metric),{CamelKeysDict[key]: generated_file})
context.publish(event)
else:
print('({}) ERROR ... generated metric {} not published to context. Either `s3` or `local` missing from cofnigs or both feature.'.format('solution',generated_file))

sources = ['inputs', 'objectives', 'data', 'solutions']
targets = ['objectives', 'data', 'solutions', 'metrics_dest']
operators = [generate_objectives, generate_datasets, generate_solutions, generate_metrics]
Expand Down
Loading