From 9ce624b60fe9de55ce928283e76841ed45a76ea2 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sat, 25 Apr 2020 17:41:16 -0300 Subject: [PATCH 1/6] Add support to EMR with Docker --- awswrangler/__init__.py | 1 + awswrangler/_utils.py | 13 + awswrangler/athena.py | 2 +- awswrangler/emr.py | 335 ++++++++++++++++---- awswrangler/s3.py | 65 +++- docs/source/api.rst | 2 + requirements-dev.txt | 3 +- testing/test_awswrangler/test_cloudwatch.py | 2 +- testing/test_awswrangler/test_data_lake.py | 3 + testing/test_awswrangler/test_emr.py | 33 ++ testing/test_awswrangler/test_moto.py | 27 +- tutorials/15 - EMR.ipynb | 193 +++++++++++ tutorials/16 - EMR & Docker.ipynb | 269 ++++++++++++++++ 13 files changed, 869 insertions(+), 79 deletions(-) create mode 100644 tutorials/15 - EMR.ipynb create mode 100644 tutorials/16 - EMR & Docker.ipynb diff --git a/awswrangler/__init__.py b/awswrangler/__init__.py index ce11c7ad5..4413ab5f4 100644 --- a/awswrangler/__init__.py +++ b/awswrangler/__init__.py @@ -9,5 +9,6 @@ from awswrangler import athena, catalog, cloudwatch, db, emr, exceptions, s3 # noqa from awswrangler.__metadata__ import __description__, __license__, __title__, __version__ # noqa +from awswrangler._utils import get_account_id # noqa logging.getLogger("awswrangler").addHandler(logging.NullHandler()) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 21a27d37e..df168bdb9 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -166,3 +166,16 @@ def ensure_postgresql_casts(): def get_directory(path: str) -> str: """Extract directory path.""" return path.rsplit(sep="/", maxsplit=1)[0] + "/" + + +def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str: + """Get Account ID.""" + session: boto3.Session = ensure_session(session=boto3_session) + return client(service_name="sts", session=session).get_caller_identity().get("Account") + + +def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: + """Extract region from Subnet ID.""" + session: boto3.Session = ensure_session(session=boto3_session) + client_ec2: boto3.client = client(service_name="ec2", session=session) + return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9] diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 1933606ad..d73c41063 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -68,7 +68,7 @@ def create_athena_bucket(boto3_session: Optional[boto3.Session] = None) -> str: """ session: boto3.Session = _utils.ensure_session(session=boto3_session) - account_id: str = _utils.client(service_name="sts", session=session).get_caller_identity().get("Account") + account_id: str = _utils.get_account_id(boto3_session=session) region_name: str = str(session.region_name).lower() s3_output = f"s3://aws-athena-query-results-{account_id}-{region_name}/" s3_resource = session.resource("s3") diff --git a/awswrangler/emr.py b/awswrangler/emr.py index aee470621..106a57da3 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -7,12 +7,76 @@ import boto3 # type: ignore -from awswrangler import _utils +from awswrangler import _utils, exceptions _logger: logging.Logger = logging.getLogger(__name__) +def _get_default_logging_path( + subnet_id: Optional[str] = None, + account_id: Optional[str] = None, + region: Optional[str] = None, + boto3_session: Optional[boto3.Session] = None, +) -> str: + """Get EMR default logging path. + + E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/" + + Parameters + ---------- + subnet_id : str, optional + Subnet ID. If not provided, you must pass `account_id` and `region` explicit. + account_id: str, optional + Account ID. + region: str, optional + Region e.g. 'us-east-1' + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Default logging path. + E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/" + + Examples + -------- + >>> import awswrangler as wr + >>> state = wr.emr._get_default_logging_path("subnet-id") + 's3://aws-logs-{account_id}-{region}/elasticmapreduce/' + + """ + if account_id is None: + boto3_session = _utils.ensure_session(session=boto3_session) + _account_id: str = _utils.get_account_id(boto3_session=boto3_session) + else: + _account_id = account_id + if (region is None) and (subnet_id is not None): + boto3_session = _utils.ensure_session(session=boto3_session) + _region: str = _utils.get_region_from_subnet(subnet_id=subnet_id, boto3_session=boto3_session) + elif (region is None) and (subnet_id is None): + raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.") + else: + _region = region # type: ignore + return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/" + + +def _get_ecr_credentials_command() -> str: + return ( + "sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email) && " + "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/" + ) + + def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-statements + account_id: str = _utils.get_account_id(boto3_session=pars["boto3_session"]) + region: str = _utils.get_region_from_subnet(subnet_id=pars["subnet_id"], boto3_session=pars["boto3_session"]) + + # S3 Logging path + if pars.get("logging_s3_path") is None: + pars["logging_s3_path"] = _get_default_logging_path( + subnet_id=None, account_id=account_id, region=region, boto3_session=pars["boto3_session"] + ) spark_env: Optional[Dict[str, str]] = None yarn_env: Optional[Dict[str, str]] = None @@ -20,25 +84,25 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s if pars["spark_pyarrow"] is True: if pars["spark_defaults"] is None: - pars["spark_defaults"]: Dict[str, str] = {"spark.sql.execution.arrow.enabled": "true"} + pars["spark_defaults"] = {"spark.sql.execution.arrow.enabled": "true"} else: # pragma: no cover - pars["spark_defaults"]["spark.sql.execution.arrow.enabled"]: str = "true" + pars["spark_defaults"]["spark.sql.execution.arrow.enabled"] = "true" spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} if pars["python3"] is True: if spark_env is None: - spark_env: Dict[str, str] = {"PYSPARK_PYTHON": "/usr/bin/python3"} # pragma: no cover + spark_env = {"PYSPARK_PYTHON": "/usr/bin/python3"} # pragma: no cover else: - spark_env["PYSPARK_PYTHON"]: str = "/usr/bin/python3" + spark_env["PYSPARK_PYTHON"] = "/usr/bin/python3" if pars["spark_jars_path"] is not None: paths: str = ",".join(pars["spark_jars_path"]) if pars["spark_defaults"] is None: # pragma: no cover - pars["spark_defaults"]: Dict[str, str] = {"spark.jars": paths} + pars["spark_defaults"] = {"spark.jars": paths} else: - pars["spark_defaults"]["spark.jars"]: str = paths + pars["spark_defaults"]["spark.jars"] = paths args: Dict[str, Any] = { "Name": pars["cluster_name"], @@ -72,9 +136,52 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"] # Configurations - args["Configurations"]: List[Dict[str, Any]] = [ + args["Configurations"] = [ {"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}} ] + if (pars["docker"] is True) or (pars["spark_docker"] is True) or (pars["hive_docker"] is True): + if pars.get("extra_registries") is None: + extra_registries: List[str] = [] + else: # pragma: no cover + extra_registries = pars["extra_registries"] + registries: str = f"local,centos,{account_id}.dkr.ecr.{region}.amazonaws.com,{','.join(extra_registries)}" + registries = registries[:-1] if registries.endswith(",") else registries + args["Configurations"].append( + { + "Classification": "container-executor", + "Properties": {}, + "Configurations": [ + { + "Classification": "docker", + "Properties": { + "docker.privileged-containers.registries": registries, + "docker.trusted.registries": registries, + }, + "Configurations": [], + } + ], + } + ) + if pars["spark_docker"] is True: + if pars.get("spark_docker_image") is None: # pragma: no cover + raise exceptions.InvalidArgumentCombination("You must pass a spark_docker_image if spark_docker is True.") + pars["spark_defaults"] = {} if pars["spark_defaults"] is None else pars["spark_defaults"] + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" + pars["spark_defaults"][ + "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" + ] = "hdfs:///user/hadoop/config.json" + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars["spark_docker_image"] + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"] = "/etc/passwd:/etc/passwd:ro" + pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" + pars["spark_defaults"][ + "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" + ] = "hdfs:///user/hadoop/config.json" + pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars[ + "spark_docker_image" + ] + pars["spark_defaults"][ + "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS" + ] = "/etc/passwd:/etc/passwd:ro" if spark_env is not None: args["Configurations"].append( { @@ -109,16 +216,21 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "Configurations": [], } ) + + hive_conf: Optional[Dict[str, Any]] = None + if (pars["hive_glue_catalog"] is True) or (pars["hive_docker"] is True): + hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []} + if pars["hive_glue_catalog"] is True: - args["Configurations"].append( - { - "Classification": "hive-site", - "Properties": { - "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" # noqa - }, - "Configurations": [], - } - ) + hive_conf["Properties"][ + "hive.metastore.client.factory.class" + ] = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + if pars["hive_docker"] is True: + hive_conf["Properties"]["hive.execution.mode"] = "container" + + if hive_conf is not None: + args["Configurations"].append(hive_conf) + if pars["presto_glue_catalog"] is True: args["Configurations"].append( { @@ -147,20 +259,21 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "Properties": pars["spark_defaults"], } args["Configurations"].append(spark_defaults) + if pars.get("custom_classifications") is not None: + for c in pars["custom_classifications"]: + args["Configurations"].append(c) # Applications if pars["applications"]: - args["Applications"]: List[Dict[str, str]] = [{"Name": x} for x in pars["applications"]] + args["Applications"] = [{"Name": x} for x in pars["applications"]] # Bootstraps if pars["bootstraps_paths"]: # pragma: no cover - args["BootstrapActions"]: List[Dict] = [ - {"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"] - ] + args["BootstrapActions"] = [{"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"]] # Debugging and Steps if (pars["debugging"] is True) or (pars["steps"] is not None): - args["Steps"]: List[Dict[str, Any]] = [] + args["Steps"] = [] if pars["debugging"] is True: args["Steps"].append( { @@ -169,6 +282,17 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]}, } ) + if pars["ecr_credentials_step"] is True: + args["Steps"].append( + build_step( + name="ECR Credentials Setup", + command=_get_ecr_credentials_command(), + action_on_failure="TERMINATE_CLUSTER", + script=False, + region=region, + boto3_session=pars["boto3_session"], + ) + ) if pars["steps"] is not None: args["Steps"] += pars["steps"] @@ -199,7 +323,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_master"] > 0: # pragma: no cover - fleet_master["LaunchSpecifications"]: Dict = { + fleet_master["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_master"], "TimeoutAction": timeout_action_master, @@ -236,7 +360,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_core"] > 0: - fleet_core["LaunchSpecifications"]: Dict = { + fleet_core["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"], "TimeoutAction": timeout_action_core, @@ -275,7 +399,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_task"] > 0: - fleet_task["LaunchSpecifications"]: Dict = { + fleet_task["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"], "TimeoutAction": timeout_action_task, @@ -292,30 +416,30 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused-argument - cluster_name: str, - logging_s3_path: str, - emr_release: str, subnet_id: str, - emr_ec2_role: str, - emr_role: str, - instance_type_master: str, - instance_type_core: str, - instance_type_task: str, - instance_ebs_size_master: int, - instance_ebs_size_core: int, - instance_ebs_size_task: int, - instance_num_on_demand_master: int, - instance_num_on_demand_core: int, - instance_num_on_demand_task: int, - instance_num_spot_master: int, - instance_num_spot_core: int, - instance_num_spot_task: int, - spot_bid_percentage_of_on_demand_master: int, - spot_bid_percentage_of_on_demand_core: int, - spot_bid_percentage_of_on_demand_task: int, - spot_provisioning_timeout_master: int, - spot_provisioning_timeout_core: int, - spot_provisioning_timeout_task: int, + cluster_name: str = "my-emr-cluster", + logging_s3_path: Optional[str] = None, + emr_release: str = "emr-6.0.0", + emr_ec2_role: str = "EMR_EC2_DefaultRole", + emr_role: str = "EMR_DefaultRole", + instance_type_master: str = "r5.xlarge", + instance_type_core: str = "r5.xlarge", + instance_type_task: str = "r5.xlarge", + instance_ebs_size_master: int = 64, + instance_ebs_size_core: int = 64, + instance_ebs_size_task: int = 64, + instance_num_on_demand_master: int = 1, + instance_num_on_demand_core: int = 0, + instance_num_on_demand_task: int = 0, + instance_num_spot_master: int = 0, + instance_num_spot_core: int = 0, + instance_num_spot_task: int = 0, + spot_bid_percentage_of_on_demand_master: int = 100, + spot_bid_percentage_of_on_demand_core: int = 100, + spot_bid_percentage_of_on_demand_task: int = 100, + spot_provisioning_timeout_master: int = 5, + spot_provisioning_timeout_core: int = 5, + spot_provisioning_timeout_task: int = 5, spot_timeout_to_on_demand_master: bool = True, spot_timeout_to_on_demand_core: bool = True, spot_timeout_to_on_demand_task: bool = True, @@ -337,10 +461,17 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused security_group_slave: Optional[str] = None, security_groups_slave_additional: Optional[List[str]] = None, security_group_service_access: Optional[str] = None, + docker: bool = False, spark_log_level: str = "WARN", spark_jars_path: Optional[List[str]] = None, spark_defaults: Optional[Dict[str, str]] = None, spark_pyarrow: bool = False, + spark_docker: bool = False, + spark_docker_image: str = None, + hive_docker: bool = False, + ecr_credentials_step: bool = False, + extra_public_registries: Optional[List[str]] = None, + custom_classifications: Optional[List[Dict[str, Any]]] = None, maximize_resource_allocation: bool = False, steps: Optional[List[Dict[str, Any]]] = None, keep_cluster_alive_when_no_steps: bool = True, @@ -354,18 +485,19 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Parameters ---------- + subnet_id : str + VPC subnet ID. cluster_name : str Cluster name. - logging_s3_path : str + logging_s3_path : str, optional Logging s3 path (e.g. s3://BUCKET_NAME/DIRECTORY_NAME/). + If None, the default is `s3://aws-logs-{AccountId}-{RegionId}/elasticmapreduce/` emr_release : str EMR release (e.g. emr-5.28.0). emr_ec2_role : str IAM role name. emr_role : str IAM role name. - subnet_id : str - VPC subnet ID. instance_type_master : str EC2 instance type. instance_type_core : str @@ -448,6 +580,7 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Debugging enabled? applications : List[str], optional List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"]). + If None, ["Spark"] will be considered. visible_to_all_users : bool True or False. key_pair_name : str, optional @@ -465,6 +598,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused security_group_service_access : str, optional The identifier of the Amazon EC2 security group for the Amazon EMR service to access clusters in VPC private subnets. + docker : bool + Enable Docker Hub and ECR registries access. spark_log_level : str log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE). spark_jars_path : List[str], optional @@ -475,6 +610,18 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused spark_pyarrow : bool Enable PySpark to use PyArrow behind the scenes. P.S. You must install pyarrow by your self via bootstrap + spark_docker : bool = False + Add necessary Spark Defaults to run on Docker + spark_docker_image : str, optional + E.g. {ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG} + hive_docker : bool + Add necessary configurations to run on Docker + ecr_credentials_step : bool + Add a extra step during the Cluster launch to retrieve ECR auth files. + extra_public_registries: List[str], optional + Additional registries. + custom_classifications: List[Dict[str, Any]], optional + Extra classifications. maximize_resource_allocation : bool Configure your executors to utilize the maximum resources possible https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation @@ -500,6 +647,21 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Examples -------- + Minimal Example + + >>> cluster_id = wr.emr.create_cluster("SUBNET_ID") + + Minimal Exmaple on Docker + + >>> cluster_id = wr.emr.create_cluster( + >>> subnet_id="SUBNET_ID", + >>> spark_docker=True, + >>> spark_docker_image="{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}", + >>> ecr_credentials_step=True + >>> ) + + Full Example + >>> import awswrangler as wr >>> cluster_id = wr.emr.create_cluster( ... cluster_name="wrangler_cluster", @@ -548,6 +710,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused ... }) """ + applications = ["Spark"] if applications is None else applications + boto3_session = _utils.ensure_session(session=boto3_session) args: Dict[str, Any] = _build_cluster_args(**locals()) client_emr: boto3.client = _utils.client(service_name="emr", session=boto3_session) response: Dict[str, Any] = client_emr.run_job_flow(**args) @@ -647,8 +811,8 @@ def submit_steps( def submit_step( cluster_id: str, - name: str, command: str, + name: str = "my-step", action_on_failure: str = "CONTINUE", script: bool = False, boto3_session: Optional[boto3.Session] = None, @@ -659,11 +823,11 @@ def submit_step( ---------- cluster_id : str Cluster ID. - name : str - Step name. command : str e.g. 'echo "Hello!"' e.g. for script 's3://.../script.sh arg1 arg2' + name : str, optional + Step name. action_on_failure : str 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' script : bool @@ -698,26 +862,29 @@ def submit_step( def build_step( - name: str, command: str, + name: str = "my-step", action_on_failure: str = "CONTINUE", script: bool = False, + region: Optional[str] = None, boto3_session: Optional[boto3.Session] = None, ) -> Dict[str, Any]: """Build the Step structure (dictionary). Parameters ---------- - name : str - Step name. command : str e.g. 'echo "Hello!"' e.g. for script 's3://.../script.sh arg1 arg2' + name : str, optional + Step name. action_on_failure : str 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' script : bool - True for raw command or False for script runner. + False for raw command or True for script runner. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html + region: str, optional + Region name to not get it from boto3.Session. (e.g. `us-east-1`) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -734,14 +901,17 @@ def build_step( >>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps) """ - session: boto3.Session = _utils.ensure_session(session=boto3_session) jar: str = "command-runner.jar" if script is True: - if session.region_name is not None: - region: str = session.region_name - else: # pragma: no cover - region = "us-east-1" - jar = f"s3://{region}.elasticmapreduce/libs/script-runner/script-runner.jar" + if region is not None: # pragma: no cover + _region: str = region + else: + session: boto3.Session = _utils.ensure_session(session=boto3_session) + if session.region_name is not None: + _region = session.region_name + else: # pragma: no cover + _region = "us-east-1" + jar = f"s3://{_region}.elasticmapreduce/libs/script-runner/script-runner.jar" step: Dict[str, Any] = { "Name": name, "ActionOnFailure": action_on_failure, @@ -780,3 +950,40 @@ def get_step_state(cluster_id: str, step_id: str, boto3_session: Optional[boto3. response: Dict[str, Any] = client_emr.describe_step(ClusterId=cluster_id, StepId=step_id) _logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}") return response["Step"]["Status"]["State"] + + +def update_ecr_credentials( + cluster_id: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None +) -> str: + """Update internal ECR credentials. + + Parameters + ---------- + cluster_id : str + Cluster ID. + action_on_failure : str + 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Step ID. + + Examples + -------- + >>> import awswrangler as wr + >>> step_id = wr.emr.update_ecr_credentials("cluster_id") + + """ + name: str = "Update ECR Credentials" + command: str = _get_ecr_credentials_command() + session: boto3.Session = _utils.ensure_session(session=boto3_session) + step: Dict[str, Any] = build_step( + name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=session + ) + client_emr: boto3.client = _utils.client(service_name="emr", session=session) + response: Dict[str, Any] = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) + _logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}") + return response["StepIds"][0] diff --git a/awswrangler/s3.py b/awswrangler/s3.py index f728937db..527c1ae76 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -111,6 +111,40 @@ def does_object_exist(path: str, boto3_session: Optional[boto3.Session] = None) raise ex # pragma: no cover +def list_directories(path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]: + """List Amazon S3 objects from a prefix. + + Parameters + ---------- + path : str + S3 path (e.g. s3://bucket/prefix). + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of objects paths. + + Examples + -------- + Using the default boto3 session + + >>> import awswrangler as wr + >>> wr.s3.list_objects('s3://bucket/prefix/') + ['s3://bucket/prefix/dir0', 's3://bucket/prefix/dir1', 's3://bucket/prefix/dir2'] + + Using a custom boto3 session + + >>> import boto3 + >>> import awswrangler as wr + >>> wr.s3.list_objects('s3://bucket/prefix/', boto3_session=boto3.Session()) + ['s3://bucket/prefix/dir0', 's3://bucket/prefix/dir1', 's3://bucket/prefix/dir2'] + + """ + return _list_objects(path=path, delimiter="/", boto3_session=boto3_session) + + def list_objects(path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]: """List Amazon S3 objects from a prefix. @@ -142,20 +176,37 @@ def list_objects(path: str, boto3_session: Optional[boto3.Session] = None) -> Li ['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2'] """ + return _list_objects(path=path, delimiter=None, boto3_session=boto3_session) + + +def _list_objects( + path: str, delimiter: Optional[str] = None, boto3_session: Optional[boto3.Session] = None +) -> List[str]: client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) paginator = client_s3.get_paginator("list_objects_v2") bucket: str prefix: str bucket, prefix = _utils.parse_path(path=path) - response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000}) + args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, "PaginationConfig": {"PageSize": 1000}} + if delimiter is not None: + args["Delimiter"] = delimiter + response_iterator = paginator.paginate(**args) paths: List[str] = [] for page in response_iterator: - contents: Optional[List] = page.get("Contents") - if contents is not None: - for content in contents: - if (content is not None) and ("Key" in content): - key: str = content["Key"] - paths.append(f"s3://{bucket}/{key}") + if delimiter is None: + contents: Optional[List[Optional[Dict[str, str]]]] = page.get("Contents") + if contents is not None: + for content in contents: + if (content is not None) and ("Key" in content): + key: str = content["Key"] + paths.append(f"s3://{bucket}/{key}") + else: + prefixes: Optional[List[Optional[Dict[str, str]]]] = page.get("CommonPrefixes") + if prefixes is not None: + for pfx in prefixes: + if (pfx is not None) and ("Prefix" in pfx): + key = pfx["Prefix"] + paths.append(f"s3://{bucket}/{key}") return paths diff --git a/docs/source/api.rst b/docs/source/api.rst index 897fc7a3e..7d2d51602 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -16,6 +16,7 @@ Amazon S3 does_object_exist get_bucket_region list_objects + list_directories read_csv read_fwf read_json @@ -115,6 +116,7 @@ EMR submit_steps build_step get_step_state + update_ecr_credentials CloudWatch Logs --------------- diff --git a/requirements-dev.txt b/requirements-dev.txt index 3fdd3cdf3..99a9b0730 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -17,4 +17,5 @@ twine~=3.1.1 wheel~=0.34.2 sphinx~=3.0.1 sphinx_bootstrap_theme~=0.7.1 -moto~=1.3.14 \ No newline at end of file +moto~=1.3.14 +jupyterlab~=2.1.1 \ No newline at end of file diff --git a/testing/test_awswrangler/test_cloudwatch.py b/testing/test_awswrangler/test_cloudwatch.py index f59b8b3dd..eced7a754 100644 --- a/testing/test_awswrangler/test_cloudwatch.py +++ b/testing/test_awswrangler/test_cloudwatch.py @@ -48,7 +48,7 @@ def loggroup(cloudformation_outputs): def test_query_cancelled(loggroup): client_logs = boto3.client("logs") query_id = wr.cloudwatch.start_query( - log_group_names=[loggroup], query="fields @timestamp, @message | sort @timestamp desc | limit 5" + log_group_names=[loggroup], query="fields @timestamp, @message | sort @timestamp desc" ) client_logs.stop_query(queryId=query_id) with pytest.raises(exceptions.QueryCancelled): diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index afa2a8307..bd53d4bad 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -127,6 +127,9 @@ def test_athena_ctas(bucket, database, kms_key): partition_cols=["par0", "par1"], )["paths"] wr.s3.wait_objects_exist(paths=paths) + dirs = wr.s3.list_directories(path=f"s3://{bucket}/test_athena_ctas/") + for d in dirs: + assert d.startswith(f"s3://{bucket}/test_athena_ctas/par0=") df = wr.s3.read_parquet_table(table="test_athena_ctas", database=database) assert len(df.index) == 3 ensure_data_types(df=df, has_list=True) diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index e64329b33..df2dab1cb 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -146,3 +146,36 @@ def test_cluster_single_node(bucket, cloudformation_outputs): wr.emr.submit_steps(cluster_id=cluster_id, steps=steps) wr.emr.terminate_cluster(cluster_id=cluster_id) wr.s3.delete_objects(f"s3://{bucket}/emr-logs/") + + +def test_default_logging_path(cloudformation_outputs): + path = wr.emr._get_default_logging_path(subnet_id=cloudformation_outputs["SubnetId"]) + assert path.startswith("s3://aws-logs-") + assert path.endswith("/elasticmapreduce/") + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.emr._get_default_logging_path() + + +def test_docker(cloudformation_outputs): + cluster_id = wr.emr.create_cluster( + subnet_id=cloudformation_outputs["SubnetId"], + docker=True, + spark_docker=True, + spark_docker_image="787535711150.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", + hive_docker=True, + ecr_credentials_step=True, + custom_classifications=[ + { + "Classification": "livy-conf", + "Properties": { + "livy.spark.master": "yarn", + "livy.spark.deploy-mode": "cluster", + "livy.server.session.timeout": "16h", + }, + } + ], + steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://igor-tavares/emr.py")], + ) + wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://igor-tavares/emr.py") + wr.emr.update_ecr_credentials(cluster_id=cluster_id) + wr.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index db12dbe1a..2adc7aec8 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -20,6 +20,21 @@ def emr(): yield True +@pytest.fixture(scope="module") +def sts(): + with moto.mock_sts(): + yield True + + +@pytest.fixture(scope="module") +def subnet(): + with moto.mock_ec2(): + ec2 = boto3.resource("ec2", region_name="us-west-1") + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-1a") + yield subnet.id + + def test_csv(s3): path = "s3://bucket/test.csv" wr.s3.to_csv(df=get_df_csv(), path=path, index=False) @@ -37,12 +52,13 @@ def test_parquet(s3): assert len(df.columns) == 18 -def test_emr(s3, emr): +def test_emr(s3, emr, sts, subnet): + session = boto3.Session(region_name="us-west-1") cluster_id = wr.emr.create_cluster( cluster_name="wrangler_cluster", logging_s3_path="s3://bucket/emr-logs/", emr_release="emr-5.29.0", - subnet_id="foo", + subnet_id=subnet, emr_ec2_role="EMR_EC2_DefaultRole", emr_role="EMR_DefaultRole", instance_type_master="m5.xlarge", @@ -87,11 +103,12 @@ def test_emr(s3, emr): termination_protected=False, spark_pyarrow=False, tags={"foo": "boo", "bar": "xoo"}, + boto3_session=session, ) - wr.emr.get_cluster_state(cluster_id=cluster_id) + wr.emr.get_cluster_state(cluster_id=cluster_id, boto3_session=session) steps = [] for cmd in ['echo "Hello"', "ls -la"]: steps.append(wr.emr.build_step(name=cmd, command=cmd)) - wr.emr.submit_steps(cluster_id=cluster_id, steps=steps) - wr.emr.terminate_cluster(cluster_id=cluster_id) + wr.emr.submit_steps(cluster_id=cluster_id, steps=steps, boto3_session=session) + wr.emr.terminate_cluster(cluster_id=cluster_id, boto3_session=session) wr.s3.delete_objects("s3://bucket/emr-logs/") diff --git a/tutorials/15 - EMR.ipynb b/tutorials/15 - EMR.ipynb new file mode 100644 index 000000000..4e1c627e6 --- /dev/null +++ b/tutorials/15 - EMR.ipynb @@ -0,0 +1,193 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 15 - EMR" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import awswrangler as wr\n", + "import boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your Subnet ID:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ························\n" + ] + } + ], + "source": [ + "subnet = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating EMR Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "cluster_id = wr.emr.create_cluster(subnet)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Uploading our PySpark script to Amazon S3" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "script = \"\"\"\n", + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", + "sc = spark.sparkContext\n", + "\n", + "print(\"Spark Initialized\")\n", + "\"\"\"\n", + "\n", + "_ = boto3.client(\"s3\").put_object(\n", + " Body=script,\n", + " Bucket=bucket,\n", + " Key=\"test.py\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit PySpark step" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit s3://{bucket}/test.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Wait Step" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Terminate Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "wr.emr.terminate_cluster(cluster_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb new file mode 100644 index 000000000..138759d8f --- /dev/null +++ b/tutorials/16 - EMR & Docker.ipynb @@ -0,0 +1,269 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 16 - EMR & Docker" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import awswrangler as wr\n", + "import boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your Subnet ID:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ························\n" + ] + } + ], + "source": [ + "subnet = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Build and Upload Docker Image to ECR repository\n", + "\n", + "Replace the `{ACCOUNT_ID}` placeholder." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%writefile Dockerfile\n" + } + }, + "outputs": [], + "source": [ + "%%writefile Dockerfile\n", + "\n", + "FROM amazoncorretto:8\n", + "\n", + "RUN yum -y update\n", + "RUN yum -y install yum-utils\n", + "RUN yum -y groupinstall development\n", + "\n", + "RUN yum list python3*\n", + "RUN yum -y install python3 python3-dev python3-pip python3-virtualenv\n", + "\n", + "RUN python -V\n", + "RUN python3 -V\n", + "\n", + "ENV PYSPARK_DRIVER_PYTHON python3\n", + "ENV PYSPARK_PYTHON python3\n", + "\n", + "RUN pip3 install --upgrade pip\n", + "RUN pip3 install awswrangler\n", + "\n", + "RUN python3 -c \"import awswrangler as wr\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%bash\n", + "\n", + "docker build -t 'local/emr-wrangler' .\n", + "aws ecr create-repository --repository-name emr-wrangler\n", + "docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\n", + "eval $(aws ecr get-login --region us-east-1 --no-include-email)\n", + "docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating EMR Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", + "\n", + "cluster_id = wr.emr.create_cluster(\n", + " subnet_id=subnet,\n", + " spark_docker=True,\n", + " spark_docker_image=DOCKER_IMAGE,\n", + " ecr_credentials_step=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Uploading our PySpark script to Amazon S3" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "script = \"\"\"\n", + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", + "sc = spark.sparkContext\n", + "\n", + "print(\"Spark Initialized\")\n", + "\n", + "import awswrangler as wr\n", + "\n", + "print(f\"Wrangler version: {wr.__version__}\")\n", + "\"\"\"\n", + "\n", + "_ = boto3.client(\"s3\").put_object(\n", + " Body=script,\n", + " Bucket=bucket,\n", + " Key=\"test_docker.py\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit PySpark step" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit --deploy-mode cluster s3://{bucket}/test_docker.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Wait Step" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Terminate Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "wr.emr.terminate_cluster(cluster_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From c2db8cd27bbd80da857b40df737385c4bd254eb7 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sat, 25 Apr 2020 17:41:16 -0300 Subject: [PATCH 2/6] Add support to EMR with Docker #193 --- awswrangler/__init__.py | 1 + awswrangler/_utils.py | 13 + awswrangler/athena.py | 2 +- awswrangler/emr.py | 335 ++++++++++++++++---- awswrangler/s3.py | 65 +++- docs/source/api.rst | 2 + requirements-dev.txt | 3 +- testing/test_awswrangler/test_cloudwatch.py | 2 +- testing/test_awswrangler/test_data_lake.py | 3 + testing/test_awswrangler/test_emr.py | 33 ++ testing/test_awswrangler/test_moto.py | 27 +- tutorials/15 - EMR.ipynb | 193 +++++++++++ tutorials/16 - EMR & Docker.ipynb | 269 ++++++++++++++++ 13 files changed, 869 insertions(+), 79 deletions(-) create mode 100644 tutorials/15 - EMR.ipynb create mode 100644 tutorials/16 - EMR & Docker.ipynb diff --git a/awswrangler/__init__.py b/awswrangler/__init__.py index ce11c7ad5..4413ab5f4 100644 --- a/awswrangler/__init__.py +++ b/awswrangler/__init__.py @@ -9,5 +9,6 @@ from awswrangler import athena, catalog, cloudwatch, db, emr, exceptions, s3 # noqa from awswrangler.__metadata__ import __description__, __license__, __title__, __version__ # noqa +from awswrangler._utils import get_account_id # noqa logging.getLogger("awswrangler").addHandler(logging.NullHandler()) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 21a27d37e..df168bdb9 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -166,3 +166,16 @@ def ensure_postgresql_casts(): def get_directory(path: str) -> str: """Extract directory path.""" return path.rsplit(sep="/", maxsplit=1)[0] + "/" + + +def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str: + """Get Account ID.""" + session: boto3.Session = ensure_session(session=boto3_session) + return client(service_name="sts", session=session).get_caller_identity().get("Account") + + +def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: + """Extract region from Subnet ID.""" + session: boto3.Session = ensure_session(session=boto3_session) + client_ec2: boto3.client = client(service_name="ec2", session=session) + return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9] diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 1933606ad..d73c41063 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -68,7 +68,7 @@ def create_athena_bucket(boto3_session: Optional[boto3.Session] = None) -> str: """ session: boto3.Session = _utils.ensure_session(session=boto3_session) - account_id: str = _utils.client(service_name="sts", session=session).get_caller_identity().get("Account") + account_id: str = _utils.get_account_id(boto3_session=session) region_name: str = str(session.region_name).lower() s3_output = f"s3://aws-athena-query-results-{account_id}-{region_name}/" s3_resource = session.resource("s3") diff --git a/awswrangler/emr.py b/awswrangler/emr.py index aee470621..106a57da3 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -7,12 +7,76 @@ import boto3 # type: ignore -from awswrangler import _utils +from awswrangler import _utils, exceptions _logger: logging.Logger = logging.getLogger(__name__) +def _get_default_logging_path( + subnet_id: Optional[str] = None, + account_id: Optional[str] = None, + region: Optional[str] = None, + boto3_session: Optional[boto3.Session] = None, +) -> str: + """Get EMR default logging path. + + E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/" + + Parameters + ---------- + subnet_id : str, optional + Subnet ID. If not provided, you must pass `account_id` and `region` explicit. + account_id: str, optional + Account ID. + region: str, optional + Region e.g. 'us-east-1' + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Default logging path. + E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/" + + Examples + -------- + >>> import awswrangler as wr + >>> state = wr.emr._get_default_logging_path("subnet-id") + 's3://aws-logs-{account_id}-{region}/elasticmapreduce/' + + """ + if account_id is None: + boto3_session = _utils.ensure_session(session=boto3_session) + _account_id: str = _utils.get_account_id(boto3_session=boto3_session) + else: + _account_id = account_id + if (region is None) and (subnet_id is not None): + boto3_session = _utils.ensure_session(session=boto3_session) + _region: str = _utils.get_region_from_subnet(subnet_id=subnet_id, boto3_session=boto3_session) + elif (region is None) and (subnet_id is None): + raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.") + else: + _region = region # type: ignore + return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/" + + +def _get_ecr_credentials_command() -> str: + return ( + "sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email) && " + "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/" + ) + + def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-statements + account_id: str = _utils.get_account_id(boto3_session=pars["boto3_session"]) + region: str = _utils.get_region_from_subnet(subnet_id=pars["subnet_id"], boto3_session=pars["boto3_session"]) + + # S3 Logging path + if pars.get("logging_s3_path") is None: + pars["logging_s3_path"] = _get_default_logging_path( + subnet_id=None, account_id=account_id, region=region, boto3_session=pars["boto3_session"] + ) spark_env: Optional[Dict[str, str]] = None yarn_env: Optional[Dict[str, str]] = None @@ -20,25 +84,25 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s if pars["spark_pyarrow"] is True: if pars["spark_defaults"] is None: - pars["spark_defaults"]: Dict[str, str] = {"spark.sql.execution.arrow.enabled": "true"} + pars["spark_defaults"] = {"spark.sql.execution.arrow.enabled": "true"} else: # pragma: no cover - pars["spark_defaults"]["spark.sql.execution.arrow.enabled"]: str = "true" + pars["spark_defaults"]["spark.sql.execution.arrow.enabled"] = "true" spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"} if pars["python3"] is True: if spark_env is None: - spark_env: Dict[str, str] = {"PYSPARK_PYTHON": "/usr/bin/python3"} # pragma: no cover + spark_env = {"PYSPARK_PYTHON": "/usr/bin/python3"} # pragma: no cover else: - spark_env["PYSPARK_PYTHON"]: str = "/usr/bin/python3" + spark_env["PYSPARK_PYTHON"] = "/usr/bin/python3" if pars["spark_jars_path"] is not None: paths: str = ",".join(pars["spark_jars_path"]) if pars["spark_defaults"] is None: # pragma: no cover - pars["spark_defaults"]: Dict[str, str] = {"spark.jars": paths} + pars["spark_defaults"] = {"spark.jars": paths} else: - pars["spark_defaults"]["spark.jars"]: str = paths + pars["spark_defaults"]["spark.jars"] = paths args: Dict[str, Any] = { "Name": pars["cluster_name"], @@ -72,9 +136,52 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"] # Configurations - args["Configurations"]: List[Dict[str, Any]] = [ + args["Configurations"] = [ {"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}} ] + if (pars["docker"] is True) or (pars["spark_docker"] is True) or (pars["hive_docker"] is True): + if pars.get("extra_registries") is None: + extra_registries: List[str] = [] + else: # pragma: no cover + extra_registries = pars["extra_registries"] + registries: str = f"local,centos,{account_id}.dkr.ecr.{region}.amazonaws.com,{','.join(extra_registries)}" + registries = registries[:-1] if registries.endswith(",") else registries + args["Configurations"].append( + { + "Classification": "container-executor", + "Properties": {}, + "Configurations": [ + { + "Classification": "docker", + "Properties": { + "docker.privileged-containers.registries": registries, + "docker.trusted.registries": registries, + }, + "Configurations": [], + } + ], + } + ) + if pars["spark_docker"] is True: + if pars.get("spark_docker_image") is None: # pragma: no cover + raise exceptions.InvalidArgumentCombination("You must pass a spark_docker_image if spark_docker is True.") + pars["spark_defaults"] = {} if pars["spark_defaults"] is None else pars["spark_defaults"] + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" + pars["spark_defaults"][ + "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" + ] = "hdfs:///user/hadoop/config.json" + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars["spark_docker_image"] + pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"] = "/etc/passwd:/etc/passwd:ro" + pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" + pars["spark_defaults"][ + "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" + ] = "hdfs:///user/hadoop/config.json" + pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars[ + "spark_docker_image" + ] + pars["spark_defaults"][ + "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS" + ] = "/etc/passwd:/etc/passwd:ro" if spark_env is not None: args["Configurations"].append( { @@ -109,16 +216,21 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "Configurations": [], } ) + + hive_conf: Optional[Dict[str, Any]] = None + if (pars["hive_glue_catalog"] is True) or (pars["hive_docker"] is True): + hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []} + if pars["hive_glue_catalog"] is True: - args["Configurations"].append( - { - "Classification": "hive-site", - "Properties": { - "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" # noqa - }, - "Configurations": [], - } - ) + hive_conf["Properties"][ + "hive.metastore.client.factory.class" + ] = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + if pars["hive_docker"] is True: + hive_conf["Properties"]["hive.execution.mode"] = "container" + + if hive_conf is not None: + args["Configurations"].append(hive_conf) + if pars["presto_glue_catalog"] is True: args["Configurations"].append( { @@ -147,20 +259,21 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "Properties": pars["spark_defaults"], } args["Configurations"].append(spark_defaults) + if pars.get("custom_classifications") is not None: + for c in pars["custom_classifications"]: + args["Configurations"].append(c) # Applications if pars["applications"]: - args["Applications"]: List[Dict[str, str]] = [{"Name": x} for x in pars["applications"]] + args["Applications"] = [{"Name": x} for x in pars["applications"]] # Bootstraps if pars["bootstraps_paths"]: # pragma: no cover - args["BootstrapActions"]: List[Dict] = [ - {"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"] - ] + args["BootstrapActions"] = [{"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"]] # Debugging and Steps if (pars["debugging"] is True) or (pars["steps"] is not None): - args["Steps"]: List[Dict[str, Any]] = [] + args["Steps"] = [] if pars["debugging"] is True: args["Steps"].append( { @@ -169,6 +282,17 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]}, } ) + if pars["ecr_credentials_step"] is True: + args["Steps"].append( + build_step( + name="ECR Credentials Setup", + command=_get_ecr_credentials_command(), + action_on_failure="TERMINATE_CLUSTER", + script=False, + region=region, + boto3_session=pars["boto3_session"], + ) + ) if pars["steps"] is not None: args["Steps"] += pars["steps"] @@ -199,7 +323,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_master"] > 0: # pragma: no cover - fleet_master["LaunchSpecifications"]: Dict = { + fleet_master["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_master"], "TimeoutAction": timeout_action_master, @@ -236,7 +360,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_core"] > 0: - fleet_core["LaunchSpecifications"]: Dict = { + fleet_core["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"], "TimeoutAction": timeout_action_core, @@ -275,7 +399,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } if pars["instance_num_spot_task"] > 0: - fleet_task["LaunchSpecifications"]: Dict = { + fleet_task["LaunchSpecifications"] = { "SpotSpecification": { "TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"], "TimeoutAction": timeout_action_task, @@ -292,30 +416,30 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused-argument - cluster_name: str, - logging_s3_path: str, - emr_release: str, subnet_id: str, - emr_ec2_role: str, - emr_role: str, - instance_type_master: str, - instance_type_core: str, - instance_type_task: str, - instance_ebs_size_master: int, - instance_ebs_size_core: int, - instance_ebs_size_task: int, - instance_num_on_demand_master: int, - instance_num_on_demand_core: int, - instance_num_on_demand_task: int, - instance_num_spot_master: int, - instance_num_spot_core: int, - instance_num_spot_task: int, - spot_bid_percentage_of_on_demand_master: int, - spot_bid_percentage_of_on_demand_core: int, - spot_bid_percentage_of_on_demand_task: int, - spot_provisioning_timeout_master: int, - spot_provisioning_timeout_core: int, - spot_provisioning_timeout_task: int, + cluster_name: str = "my-emr-cluster", + logging_s3_path: Optional[str] = None, + emr_release: str = "emr-6.0.0", + emr_ec2_role: str = "EMR_EC2_DefaultRole", + emr_role: str = "EMR_DefaultRole", + instance_type_master: str = "r5.xlarge", + instance_type_core: str = "r5.xlarge", + instance_type_task: str = "r5.xlarge", + instance_ebs_size_master: int = 64, + instance_ebs_size_core: int = 64, + instance_ebs_size_task: int = 64, + instance_num_on_demand_master: int = 1, + instance_num_on_demand_core: int = 0, + instance_num_on_demand_task: int = 0, + instance_num_spot_master: int = 0, + instance_num_spot_core: int = 0, + instance_num_spot_task: int = 0, + spot_bid_percentage_of_on_demand_master: int = 100, + spot_bid_percentage_of_on_demand_core: int = 100, + spot_bid_percentage_of_on_demand_task: int = 100, + spot_provisioning_timeout_master: int = 5, + spot_provisioning_timeout_core: int = 5, + spot_provisioning_timeout_task: int = 5, spot_timeout_to_on_demand_master: bool = True, spot_timeout_to_on_demand_core: bool = True, spot_timeout_to_on_demand_task: bool = True, @@ -337,10 +461,17 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused security_group_slave: Optional[str] = None, security_groups_slave_additional: Optional[List[str]] = None, security_group_service_access: Optional[str] = None, + docker: bool = False, spark_log_level: str = "WARN", spark_jars_path: Optional[List[str]] = None, spark_defaults: Optional[Dict[str, str]] = None, spark_pyarrow: bool = False, + spark_docker: bool = False, + spark_docker_image: str = None, + hive_docker: bool = False, + ecr_credentials_step: bool = False, + extra_public_registries: Optional[List[str]] = None, + custom_classifications: Optional[List[Dict[str, Any]]] = None, maximize_resource_allocation: bool = False, steps: Optional[List[Dict[str, Any]]] = None, keep_cluster_alive_when_no_steps: bool = True, @@ -354,18 +485,19 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Parameters ---------- + subnet_id : str + VPC subnet ID. cluster_name : str Cluster name. - logging_s3_path : str + logging_s3_path : str, optional Logging s3 path (e.g. s3://BUCKET_NAME/DIRECTORY_NAME/). + If None, the default is `s3://aws-logs-{AccountId}-{RegionId}/elasticmapreduce/` emr_release : str EMR release (e.g. emr-5.28.0). emr_ec2_role : str IAM role name. emr_role : str IAM role name. - subnet_id : str - VPC subnet ID. instance_type_master : str EC2 instance type. instance_type_core : str @@ -448,6 +580,7 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Debugging enabled? applications : List[str], optional List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"]). + If None, ["Spark"] will be considered. visible_to_all_users : bool True or False. key_pair_name : str, optional @@ -465,6 +598,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused security_group_service_access : str, optional The identifier of the Amazon EC2 security group for the Amazon EMR service to access clusters in VPC private subnets. + docker : bool + Enable Docker Hub and ECR registries access. spark_log_level : str log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE). spark_jars_path : List[str], optional @@ -475,6 +610,18 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused spark_pyarrow : bool Enable PySpark to use PyArrow behind the scenes. P.S. You must install pyarrow by your self via bootstrap + spark_docker : bool = False + Add necessary Spark Defaults to run on Docker + spark_docker_image : str, optional + E.g. {ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG} + hive_docker : bool + Add necessary configurations to run on Docker + ecr_credentials_step : bool + Add a extra step during the Cluster launch to retrieve ECR auth files. + extra_public_registries: List[str], optional + Additional registries. + custom_classifications: List[Dict[str, Any]], optional + Extra classifications. maximize_resource_allocation : bool Configure your executors to utilize the maximum resources possible https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation @@ -500,6 +647,21 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused Examples -------- + Minimal Example + + >>> cluster_id = wr.emr.create_cluster("SUBNET_ID") + + Minimal Exmaple on Docker + + >>> cluster_id = wr.emr.create_cluster( + >>> subnet_id="SUBNET_ID", + >>> spark_docker=True, + >>> spark_docker_image="{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}", + >>> ecr_credentials_step=True + >>> ) + + Full Example + >>> import awswrangler as wr >>> cluster_id = wr.emr.create_cluster( ... cluster_name="wrangler_cluster", @@ -548,6 +710,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused ... }) """ + applications = ["Spark"] if applications is None else applications + boto3_session = _utils.ensure_session(session=boto3_session) args: Dict[str, Any] = _build_cluster_args(**locals()) client_emr: boto3.client = _utils.client(service_name="emr", session=boto3_session) response: Dict[str, Any] = client_emr.run_job_flow(**args) @@ -647,8 +811,8 @@ def submit_steps( def submit_step( cluster_id: str, - name: str, command: str, + name: str = "my-step", action_on_failure: str = "CONTINUE", script: bool = False, boto3_session: Optional[boto3.Session] = None, @@ -659,11 +823,11 @@ def submit_step( ---------- cluster_id : str Cluster ID. - name : str - Step name. command : str e.g. 'echo "Hello!"' e.g. for script 's3://.../script.sh arg1 arg2' + name : str, optional + Step name. action_on_failure : str 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' script : bool @@ -698,26 +862,29 @@ def submit_step( def build_step( - name: str, command: str, + name: str = "my-step", action_on_failure: str = "CONTINUE", script: bool = False, + region: Optional[str] = None, boto3_session: Optional[boto3.Session] = None, ) -> Dict[str, Any]: """Build the Step structure (dictionary). Parameters ---------- - name : str - Step name. command : str e.g. 'echo "Hello!"' e.g. for script 's3://.../script.sh arg1 arg2' + name : str, optional + Step name. action_on_failure : str 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' script : bool - True for raw command or False for script runner. + False for raw command or True for script runner. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html + region: str, optional + Region name to not get it from boto3.Session. (e.g. `us-east-1`) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -734,14 +901,17 @@ def build_step( >>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps) """ - session: boto3.Session = _utils.ensure_session(session=boto3_session) jar: str = "command-runner.jar" if script is True: - if session.region_name is not None: - region: str = session.region_name - else: # pragma: no cover - region = "us-east-1" - jar = f"s3://{region}.elasticmapreduce/libs/script-runner/script-runner.jar" + if region is not None: # pragma: no cover + _region: str = region + else: + session: boto3.Session = _utils.ensure_session(session=boto3_session) + if session.region_name is not None: + _region = session.region_name + else: # pragma: no cover + _region = "us-east-1" + jar = f"s3://{_region}.elasticmapreduce/libs/script-runner/script-runner.jar" step: Dict[str, Any] = { "Name": name, "ActionOnFailure": action_on_failure, @@ -780,3 +950,40 @@ def get_step_state(cluster_id: str, step_id: str, boto3_session: Optional[boto3. response: Dict[str, Any] = client_emr.describe_step(ClusterId=cluster_id, StepId=step_id) _logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}") return response["Step"]["Status"]["State"] + + +def update_ecr_credentials( + cluster_id: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None +) -> str: + """Update internal ECR credentials. + + Parameters + ---------- + cluster_id : str + Cluster ID. + action_on_failure : str + 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Step ID. + + Examples + -------- + >>> import awswrangler as wr + >>> step_id = wr.emr.update_ecr_credentials("cluster_id") + + """ + name: str = "Update ECR Credentials" + command: str = _get_ecr_credentials_command() + session: boto3.Session = _utils.ensure_session(session=boto3_session) + step: Dict[str, Any] = build_step( + name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=session + ) + client_emr: boto3.client = _utils.client(service_name="emr", session=session) + response: Dict[str, Any] = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) + _logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}") + return response["StepIds"][0] diff --git a/awswrangler/s3.py b/awswrangler/s3.py index f728937db..527c1ae76 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -111,6 +111,40 @@ def does_object_exist(path: str, boto3_session: Optional[boto3.Session] = None) raise ex # pragma: no cover +def list_directories(path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]: + """List Amazon S3 objects from a prefix. + + Parameters + ---------- + path : str + S3 path (e.g. s3://bucket/prefix). + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of objects paths. + + Examples + -------- + Using the default boto3 session + + >>> import awswrangler as wr + >>> wr.s3.list_objects('s3://bucket/prefix/') + ['s3://bucket/prefix/dir0', 's3://bucket/prefix/dir1', 's3://bucket/prefix/dir2'] + + Using a custom boto3 session + + >>> import boto3 + >>> import awswrangler as wr + >>> wr.s3.list_objects('s3://bucket/prefix/', boto3_session=boto3.Session()) + ['s3://bucket/prefix/dir0', 's3://bucket/prefix/dir1', 's3://bucket/prefix/dir2'] + + """ + return _list_objects(path=path, delimiter="/", boto3_session=boto3_session) + + def list_objects(path: str, boto3_session: Optional[boto3.Session] = None) -> List[str]: """List Amazon S3 objects from a prefix. @@ -142,20 +176,37 @@ def list_objects(path: str, boto3_session: Optional[boto3.Session] = None) -> Li ['s3://bucket/prefix0', 's3://bucket/prefix1', 's3://bucket/prefix2'] """ + return _list_objects(path=path, delimiter=None, boto3_session=boto3_session) + + +def _list_objects( + path: str, delimiter: Optional[str] = None, boto3_session: Optional[boto3.Session] = None +) -> List[str]: client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) paginator = client_s3.get_paginator("list_objects_v2") bucket: str prefix: str bucket, prefix = _utils.parse_path(path=path) - response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000}) + args: Dict[str, Any] = {"Bucket": bucket, "Prefix": prefix, "PaginationConfig": {"PageSize": 1000}} + if delimiter is not None: + args["Delimiter"] = delimiter + response_iterator = paginator.paginate(**args) paths: List[str] = [] for page in response_iterator: - contents: Optional[List] = page.get("Contents") - if contents is not None: - for content in contents: - if (content is not None) and ("Key" in content): - key: str = content["Key"] - paths.append(f"s3://{bucket}/{key}") + if delimiter is None: + contents: Optional[List[Optional[Dict[str, str]]]] = page.get("Contents") + if contents is not None: + for content in contents: + if (content is not None) and ("Key" in content): + key: str = content["Key"] + paths.append(f"s3://{bucket}/{key}") + else: + prefixes: Optional[List[Optional[Dict[str, str]]]] = page.get("CommonPrefixes") + if prefixes is not None: + for pfx in prefixes: + if (pfx is not None) and ("Prefix" in pfx): + key = pfx["Prefix"] + paths.append(f"s3://{bucket}/{key}") return paths diff --git a/docs/source/api.rst b/docs/source/api.rst index 897fc7a3e..7d2d51602 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -16,6 +16,7 @@ Amazon S3 does_object_exist get_bucket_region list_objects + list_directories read_csv read_fwf read_json @@ -115,6 +116,7 @@ EMR submit_steps build_step get_step_state + update_ecr_credentials CloudWatch Logs --------------- diff --git a/requirements-dev.txt b/requirements-dev.txt index 3fdd3cdf3..99a9b0730 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -17,4 +17,5 @@ twine~=3.1.1 wheel~=0.34.2 sphinx~=3.0.1 sphinx_bootstrap_theme~=0.7.1 -moto~=1.3.14 \ No newline at end of file +moto~=1.3.14 +jupyterlab~=2.1.1 \ No newline at end of file diff --git a/testing/test_awswrangler/test_cloudwatch.py b/testing/test_awswrangler/test_cloudwatch.py index f59b8b3dd..eced7a754 100644 --- a/testing/test_awswrangler/test_cloudwatch.py +++ b/testing/test_awswrangler/test_cloudwatch.py @@ -48,7 +48,7 @@ def loggroup(cloudformation_outputs): def test_query_cancelled(loggroup): client_logs = boto3.client("logs") query_id = wr.cloudwatch.start_query( - log_group_names=[loggroup], query="fields @timestamp, @message | sort @timestamp desc | limit 5" + log_group_names=[loggroup], query="fields @timestamp, @message | sort @timestamp desc" ) client_logs.stop_query(queryId=query_id) with pytest.raises(exceptions.QueryCancelled): diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index afa2a8307..bd53d4bad 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -127,6 +127,9 @@ def test_athena_ctas(bucket, database, kms_key): partition_cols=["par0", "par1"], )["paths"] wr.s3.wait_objects_exist(paths=paths) + dirs = wr.s3.list_directories(path=f"s3://{bucket}/test_athena_ctas/") + for d in dirs: + assert d.startswith(f"s3://{bucket}/test_athena_ctas/par0=") df = wr.s3.read_parquet_table(table="test_athena_ctas", database=database) assert len(df.index) == 3 ensure_data_types(df=df, has_list=True) diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index e64329b33..df2dab1cb 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -146,3 +146,36 @@ def test_cluster_single_node(bucket, cloudformation_outputs): wr.emr.submit_steps(cluster_id=cluster_id, steps=steps) wr.emr.terminate_cluster(cluster_id=cluster_id) wr.s3.delete_objects(f"s3://{bucket}/emr-logs/") + + +def test_default_logging_path(cloudformation_outputs): + path = wr.emr._get_default_logging_path(subnet_id=cloudformation_outputs["SubnetId"]) + assert path.startswith("s3://aws-logs-") + assert path.endswith("/elasticmapreduce/") + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.emr._get_default_logging_path() + + +def test_docker(cloudformation_outputs): + cluster_id = wr.emr.create_cluster( + subnet_id=cloudformation_outputs["SubnetId"], + docker=True, + spark_docker=True, + spark_docker_image="787535711150.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", + hive_docker=True, + ecr_credentials_step=True, + custom_classifications=[ + { + "Classification": "livy-conf", + "Properties": { + "livy.spark.master": "yarn", + "livy.spark.deploy-mode": "cluster", + "livy.server.session.timeout": "16h", + }, + } + ], + steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://igor-tavares/emr.py")], + ) + wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://igor-tavares/emr.py") + wr.emr.update_ecr_credentials(cluster_id=cluster_id) + wr.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index db12dbe1a..2adc7aec8 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -20,6 +20,21 @@ def emr(): yield True +@pytest.fixture(scope="module") +def sts(): + with moto.mock_sts(): + yield True + + +@pytest.fixture(scope="module") +def subnet(): + with moto.mock_ec2(): + ec2 = boto3.resource("ec2", region_name="us-west-1") + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-1a") + yield subnet.id + + def test_csv(s3): path = "s3://bucket/test.csv" wr.s3.to_csv(df=get_df_csv(), path=path, index=False) @@ -37,12 +52,13 @@ def test_parquet(s3): assert len(df.columns) == 18 -def test_emr(s3, emr): +def test_emr(s3, emr, sts, subnet): + session = boto3.Session(region_name="us-west-1") cluster_id = wr.emr.create_cluster( cluster_name="wrangler_cluster", logging_s3_path="s3://bucket/emr-logs/", emr_release="emr-5.29.0", - subnet_id="foo", + subnet_id=subnet, emr_ec2_role="EMR_EC2_DefaultRole", emr_role="EMR_DefaultRole", instance_type_master="m5.xlarge", @@ -87,11 +103,12 @@ def test_emr(s3, emr): termination_protected=False, spark_pyarrow=False, tags={"foo": "boo", "bar": "xoo"}, + boto3_session=session, ) - wr.emr.get_cluster_state(cluster_id=cluster_id) + wr.emr.get_cluster_state(cluster_id=cluster_id, boto3_session=session) steps = [] for cmd in ['echo "Hello"', "ls -la"]: steps.append(wr.emr.build_step(name=cmd, command=cmd)) - wr.emr.submit_steps(cluster_id=cluster_id, steps=steps) - wr.emr.terminate_cluster(cluster_id=cluster_id) + wr.emr.submit_steps(cluster_id=cluster_id, steps=steps, boto3_session=session) + wr.emr.terminate_cluster(cluster_id=cluster_id, boto3_session=session) wr.s3.delete_objects("s3://bucket/emr-logs/") diff --git a/tutorials/15 - EMR.ipynb b/tutorials/15 - EMR.ipynb new file mode 100644 index 000000000..4e1c627e6 --- /dev/null +++ b/tutorials/15 - EMR.ipynb @@ -0,0 +1,193 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 15 - EMR" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import awswrangler as wr\n", + "import boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your Subnet ID:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ························\n" + ] + } + ], + "source": [ + "subnet = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating EMR Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "cluster_id = wr.emr.create_cluster(subnet)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Uploading our PySpark script to Amazon S3" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "script = \"\"\"\n", + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", + "sc = spark.sparkContext\n", + "\n", + "print(\"Spark Initialized\")\n", + "\"\"\"\n", + "\n", + "_ = boto3.client(\"s3\").put_object(\n", + " Body=script,\n", + " Bucket=bucket,\n", + " Key=\"test.py\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit PySpark step" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit s3://{bucket}/test.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Wait Step" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Terminate Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "wr.emr.terminate_cluster(cluster_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb new file mode 100644 index 000000000..138759d8f --- /dev/null +++ b/tutorials/16 - EMR & Docker.ipynb @@ -0,0 +1,269 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 16 - EMR & Docker" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import awswrangler as wr\n", + "import boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your Subnet ID:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ························\n" + ] + } + ], + "source": [ + "subnet = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Build and Upload Docker Image to ECR repository\n", + "\n", + "Replace the `{ACCOUNT_ID}` placeholder." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%writefile Dockerfile\n" + } + }, + "outputs": [], + "source": [ + "%%writefile Dockerfile\n", + "\n", + "FROM amazoncorretto:8\n", + "\n", + "RUN yum -y update\n", + "RUN yum -y install yum-utils\n", + "RUN yum -y groupinstall development\n", + "\n", + "RUN yum list python3*\n", + "RUN yum -y install python3 python3-dev python3-pip python3-virtualenv\n", + "\n", + "RUN python -V\n", + "RUN python3 -V\n", + "\n", + "ENV PYSPARK_DRIVER_PYTHON python3\n", + "ENV PYSPARK_PYTHON python3\n", + "\n", + "RUN pip3 install --upgrade pip\n", + "RUN pip3 install awswrangler\n", + "\n", + "RUN python3 -c \"import awswrangler as wr\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%bash\n", + "\n", + "docker build -t 'local/emr-wrangler' .\n", + "aws ecr create-repository --repository-name emr-wrangler\n", + "docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\n", + "eval $(aws ecr get-login --region us-east-1 --no-include-email)\n", + "docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating EMR Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", + "\n", + "cluster_id = wr.emr.create_cluster(\n", + " subnet_id=subnet,\n", + " spark_docker=True,\n", + " spark_docker_image=DOCKER_IMAGE,\n", + " ecr_credentials_step=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Uploading our PySpark script to Amazon S3" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "script = \"\"\"\n", + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n", + "sc = spark.sparkContext\n", + "\n", + "print(\"Spark Initialized\")\n", + "\n", + "import awswrangler as wr\n", + "\n", + "print(f\"Wrangler version: {wr.__version__}\")\n", + "\"\"\"\n", + "\n", + "_ = boto3.client(\"s3\").put_object(\n", + " Body=script,\n", + " Bucket=bucket,\n", + " Key=\"test_docker.py\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit PySpark step" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit --deploy-mode cluster s3://{bucket}/test_docker.py\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Wait Step" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Terminate Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "wr.emr.terminate_cluster(cluster_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From 9611a0ae88ebb2b44853ebd460916013383cae26 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sat, 25 Apr 2020 18:17:36 -0300 Subject: [PATCH 3/6] Improve EMR tutorials #193 --- awswrangler/emr.py | 21 ++++++++++++++++++++- testing/test_awswrangler/test_emr.py | 6 +++--- tutorials/16 - EMR & Docker.ipynb | 7 +++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/awswrangler/emr.py b/awswrangler/emr.py index 106a57da3..7490b29c9 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -649,10 +649,29 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused -------- Minimal Example + >>> import awswrangler as wr >>> cluster_id = wr.emr.create_cluster("SUBNET_ID") - Minimal Exmaple on Docker + Minimal Example With Custom Classification + >>> import awswrangler as wr + >>> cluster_id = wr.emr.create_cluster( + >>> subnet_id="SUBNET_ID", + >>> custom_classifications=[ + >>> { + >>> "Classification": "livy-conf", + >>> "Properties": { + >>> "livy.spark.master": "yarn", + >>> "livy.spark.deploy-mode": "cluster", + >>> "livy.server.session.timeout": "16h", + >>> }, + >>> } + >>> ], + >>> ) + + Minimal Example on Docker + + >>> import awswrangler as wr >>> cluster_id = wr.emr.create_cluster( >>> subnet_id="SUBNET_ID", >>> spark_docker=True, diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index df2dab1cb..66f8e139f 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -161,7 +161,7 @@ def test_docker(cloudformation_outputs): subnet_id=cloudformation_outputs["SubnetId"], docker=True, spark_docker=True, - spark_docker_image="787535711150.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", + spark_docker_image="123456789123.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", hive_docker=True, ecr_credentials_step=True, custom_classifications=[ @@ -174,8 +174,8 @@ def test_docker(cloudformation_outputs): }, } ], - steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://igor-tavares/emr.py")], + steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://bucket/emr.py")], ) - wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://igor-tavares/emr.py") + wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://bucket/emr.py") wr.emr.update_ecr_credentials(cluster_id=cluster_id) wr.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb index 138759d8f..440d72066 100644 --- a/tutorials/16 - EMR & Docker.ipynb +++ b/tutorials/16 - EMR & Docker.ipynb @@ -201,7 +201,10 @@ "metadata": {}, "outputs": [], "source": [ - "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit --deploy-mode cluster s3://{bucket}/test_docker.py\")" + "step_id = wr.emr.submit_step(\n", + " cluster_id=cluster_id,\n", + " command=f\"spark-submit --deploy-mode cluster s3://{bucket}/test_docker.py\"\n", + ")" ] }, { @@ -266,4 +269,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} +} \ No newline at end of file From 3c3ca645718aedbae47eb8b4134118d178ef90a4 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 26 Apr 2020 16:30:02 -0300 Subject: [PATCH 4/6] Splitting up the ecr_credentials to a individual function #193 --- awswrangler/emr.py | 184 +++++++++++++++------------ testing/test_awswrangler/test_emr.py | 18 ++- tutorials/16 - EMR & Docker.ipynb | 132 ++++++++++++++++--- 3 files changed, 230 insertions(+), 104 deletions(-) diff --git a/awswrangler/emr.py b/awswrangler/emr.py index 7490b29c9..3658d4573 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -61,13 +61,6 @@ def _get_default_logging_path( return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/" -def _get_ecr_credentials_command() -> str: - return ( - "sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email) && " - "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/" - ) - - def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-statements account_id: str = _utils.get_account_id(boto3_session=pars["boto3_session"]) region: str = _utils.get_region_from_subnet(subnet_id=pars["subnet_id"], boto3_session=pars["boto3_session"]) @@ -139,7 +132,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s args["Configurations"] = [ {"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}} ] - if (pars["docker"] is True) or (pars["spark_docker"] is True) or (pars["hive_docker"] is True): + if pars["docker"] is True: if pars.get("extra_registries") is None: extra_registries: List[str] = [] else: # pragma: no cover @@ -162,26 +155,6 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } ) - if pars["spark_docker"] is True: - if pars.get("spark_docker_image") is None: # pragma: no cover - raise exceptions.InvalidArgumentCombination("You must pass a spark_docker_image if spark_docker is True.") - pars["spark_defaults"] = {} if pars["spark_defaults"] is None else pars["spark_defaults"] - pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" - pars["spark_defaults"][ - "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" - ] = "hdfs:///user/hadoop/config.json" - pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars["spark_docker_image"] - pars["spark_defaults"]["spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"] = "/etc/passwd:/etc/passwd:ro" - pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE"] = "docker" - pars["spark_defaults"][ - "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG" - ] = "hdfs:///user/hadoop/config.json" - pars["spark_defaults"]["spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE"] = pars[ - "spark_docker_image" - ] - pars["spark_defaults"][ - "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS" - ] = "/etc/passwd:/etc/passwd:ro" if spark_env is not None: args["Configurations"].append( { @@ -216,21 +189,12 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "Configurations": [], } ) - - hive_conf: Optional[Dict[str, Any]] = None - if (pars["hive_glue_catalog"] is True) or (pars["hive_docker"] is True): - hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []} - if pars["hive_glue_catalog"] is True: + hive_conf: Optional[Dict[str, Any]] = {"Classification": "hive-site", "Properties": {}, "Configurations": []} hive_conf["Properties"][ "hive.metastore.client.factory.class" ] = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - if pars["hive_docker"] is True: - hive_conf["Properties"]["hive.execution.mode"] = "container" - - if hive_conf is not None: args["Configurations"].append(hive_conf) - if pars["presto_glue_catalog"] is True: args["Configurations"].append( { @@ -282,17 +246,6 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s "HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]}, } ) - if pars["ecr_credentials_step"] is True: - args["Steps"].append( - build_step( - name="ECR Credentials Setup", - command=_get_ecr_credentials_command(), - action_on_failure="TERMINATE_CLUSTER", - script=False, - region=region, - boto3_session=pars["boto3_session"], - ) - ) if pars["steps"] is not None: args["Steps"] += pars["steps"] @@ -462,15 +415,11 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused security_groups_slave_additional: Optional[List[str]] = None, security_group_service_access: Optional[str] = None, docker: bool = False, + extra_public_registries: Optional[List[str]] = None, spark_log_level: str = "WARN", spark_jars_path: Optional[List[str]] = None, spark_defaults: Optional[Dict[str, str]] = None, spark_pyarrow: bool = False, - spark_docker: bool = False, - spark_docker_image: str = None, - hive_docker: bool = False, - ecr_credentials_step: bool = False, - extra_public_registries: Optional[List[str]] = None, custom_classifications: Optional[List[Dict[str, Any]]] = None, maximize_resource_allocation: bool = False, steps: Optional[List[Dict[str, Any]]] = None, @@ -600,6 +549,8 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused service to access clusters in VPC private subnets. docker : bool Enable Docker Hub and ECR registries access. + extra_public_registries: List[str], optional + Additional docker registries. spark_log_level : str log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE). spark_jars_path : List[str], optional @@ -610,16 +561,6 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused spark_pyarrow : bool Enable PySpark to use PyArrow behind the scenes. P.S. You must install pyarrow by your self via bootstrap - spark_docker : bool = False - Add necessary Spark Defaults to run on Docker - spark_docker_image : str, optional - E.g. {ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG} - hive_docker : bool - Add necessary configurations to run on Docker - ecr_credentials_step : bool - Add a extra step during the Cluster launch to retrieve ECR auth files. - extra_public_registries: List[str], optional - Additional registries. custom_classifications: List[Dict[str, Any]], optional Extra classifications. maximize_resource_allocation : bool @@ -669,16 +610,6 @@ def create_cluster( # pylint: disable=too-many-arguments,too-many-locals,unused >>> ], >>> ) - Minimal Example on Docker - - >>> import awswrangler as wr - >>> cluster_id = wr.emr.create_cluster( - >>> subnet_id="SUBNET_ID", - >>> spark_docker=True, - >>> spark_docker_image="{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}", - >>> ecr_credentials_step=True - >>> ) - Full Example >>> import awswrangler as wr @@ -971,8 +902,8 @@ def get_step_state(cluster_id: str, step_id: str, boto3_session: Optional[boto3. return response["Step"]["Status"]["State"] -def update_ecr_credentials( - cluster_id: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None +def submit_ecr_credentials_refresh( + cluster_id: str, path: str, action_on_failure: str = "CONTINUE", boto3_session: Optional[boto3.Session] = None ) -> str: """Update internal ECR credentials. @@ -980,6 +911,8 @@ def update_ecr_credentials( ---------- cluster_id : str Cluster ID. + path : str + Amazon S3 path where Wrangler will stage the script ecr_credentials_refresh.py (e.g. s3://bucket/emr/) action_on_failure : str 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' boto3_session : boto3.Session(), optional @@ -993,12 +926,17 @@ def update_ecr_credentials( Examples -------- >>> import awswrangler as wr - >>> step_id = wr.emr.update_ecr_credentials("cluster_id") + >>> step_id = wr.emr.submit_ecr_credentials_refresh("cluster_id", "s3://bucket/emr/") """ - name: str = "Update ECR Credentials" - command: str = _get_ecr_credentials_command() + path = path[:-1] if path.endswith("/") else path + path_script: str = f"{path}/ecr_credentials_refresh.py" session: boto3.Session = _utils.ensure_session(session=boto3_session) + client_s3: boto3.client = _utils.client(service_name="s3", session=session) + bucket, key = _utils.parse_path(path=path_script) + client_s3.put_object(Body=_get_ecr_credentials_refresh_content().encode(encoding="utf-8"), Bucket=bucket, Key=key) + command: str = f"spark-submit --deploy-mode cluster {path_script}" + name: str = "ECR Credentials Refresh" step: Dict[str, Any] = build_step( name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=session ) @@ -1006,3 +944,91 @@ def update_ecr_credentials( response: Dict[str, Any] = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) _logger.debug(f"response: \n{json.dumps(response, default=str, indent=4)}") return response["StepIds"][0] + + +def _get_ecr_credentials_refresh_content() -> str: + return """ +import subprocess +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("ECR Setup Job").getOrCreate() + +COMMANDS = [ + "sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email)", + "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/" +] + +for command in COMMANDS: + subprocess.run(command.split(" "), timeout=6.0, check=True) + +print("done!") + """ + + +def build_spark_step( + path: str, + deploy_mode: str = "cluster", + docker_image: Optional[str] = None, + name: str = "my-step", + action_on_failure: str = "CONTINUE", + region: Optional[str] = None, + boto3_session: Optional[boto3.Session] = None, +) -> Dict[str, Any]: + """Build the Step structure (dictionary). + + Parameters + ---------- + path : str + Script path. (e.g. s3://bucket/app.py) + deploy_mode : str + "cluster" | "client" + docker_image : str, optional + e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}" + name : str, optional + Step name. + action_on_failure : str + 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + region: str, optional + Region name to not get it from boto3.Session. (e.g. `us-east-1`) + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + Dict[str, Any] + Step structure. + + Examples + -------- + >>> import awswrangler as wr + >>> step_id = wr.emr.submit_steps( + >>> cluster_id="cluster-id", + >>> steps=[ + >>> wr.emr.build_spark_step(path="s3://bucket/app.py") + >>> ] + >>> ) + + """ + if docker_image is None: # pragma: no cover + cmd: str = f"spark-submit --deploy-mode {deploy_mode} {path}" + else: + config: str = "hdfs:///user/hadoop/config.json" + cmd = ( + f"spark-submit --deploy-mode cluster " + f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker " + f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} " + f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} " + f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro " + f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker " + f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} " + f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} " + f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro " + f"{path}" + ) + return build_step( + command=cmd, + name=name, + action_on_failure=action_on_failure, + script=False, + region=region, + boto3_session=boto3_session, + ) diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index 66f8e139f..fdda2fa25 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -156,14 +156,10 @@ def test_default_logging_path(cloudformation_outputs): wr.emr._get_default_logging_path() -def test_docker(cloudformation_outputs): +def test_docker(bucket, cloudformation_outputs): cluster_id = wr.emr.create_cluster( subnet_id=cloudformation_outputs["SubnetId"], docker=True, - spark_docker=True, - spark_docker_image="123456789123.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", - hive_docker=True, - ecr_credentials_step=True, custom_classifications=[ { "Classification": "livy-conf", @@ -176,6 +172,14 @@ def test_docker(cloudformation_outputs): ], steps=[wr.emr.build_step("spark-submit --deploy-mode cluster s3://bucket/emr.py")], ) - wr.emr.submit_step(cluster_id=cluster_id, command="spark-submit --deploy-mode cluster s3://bucket/emr.py") - wr.emr.update_ecr_credentials(cluster_id=cluster_id) + wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/") + wr.emr.submit_steps( + cluster_id=cluster_id, + steps=[ + wr.emr.build_spark_step( + path=f"s3://{bucket}/emr/test_docker.py", + docker_image="123456789123.dkr.ecr.us-east-1.amazonaws.com/docker-emr:docker-emr", + ) + ], + ) wr.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb index 440d72066..8a637af86 100644 --- a/tutorials/16 - EMR & Docker.ipynb +++ b/tutorials/16 - EMR & Docker.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -28,7 +28,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 2, "metadata": {}, "outputs": [ { @@ -53,7 +53,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 3, "metadata": {}, "outputs": [ { @@ -142,25 +142,45 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ - "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", - "\n", - "cluster_id = wr.emr.create_cluster(\n", - " subnet_id=subnet,\n", - " spark_docker=True,\n", - " spark_docker_image=DOCKER_IMAGE,\n", - " ecr_credentials_step=True\n", - ")" + "cluster_id = wr.emr.create_cluster(subnet, docker=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Refresh ECR credentials in the cluster (expiration time: 12h )" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'s-3OPMPDCYGEGOT'" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Uploading our PySpark script to Amazon S3" + "## Uploading application script to Amazon S3 (PySpark)" ] }, { @@ -184,7 +204,7 @@ "_ = boto3.client(\"s3\").put_object(\n", " Body=script,\n", " Bucket=bucket,\n", - " Key=\"test_docker.py\"\n", + " Key=\"emr/test_docker.py\"\n", ")" ] }, @@ -201,9 +221,13 @@ "metadata": {}, "outputs": [], "source": [ - "step_id = wr.emr.submit_step(\n", + "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", + "\n", + "step = wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n", + "\n", + "steps_ids = wr.emr.submit_steps(\n", " cluster_id=cluster_id,\n", - " command=f\"spark-submit --deploy-mode cluster s3://{bucket}/test_docker.py\"\n", + " steps=[step]\n", ")" ] }, @@ -220,7 +244,7 @@ "metadata": {}, "outputs": [], "source": [ - "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", + "while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n", " pass" ] }, @@ -240,6 +264,78 @@ "wr.emr.terminate_cluster(cluster_id)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Another example with custom configurations" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "cluster_id = wr.emr.create_cluster(\n", + " cluster_name=\"my-demo-cluster-v2\",\n", + " logging_s3_path=f\"s3://{bucket}/emr-logs/\",\n", + " emr_release=\"emr-6.0.0\",\n", + " subnet_id=subnet,\n", + " emr_ec2_role=\"EMR_EC2_DefaultRole\",\n", + " emr_role=\"EMR_DefaultRole\",\n", + " instance_type_master=\"m5.2xlarge\",\n", + " instance_type_core=\"m5.2xlarge\",\n", + " instance_ebs_size_master=50,\n", + " instance_ebs_size_core=50,\n", + " instance_num_on_demand_master=0,\n", + " instance_num_on_demand_core=0,\n", + " instance_num_spot_master=1,\n", + " instance_num_spot_core=2,\n", + " spot_bid_percentage_of_on_demand_master=100,\n", + " spot_bid_percentage_of_on_demand_core=100,\n", + " spot_provisioning_timeout_master=5,\n", + " spot_provisioning_timeout_core=5,\n", + " spot_timeout_to_on_demand_master=False,\n", + " spot_timeout_to_on_demand_core=False,\n", + " python3=True,\n", + " docker=True,\n", + " spark_glue_catalog=True,\n", + " hive_glue_catalog=True,\n", + " presto_glue_catalog=True,\n", + " debugging=True,\n", + " applications=[\"Hadoop\", \"Spark\", \"Hive\", \"Zeppelin\", \"Livy\"],\n", + " visible_to_all_users=True,\n", + " maximize_resource_allocation=True,\n", + " keep_cluster_alive_when_no_steps=True,\n", + " termination_protected=False,\n", + " spark_pyarrow=True\n", + ")\n", + "\n", + "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")\n", + "\n", + "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", + "\n", + "steps_ids = wr.emr.submit_steps(\n", + " cluster_id=cluster_id,\n", + " steps=[\n", + " wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n", + " pass\n", + "\n", + "wr.emr.terminate_cluster(cluster_id)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -269,4 +365,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} From 2eefb3aa4ff8694fcdf58bc6e5bb943633c0de5d Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 26 Apr 2020 16:42:46 -0300 Subject: [PATCH 5/6] Small update in the EMR tutorial --- tutorials/16 - EMR & Docker.ipynb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb index 8a637af86..9bfa182fc 100644 --- a/tutorials/16 - EMR & Docker.ipynb +++ b/tutorials/16 - EMR & Docker.ipynb @@ -225,10 +225,7 @@ "\n", "step = wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n", "\n", - "steps_ids = wr.emr.submit_steps(\n", - " cluster_id=cluster_id,\n", - " steps=[step]\n", - ")" + "steps_ids = wr.emr.submit_steps(cluster_id, steps=[step])" ] }, { From f0f154bd42807066dbfa24204fcd28a66c1981ab Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 26 Apr 2020 22:37:45 -0300 Subject: [PATCH 6/6] Add wr.emr.submit_spark_step --- awswrangler/emr.py | 58 ++++++++++++++++++++++++++++ docs/source/api.rst | 4 +- testing/test_awswrangler/test_emr.py | 1 + tutorials/16 - EMR & Docker.ipynb | 47 +++++++++++----------- 4 files changed, 84 insertions(+), 26 deletions(-) diff --git a/awswrangler/emr.py b/awswrangler/emr.py index 3658d4573..3801d340e 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -1032,3 +1032,61 @@ def build_spark_step( region=region, boto3_session=boto3_session, ) + + +def submit_spark_step( + cluster_id: str, + path: str, + deploy_mode: str = "cluster", + docker_image: Optional[str] = None, + name: str = "my-step", + action_on_failure: str = "CONTINUE", + region: Optional[str] = None, + boto3_session: Optional[boto3.Session] = None, +) -> str: + """Submit Spark Step. + + Parameters + ---------- + cluster_id : str + Cluster ID. + path : str + Script path. (e.g. s3://bucket/app.py) + deploy_mode : str + "cluster" | "client" + docker_image : str, optional + e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}" + name : str, optional + Step name. + action_on_failure : str + 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE' + region: str, optional + Region name to not get it from boto3.Session. (e.g. `us-east-1`) + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Step ID. + + Examples + -------- + >>> import awswrangler as wr + >>> step_id = wr.emr.submit_spark_step( + >>> cluster_id="cluster-id", + >>> path="s3://bucket/emr/app.py" + >>> ) + + """ + session: boto3.Session = _utils.ensure_session(session=boto3_session) + step = build_spark_step( + path=path, + deploy_mode=deploy_mode, + docker_image=docker_image, + name=name, + action_on_failure=action_on_failure, + region=region, + boto3_session=session, + ) + return submit_steps(cluster_id=cluster_id, steps=[step], boto3_session=session)[0] diff --git a/docs/source/api.rst b/docs/source/api.rst index 7d2d51602..6b841705e 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -113,10 +113,12 @@ EMR get_cluster_state terminate_cluster submit_step + submit_spark_step + submit_ecr_credentials_refresh submit_steps build_step + build_spark_step get_step_state - update_ecr_credentials CloudWatch Logs --------------- diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index fdda2fa25..0c0112bf8 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -182,4 +182,5 @@ def test_docker(bucket, cloudformation_outputs): ) ], ) + wr.emr.submit_spark_step(cluster_id=cluster_id, path=f"s3://{bucket}/emr/test_docker.py") wr.emr.terminate_cluster(cluster_id=cluster_id) diff --git a/tutorials/16 - EMR & Docker.ipynb b/tutorials/16 - EMR & Docker.ipynb index 9bfa182fc..4ffb2be2b 100644 --- a/tutorials/16 - EMR & Docker.ipynb +++ b/tutorials/16 - EMR & Docker.ipynb @@ -11,12 +11,13 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", - "import boto3" + "import boto3\n", + "import getpass" ] }, { @@ -40,7 +41,6 @@ } ], "source": [ - "import getpass\n", "bucket = getpass.getpass()" ] }, @@ -164,7 +164,7 @@ { "data": { "text/plain": [ - "'s-3OPMPDCYGEGOT'" + "'s-1B0O45RWJL8CL'" ] }, "execution_count": 5, @@ -173,7 +173,7 @@ } ], "source": [ - "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")" + "wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/\")" ] }, { @@ -185,7 +185,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ @@ -201,11 +201,7 @@ "print(f\"Wrangler version: {wr.__version__}\")\n", "\"\"\"\n", "\n", - "_ = boto3.client(\"s3\").put_object(\n", - " Body=script,\n", - " Bucket=bucket,\n", - " Key=\"emr/test_docker.py\"\n", - ")" + "boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\");" ] }, { @@ -217,15 +213,17 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", "\n", - "step = wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n", - "\n", - "steps_ids = wr.emr.submit_steps(cluster_id, steps=[step])" + "step_id = wr.emr.submit_spark_step(\n", + " cluster_id,\n", + " f\"s3://{bucket}/test_docker.py\",\n", + " docker_image=DOCKER_IMAGE\n", + ")" ] }, { @@ -237,11 +235,11 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n", + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", " pass" ] }, @@ -254,7 +252,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -270,7 +268,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ @@ -313,11 +311,10 @@ "\n", "DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n", "\n", - "steps_ids = wr.emr.submit_steps(\n", - " cluster_id=cluster_id,\n", - " steps=[\n", - " wr.emr.build_spark_step(f\"s3://{bucket}/emr/test_docker.py\", docker_image=DOCKER_IMAGE)\n", - " ]\n", + "step_id = wr.emr.submit_spark_step(\n", + " cluster_id,\n", + " f\"s3://{bucket}/test_docker.py\",\n", + " docker_image=DOCKER_IMAGE\n", ")" ] }, @@ -327,7 +324,7 @@ "metadata": {}, "outputs": [], "source": [ - "while wr.emr.get_step_state(cluster_id, steps_ids[0]) != \"COMPLETED\":\n", + "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n", " pass\n", "\n", "wr.emr.terminate_cluster(cluster_id)"