Skip to content

Commit

Permalink
[SPARK-36092][INFRA][BUILD][PYTHON] Migrate to GitHub Actions with Co…
Browse files Browse the repository at this point in the history
…decov from Jenkins

### What changes were proposed in this pull request?

This PR proposes to migrate Coverage report from Jenkins to GitHub Actions by setting a dailly cron job.

### Why are the changes needed?

For some background, currently PySpark code coverage is being reported in this specific Jenkins job: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/

Because of the security issue between [Codecov service](https://app.codecov.io/gh/) and Jenkins machines, we had to work around by manually hosting a coverage site via GitHub pages, see also https://spark-test.github.io/pyspark-coverage-site/ by spark-test account (which is shared to only subset of PMC members).

Since we now run the build via GitHub Actions, we can leverage [Codecov plugin](https://github.com/codecov/codecov-action), and remove the workaround we used.

### Does this PR introduce _any_ user-facing change?

Virtually no. Coverage site (UI) might change but the information it holds should be virtually the same.

### How was this patch tested?

I manually tested:
- Scheduled run: https://github.com/HyukjinKwon/spark/actions/runs/1082261484
- Coverage report: https://codecov.io/gh/HyukjinKwon/spark/tree/73f0291a7df1eda98045cd759303aac1c2a9c929/python/pyspark
- Run against a PR: https://github.com/HyukjinKwon/spark/actions/runs/1082367175

Closes #33591 from HyukjinKwon/SPARK-36092.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Aug 1, 2021
1 parent 72615bc commit c0d1860
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 85 deletions.
32 changes: 28 additions & 4 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ on:
- cron: '0 4 * * *'
# branch-3.2
- cron: '0 7 * * *'
# PySpark coverage for master branch
- cron: '0 10 * * *'

jobs:
configure-jobs:
Expand All @@ -31,6 +33,10 @@ jobs:
echo '::set-output name=branch::branch-3.2'
echo '::set-output name=type::scheduled'
echo '::set-output name=envs::{"SCALA_PROFILE": "scala2.13"}'
elif [ "${{ github.event.schedule }}" = "0 10 * * *" ]; then
echo '::set-output name=branch::master'
echo '::set-output name=type::pyspark-coverage-scheduled'
echo '::set-output name=envs::{"PYSPARK_CODECOV": "true"}'
else
echo '::set-output name=branch::master' # Default branch to run on. CHANGE here when a branch is cut out.
echo '::set-output name=type::regular'
Expand All @@ -41,8 +47,11 @@ jobs:
build:
name: "Build modules (${{ format('{0}, {1} job', needs.configure-jobs.outputs.branch, needs.configure-jobs.outputs.type) }}): ${{ matrix.modules }} ${{ matrix.comment }} (JDK ${{ matrix.java }}, ${{ matrix.hadoop }}, ${{ matrix.hive }})"
needs: configure-jobs
# Do not run as scheduled jobs in forked repos
if: github.repository == 'apache/spark' || needs.configure-jobs.outputs.type == 'regular'
# Run scheduled jobs for Apache Spark only
# Run regular jobs for commit in both Apache Spark and forked repository
if: >-
(github.repository == 'apache/spark' && needs.configure-jobs.outputs.type == 'scheduled')
|| needs.configure-jobs.outputs.type == 'regular'
# Ubuntu 20.04 is the latest LTS. The next LTS is 22.04.
runs-on: ubuntu-20.04
strategy:
Expand Down Expand Up @@ -182,8 +191,12 @@ jobs:

pyspark:
needs: configure-jobs
if: needs.configure-jobs.outputs.type == 'regular'
name: "Build modules: ${{ matrix.modules }}"
# Run PySpark coverage scheduled jobs for Apache Spark only
# Run regular jobs for commit in both Apache Spark and fored repository
if: >-
(github.repository == 'apache/spark' && needs.configure-jobs.outputs.type == 'pyspark-coverage-scheduled')
|| needs.configure-jobs.outputs.type == 'regular'
name: "Build modules (${{ format('{0}, {1} job', needs.configure-jobs.outputs.branch, needs.configure-jobs.outputs.type) }}): ${{ matrix.modules }}"
runs-on: ubuntu-20.04
container:
image: dongjoon/apache-spark-github-action-image:20210602
Expand Down Expand Up @@ -251,11 +264,22 @@ jobs:
bash miniconda.sh -b -p $HOME/miniconda
# Run the tests.
- name: Run tests
env: ${{ fromJSON(needs.configure-jobs.outputs.envs) }}
run: |
# TODO(SPARK-36345): Install mlflow>=1.0 and sklearn in Python 3.9 of the base image
python3.9 -m pip install 'mlflow>=1.0' sklearn
# TODO(SPARK-36361): Install coverage in Python 3.9 and PyPy 3 in the base image
python3.9 -m pip install coverage
pypy3 -m pip install coverage
export PATH=$PATH:$HOME/miniconda/bin
./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST"
- name: Upload coverage to Codecov
if: needs.configure-jobs.outputs.type == 'pyspark-coverage-scheduled'
uses: codecov/codecov-action@v2
with:
files: ./python/coverage.xml
flags: unittests
name: PySpark
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ project/plugins/src_managed/
project/plugins/target/
python/lib/pyspark.zip
python/.eggs/
python/coverage.xml
python/deps
python/docs/_site/
python/docs/source/reference/**/api/
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ and Structured Streaming for stream processing.
[![GitHub Action Build](https://github.com/apache/spark/actions/workflows/build_and_test.yml/badge.svg?branch=master)](https://github.com/apache/spark/actions/workflows/build_and_test.yml?query=branch%3Amaster)
[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-3.2/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-3.2)
[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic&logo=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage&url=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site&query=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan&colorB=brightgreen&style=plastic)](https://spark-test.github.io/pyspark-coverage-site)
[![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/spark)


## Online Documentation
Expand Down
3 changes: 3 additions & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ matplotlib<3.3.0
# PySpark test dependencies
xmlrunner

# PySpark test dependencies (optional)
coverage

# Linter
mypy
flake8
Expand Down
57 changes: 3 additions & 54 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import re
import sys
import subprocess
import glob
import shutil

from sparktestsupport import SPARK_HOME, USER_HOME, ERROR_CODES
from sparktestsupport.shellutils import exit_from_command_with_retcode, run_cmd, rm_r, which
Expand Down Expand Up @@ -522,54 +520,6 @@ def run_python_tests(test_modules, parallelism, with_coverage=False):
x for x in ["python3.9", "pypy3"] if which(x)))
run_cmd(command)

if with_coverage:
post_python_tests_results()


def post_python_tests_results():
if "SPARK_TEST_KEY" not in os.environ:
print("[error] 'SPARK_TEST_KEY' environment variable was not set. Unable to post "
"PySpark coverage results.")
sys.exit(1)
spark_test_key = os.environ.get("SPARK_TEST_KEY")
# The steps below upload HTMLs to 'github.com/spark-test/pyspark-coverage-site'.
# 1. Clone PySpark coverage site.
run_cmd([
"git",
"clone",
"https://spark-test:%s@github.com/spark-test/pyspark-coverage-site.git" % spark_test_key])
# 2. Remove existing HTMLs.
run_cmd(["rm", "-fr"] + glob.glob("pyspark-coverage-site/*"))
# 3. Copy generated coverage HTMLs.
for f in glob.glob("%s/python/test_coverage/htmlcov/*" % SPARK_HOME):
shutil.copy(f, "pyspark-coverage-site/")
os.chdir("pyspark-coverage-site")
try:
# 4. Check out to a temporary branch.
run_cmd(["git", "symbolic-ref", "HEAD", "refs/heads/latest_branch"])
# 5. Add all the files.
run_cmd(["git", "add", "-A"])
# 6. Commit current HTMLs.
run_cmd([
"git",
"-c",
"user.name='Apache Spark Test Account'",
"-c",
"user.email='sparktestacc@gmail.com'",
"commit",
"-am",
"Coverage report at latest commit in Apache Spark"])
# 7. Delete the old branch.
run_cmd(["git", "branch", "-D", "gh-pages"])
# 8. Rename the temporary branch to master.
run_cmd(["git", "branch", "-m", "gh-pages"])
# 9. Finally, force update to our repository.
run_cmd(["git", "push", "-f", "origin", "gh-pages"])
finally:
os.chdir("..")
# 10. Remove the cloned repository.
shutil.rmtree("pyspark-coverage-site")


def run_python_packaging_tests():
set_title_and_block("Running PySpark packaging tests", "BLOCK_PYSPARK_PIP_TESTS")
Expand Down Expand Up @@ -815,11 +765,10 @@ def main():

modules_with_python_tests = [m for m in test_modules if m.python_test_goals]
if modules_with_python_tests:
# We only run PySpark tests with coverage report in one specific job with
# Spark master with SBT in Jenkins.
is_sbt_master_job = "SPARK_MASTER_SBT_HADOOP_2_7" in os.environ
run_python_tests(
modules_with_python_tests, opts.parallelism, with_coverage=is_sbt_master_job)
modules_with_python_tests,
opts.parallelism,
with_coverage=os.environ.get("PYSPARK_CODECOV", "false") == "true")
run_python_packaging_tests()
if any(m.should_run_r_tests for m in test_modules):
run_sparkr_tests()
Expand Down
13 changes: 11 additions & 2 deletions python/pyspark/mllib/tests/test_streaming_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import os
import unittest

from numpy import array, random, exp, dot, all, mean, abs
Expand Down Expand Up @@ -117,7 +118,7 @@ def condition():
self.assertTrue(all(finalModel.centers == array(initCenters)))
self.assertEqual(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
return True
eventually(condition, catch_assertions=True)
eventually(condition, 90, catch_assertions=True)

def test_predictOn_model(self):
"""Test that the model predicts correctly on toy data."""
Expand Down Expand Up @@ -251,7 +252,7 @@ def condition():
return True

# We want all batches to finish for this test.
eventually(condition, 60.0, catch_assertions=True)
eventually(condition, 120, catch_assertions=True)

t_models = array(models)
diff = t_models[1:] - t_models[:-1]
Expand Down Expand Up @@ -292,6 +293,10 @@ def condition():
self.assertTrue(
self.calculate_accuracy_error(true, predicted) < 0.4)

@unittest.skipIf(
"COVERAGE_PROCESS_START" in os.environ,
"Flaky with coverage enabled, skipping for now."
)
def test_training_and_prediction(self):
"""Test that the model improves on toy data with no. of batches"""
input_batches = [
Expand Down Expand Up @@ -428,6 +433,10 @@ def condition():
true, predicted = zip(*batch)
self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)

@unittest.skipIf(
"COVERAGE_PROCESS_START" in os.environ,
"Flaky with coverage enabled, skipping for now."
)
def test_train_prediction(self):
"""Test that error on test data improves as model is trained."""
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
Expand Down
9 changes: 3 additions & 6 deletions python/pyspark/streaming/tests/test_dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

@unittest.skipIf(
"pypy" in platform.python_implementation().lower(),
"The tests fail in PyPy3 implementation for an unknown reason. "
"With PyPy, it causes to hang DStream tests forever when Coverage report is used.")
"The tests fail in PyPy3 implementation for an unknown reason.")
class BasicOperationTests(PySparkStreamingTestCase):

def test_map(self):
Expand Down Expand Up @@ -396,8 +395,7 @@ def failed_func(i):

@unittest.skipIf(
"pypy" in platform.python_implementation().lower(),
"The tests fail in PyPy3 implementation for an unknown reason. "
"With PyPy, it causes to hang DStream tests forever when Coverage report is used.")
"The tests fail in PyPy3 implementation for an unknown reason.")
class WindowFunctionTests(PySparkStreamingTestCase):

timeout = 15
Expand Down Expand Up @@ -477,8 +475,7 @@ def func(dstream):

@unittest.skipIf(
"pypy" in platform.python_implementation().lower(),
"The tests fail in PyPy3 implementation for an unknown reason. "
"With PyPy, it causes to hang DStream tests forever when Coverage report is used.")
"The tests fail in PyPy3 implementation for an unknown reason.")
class CheckpointTests(unittest.TestCase):

setupCalled = False
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def run():
t.daemon = True
t.start()
# wait for scheduler to start
time.sleep(1)
time.sleep(3)

tracker = sc.statusTracker()
jobIds = tracker.getJobIdsForGroup('test_progress_api')
Expand Down
19 changes: 16 additions & 3 deletions python/pyspark/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,22 @@ def run():
t.start()

daemon_pid, worker_pid = 0, 0
cnt = 0
while True:
if os.path.exists(path):
with open(path) as f:
data = f.read().split(' ')
daemon_pid, worker_pid = map(int, data)
break
time.sleep(0.1)
try:
daemon_pid, worker_pid = map(int, data)
except ValueError:
pass
# In case the value is not written yet.
cnt += 1
if cnt == 10:
raise
else:
break
time.sleep(1)

# cancel jobs
self.sc.cancelAllJobs()
Expand Down Expand Up @@ -226,6 +235,10 @@ def f():
self.assertRegex(str(e), "Segmentation fault")


@unittest.skipIf(
"COVERAGE_PROCESS_START" in os.environ,
"Flaky with coverage enabled, skipping for now."
)
class WorkerSegfaultNonDaemonTest(WorkerSegfaultTest):

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions python/run-tests-with-coverage
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ unset COVERAGE_PROCESS_START
find $COVERAGE_DIR/coverage_data -size 0 -print0 | xargs -0 rm -fr
echo "Combining collected coverage data under $COVERAGE_DIR/coverage_data"
$COV_EXEC combine
echo "Creating XML report file at python/coverage.xml"
$COV_EXEC xml --ignore-errors --include "pyspark/*"
echo "Reporting the coverage data at $COVERAGE_DIR/coverage_data/coverage"
$COV_EXEC report --include "pyspark/*"
echo "Generating HTML files for PySpark coverage under $COVERAGE_DIR/htmlcov"
Expand Down
27 changes: 15 additions & 12 deletions python/test_coverage/coverage_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@

import os
import imp
import platform


# This is a hack to always refer the main code rather than built zip.
main_code_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
daemon = imp.load_source("daemon", "%s/pyspark/daemon.py" % main_code_dir)

if "COVERAGE_PROCESS_START" in os.environ:
worker = imp.load_source("worker", "%s/pyspark/worker.py" % main_code_dir)
# PyPy with coverage makes the tests flaky, and CPython is enough for coverage report.
if "pypy" not in platform.python_implementation().lower():
worker = imp.load_source("worker", "%s/pyspark/worker.py" % main_code_dir)

def _cov_wrapped(*args, **kwargs):
import coverage
cov = coverage.coverage(
config_file=os.environ["COVERAGE_PROCESS_START"])
cov.start()
try:
worker.main(*args, **kwargs)
finally:
cov.stop()
cov.save()
daemon.worker_main = _cov_wrapped
def _cov_wrapped(*args, **kwargs):
import coverage
cov = coverage.coverage(
config_file=os.environ["COVERAGE_PROCESS_START"])
cov.start()
try:
worker.main(*args, **kwargs)
finally:
cov.stop()
cov.save()
daemon.worker_main = _cov_wrapped
else:
raise RuntimeError("COVERAGE_PROCESS_START environment variable is not set, exiting.")

Expand Down
7 changes: 5 additions & 2 deletions python/test_coverage/sitecustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@
# If this module is defined, it's executed when the Python session begins.
# `coverage.process_startup()` seeks if COVERAGE_PROCESS_START environment
# variable is set or not. If set, it starts to run the coverage.
import coverage
coverage.process_startup()
try:
import coverage
coverage.process_startup()
except ImportError:
pass

0 comments on commit c0d1860

Please sign in to comment.