-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Beam 6677] Flink portability cluster #7848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ef48cd1
46e4522
8d066cb
6b8a5f9
a4777a9
6f9a635
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| #!/usr/bin/env bash | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| # Runs init actions for Docker, Portability framework (Beam) and Flink cluster | ||
| # and opens an SSH tunnel to connect with Flink easily and run Beam jobs. | ||
| # | ||
| # Provide the following environment to run this script: | ||
| # | ||
| # CLUSTER_NAME: Cluster name | ||
| # GCS_BUCKET: GCS bucket url for Dataproc resources (init actions) | ||
| # HARNESS_IMAGES_TO_PULL: Urls to SDK Harness' images to pull on dataproc workers (accepts 1 or more urls) | ||
| # FLINK_DOWNLOAD_URL: Url to Flink .tar archive to be installed on the cluster | ||
| # FLINK_NUM_WORKERS: Number of Flink workers | ||
| # TASK_MANAGER_SLOTS: Number of Flink slots per worker | ||
| # DETACHED_MODE: Detached mode: should the SSH tunnel run in detached mode? | ||
| # | ||
| # Example usage: | ||
| # | ||
| # CLUSTER_NAME=flink \ | ||
| # GCS_BUCKET=gs://<GCS_BUCKET>/flink \ | ||
| # HARNESS_IMAGES_TO_PULL='gcr.io/<IMAGE_REPOSITORY>/python:latest gcr.io/<IMAGE_REPOSITORY>/java:latest' \ | ||
| # FLINK_DOWNLOAD_URL=http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz \ | ||
| # FLINK_NUM_WORKERS=2 \ | ||
| # TASK_MANAGER_SLOTS=1 \ | ||
| # DETACHED_MODE=false \ | ||
| # ./create_flink_cluster.sh | ||
| # | ||
| set -Eeuxo pipefail | ||
|
|
||
| DATAPROC_VERSION=1.2 | ||
|
|
||
| MASTER_NAME="$CLUSTER_NAME-m" | ||
|
|
||
| # GCS properties | ||
| INIT_ACTIONS_FOLDER_NAME="init-actions" | ||
| FLINK_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/flink.sh" | ||
| BEAM_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/beam.sh" | ||
| DOCKER_INIT="$GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME/docker.sh" | ||
|
|
||
| # Flink properties | ||
| FLINK_LOCAL_PORT=8081 | ||
| TASK_MANAGER_MEM=10240 | ||
| YARN_APPLICATION_MASTER="" | ||
|
|
||
| function upload_init_actions() { | ||
| echo "Uploading initialization actions to GCS bucket: $GCS_BUCKET" | ||
| gsutil cp -r $INIT_ACTIONS_FOLDER_NAME/* $GCS_BUCKET/$INIT_ACTIONS_FOLDER_NAME | ||
| } | ||
|
|
||
| function get_leader() { | ||
| local i=0 | ||
| local application_ids | ||
| local application_masters | ||
|
|
||
| echo "Yarn Applications" | ||
| while read line; do | ||
| echo $line | ||
| application_ids[$i]=`echo $line | sed "s/ .*//"` | ||
| application_masters[$i]=`echo $line | sed "s/.*$CLUSTER_NAME/$CLUSTER_NAME/" | sed "s/ .*//"` | ||
| i=$((i+1)) | ||
| done <<< $(gcloud compute ssh yarn@$MASTER_NAME --command="yarn application -list" | grep "$CLUSTER_NAME") | ||
|
|
||
| if [ $i != 1 ]; then | ||
| echo "Multiple applications found. Make sure that only 1 application is running on the cluster." | ||
| for app in ${application_ids[*]}; | ||
| do | ||
| echo $app | ||
| done | ||
|
|
||
| echo "Execute 'gcloud compute ssh yarn@$MASTER_NAME --command=\"yarn application -kill <APP_NAME>\"' to kill the yarn application." | ||
| exit 1 | ||
| fi | ||
|
|
||
| YARN_APPLICATION_MASTER=${application_masters[0]} | ||
| echo "Using Yarn Application master: $YARN_APPLICATION_MASTER" | ||
| } | ||
|
|
||
| function start_tunnel() { | ||
| local job_server_config=`gcloud compute ssh yarn@$MASTER_NAME --command="curl -s \"http://$YARN_APPLICATION_MASTER/jobmanager/config\""` | ||
| local key="jobmanager.rpc.port" | ||
| local yarn_application_master_host=`echo $YARN_APPLICATION_MASTER | cut -d ":" -f1` | ||
| local jobmanager_rpc_port=`echo $job_server_config | python -c "import sys, json; print [ e['value'] for e in json.load(sys.stdin) if e['key'] == u'$key'][0]"` | ||
|
|
||
| local detached_mode_params=$([[ $DETACHED_MODE == "true" ]] && echo " -Nf >& /dev/null" || echo "") | ||
| local tunnel_command="gcloud compute ssh yarn@${MASTER_NAME} -- -L ${FLINK_LOCAL_PORT}:${YARN_APPLICATION_MASTER} -L ${jobmanager_rpc_port}:${yarn_application_master_host}:${jobmanager_rpc_port} -D 1080 ${detached_mode_params}" | ||
|
|
||
| eval $tunnel_command | ||
| } | ||
|
|
||
| function create_cluster() { | ||
| local metadata="beam-images-to-pull=${HARNESS_IMAGES_TO_PULL}," | ||
| metadata+="flink-snapshot-url=${FLINK_DOWNLOAD_URL}," | ||
| metadata+="flink-start-yarn-session=true" | ||
|
|
||
|
|
||
| local image_version=${DATAPROC_VERSION:=1.2} | ||
|
|
||
| echo "Starting dataproc cluster. Dataproc version: $image_version" | ||
|
|
||
| # Docker init action restarts yarn so we need to start yarn session after this restart happens. | ||
| # This is why flink init action is invoked last. | ||
| gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$FLINK_NUM_WORKERS --initialization-actions $DOCKER_INIT,$BEAM_INIT,$FLINK_INIT --metadata "${metadata}", --image-version=$image_version | ||
| } | ||
|
|
||
| function main() { | ||
| upload_init_actions | ||
| create_cluster # Comment this line to use existing cluster. | ||
| get_leader | ||
| start_tunnel | ||
| } | ||
|
|
||
| main "$@" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| #!/usr/bin/env bash | ||
|
|
||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that Beam init action exists in Dataproc repo too, but IMO it didn't fit our needs. I provided a version that is very different. It's got a Beam license header. Is this ok?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems fine to me. |
||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| # Pulls SDK Harness' images as specified in init action's metadata (see below). | ||
| # | ||
| # NOTE: In order to be able to pull particular images, their urls must be passed in metadata | ||
| # in the form of list, like this: | ||
| # | ||
| # beam-images-to-pull=gcr.io/<IMAGE_REPOSITORY>/<IMAGE_NAME>:<IMAGE_REVISION> gcr.io/<IMAGE_REPOSITORY>/<IMAGE_NAME>:<IMAGE_REVISION> | ||
| # | ||
| set -euxo pipefail | ||
|
|
||
| readonly BEAM_IMAGES_TO_PULL_METADATA_KEY="beam-images-to-pull" | ||
| readonly BEAM_IMAGES_TO_PULL_DEFAULT="apache.bintray.io/beam/python:master" | ||
|
|
||
|
|
||
| function pull_images() { | ||
|
|
||
| local beam_images_to_pull="$(/usr/share/google/get_metadata_value \ | ||
| "attributes/${BEAM_IMAGES_TO_PULL_METADATA_KEY}" \ | ||
| || echo "${BEAM_IMAGES_TO_PULL_DEFAULT}")" | ||
|
|
||
| for image in $beam_images_to_pull | ||
| do | ||
| echo "Pulling image: ${image}" | ||
|
|
||
| # Pull beam images with `sudo -i` since if pulling from GCR, yarn will be | ||
| # configured with GCR authorization | ||
| sudo -u yarn -i docker pull ${image} | ||
| done | ||
| } | ||
|
|
||
| function main() { | ||
| pull_images | ||
| } | ||
|
|
||
| main "$@" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| #!/usr/bin/env bash | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file was copied from: https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/blob/master/docker/docker.sh and was unchanged. Is it ok to copy this like that? What license should I put there?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this project is licensed under the Apache license, so it should be fine to just add it. I'll add it for you now to get the precommits running (even though I guess they won't exercise this code..)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Thanks! |
||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you specify in the header what is the origin of this file, and a small summary of what it does please?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
| # This script installs and configures Docker on Google Cloud Dataproc Cluster. | ||
| # For information about which software components (and their version) are included | ||
| # in Cloud Dataproc clusters, see the Cloud Dataproc Image Version information: | ||
| # https://cloud.google.com/dataproc/concepts/dataproc-versions | ||
| # | ||
| # This file originated from: | ||
| # https://github.com/GoogleCloudPlatform/dataproc-initialization-actions/blob/master/docker/docker.sh | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets mention the git version from which we copied this file.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto for flink init action.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, mentioned in both files. The reason I copied the files is that I think it's better to be immune to any hypothetical changes in case scripts from the official buckets change and have the possibility to change the script easily (eg. configure flink/docker/beam as we need). |
||
| # (last commit: 6477e6067cc7a08de165117778a251ac2ed6a62f) | ||
| # | ||
| set -euxo pipefail | ||
|
|
||
| readonly OS_ID=$(lsb_release -is | tr '[:upper:]' '[:lower:]') | ||
| readonly OS_CODE=$(lsb_release -cs) | ||
| # TODO: Allow this to be configured by metadata. | ||
| readonly DOCKER_VERSION="18.06.0~ce~3-0~${OS_ID}" | ||
| readonly CREDENTIAL_HELPER_VERSION='1.5.0' | ||
|
|
||
|
|
||
| function is_master() { | ||
| local role="$(/usr/share/google/get_metadata_value attributes/dataproc-role)" | ||
| if [[ "$role" == 'Master' ]] ; then | ||
| true | ||
| else | ||
| false | ||
| fi | ||
| } | ||
|
|
||
| function get_docker_gpg() { | ||
| curl -fsSL https://download.docker.com/linux/${OS_ID}/gpg | ||
| } | ||
|
|
||
| function update_apt_get() { | ||
| for ((i = 0; i < 10; i++)) ; do | ||
| if apt-get update; then | ||
| return 0 | ||
| fi | ||
| sleep 5 | ||
| done | ||
| return 1 | ||
| } | ||
|
|
||
| function install_docker() { | ||
| update_apt_get | ||
| apt-get install -y apt-transport-https ca-certificates curl gnupg2 | ||
| get_docker_gpg | apt-key add - | ||
| echo "deb [arch=amd64] https://download.docker.com/linux/${OS_ID} ${OS_CODE} stable" >/etc/apt/sources.list.d/docker.list | ||
| update_apt_get | ||
| apt-get install -y docker-ce="${DOCKER_VERSION}" | ||
| } | ||
|
|
||
| function configure_gcr() { | ||
| # this standalone method is recommended here: | ||
| # https://cloud.google.com/container-registry/docs/advanced-authentication#standalone_docker_credential_helper | ||
| curl -fsSL "https://github.com/GoogleCloudPlatform/docker-credential-gcr/releases/download/v${CREDENTIAL_HELPER_VERSION}/docker-credential-gcr_linux_amd64-${CREDENTIAL_HELPER_VERSION}.tar.gz" \ | ||
| | tar xz --to-stdout ./docker-credential-gcr \ | ||
| > /usr/local/bin/docker-credential-gcr && chmod +x /usr/local/bin/docker-credential-gcr | ||
|
|
||
| # this command configures docker on a per-user basis. Therefore we configure | ||
| # the root user, as well as the yarn user which is part of the docker group. | ||
| # If additional users are added to the docker group later, this command will | ||
| # need to be run for them as well. | ||
| docker-credential-gcr configure-docker | ||
| su yarn --command "docker-credential-gcr configure-docker" | ||
| } | ||
|
|
||
| function configure_docker() { | ||
| # The installation package should create `docker` group. | ||
| usermod -aG docker yarn | ||
| # configure docker to use Google Cloud Registry | ||
| configure_gcr | ||
|
|
||
| systemctl enable docker | ||
| # Restart YARN daemons to pick up new group without restarting nodes. | ||
| if is_master ; then | ||
| systemctl restart hadoop-yarn-resourcemanager | ||
| else | ||
| systemctl restart hadoop-yarn-nodemanager | ||
| fi | ||
| } | ||
|
|
||
| function main() { | ||
| install_docker | ||
| configure_docker | ||
| } | ||
|
|
||
| main "$@" | ||
Uh oh!
There was an error while loading. Please reload this page.