Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ under the License.
<artifactId>flink-test-utils-junit</artifactId>
</dependency>

<dependency>
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
60 changes: 2 additions & 58 deletions flink-python/pyflink/pyflink_gateway_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import glob
import os
import platform
import re
import signal
import socket
import sys
import time
from collections import namedtuple
from string import Template
from subprocess import Popen, PIPE, check_output, CalledProcessError
from subprocess import Popen, PIPE

from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root

Expand Down Expand Up @@ -213,47 +211,12 @@ def construct_hadoop_classpath(env):
read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))])


def download_apache_avro():
"""
Currently we need to download the Apache Avro manually to avoid test failure caused by the avro
format sql jar. See https://issues.apache.org/jira/browse/FLINK-17417. If the issue is fixed,
this method could be removed. Using maven command copy the jars in repository to avoid accessing
external network.
"""
flink_source_root = _find_flink_source_root()
avro_jar_pattern = os.path.join(
flink_source_root, "flink-formats", "flink-avro", "target", "avro*.jar")
if len(glob.glob(avro_jar_pattern)) > 0:
# the avro jar already existed, just return.
return
mvn = "mvn.cmd" if on_windows() else "mvn"
avro_version_output = check_output(
[mvn, "help:evaluate", "-Dexpression=avro.version"],
cwd=flink_source_root).decode("utf-8")
lines = avro_version_output.replace("\r", "").split("\n")
avro_version = None
for line in lines:
if line.strip() != "" and re.match(r'^[0-9]+\.[0-9]+(\.[0-9]+)?$', line.strip()):
avro_version = line
break
if avro_version is None:
raise Exception("The Apache Avro version is not found in the maven command output:\n %s" %
avro_version_output)
check_output(
[mvn,
"org.apache.maven.plugins:maven-dependency-plugin:3.2.0:copy",
"-Dartifact=org.apache.avro:avro:%s:jar" % avro_version,
"-DoutputDirectory=%s/flink-formats/flink-avro/target" % flink_source_root],
cwd=flink_source_root)


def construct_test_classpath():
test_jar_patterns = [
"flink-runtime/target/flink-runtime*tests.jar",
"flink-streaming-java/target/flink-streaming-java*tests.jar",
"flink-formats/flink-csv/target/flink-csv*.jar",
"flink-formats/flink-avro/target/flink-avro*.jar",
"flink-formats/flink-avro/target/avro*.jar",
"flink-formats/flink-sql-avro/target/flink-sql-avro*.jar",
"flink-formats/flink-json/target/flink-json*.jar",
"flink-python/target/artifacts/testDataStream.jar",
"flink-python/target/flink-python*-tests.jar",
Expand Down Expand Up @@ -291,25 +254,6 @@ def launch_gateway_server_process(env, args):
classpath = os.pathsep.join(
[construct_flink_classpath(env), construct_hadoop_classpath(env)])
if "FLINK_TESTING" in env:
total_retry_times = 3
retry_times = 0
status = 0
error = None
while retry_times < total_retry_times and not status:
retry_times += 1
try:
download_apache_avro()
status = 1
except CalledProcessError as e:
status = 0
error = e
print("{0} retry download, {1} retries remaining".format(
retry_times, total_retry_times - retry_times))
# sleep 3 seconds and then re-download.
time.sleep(3)
if retry_times == total_retry_times and not status:
raise error

classpath = os.pathsep.join([classpath, construct_test_classpath()])
command = [java_executable, jvm_args, jvm_opts] + log_settings \
+ ["-cp", classpath, program_args.main_class] + program_args.other_args
Expand Down