Permalink
Browse files

PEP8 and pyflakes cleanup.

  • Loading branch information...
garnaat committed Mar 28, 2012
1 parent 7f4fe40 commit 28aff58ff881ed25205cf01072559c8820ec3e07
View
@@ -23,8 +23,3 @@
This module provies an interface to the Elastic MapReduce (EMR)
service from AWS.
"""
-from .connection import EmrConnection
-from .step import Step, StreamingStep, JarStep
-from .bootstrap_action import BootstrapAction
-
-
@@ -20,6 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
+
class BootstrapAction(object):
def __init__(self, name, path, bootstrap_action_args):
self.name = name
View
@@ -27,12 +27,13 @@
import boto
import boto.utils
from boto.ec2.regioninfo import RegionInfo
-from boto.emr.emrobject import JobFlow, RunJobFlowResponse
-from boto.emr.emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsResponse
-from boto.emr.step import JarStep
+from .emrobject import JobFlow, RunJobFlowResponse
+from .emrobject import AddInstanceGroupsResponse, ModifyInstanceGroupsResponse
+from .step import JarStep
from boto.connection import AWSQueryConnection
from boto.exception import EmrResponseError
+
class EmrConnection(AWSQueryConnection):
APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
@@ -153,7 +154,7 @@ def add_instance_groups(self, jobflow_id, instance_groups):
:type jobflow_id: str
:param jobflow_id: The id of the jobflow which will take the
new instance groups
-
+
:type instance_groups: list(boto.emr.InstanceGroup)
:param instance_groups: A list of instance groups to add to the job
"""
@@ -174,7 +175,7 @@ def modify_instance_groups(self, instance_group_ids, new_sizes):
:type instance_group_ids: list(str)
:param instance_group_ids: A list of the ID's of the instance
groups to be modified
-
+
:type new_sizes: list(int)
:param new_sizes: A list of the new sizes for each instance group
"""
@@ -190,8 +191,8 @@ def modify_instance_groups(self, instance_group_ids, new_sizes):
# could be wrong - the example amazon gives uses
# InstanceRequestCount, while the api documentation
# says InstanceCount
- params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
- params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
+ params['InstanceGroups.member.%d.InstanceGroupId' % (k + 1)] = ig[0]
+ params['InstanceGroups.member.%d.InstanceCount' % (k + 1)] = ig[1]
return self.get_object('ModifyInstanceGroups', params,
ModifyInstanceGroupsResponse, verb='POST')
@@ -213,59 +214,59 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None,
Runs a job flow
:type name: str
:param name: Name of the job flow
-
+
:type log_uri: str
:param log_uri: URI of the S3 bucket to place logs
-
+
:type ec2_keyname: str
:param ec2_keyname: EC2 key used for the instances
-
+
:type availability_zone: str
:param availability_zone: EC2 availability zone of the cluster
-
+
:type master_instance_type: str
:param master_instance_type: EC2 instance type of the master
-
+
:type slave_instance_type: str
:param slave_instance_type: EC2 instance type of the slave nodes
-
+
:type num_instances: int
:param num_instances: Number of instances in the Hadoop cluster
-
+
:type action_on_failure: str
:param action_on_failure: Action to take if a step terminates
-
+
:type keep_alive: bool
:param keep_alive: Denotes whether the cluster should stay
alive upon completion
-
+
:type enable_debugging: bool
:param enable_debugging: Denotes whether AWS console debugging
should be enabled.
:type hadoop_version: str
:param hadoop_version: Version of Hadoop to use. This no longer
- defaults to '0.20' and now uses the AMI default.
+ defaults to '0.20' and now uses the AMI default.
:type steps: list(boto.emr.Step)
:param steps: List of steps to add with the job
-
+
:type bootstrap_actions: list(boto.emr.BootstrapAction)
:param bootstrap_actions: List of bootstrap actions that run
before Hadoop starts.
-
+
:type instance_groups: list(boto.emr.InstanceGroup)
:param instance_groups: Optional list of instance groups to
use when creating this job.
NB: When provided, this argument supersedes num_instances
and master/slave_instance_type.
-
+
:type ami_version: str
:param ami_version: Amazon Machine Image (AMI) version to use
for instances. Values accepted by EMR are '1.0', '2.0', and
'latest'; EMR currently defaults to '1.0' if you don't set
'ami_version'.
-
+
:type additional_info: JSON str
:param additional_info: A JSON string for selecting additional features
@@ -304,7 +305,7 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None,
num_instances)
params.update(instance_params)
else:
- # Instance group args (for spot instances or a heterogenous cluster)
+ # Instance group args for spot instances or a heterogenous cluster
list_args = self._build_instance_group_list_args(instance_groups)
instance_params = dict(
('Instances.%s' % k, v) for k, v in list_args.items()
@@ -353,7 +354,7 @@ def set_termination_protection(self, jobflow_id,
:type jobflow_ids: list or str
:param jobflow_ids: A list of job flow IDs
-
+
:type termination_protection_status: bool
:param termination_protection_status: Termination protection status
"""
@@ -365,7 +366,6 @@ def set_termination_protection(self, jobflow_id,
return self.get_status('SetTerminationProtection', params, verb='POST')
-
def _build_bootstrap_action_args(self, bootstrap_action):
bootstrap_action_params = {}
bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
@@ -377,7 +377,8 @@ def _build_bootstrap_action_args(self, bootstrap_action):
args = bootstrap_action.args()
if args:
- self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member')
+ self.build_list_params(bootstrap_action_params, args,
+ 'ScriptBootstrapAction.Args.member')
return bootstrap_action_params
@@ -392,7 +393,8 @@ def _build_step_args(self, step):
args = step.args()
if args:
- self.build_list_params(step_params, args, 'HadoopJarStep.Args.member')
+ self.build_list_params(step_params, args,
+ 'HadoopJarStep.Args.member')
step_params['Name'] = step.name
return step_params
@@ -414,7 +416,7 @@ def _build_step_list(self, steps):
params = {}
for i, step in enumerate(steps):
for key, value in step.items():
- params['Steps.member.%s.%s' % (i+1, key)] = value
+ params['Steps.member.%s.%s' % (i + 1, key)] = value
return params
def _build_instance_common_args(self, ec2_keyname, availability_zone,
@@ -425,7 +427,7 @@ def _build_instance_common_args(self, ec2_keyname, availability_zone,
use in making a RunJobFlow request.
"""
params = {
- 'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(),
+ 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(),
}
if hadoop_version:
@@ -438,16 +440,17 @@ def _build_instance_common_args(self, ec2_keyname, availability_zone,
return params
def _build_instance_count_and_type_args(self, master_instance_type,
- slave_instance_type, num_instances):
+ slave_instance_type,
+ num_instances):
"""
Takes a master instance type (string), a slave instance type
(string), and a number of instances. Returns a comparable dict
for use in making a RunJobFlow request.
"""
params = {
- 'Instances.MasterInstanceType' : master_instance_type,
- 'Instances.SlaveInstanceType' : slave_instance_type,
- 'Instances.InstanceCount' : num_instances,
+ 'Instances.MasterInstanceType': master_instance_type,
+ 'Instances.SlaveInstanceType': slave_instance_type,
+ 'Instances.InstanceCount': num_instances,
}
return params
@@ -458,11 +461,11 @@ def _build_instance_group_args(self, instance_group):
RunJobFlow or AddInstanceGroups requests.
"""
params = {
- 'InstanceCount' : instance_group.num_instances,
- 'InstanceRole' : instance_group.role,
- 'InstanceType' : instance_group.type,
- 'Name' : instance_group.name,
- 'Market' : instance_group.market
+ 'InstanceCount': instance_group.num_instances,
+ 'InstanceRole': instance_group.role,
+ 'InstanceType': instance_group.type,
+ 'Name': instance_group.name,
+ 'Market': instance_group.market
}
if instance_group.market == 'SPOT':
params['BidPrice'] = instance_group.bidprice
@@ -481,5 +484,5 @@ def _build_instance_group_list_args(self, instance_groups):
for i, instance_group in enumerate(instance_groups):
ig_dict = self._build_instance_group_args(instance_group)
for key, value in ig_dict.items():
- params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
+ params['InstanceGroups.member.%d.%s' % (i + 1, key)] = value
return params
View
@@ -45,12 +45,14 @@ def endElement(self, name, value, connection):
class RunJobFlowResponse(EmrObject):
Fields = set(['JobFlowId'])
+
class AddInstanceGroupsResponse(EmrObject):
Fields = set(['InstanceGroupIds', 'JobFlowId'])
-
+
+
class ModifyInstanceGroupsResponse(EmrObject):
Fields = set(['RequestId'])
-
+
class Arg(EmrObject):
def __init__(self, connection=None):
View
@@ -20,6 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
+
class Step(object):
"""
Jobflow Step base class
@@ -105,26 +106,40 @@ def __init__(self, name, mapper, reducer=None, combiner=None,
:type name: str
:param name: The name of the step
+
:type mapper: str
:param mapper: The mapper URI
+
:type reducer: str
:param reducer: The reducer URI
+
:type combiner: str
- :param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
+ :param combiner: The combiner URI. Only works for Hadoop 0.20
+ and later!
+
:type action_on_failure: str
- :param action_on_failure: An action, defined in the EMR docs to take on failure.
+ :param action_on_failure: An action, defined in the EMR docs
+ to take on failure.
+
:type cache_files: list(str)
:param cache_files: A list of cache files to be bundled with the job
+
:type cache_archives: list(str)
- :param cache_archives: A list of jar archives to be bundled with the job
+ :param cache_archives: A list of jar archives to be bundled with
+ the job.
+
:type step_args: list(str)
:param step_args: A list of arguments to pass to the step
+
:type input: str or a list of str
:param input: The input uri
+
:type output: str
:param output: The output uri
+
:type jar: str
- :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
+ :param jar: The hadoop streaming jar. This can be either a
+ local path on the master node, or an s3:// URI.
"""
self.name = name
self.mapper = mapper
@@ -180,7 +195,7 @@ def args(self):
args.extend(('-cacheFile', cache_file))
if self.cache_archives:
- for cache_archive in self.cache_archives:
+ for cache_archive in self.cache_archives:
args.extend(('-cacheArchive', cache_archive))
return args
View
@@ -14,7 +14,7 @@
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
View
@@ -27,6 +27,7 @@
from boto.file.simpleresultset import SimpleResultSet
from boto.s3.bucketlistresultset import BucketListResultSet
+
class Bucket(object):
def __init__(self, name, contained_key):
"""Instantiate an anonymous file-based Bucket around a single key.
@@ -81,7 +82,8 @@ def get_key(self, key_name, headers=None, version_id=None,
:param version_id: Unused in this subclass.
:type stream_type: integer
- :param stream_type: Type of the Key - Regular File or input/output Stream
+ :param stream_type: Type of the Key - Regular File or input/output
+ Stream
:rtype: :class:`boto.file.key.Key`
:returns: A Key object from this bucket.
View
@@ -19,10 +19,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
-# File representation of connection, for use with "file://" URIs.
+"""
+File representation of connection, for use with "file://" URIs.
+"""
from .bucket import Bucket
+
class FileConnection(object):
def __init__(self, file_storage_uri):
View
@@ -22,16 +22,19 @@
# File representation of key, for use with "file://" URIs.
-import os, shutil
+import os
+import shutil
import sys
+from boto.exception import BotoClientError
import boto.compat as compat
+
class Key(object):
KEY_STREAM_READABLE = 0x01
KEY_STREAM_WRITABLE = 0x02
- KEY_STREAM = (KEY_STREAM_READABLE | KEY_STREAM_WRITABLE)
- KEY_REGULAR_FILE = 0x00
+ KEY_STREAM = (KEY_STREAM_READABLE | KEY_STREAM_WRITABLE)
+ KEY_REGULAR_FILE = 0x00
def __init__(self, bucket, name, fp=None, key_type=KEY_REGULAR_FILE):
self.bucket = bucket
@@ -19,6 +19,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
+
class SimpleResultSet(list):
"""
ResultSet facade built from a simple list, rather than via XML parsing.
Oops, something went wrong.

0 comments on commit 28aff58

Please sign in to comment.