-
Notifications
You must be signed in to change notification settings - Fork 514
/
kafka.sh
executable file
·242 lines (211 loc) · 8.13 KB
/
kafka.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/bin/bash
# Copyright 2015 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
# Dataproc cluster.
set -euxo pipefail
readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
readonly KAFKA_HOME=/usr/lib/kafka
readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
# The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
ZOOKEEPER_ADDRESS=''
# Integer broker ID of this node, e.g., 0
BROKER_ID=''
function retry_apt_command() {
cmd="$1"
for ((i = 0; i < 10; i++)); do
if eval "$cmd"; then
return 0
fi
sleep 5
done
return 1
}
function recv_keys() {
retry_apt_command "apt-get install -y gnupg2 &&\
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
}
function update_apt_get() {
retry_apt_command "apt-get update"
}
function install_apt_get() {
pkgs="$@"
retry_apt_command "apt-get install -y $pkgs"
}
function err() {
echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
return 1
}
# Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
function get_broker_list() {
${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
<<<"ls /brokers/ids" |
grep '\[.*\]' |
sed 's/\[/ /' |
sed 's/\]/,/'
}
# Waits for zookeeper to be up or time out.
function wait_for_zookeeper() {
for i in {1..20}; do
if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
return 0
else
echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
sleep 5
fi
done
echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
exit 1
}
# Wait until the current broker is registered or time out.
function wait_for_kafka() {
for i in {1..20}; do
local broker_list=$(get_broker_list || true)
if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
return 0
else
echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
sleep 5
fi
done
echo "Failed to start Kafka broker ${BROKER_ID}." >&2
exit 1
}
function install_and_configure_kafka_server() {
# Find zookeeper list first, before attempting any installation.
local zookeeper_client_port
zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
tail -n 1 |
cut -d '=' -f 2)
local zookeeper_list
zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
cut -d '=' -f 2 |
cut -d ':' -f 1 |
sort |
uniq |
sed "s/$/:${zookeeper_client_port}/" |
xargs echo |
sed "s/ /,/g")
if [[ -z "${zookeeper_list}" ]]; then
# Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
# bother to populate it. Check if YARN HA is configured.
zookeeper_list=$(bdconfig get_property_value --configuration_file \
/etc/hadoop/conf/yarn-site.xml \
--name yarn.resourcemanager.zk-address 2>/dev/null)
fi
# If all attempts failed, error out.
if [[ -z "${zookeeper_list}" ]]; then
err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
fi
ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
# Install Kafka from Dataproc distro.
install_apt_get kafka-server || dpkg -l kafka-server ||
err 'Unable to install and find kafka-server.'
mkdir -p /var/lib/kafka-logs
chown kafka:kafka -R /var/lib/kafka-logs
if [[ "${ROLE}" == "Master" ]]; then
# For master nodes, broker ID starts from 10,000.
if [[ "$(hostname)" == *-m ]]; then
# non-HA
BROKER_ID=10000
else
# HA
BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
fi
else
# For worker nodes, broker ID is the worker ID.
BROKER_ID=$(hostname | sed 's/.*-w-\([0-9]*\)$/\1/g')
fi
sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
"${KAFKA_PROP_FILE}"
sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
"${KAFKA_PROP_FILE}"
sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
"${KAFKA_PROP_FILE}"
echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
fi
wait_for_zookeeper
# Start Kafka.
service kafka-server restart
wait_for_kafka
}
function install_kafka_python_package() {
KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
return
fi
if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
/opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
else
OS=$(. /etc/os-release && echo "${ID}")
if [[ "${OS}" == "rocky" ]]; then
yum install -y python2-pip
else
apt-get install -y python-pip
fi
pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
fi
}
function remove_old_backports {
# This script uses 'apt-get update' and is therefore potentially dependent on
# backports repositories which have been archived. In order to mitigate this
# problem, we will remove any reference to backports repos older than oldstable
# https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
if [[ -n "$matched_files" ]]; then
for filename in "$matched_files"; do
grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
sed -i -e 's/^.*-backports.*$//' "$filename"
done
fi
}
function main() {
OS=$(. /etc/os-release && echo "${ID}")
if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
remove_old_backports
fi
recv_keys || err 'Unable to receive keys.'
update_apt_get || err 'Unable to update packages lists.'
install_kafka_python_package
# Only run the installation on workers; verify zookeeper on master(s).
if [[ "${ROLE}" == 'Master' ]]; then
service zookeeper-server status ||
err 'Required zookeeper-server not running on master!'
if [[ "${RUN_ON_MASTER}" == "true" ]]; then
# Run installation on masters.
install_and_configure_kafka_server
else
# On master nodes, just install kafka command-line tools and libs but not
# kafka-server.
install_apt_get kafka ||
err 'Unable to install kafka libraries on master!'
fi
else
# Run installation on workers.
install_and_configure_kafka_server
fi
}
main