diff --git a/.test-infra/dataproc/create_flink_cluster.sh b/.test-infra/dataproc/create_flink_cluster.sh new file mode 100755 index 000000000000..da914fdc0081 --- /dev/null +++ b/.test-infra/dataproc/create_flink_cluster.sh @@ -0,0 +1,125 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Runs init actions for Docker, Portability framework (Beam) and Flink cluster +# and opens an SSH tunnel to connect with Flink easily and run Beam jobs. +# +# Provide the following environment to run this script: +# +# CLUSTER_NAME: Cluster name +# GCS_BUCKET: GCS bucket url for Dataproc resources (init actions) +# HARNESS_IMAGES_TO_PULL: Urls to SDK Harness' images to pull on dataproc workers (accepts 1 or more urls) +# FLINK_DOWNLOAD_URL: Url to Flink .tar archive to be installed on the cluster +# FLINK_NUM_WORKERS: Number of Flink workers +# TASK_MANAGER_SLOTS: Number of Flink slots per worker +# DETACHED_MODE: Detached mode: should the SSH tunnel run in detached mode? +# +# Example usage: +# +# CLUSTER_NAME=flink \ +# GCS_BUCKET=gs:///flink \ +# HARNESS_IMAGES_TO_PULL='gcr.io//python:latest gcr.io//java:latest' \ +# FLINK_DOWNLOAD_URL=http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz \ +# FLINK_NUM_WORKERS=2 \ +# TASK_MANAGER_SLOTS=1 \ +# DETACHED_MODE=false \ +# ./create_flink_cluster.sh +# +set -Eeuxo pipefail + +DATAPROC_VERSION=1.2 + +MASTER_NAME="$CLUSTER_NAME-m" + +# GCS properties +INIT_ACTIONS_FOLDER_NAME="init-actions" +FLINK_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/flink.sh" +BEAM_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/beam.sh" +DOCKER_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/docker.sh" + +# Flink properties +FLINK_LOCAL_PORT=8081 +TASK_MANAGER_MEM=10240 +YARN_APPLICATION_MASTER="" + +function upload_init_actions() { + echo "Uploading initialization actions to GCS bucket: $GCS_BUCKET" + gsutil cp -r $INIT_ACTIONS_FOLDER_NAME/* $GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME +} + +function get_leader() { + local i=0 + local application_ids + local application_masters + + echo "Yarn Applications" + while read line; do + echo $line + application_ids[$i]=`echo $line | sed "s/ .*//"` + application_masters[$i]=`echo $line | sed "s/.*$CLUSTER_NAME/$CLUSTER_NAME/" | sed "s/ .*//"` + i=$((i+1)) + done <<< $(gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME") + + if [ $i != 1 ]; then + echo "Multiple applications found. Make sure that only 1 application is running on the cluster." + for app in ${application_ids[*]}; + do + echo $app + done + + echo "Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill \"' to kill the yarn application." + exit 1 + fi + + YARN_APPLICATION_MASTER=${application_masters[0]} + echo "Using Yarn Application master: $YARN_APPLICATION_MASTER" +} + +function start_tunnel() { + local job_server_config=`gcloud compute ssh yarn@$MASTER_NAME --command="curl -s \"http://$YARN_APPLICATION_MASTER/jobmanager/config\""` + local key="jobmanager.rpc.port" + local yarn_application_master_host=`echo $YARN_APPLICATION_MASTER | cut -d ":" -f1` + local jobmanager_rpc_port=`echo $job_server_config | python -c "import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key'][0]"` + + local detached_mode_params=$([[ $DETACHED_MODE == "true" ]] && echo " -Nf >& /dev/null" || echo "") + local tunnel_command="gcloud compute ssh yarn@${MASTER_NAME} -- -L ${FLINK_LOCAL_PORT}:${YARN_APPLICATION_MASTER} -L ${jobmanager_rpc_port}:${yarn_application_master_host}:${jobmanager_rpc_port} -D 1080 ${detached_mode_params}" + + eval $tunnel_command +} + +function create_cluster() { + local metadata="beam-images-to-pull=${HARNESS_IMAGES_TO_PULL}," + metadata+="flink-snapshot-url=${FLINK_DOWNLOAD_URL}," + metadata+="flink-start-yarn-session=true" + + + local image_version=${DATAPROC_VERSION:=1.2} + + echo "Starting dataproc cluster. Dataproc version: $image_version" + + # Docker init action restarts yarn so we need to start yarn session after this restart happens. + # This is why flink init action is invoked last. + gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$FLINK_NUM_WORKERS --initialization-actions $DOCKER_INIT,$BEAM_INIT,$FLINK_INIT --metadata "${metadata}", --image-version=$image_version +} + +function main() { + upload_init_actions + create_cluster # Comment this line to use existing cluster. + get_leader + start_tunnel +} + +main "$@" diff --git a/.test-infra/dataproc/init-actions/beam.sh b/.test-infra/dataproc/init-actions/beam.sh new file mode 100644 index 000000000000..f679715d9746 --- /dev/null +++ b/.test-infra/dataproc/init-actions/beam.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Pulls SDK Harness' images as specified in init action's metadata (see below). +# +# NOTE: In order to be able to pull particular images, their urls must be passed in metadata +# in the form of list, like this: +# +# beam-images-to-pull=gcr.io//: gcr.io//: +# +set -euxo pipefail + +readonly BEAM_IMAGES_TO_PULL_METADATA_KEY="beam-images-to-pull" +readonly BEAM_IMAGES_TO_PULL_DEFAULT="apache.bintray.io/beam/python:master" + + +function pull_images() { + + local beam_images_to_pull="$(/usr/share/google/get_metadata_value \ + "attributes/${BEAM_IMAGES_TO_PULL_METADATA_KEY}" \ + || echo "${BEAM_IMAGES_TO_PULL_DEFAULT}")" + + for image in $beam_images_to_pull + do + echo "Pulling image: ${image}" + + # Pull beam images with `sudo -i` since if pulling from GCR, yarn will be + # configured with GCR authorization + sudo -u yarn -i docker pull ${image} + done +} + +function main() { + pull_images +} + +main "$@" diff --git a/.test-infra/dataproc/init-actions/docker.sh b/.test-infra/dataproc/init-actions/docker.sh new file mode 100644 index 000000000000..b14803aae50d --- /dev/null +++ b/.test-infra/dataproc/init-actions/docker.sh @@ -0,0 +1,103 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This script installs and configures Docker on Google Cloud Dataproc Cluster. +# For information about which software components (and their version) are included +# in Cloud Dataproc clusters, see the Cloud Dataproc Image Version information: +# https://cloud.google.com/dataproc/concepts/dataproc-versions +# +# This file originated from: +# https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/blob/master/docker/docker.sh +# (last commit: 6477e6067cc7a08de165117778a251ac2ed6a62f) +# +set -euxo pipefail + +readonly OS_ID=$(lsb_release -is | tr '[:upper:]' '[:lower:]') +readonly OS_CODE=$(lsb_release -cs) +# TODO: Allow this to be configured by metadata. +readonly DOCKER_VERSION="18.06.0~ce~3-0~${OS_ID}" +readonly CREDENTIAL_HELPER_VERSION='1.5.0' + + +function is_master() { + local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" + if [[ "$role" == 'Master' ]] ; then + true + else + false + fi +} + +function get_docker_gpg() { + curl -fsSL https://download.docker.com/linux/${OS_ID}/gpg +} + +function update_apt_get() { + for ((i = 0; i < 10; i++)) ; do + if apt-get update; then + return 0 + fi + sleep 5 + done + return 1 +} + +function install_docker() { + update_apt_get + apt-get install -y apt-transport-https ca-certificates curl gnupg2 + get_docker_gpg | apt-key add - + echo "deb [arch=amd64] https://download.docker.com/linux/${OS_ID} ${OS_CODE} stable" >/etc/apt/sources.list.d/docker.list + update_apt_get + apt-get install -y docker-ce="${DOCKER_VERSION}" +} + +function configure_gcr() { + # this standalone method is recommended here: + # https://cloud.google.com/container-registry/docs/advanced-authentication#standalone_docker_credential_helper + curl -fsSL "https://github.com/GoogleCloudPlatform/docker-credential-gcr/releases/download/v${CREDENTIAL_HELPER_VERSION}/docker-credential-gcr_linux_amd64-${CREDENTIAL_HELPER_VERSION}.tar.gz" \ + | tar xz --to-stdout ./docker-credential-gcr \ + > /usr/local/bin/docker-credential-gcr && chmod +x /usr/local/bin/docker-credential-gcr + + # this command configures docker on a per-user basis. Therefore we configure + # the root user, as well as the yarn user which is part of the docker group. + # If additional users are added to the docker group later, this command will + # need to be run for them as well. + docker-credential-gcr configure-docker + su yarn --command "docker-credential-gcr configure-docker" +} + +function configure_docker() { + # The installation package should create `docker` group. + usermod -aG docker yarn + # configure docker to use Google Cloud Registry + configure_gcr + + systemctl enable docker + # Restart YARN daemons to pick up new group without restarting nodes. + if is_master ; then + systemctl restart hadoop-yarn-resourcemanager + else + systemctl restart hadoop-yarn-nodemanager + fi +} + +function main() { + install_docker + configure_docker +} + +main "$@" diff --git a/.test-infra/dataproc/init-actions/flink.sh b/.test-infra/dataproc/init-actions/flink.sh new file mode 100644 index 000000000000..bc41cbb21e2e --- /dev/null +++ b/.test-infra/dataproc/init-actions/flink.sh @@ -0,0 +1,205 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script installs Apache Flink (http://flink.apache.org) on a Google Cloud +# Dataproc cluster. This script is based on previous scripts: +# https://github.com/GoogleCloudPlatform/bdutil/tree/master/extensions/flink +# +# To use this script, you will need to configure the following variables to +# match your cluster. For information about which software components +# (and their version) are included in Cloud Dataproc clusters, see the +# Cloud Dataproc Image Version information: +# https://cloud.google.com/dataproc/concepts/dataproc-versions +# +# This file originated from: +# https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/tree/master/flink/flink.sh +# (last commit: 6477e6067cc7a08de165117778a251ac2ed6a62f) +set -euxo pipefail + +# Use Python from /usr/bin instead of /opt/conda. +export PATH=/usr/bin:$PATH + +# Install directories for Flink and Hadoop. +readonly FLINK_INSTALL_DIR='/usr/lib/flink' +readonly FLINK_WORKING_DIR='/var/lib/flink' +readonly FLINK_YARN_SCRIPT='/usr/bin/flink-yarn-daemon' +readonly FLINK_WORKING_USER='yarn' +readonly HADOOP_CONF_DIR='/etc/hadoop/conf' + +# The number of buffers for the network stack. +# Flink config entry: taskmanager.network.numberOfBuffers. +readonly FLINK_NETWORK_NUM_BUFFERS=2048 + +# Heap memory used by the job manager (master) determined by the physical (free) memory of the server. +# Flink config entry: jobmanager.heap.mb. +readonly FLINK_JOBMANAGER_MEMORY_FRACTION='1.0' + +# Heap memory used by the task managers (slaves) determined by the physical (free) memory of the servers. +# Flink config entry: taskmanager.heap.mb. +readonly FLINK_TASKMANAGER_MEMORY_FRACTION='1.0' + +readonly START_FLINK_YARN_SESSION_METADATA_KEY='flink-start-yarn-session' +# Set this to true to start a flink yarn session at initialization time. +readonly START_FLINK_YARN_SESSION_DEFAULT=true + +# Set this to install flink from a snapshot URL instead of apt +readonly FLINK_SNAPSHOT_URL_METADATA_KEY='flink-snapshot-url' + + + +function err() { + echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2 + return 1 +} + +function retry_apt_command() { + cmd="$1" + for ((i = 0; i < 10; i++)); do + if eval "$cmd"; then + return 0 + fi + sleep 5 + done + return 1 +} + +function update_apt_get() { + retry_apt_command "apt-get update" +} + +function install_apt_get() { + pkgs="$@" + retry_apt_command "apt-get install -y $pkgs" +} + +function install_flink_snapshot() { + local work_dir="$(mktemp -d)" + local flink_url="$(/usr/share/google/get_metadata_value "attributes/${FLINK_SNAPSHOT_URL_METADATA_KEY}")" + local flink_local="${work_dir}/flink.tgz" + local flink_toplevel_pattern="${work_dir}/flink-*" + + pushd "${work_dir}" + + curl -o "${flink_local}" "${flink_url}" + tar -xzvf "${flink_local}" + rm "${flink_local}" + # only the first match of the flink toplevel pattern is used + local files=( ${flink_toplevel_pattern} ) + local flink_toplevel="${files[0]}" + mv "${flink_toplevel}" "${FLINK_INSTALL_DIR}" + + popd # work_dir + +} + +function configure_flink() { + # Number of worker nodes in your cluster + local num_workers=$(/usr/share/google/get_metadata_value attributes/dataproc-worker-count) + + # Number of Flink TaskManagers to use. Reserve 1 node for the JobManager. + # NB: This assumes > 1 worker node. + local num_taskmanagers="$(($num_workers - 1))" + + # Determine the number of task slots per worker. + # TODO: Dataproc does not currently set the number of worker cores on the + # master node. However, the spark configuration sets the number of executors + # to be half the number of CPU cores per worker. We use this value to + # determine the number of worker cores. Fix this hack when + # yarn.nodemanager.resource.cpu-vcores is correctly populated. + local spark_executor_cores=$(\ + grep 'spark\.executor\.cores' /etc/spark/conf/spark-defaults.conf \ + | tail -n1 \ + | cut -d'=' -f2) + local flink_taskmanager_slots="$(($spark_executor_cores * 2))" + + # Determine the default parallelism. + local flink_parallelism=$(python -c \ + "print ${num_taskmanagers} * ${flink_taskmanager_slots}") + + # Get worker memory from yarn config. + local worker_total_mem="$(hdfs getconf \ + -confKey yarn.nodemanager.resource.memory-mb)" + local flink_jobmanager_memory=$(python -c \ + "print int(${worker_total_mem} * ${FLINK_JOBMANAGER_MEMORY_FRACTION})") + local flink_taskmanager_memory=$(python -c \ + "print int(${worker_total_mem} * ${FLINK_TASKMANAGER_MEMORY_FRACTION})") + + # Fetch the primary master name from metadata. + local master_hostname="$(/usr/share/google/get_metadata_value attributes/dataproc-master)" + + # create working directory + mkdir -p "${FLINK_WORKING_DIR}" + + # Apply Flink settings by appending them to the default config. + cat << EOF >> ${FLINK_INSTALL_DIR}/conf/flink-conf.yaml +# Settings applied by Cloud Dataproc initialization action +jobmanager.rpc.address: ${master_hostname} +jobmanager.heap.mb: ${flink_jobmanager_memory} +taskmanager.heap.mb: ${flink_taskmanager_memory} +taskmanager.numberOfTaskSlots: ${flink_taskmanager_slots} +parallelism.default: ${flink_parallelism} +taskmanager.network.numberOfBuffers: ${FLINK_NETWORK_NUM_BUFFERS} +fs.hdfs.hadoopconf: ${HADOOP_CONF_DIR} +EOF + +cat > "${FLINK_YARN_SCRIPT}" << EOF +#!/bin/bash +set -exuo pipefail +sudo -u yarn -i \ +HADOOP_CONF_DIR=${HADOOP_CONF_DIR} \ + ${FLINK_INSTALL_DIR}/bin/yarn-session.sh \ + -n "${num_taskmanagers}" \ + -s "${flink_taskmanager_slots}" \ + -jm "${flink_jobmanager_memory}" \ + -tm "${flink_taskmanager_memory}" \ + -nm flink-dataproc \ + --detached +EOF +chmod +x "${FLINK_YARN_SCRIPT}" + +} + +function start_flink_master() { + local start_yarn_session="$(/usr/share/google/get_metadata_value \ + "attributes/${START_FLINK_YARN_SESSION_METADATA_KEY}" \ + || echo "${START_FLINK_YARN_SESSION_DEFAULT}")" + + if ${start_yarn_session} ; then + "${FLINK_YARN_SCRIPT}" + else + echo "Doing nothing" + fi +} + +function main() { + local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" + local snapshot_url="$(/usr/share/google/get_metadata_value \ + "attributes/${FLINK_INSTALL_SNAPSHOT_METADATA_KEY}" \ + || echo "${FLINK_INSTALL_SNAPSHOT_METADATA_DEFAULT}")" + + # check if a flink snapshot URL is specified + if /usr/share/google/get_metadata_value \ + "attributes/${FLINK_SNAPSHOT_URL_METADATA_KEY}" ; then + install_flink_snapshot || err "Unable to install Flink" + else + update_apt_get || err "Unable to update apt-get" + install_apt_get flink || err "Unable to install flink" + fi + configure_flink || err "Flink configuration failed" + if [[ "${role}" == 'Master' ]] ; then + start_flink_master || err "Unable to start Flink master" + fi +} + +main \ No newline at end of file