Skip to content

Commit

Permalink
ENH: New SAGA python example;
Browse files Browse the repository at this point in the history
FIX: Not using broken long under Windows;
TESTS: More copy tests;
  • Loading branch information
SmokinCaterpillar committed Aug 11, 2015
1 parent 7035f32 commit 897db7b
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 8 deletions.
1 change: 1 addition & 0 deletions examples/example_22_saga_python/__init__.py
@@ -0,0 +1 @@
__author__ = 'robert'
38 changes: 38 additions & 0 deletions examples/example_22_saga_python/merge_trajs.py
@@ -0,0 +1,38 @@
__author__ = 'Robert Meyer'


import os # For path names being viable under Windows and Linux
import getopt

import sys
sys.path.append('/net/homes2/informatik/augustin/robm/python_lib/pypet-0.1b.13dev')

from pypet import merge_all_in_folder

from the_task import FunctionParameter


def get_folder():
optlist, args = getopt.getopt(sys.argv[1:], '', longopts='folder=')
folder = None
for o, a in optlist:
if o == '--folder':
folder = a
print 'Found folder %s' % folder

return folder


# And here goes our main function
def main():
folder = get_folder()
full_folder = os.path.join(os.getcwd(), folder)
print('Merging all files')
merge_all_in_folder(full_folder, delete_other_files=True,
dynamic_imports=FunctionParameter,
backup=False)
print('Done')


if __name__ == '__main__':
main()
133 changes: 133 additions & 0 deletions examples/example_22_saga_python/start_saga.py
@@ -0,0 +1,133 @@
import sys
import saga
from saga.filesystem import OVERWRITE
import os
import traceback


ADDRESS = '130.149.250.12'
USER = 'rmeyer'
PASSWORD = 'sshvice87'
WORKING_DIR = '/home/' + USER + '/python/saga-test'


def upload_file(filename, session):
""" Uploads a file """
print('Uploading file %s' % filename)
outfilesource = os.path.join(os.getcwd(), filename)
outfiletarget = 'sftp://' + ADDRESS + WORKING_DIR
out = saga.filesystem.File(outfilesource, session=session, flags=OVERWRITE)
out.copy(outfiletarget)
print('Transfer of `%s` to `%s` successful' % (filename, outfiletarget))


def download_file(filename, session):
""" Downloads a file """
print('Downloading file %s' % filename)
infilesource = os.path.join('sftp://' + ADDRESS + WORKING_DIR,
filename)
infiletarget = os.path.join(os.getcwd(), filename)
incoming = saga.filesystem.File(infilesource, session=session, flags=OVERWRITE)
incoming.copy(infiletarget)
print('Transfer of `%s` to `%s` successful' % (filename, infiletarget))


def create_session():
""" Creates and returns a new saga session """
ctx = saga.Context("UserPass")
ctx.user_id = USER
ctx.user_pass = PASSWORD

session = saga.Session()
session.add_context(ctx)

return session


def merge_trajectories(session):
""" Merges all trajectories found in the working directory """
jd = saga.job.Description()

jd.executable = 'python'
jd.arguments = ['merge_trajs.py --folder=./']
jd.output = "mysagajob_merge.stdout"
jd.error = "mysagajob_merge.stderr"
jd.working_directory = WORKING_DIR

js = saga.job.Service('ssh://' + ADDRESS, session=session)
myjob = js.create_job(jd)
print "\n...starting job...\n"

# Now we can start our job.
myjob.run()
print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)

print "\n...waiting for job...\n"
# wait for the job to either finish or fail
myjob.wait()

print "Job State : %s" % (myjob.state)
print "Exitcode : %s" % (myjob.exit_code)


def start_jobs(session):
""" Starts all jobs and runs `the_task.py` in batches. """

js = saga.job.Service('ssh://' + ADDRESS, session=session)

batches = range(2)
jobs = []
for batch in batches:
print('Starting batch %d' % batch)

jd = saga.job.Description()

jd.executable = 'python'
jd.arguments = ['the_task.py --batch=' + str(batch)]
jd.output = "mysagajob.stdout" + str(batch)
jd.error = "mysagajob.stderr" + str(batch)
jd.working_directory = '/net/homes2/informatik/augustin/robm/working_dir'

myjob = js.create_job(jd)

print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)

print "\n...starting job...\n"

myjob.run()
jobs.append(myjob)

for myjob in jobs:
print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)

print "\n...waiting for job...\n"
# wait for the job to either finish or fail
myjob.wait()

print "Job State : %s" % (myjob.state)
print "Exitcode : %s" % (myjob.exit_code)


def main():
try:
session = create_session()
upload_file('the_task.py', session)
upload_file('merge_trajs.py', session)

start_jobs(session)
merge_trajectories(session)

return 0

except saga.SagaException as ex:
# Catch all saga exceptions
print "An exception occured: (%s) %s " % (ex.type, (str(ex)))
# Trace back the exception. That can be helpful for debugging.
traceback.print_exc()
return -1

if __name__ == "__main__":
sys.exit(main())
150 changes: 150 additions & 0 deletions examples/example_22_saga_python/the_task.py
@@ -0,0 +1,150 @@
__author__ = 'Robert Meyer'

import numpy as np
import inspect
import os # For path names being viable under Windows and Linux
import getopt
import sys

from pypet import Environment, Parameter, ArrayParameter, Trajectory


def euler_scheme(traj, diff_func):
"""Simulation function for Euler integration.
:param traj:
Container for parameters and results
:param diff_func:
The differential equation we want to integrate
"""

steps = traj.steps
initial_conditions = traj.initial_conditions
dimension = len(initial_conditions)

# This array will collect the results
result_array = np.zeros((steps,dimension))
# Get the function parameters stored into `traj` as a dictionary
# with the (short) names as keys :
func_params_dict = traj.func_params.f_to_dict(short_names=True, fast_access=True)
# Take initial conditions as first result
result_array[0] = initial_conditions

# Now we compute the Euler Scheme steps-1 times
for idx in range(1,steps):
result_array[idx] = diff_func(result_array[idx-1], **func_params_dict) * traj.dt + \
result_array[idx-1]
# Note the **func_params_dict unzips the dictionary, it's the reverse of **kwargs in function
# definitions!

#Finally we want to keep the results
traj.f_add_result('euler_evolution', data=result_array, comment='Our time series data!')


class FunctionParameter(Parameter):
def _convert_data(self, val):
if callable(val):
return inspect.getsource(val)
else:
return super(FunctionParameter,self)._convert_data(val)


def add_parameters(traj):
"""Adds all necessary parameters to the `traj` container"""

traj.f_add_parameter('steps', 10000, comment='Number of time steps to simulate')
traj.f_add_parameter('dt', 0.01, comment='Step size')

# Here we want to add the initial conditions as an array parameter. We will simulate
# a 3-D differential equation, the Lorenz attractor.
traj.f_add_parameter(ArrayParameter,'initial_conditions', np.array([0.1,0.2,0.3]),
comment = 'Our initial conditions, as default we will start from'
' origin!')

# We will group all parameters of the Lorenz differential equation into the group 'func_params'
traj.f_add_parameter('func_params.sigma', 10.0)
traj.f_add_parameter('func_params.beta', 8.0/3.0)
traj.f_add_parameter('func_params.rho', 28.0)

#For the fun of it we will annotate the group
traj.func_params.v_annotations.info='This group contains as default the original values chosen ' \
'by Edward Lorenz in 1963. Check it out on wikipedia ' \
'(https://en.wikipedia.org/wiki/Lorenz_attractor)!'


def diff_lorenz(value_array, sigma, beta, rho):
"""The Lorenz attractor differential equation
:param value_array: 3d array containing the x,y, and z component values.
:param sigma: Constant attractor parameter
:param beta: FConstant attractor parameter
:param rho: Constant attractor parameter
:return: 3d array of the Lorenz system evaluated at `value_array`
"""
diff_array = np.zeros(3)
diff_array[0] = sigma * (value_array[1]-value_array[0])
diff_array[1] = value_array[0] * (rho - value_array[2]) - value_array[1]
diff_array[2] = value_array[0] * value_array[1] - beta * value_array[2]

return diff_array


def get_batch():
"""Function that parses the batch id from the command line arguments"""
optlist, args = getopt.getopt(sys.argv[1:], '', longopts='batch=')
batch = 0
for o, a in optlist:
if o == '--batch':
batch = int(a)
print 'Found batch %d' % batch

return batch


def explore_batch(traj, batch):
explore_dict = {}
explore_dict['sigma'] = np.arange(0*batch, 10.0*(batch+1), 1.0).tolist()
traj.f_explore(explore_dict)


# And here goes our main function
def main():
batch = get_batch()

filename = 'saga_%s.hdf5' % str(batch)
env = Environment(trajectory='Example_22_Euler_Integration_%s' % str(batch),
filename=filename,
file_title='Example_22_Euler_Integration',
comment='Go for Euler!',
overwrite_file=True,
multiproc=True,
ncores=4)

traj = env.v_trajectory
trajectory_name = traj.v_name

# 1st a) phase parameter addition
add_parameters(traj)

# 1st b) phase preparation
# We will add the differential equation (well, its source code only) as a derived parameter
traj.f_add_derived_parameter(FunctionParameter,'diff_eq', diff_lorenz,
comment='Source code of our equation!')

# explore the trajectory
explore_batch(traj, batch)

# 2nd phase let's run the experiment
# We pass `euler_scheme` as our top-level simulation function and
# the Lorenz equation 'diff_lorenz' as an additional argument
env.f_run(euler_scheme, diff_lorenz)


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion pypet/tests/testutils/data.py
Expand Up @@ -181,7 +181,7 @@ def simple_calculations(traj, arg1, simple_kwarg):
my_dict['__FLOATaRRAy'] = np.array([1.0,2.0,41.0])
my_dict['__FLOATaRRAy_nested'] = np.array([np.array([1.0,2.0,41.0]),np.array([1.0,2.0,41.0])])
my_dict['__STRaRRAy'] = np.array(['sds','aea','sf'])
my_dict['__LONG'] = compat.long_type(4266666666666)
my_dict['__LONG'] = compat.long_type(4266)
my_dict['__UNICODE'] = u'sdfdsf'
my_dict['__BYTES'] = b'zweiundvierzig'
my_dict['__NUMPY_UNICODE'] = np.array([u'$%&ddss'])
Expand Down

0 comments on commit 897db7b

Please sign in to comment.