diff --git a/Makefile b/Makefile index 7a65bc7..f7e5b5a 100644 --- a/Makefile +++ b/Makefile @@ -7,10 +7,10 @@ SHELL := /bin/sh # Set variables if testing locally ifeq ($(IS_RELEASE_BUILD),) - SPARK_VERSION := 2.4 + SPARK_VERSION := 3.0 PROCESSOR := cpu FRAMEWORK_VERSION := py37 - SM_VERSION := 0.1 + SM_VERSION := 1.0 USE_CASE := processing BUILD_CONTEXT := ./spark/${USE_CASE}/${SPARK_VERSION}/py3 AWS_PARTITION := aws @@ -84,8 +84,8 @@ test-sagemaker: install-sdk build-tests # History server tests can't run in parallel since they use the same container name. pytest -s -vv test/integration/history \ --repo=$(DEST_REPO) --tag=$(VERSION) --durations=0 \ - --spark-version=$(SPARK_VERSION) - --framework_version=$(FRAMEWORK_VERSION) \ + --spark-version=$(SPARK_VERSION) \ + --framework-version=$(FRAMEWORK_VERSION) \ --role $(ROLE) \ --image_uri $(IMAGE_URI) \ --region ${REGION} \ @@ -93,9 +93,10 @@ test-sagemaker: install-sdk build-tests # OBJC_DISABLE_INITIALIZE_FORK_SAFETY: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864 OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES pytest --workers auto -s -vv test/integration/sagemaker \ --repo=$(DEST_REPO) --tag=$(VERSION) --durations=0 \ - --spark-version=$(SPARK_VERSION) - --framework_version=$(FRAMEWORK_VERSION) \ + --spark-version=$(SPARK_VERSION) \ + --framework-version=$(FRAMEWORK_VERSION) \ --role $(ROLE) \ + --account-id ${INTEG_TEST_ACCOUNT} \ --image_uri $(IMAGE_URI) \ --region ${REGION} \ --domain ${AWS_DOMAIN} @@ -104,8 +105,8 @@ test-sagemaker: install-sdk build-tests test-prod: pytest -s -vv test/integration/tag \ --repo=$(DEST_REPO) --tag=$(VERSION) --durations=0 \ - --spark-version=$(SPARK_VERSION) - --framework_version=$(FRAMEWORK_VERSION) \ + --spark-version=$(SPARK_VERSION) \ + --framework-version=$(FRAMEWORK_VERSION) \ --role $(ROLE) \ --image_uri $(IMAGE_URI) \ --region ${REGION} \ diff --git a/new_images.yml b/new_images.yml index cdf8822..c5dcdda 100644 --- a/new_images.yml +++ b/new_images.yml @@ -1,6 +1,6 @@ --- new_images: - - spark: "2.4.4" + - spark: "3.0.0" use-case: "processing" processors: ["cpu"] python: ["py37"] diff --git a/setup.py b/setup.py index 5ab6b7a..24cb4ee 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ ], setup_requires=["setuptools", "wheel"], # Be frugal when adding dependencies. Prefer Python's standard library. - install_requires = install_reqs, + install_requires=install_reqs, extras_require={ "test": test_install_reqs, diff --git a/spark/processing/3.0/py3/container-bootstrap-config/bootstrap.sh b/spark/processing/3.0/py3/container-bootstrap-config/bootstrap.sh new file mode 100644 index 0000000..e4e23bb --- /dev/null +++ b/spark/processing/3.0/py3/container-bootstrap-config/bootstrap.sh @@ -0,0 +1 @@ +echo "Not implemented" \ No newline at end of file diff --git a/spark/processing/3.0/py3/docker/Dockerfile.cpu b/spark/processing/3.0/py3/docker/Dockerfile.cpu new file mode 100644 index 0000000..a61b4f4 --- /dev/null +++ b/spark/processing/3.0/py3/docker/Dockerfile.cpu @@ -0,0 +1,100 @@ +FROM amazonlinux:2 +ARG REGION +ENV AWS_REGION ${REGION} +RUN yum clean all +RUN yum update -y +RUN yum install -y awscli bigtop-utils curl gcc gzip unzip python3 python3-setuptools python3-pip python-devel python3-devel python-psutil gunzip tar wget liblapack* libblas* libopencv* libopenblas* + +# install nginx amazonlinux:2.0.20200304.0 does not have nginx, so need to install epel-release first +RUN wget https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm +RUN yum install -y epel-release-latest-7.noarch.rpm +RUN yum install -y nginx + +RUN rm -rf /var/cache/yum + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +# http://blog.stuart.axelbrooke.com/python-3-on-spark-return-of-the-pythonhashseed +ENV PYTHONHASHSEED 0 +ENV PYTHONIOENCODING UTF-8 +ENV PIP_DISABLE_PIP_VERSION_CHECK 1 + +# Install EMR Spark/Hadoop +ENV HADOOP_HOME /usr/lib/hadoop +ENV HADOOP_CONF_DIR /usr/lib/hadoop/etc/hadoop +ENV SPARK_HOME /usr/lib/spark + +COPY yum/emr-apps.repo /etc/yum.repos.d/emr-apps.repo + +# Install hadoop / spark dependencies from EMR's yum repository for Spark optimizations. +# replace placeholder with region in repository URL +RUN sed -i "s/REGION/${AWS_REGION}/g" /etc/yum.repos.d/emr-apps.repo + +# These packages are a subset of what EMR installs in a cluster with the +# "hadoop", "spark", and "hive" applications. +# They include EMR-optimized libraries and extras. +RUN yum install -y aws-hm-client \ + aws-java-sdk \ + aws-sagemaker-spark-sdk \ + emr-goodies \ + emr-scripts \ + emr-s3-select \ + emrfs \ + hadoop \ + hadoop-client \ + hadoop-hdfs \ + hadoop-hdfs-datanode \ + hadoop-hdfs-namenode \ + hadoop-httpfs \ + hadoop-kms \ + hadoop-lzo \ + hadoop-yarn \ + hadoop-yarn-nodemanager \ + hadoop-yarn-proxyserver \ + hadoop-yarn-resourcemanager \ + hadoop-yarn-timelineserver \ + hive \ + hive-hcatalog \ + hive-hcatalog-server \ + hive-jdbc \ + hive-server2 \ + python37-numpy \ + python37-sagemaker_pyspark \ + s3-dist-cp \ + spark-core \ + spark-datanucleus \ + spark-external \ + spark-history-server \ + spark-python + + +# Point Spark at proper python binary +ENV PYSPARK_PYTHON=/usr/bin/python3 + +# Setup Spark/Yarn/HDFS user as root +ENV PATH="/usr/bin:/opt/program:${PATH}" +ENV YARN_RESOURCEMANAGER_USER="root" +ENV YARN_NODEMANAGER_USER="root" +ENV HDFS_NAMENODE_USER="root" +ENV HDFS_DATANODE_USER="root" +ENV HDFS_SECONDARYNAMENODE_USER="root" + +# Set up bootstrapping program and Spark configuration +COPY *.whl /opt/program/ +RUN /usr/bin/python3 -m pip install /opt/program/*.whl +COPY hadoop-config /opt/hadoop-config +COPY nginx-config /opt/nginx-config +COPY aws-config /opt/aws-config + +# Setup container bootstrapper +COPY container-bootstrap-config /opt/container-bootstrap-config +RUN chmod +x /opt/container-bootstrap-config/bootstrap.sh +RUN /opt/container-bootstrap-config/bootstrap.sh + +# With this config, spark history server will not run as daemon, otherwise there +# will be no server running and container will terminate immediately +ENV SPARK_NO_DAEMONIZE TRUE + +WORKDIR $SPARK_HOME + +ENTRYPOINT ["smspark-submit"] diff --git a/spark/processing/3.0/py3/hadoop-config/core-site.xml b/spark/processing/3.0/py3/hadoop-config/core-site.xml new file mode 100644 index 0000000..52db7b2 --- /dev/null +++ b/spark/processing/3.0/py3/hadoop-config/core-site.xml @@ -0,0 +1,26 @@ + + + + + + + fs.defaultFS + hdfs://nn_uri/ + NameNode URI + + + fs.s3a.aws.credentials.provider + com.amazonaws.auth.DefaultAWSCredentialsProviderChain + AWS S3 credential provider + + + fs.s3.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + s3a filesystem implementation + + + fs.AbstractFileSystem.s3a.imp + org.apache.hadoop.fs.s3a.S3A + s3a filesystem implementation + + diff --git a/spark/processing/3.0/py3/hadoop-config/hdfs-site.xml b/spark/processing/3.0/py3/hadoop-config/hdfs-site.xml new file mode 100644 index 0000000..6ccfb8f --- /dev/null +++ b/spark/processing/3.0/py3/hadoop-config/hdfs-site.xml @@ -0,0 +1,19 @@ + + + + + + + dfs.datanode.data.dir + file:///opt/amazon/hadoop/hdfs/datanode + Comma separated list of paths on the local filesystem of a DataNode where it should store its\ + blocks. + + + + dfs.namenode.name.dir + file:///opt/amazon/hadoop/hdfs/namenode + Path on the local filesystem where the NameNode stores the namespace and transaction logs per\ + sistently. + + diff --git a/spark/processing/3.0/py3/hadoop-config/spark-defaults.conf b/spark/processing/3.0/py3/hadoop-config/spark-defaults.conf new file mode 100644 index 0000000..c1f1c17 --- /dev/null +++ b/spark/processing/3.0/py3/hadoop-config/spark-defaults.conf @@ -0,0 +1,6 @@ +spark.driver.extraClassPath /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar +spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native +spark.executor.extraClassPath /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar +spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native +spark.driver.host=sd_host +spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 diff --git a/spark/processing/3.0/py3/hadoop-config/spark-env.sh b/spark/processing/3.0/py3/hadoop-config/spark-env.sh new file mode 100644 index 0000000..1b58aa1 --- /dev/null +++ b/spark/processing/3.0/py3/hadoop-config/spark-env.sh @@ -0,0 +1,3 @@ +#EMPTY FILE AVOID OVERRIDDING ENV VARS +# Specifically, without copying the empty file, SPARK_HISTORY_OPTS will be overriden, +# spark.history.ui.port defaults to 18082, and spark.eventLog.dir defaults to local fs diff --git a/spark/processing/3.0/py3/hadoop-config/yarn-site.xml b/spark/processing/3.0/py3/hadoop-config/yarn-site.xml new file mode 100644 index 0000000..3790582 --- /dev/null +++ b/spark/processing/3.0/py3/hadoop-config/yarn-site.xml @@ -0,0 +1,34 @@ + + + + + yarn.resourcemanager.hostname + rm_hostname + The hostname of the RM. + + + yarn.nodemanager.hostname + nm_hostname + The hostname of the NM. + + + yarn.nodemanager.webapp.address + nm_webapp_address + + + yarn.nodemanager.vmem-pmem-ratio + 5 + Ratio between virtual memory to physical memory. + + + yarn.resourcemanager.am.max-attempts + 1 + The maximum number of application attempts. + + + yarn.nodemanager.env-whitelist + JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,YARN_HOME,AWS_CONTAINER_CREDENTIALS_RELATIVE_URI + Environment variable whitelist + + + diff --git a/spark/processing/3.0/py3/nginx-config/default.conf b/spark/processing/3.0/py3/nginx-config/default.conf new file mode 100644 index 0000000..a8a50a5 --- /dev/null +++ b/spark/processing/3.0/py3/nginx-config/default.conf @@ -0,0 +1,17 @@ +server { + listen 15050; + server_name localhost; + client_header_buffer_size 128k; + large_client_header_buffers 4 128k; + + location ~ ^/history/(.*)/(.*)/jobs/$ { + proxy_pass http://localhost:18080/history/$1/jobs/; + proxy_redirect http://localhost:18080/history/$1/jobs/ $domain_name/proxy/15050/history/$1/jobs/; + expires off; + } + + location / { + proxy_pass http://localhost:18080; + expires off; + } +} \ No newline at end of file diff --git a/spark/processing/3.0/py3/nginx-config/nginx.conf b/spark/processing/3.0/py3/nginx-config/nginx.conf new file mode 100644 index 0000000..1e3a51c --- /dev/null +++ b/spark/processing/3.0/py3/nginx-config/nginx.conf @@ -0,0 +1,66 @@ +# For more information on configuration, see: +# * Official English Documentation: http://nginx.org/en/docs/ +# * Official Russian Documentation: http://nginx.org/ru/docs/ + +user nginx; +worker_processes auto; +error_log /var/log/nginx/error.log; +pid /run/nginx.pid; + +# Load dynamic modules. See /usr/share/doc/nginx/README.dynamic. +include /usr/share/nginx/modules/*.conf; + +events { + worker_connections 1024; +} + +http { + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + types_hash_max_size 2048; + + include /etc/nginx/mime.types; + default_type application/octet-stream; + + # Load modular configuration files from the /etc/nginx/conf.d directory. + # See http://nginx.org/en/docs/ngx_core_module.html#include + # for more information. + include /etc/nginx/conf.d/*.conf; + + server { + listen 80 default_server; + listen [::]:80 default_server; + server_name _; + root /usr/share/nginx/html; + + # Load configuration files for the default server block. + include /etc/nginx/default.d/*.conf; + + location /proxy/15050 { + proxy_pass http://localhost:15050/; + } + + location ~ ^/proxy/15050/(.*) { + proxy_pass http://localhost:15050/$1; + } + + location / { + } + + error_page 404 /404.html; + location = /40x.html { + } + + error_page 500 502 503 504 /50x.html; + location = /50x.html { + } + } +} \ No newline at end of file diff --git a/spark/processing/3.0/py3/smspark-0.1-py3-none-any.whl b/spark/processing/3.0/py3/smspark-0.1-py3-none-any.whl new file mode 100644 index 0000000..3d4c3d6 Binary files /dev/null and b/spark/processing/3.0/py3/smspark-0.1-py3-none-any.whl differ diff --git a/spark/processing/3.0/py3/yum/emr-apps.repo b/spark/processing/3.0/py3/yum/emr-apps.repo new file mode 100644 index 0000000..6466eba --- /dev/null +++ b/spark/processing/3.0/py3/yum/emr-apps.repo @@ -0,0 +1,7 @@ +[emr-apps] +name = EMR Application Repository +gpgkey = https://s3-REGION.amazonaws.com/repo.REGION.emr.amazonaws.com/apps-repository/emr-6.1.0/72a9ec2e-9bf6-4d7d-9244-86a0ab1e50d6/repoPublicKey.txt +enabled = 1 +baseurl = https://s3-REGION.amazonaws.com/repo.REGION.emr.amazonaws.com/apps-repository/emr-6.1.0/72a9ec2e-9bf6-4d7d-9244-86a0ab1e50d6 +priority = 5 +gpgcheck = 0 diff --git a/src/smspark/bootstrapper.py b/src/smspark/bootstrapper.py index ff1ff65..0369662 100644 --- a/src/smspark/bootstrapper.py +++ b/src/smspark/bootstrapper.py @@ -19,12 +19,13 @@ import shutil import socket import subprocess -from typing import Any, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Dict, List, Sequence, Tuple, Union import psutil import requests from smspark.config import Configuration from smspark.defaults import default_resource_config +from smspark.errors import AlgorithmError from smspark.waiter import Waiter @@ -36,11 +37,18 @@ class Bootstrapper: HADOOP_CONFIG_PATH = "/opt/hadoop-config/" HADOOP_PATH = "/usr/lib/hadoop" SPARK_PATH = "/usr/lib/spark" + HIVE_PATH = "/usr/lib/hive" PROCESSING_CONF_INPUT_PATH = "/opt/ml/processing/input/conf/configuration.json" PROCESSING_JOB_CONFIG_PATH = "/opt/ml/config/processingjobconfig.json" INSTANCE_TYPE_INFO_PATH = "/opt/aws-config/ec2-instance-type-info.json" EMR_CONFIGURE_APPS_URL = "https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html" + JAR_DEST = SPARK_PATH + "/jars" + # jets3t-0.9.0.jar is used by hadoop 2.8.5(https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common) + # and 2.10.0(https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/2.10.0). However, it's not + # needed in 3.2.1 (https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common/3.2.1) + # TODO: use a map with spark version as the key to maintain the optional jars + OPTIONAL_JARS = {"jets3t-0.9.0.jar": HADOOP_PATH + "/lib"} def __init__(self, resource_config: Dict[str, Any] = default_resource_config): logging.basicConfig(level=logging.INFO) @@ -63,28 +71,43 @@ def bootstrap_history_server(self) -> None: def copy_aws_jars(self) -> None: self.logger.info("copying aws jars") - jar_dest = Bootstrapper.SPARK_PATH + "/jars" for f in glob.glob("/usr/share/aws/aws-java-sdk/*.jar"): - shutil.copyfile(f, os.path.join(jar_dest, os.path.basename(f))) - hadoop_aws_jar = "hadoop-aws-2.8.5-amzn-6.jar" - jets3t_jar = "jets3t-0.9.0.jar" - shutil.copyfile( - os.path.join(Bootstrapper.HADOOP_PATH, hadoop_aws_jar), os.path.join(jar_dest, hadoop_aws_jar), - ) - # this jar required for using s3a client + shutil.copyfile(f, os.path.join(self.JAR_DEST, os.path.basename(f))) + hadoop_aws_jar = self._get_hadoop_jar() shutil.copyfile( - os.path.join(Bootstrapper.HADOOP_PATH + "/lib", jets3t_jar), os.path.join(jar_dest, jets3t_jar), + os.path.join(Bootstrapper.HADOOP_PATH, hadoop_aws_jar), os.path.join(self.JAR_DEST, hadoop_aws_jar) ) + + self._copy_optional_jars() # copy hmclient (glue data catalog hive metastore client) jars to classpath: # https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore for f in glob.glob("/usr/share/aws/hmclient/lib/*.jar"): - shutil.copyfile(f, os.path.join(jar_dest, os.path.basename(f))) + shutil.copyfile(f, os.path.join(self.JAR_DEST, os.path.basename(f))) + + # TODO: use glob.glob + def _get_hadoop_jar(self) -> str: + for file_name in os.listdir(Bootstrapper.HADOOP_PATH): + if file_name.startswith("hadoop-aws") and file_name.endswith(".jar"): + self.logger.info(f"Found hadoop jar {file_name}") + return file_name + + raise AlgorithmError("Error finding hadoop jar", caused_by=FileNotFoundError()) + + def _copy_optional_jars(self) -> None: + for jar, jar_path in self.OPTIONAL_JARS.items(): + if os.path.isfile(os.path.join(jar_path, jar)): + self.logger.info(f"Copying optional jar {jar} from {jar_path} to {self.JAR_DEST}") + shutil.copyfile( + os.path.join(jar_path, jar), os.path.join(self.JAR_DEST, jar), + ) + else: + self.logger.info(f"Optional jar {jar} in {jar_path} does not exist") def copy_cluster_config(self) -> None: self.logger.info("copying cluster config") def copy_config(src: str, dst: str) -> None: - self.logger.info("copying {} to {}".format(src, dst)) + self.logger.info(f"copying {src} to {dst}") shutil.copyfile(src, dst) copy_config( @@ -395,7 +418,7 @@ def get_yarn_spark_resource_config( { "spark.driver.memory": f"{driver_mem_mb}m", "spark.driver.memoryOverhead": f"{driver_mem_ovr_mb}m", - "spark.driver.defaultJavaOptions": f"{driver_java_opts}m", + "spark.driver.defaultJavaOptions": f"{driver_java_opts}", "spark.executor.memory": f"{executor_mem_mb}m", "spark.executor.memoryOverhead": f"{executor_mem_ovr_mb}m", "spark.executor.cores": f"{executor_cores}", diff --git a/test/integration/local/test_multinode_container.py b/test/integration/local/test_multinode_container.py index 4c63857..d70143a 100644 --- a/test/integration/local/test_multinode_container.py +++ b/test/integration/local/test_multinode_container.py @@ -75,12 +75,12 @@ def test_pyspark_multinode(input_data: str, output_data: str, verbose_opt: str) def test_scala_spark_multinode(input_data: str, output_data: str, verbose_opt: str) -> None: input = "--input {}".format(input_data) output = "--output {}".format(output_data) - host_jars_dir = "./test/resources/code/scala/hello-scala-spark/lib_managed/jars/org.json4s/json4s-native_2.11" + host_jars_dir = "./test/resources/code/scala/hello-scala-spark/lib_managed/jars/org.json4s/json4s-native_2.12" container_jars_dir = "/opt/ml/processing/input/jars" jars_mount = f"{host_jars_dir}:{container_jars_dir}" jars_arg = f"--jars {container_jars_dir}" class_arg = "--class com.amazonaws.sagemaker.spark.test.HelloScalaSparkApp" - app_jar = "/opt/ml/processing/input/code/scala/hello-scala-spark/target/scala-2.11/hello-scala-spark_2.11-1.0.jar" + app_jar = "/opt/ml/processing/input/code/scala/hello-scala-spark/target/scala-2.12/hello-scala-spark_2.12-1.0.jar" docker_compose_cmd = ( f"JARS_MOUNT={jars_mount} " f"CMD='{jars_arg} {class_arg} {verbose_opt} {app_jar} {input} {output}' " diff --git a/test/integration/sagemaker/test_spark.py b/test/integration/sagemaker/test_spark.py index 5f1c772..be25bd9 100644 --- a/test/integration/sagemaker/test_spark.py +++ b/test/integration/sagemaker/test_spark.py @@ -210,6 +210,69 @@ def test_sagemaker_pyspark_sse_s3(role, image_uri, sagemaker_session, region, sa assert len(output_contents) != 0 +def test_sagemaker_pyspark_sse_kms_s3(role, image_uri, sagemaker_session, region, sagemaker_client, account_id): + spark = PySparkProcessor( + base_job_name="sm-spark-py", + image_uri=image_uri, + role=role, + instance_count=2, + instance_type="ml.c5.xlarge", + max_runtime_in_seconds=1200, + sagemaker_session=sagemaker_session, + ) + + # This test expected AWS managed s3 kms key to be present. The key will be in + # KMS > AWS managed keys > aws/s3 + kms_key_id = None + kms_client = sagemaker_session.boto_session.client("kms", region_name=region) + for alias in kms_client.list_aliases()["Aliases"]: + if "s3" in alias["AliasName"]: + kms_key_id = alias["TargetKeyId"] + + if not kms_key_id: + raise ValueError("AWS managed s3 kms key(alias: aws/s3) does not exist") + + bucket = sagemaker_session.default_bucket() + timestamp = datetime.now().isoformat() + input_data_key = f"spark/input/sales/{timestamp}/data.jsonl" + input_data_uri = f"s3://{bucket}/{input_data_key}" + output_data_uri_prefix = f"spark/output/sales/{timestamp}" + output_data_uri = f"s3://{bucket}/{output_data_uri_prefix}" + s3_client = sagemaker_session.boto_session.client("s3", region_name=region) + with open("test/resources/data/files/data.jsonl") as data: + body = data.read() + s3_client.put_object( + Body=body, Bucket=bucket, Key=input_data_key, ServerSideEncryption="aws:kms", SSEKMSKeyId=kms_key_id + ) + + spark.run( + submit_app="test/resources/code/python/hello_py_spark/hello_py_spark_app.py", + submit_py_files=["test/resources/code/python/hello_py_spark/hello_py_spark_udfs.py"], + arguments=["--input", input_data_uri, "--output", output_data_uri], + configuration={ + "Classification": "core-site", + "Properties": { + "fs.s3a.server-side-encryption-algorithm": "SSE-KMS", + "fs.s3a.server-side-encryption.key": f"arn:aws:kms:{region}:{account_id}:key/{kms_key_id}", + }, + }, + ) + processing_job = spark.latest_job + waiter = sagemaker_client.get_waiter("processing_job_completed_or_stopped") + waiter.wait( + ProcessingJobName=processing_job.job_name, + # poll every 15 seconds. timeout after 15 minutes. + WaiterConfig={"Delay": 15, "MaxAttempts": 60}, + ) + + s3_objects = s3_client.list_objects(Bucket=bucket, Prefix=output_data_uri_prefix)["Contents"] + assert len(s3_objects) != 0 + for s3_object in s3_objects: + object_metadata = s3_client.get_object(Bucket=bucket, Key=s3_object["Key"]) + assert object_metadata["ServerSideEncryption"] == "aws:kms" + assert object_metadata["SSEKMSKeyId"] == f"arn:aws:kms:{region}:{account_id}:key/{kms_key_id}" + + def test_sagemaker_scala_jar_multinode(role, image_uri, configuration, sagemaker_session, sagemaker_client): """Test SparkJarProcessor using Scala application jar with external runtime dependency jars staged by SDK""" spark = SparkJarProcessor( @@ -233,10 +296,10 @@ def test_sagemaker_scala_jar_multinode(role, image_uri, configuration, sagemaker scala_project_dir = "test/resources/code/scala/hello-scala-spark" spark.run( - submit_app="{}/target/scala-2.11/hello-scala-spark_2.11-1.0.jar".format(scala_project_dir), + submit_app="{}/target/scala-2.12/hello-scala-spark_2.12-1.0.jar".format(scala_project_dir), submit_class="com.amazonaws.sagemaker.spark.test.HelloScalaSparkApp", submit_jars=[ - "{}/lib_managed/jars/org.json4s/json4s-native_2.11/json4s-native_2.11-3.6.9.jar".format(scala_project_dir) + "{}/lib_managed/jars/org.json4s/json4s-native_2.12/json4s-native_2.12-3.6.9.jar".format(scala_project_dir) ], arguments=["--input", input_data_uri, "--output", output_data_uri], configuration=configuration, diff --git a/test/resources/code/scala/hello-scala-spark/hello-scala-spark.sbt b/test/resources/code/scala/hello-scala-spark/hello-scala-spark.sbt index 7b44959..1ba7d17 100644 --- a/test/resources/code/scala/hello-scala-spark/hello-scala-spark.sbt +++ b/test/resources/code/scala/hello-scala-spark/hello-scala-spark.sbt @@ -1,9 +1,9 @@ name := "hello-scala-spark" version := "1.0" -scalaVersion := "2.11.12" +scalaVersion := "2.12.12" useCoursier := false retrieveManaged := true -libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" +libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.9" mainClass in (Compile, packageBin) := Some("HelloScalaSparkApp") mainClass in (Compile, run) := Some("HelloScalaSparkApp") diff --git a/test/unit/test_bootstrapper.py b/test/unit/test_bootstrapper.py index c20430c..759fd57 100644 --- a/test/unit/test_bootstrapper.py +++ b/test/unit/test_bootstrapper.py @@ -97,9 +97,11 @@ def test_env_classification(default_bootstrapper): assert output == expected +@patch("os.listdir", return_value=["hadoop-aws-2.8.5-amzn-5.jar"]) +@patch("os.path.isfile", return_value=True) @patch("glob.glob", side_effect=[["/aws-sdk.jar"], ["/hmclient/lib/client.jar"]]) @patch("shutil.copyfile", side_effect=None) -def test_copy_aws_jars(patched_copyfile, patched_glob, default_bootstrapper) -> None: +def test_copy_aws_jars(patched_copyfile, patched_glob, patched_isfile, patched_listdir, default_bootstrapper) -> None: default_bootstrapper.copy_aws_jars() expected = [ @@ -422,7 +424,7 @@ def test_get_yarn_spark_resource_config(default_bootstrapper: Bootstrapper) -> N exp_spark_config_props = { "spark.driver.memory": f"{exp_driver_mem_mb}m", "spark.driver.memoryOverhead": f"{exp_driver_mem_ovr_mb}m", - "spark.driver.defaultJavaOptions": f"{exp_driver_java_opts}m", + "spark.driver.defaultJavaOptions": f"{exp_driver_java_opts}", "spark.executor.memory": f"{exp_executor_mem_mb}m", "spark.executor.memoryOverhead": f"{exp_executor_mem_ovr_mb}m", "spark.executor.cores": f"{exp_executor_cores}",