diff --git a/rastervision_sagemaker/rastervision/sagemaker/sagemaker_runner.py b/rastervision_sagemaker/rastervision/sagemaker/sagemaker_runner.py index c9dd97b445..210b2983df 100644 --- a/rastervision_sagemaker/rastervision/sagemaker/sagemaker_runner.py +++ b/rastervision_sagemaker/rastervision/sagemaker/sagemaker_runner.py @@ -11,8 +11,6 @@ from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.steps import ProcessingStep, TrainingStep -import tempfile - log = logging.getLogger(__name__) SAGEMAKER = 'sagemaker' @@ -25,7 +23,6 @@ def make_step( instance_type: str, use_spot_instances: bool, sagemaker_session: PipelineSession, - tempdir: tempfile.TemporaryDirectory, ): python_executable = cmd[0] @@ -105,67 +102,64 @@ def run(self, steps = [] - with tempfile.TemporaryDirectory() as tempdir: - for command in commands: - - use_gpu = command in pipeline.gpu_commands - job_name = command - cmd = [ - 'python', - '/opt/src/rastervision_pipeline/rastervision/pipeline/cli.py', # XXX - ] - if rv_config.get_verbosity() > 1: - cmd.append('-' + 'v' * (rv_config.get_verbosity() - 1)) - cmd.extend(['run_command', cfg_json_uri, command]) - - if command in pipeline.split_commands and num_splits > 1: - # If the step can be split, then split it into parts - # that do not depend on each other (can run in - # parallel). - _steps = [] - for i in range(num_splits): - cmd += [ - '--split-ind', - str(i), '--num-splits', - str(num_splits) - ] - step = make_step( - f'{job_name}_{i+1}of{num_splits}', - cmd, - exec_role, - gpu_image if use_gpu else cpu_image, - gpu_inst_type if use_gpu else cpu_inst_type, - use_spot_instances, - sagemaker_session, - tempdir, - ) - # step.add_depends_on(steps) - _steps.append(step) - steps.extend(_steps) - else: - # If the step can not be split, then submit it as-is. + for command in commands: + + use_gpu = command in pipeline.gpu_commands + job_name = command + cmd = [ + 'python', + '/opt/src/rastervision_pipeline/rastervision/pipeline/cli.py', # XXX + ] + if rv_config.get_verbosity() > 1: + cmd.append('-' + 'v' * (rv_config.get_verbosity() - 1)) + cmd.extend(['run_command', cfg_json_uri, command]) + + if command in pipeline.split_commands and num_splits > 1: + # If the step can be split, then split it into parts + # that do not depend on each other (can run in + # parallel). + _steps = [] + for i in range(num_splits): + cmd += [ + '--split-ind', + str(i), '--num-splits', + str(num_splits) + ] step = make_step( - job_name, + f'{job_name}_{i+1}of{num_splits}', cmd, exec_role, gpu_image if use_gpu else cpu_image, - cpu_inst_type if use_gpu else cpu_inst_type, + gpu_inst_type if use_gpu else cpu_inst_type, use_spot_instances, sagemaker_session, - tempdir, ) - step.add_depends_on(steps) - steps.append(step) - - # Submit the pipeline to SageMaker - iam_client = boto3.client('iam') - role_arn = iam_client.get_role(RoleName=exec_role)['Role']['Arn'] - pipeline = Pipeline( - name=pipeline_run_name, - steps=steps, - sagemaker_session=sagemaker_session, - ) - pipeline.upsert(role_arn=role_arn) - execution = pipeline.start() + # step.add_depends_on(steps) + _steps.append(step) + steps.extend(_steps) + else: + # If the step can not be split, then submit it as-is. + step = make_step( + job_name, + cmd, + exec_role, + gpu_image if use_gpu else cpu_image, + cpu_inst_type if use_gpu else cpu_inst_type, + use_spot_instances, + sagemaker_session, + ) + step.add_depends_on(steps) + steps.append(step) + + # Submit the pipeline to SageMaker + iam_client = boto3.client('iam') + role_arn = iam_client.get_role(RoleName=exec_role)['Role']['Arn'] + pipeline = Pipeline( + name=pipeline_run_name, + steps=steps, + sagemaker_session=sagemaker_session, + ) + pipeline.upsert(role_arn=role_arn) + execution = pipeline.start() print(execution.describe())