Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10675] Add Python GBK Load Tests for streaming on Dataflow #12612

Merged
merged 5 commits into from Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 73 additions & 28 deletions .test-infra/jenkins/job_LoadTests_GBK_Python.groovy
Expand Up @@ -22,119 +22,148 @@ import InfluxDBCredentialsHelper

def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))

def loadTestConfigurations = { datasetName ->
// TODO(BEAM-10774): Skipping some cases because they are too slow.
def TESTS_TO_SKIP = [
'load-tests-python-dataflow-streaming-gbk-1',
'load-tests-python-dataflow-streaming-gbk-2',
'load-tests-python-dataflow-streaming-gbk-4',
'load-tests-python-dataflow-streaming-gbk-5',
]

def loadTestConfigurations = { mode, datasetName ->
[
[
title : 'GroupByKey Python Load test: 2GB of 10B records',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : 'load-tests-python-dataflow-batch-gbk-1-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-1-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_1',
influx_measurement : 'python_batch_gbk_1',
metrics_table : "python_dataflow_${mode}_gbk_1",
influx_measurement : "python_${mode}_gbk_1",
input_options : '\'{"num_records": 200000000,' +
'"key_size": 1,' +
'"value_size": 9}\'',
iterations : 1,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: "NONE"
autoscaling_algorithm: 'NONE',
]
],
[
title : 'GroupByKey Python Load test: 2GB of 100B records',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : 'load-tests-python-dataflow-batch-gbk-2-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-2-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_2',
influx_measurement : 'python_batch_gbk_2',
metrics_table : "python_dataflow_${mode}_gbk_2",
influx_measurement : "python_${mode}_gbk_2",
input_options : '\'{"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
iterations : 1,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: "NONE"
autoscaling_algorithm: 'NONE',
]
],
[
title : 'GroupByKey Python Load test: 2GB of 100kB records',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : 'load-tests-python-dataflow-batch-gbk-3-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-3-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_3',
influx_measurement : 'python_batch_gbk_3',
metrics_table : "python_dataflow_${mode}_gbk_3",
influx_measurement : "python_${mode}_gbk_3",
input_options : '\'{"num_records": 20000,' +
'"key_size": 10000,' +
'"value_size": 90000}\'',
iterations : 1,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: "NONE"
autoscaling_algorithm: 'NONE',
]
],
[
title : 'GroupByKey Python Load test: fanout 4 times with 2GB 10-byte records total',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : 'load-tests-python-dataflow-batch-gbk-4-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-4-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_4',
influx_measurement : 'python_batch_gbk_4',
metrics_table : "python_dataflow_${mode}_gbk_4",
influx_measurement : "python_${mode}_gbk_4",
input_options : '\'{"num_records": 5000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
iterations : 1,
fanout : 4,
num_workers : 5,
autoscaling_algorithm: "NONE"
num_workers : 16,
autoscaling_algorithm: 'NONE',
]
],
[
title : 'GroupByKey Python Load test: fanout 8 times with 2GB 10-byte records total',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : 'load-tests-python-dataflow-batch-gbk-5-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-5-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_5',
influx_measurement : 'python_batch_gbk_5',
metrics_table : "python_dataflow_${mode}_gbk_5",
influx_measurement : "python_${mode}_gbk_5",
input_options : '\'{"num_records": 2500000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
iterations : 1,
fanout : 8,
num_workers : 5,
autoscaling_algorithm: "NONE"
num_workers : 16,
autoscaling_algorithm: 'NONE',
]
],
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
]
.each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
.each { test -> (mode != 'streaming') ?: addStreamingOptions(test) }
.collectMany { test ->
TESTS_TO_SKIP.any { element -> test.pipelineOptions.job_name.startsWith(element) } ? []: [test]
}
}

def addStreamingOptions(test) {
test.pipelineOptions << [
streaming: null,
// Use the new Dataflow runner, which offers improved efficiency of Dataflow jobs.
// See https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
// for more details.
experiments: 'use_runner_v2',
]
}

def loadTestJob = { scope, triggeringContext, mode ->
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON_37,
loadTestConfigurations(mode, datasetName), 'GBK', mode)
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
Expand All @@ -144,16 +173,32 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
this
) {
additionalPipelineArgs = [:]
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', CommonTestProperties.TriggeringContext.PR)
loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON_37, loadTestConfigurations(datasetName), "GBK", "batch")
loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
}

CronJobBuilder.cronJob('beam_LoadTests_Python_GBK_Dataflow_Batch', 'H 12 * * *', this) {
additionalPipelineArgs = [
influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname,
]
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', CommonTestProperties.TriggeringContext.POST_COMMIT)
loadTestsBuilder.loadTests(delegate, CommonTestProperties.SDK.PYTHON_37, loadTestConfigurations(datasetName), "GBK", "batch")
loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'batch')
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
'beam_LoadTests_Python_GBK_Dataflow_Streaming',
'Run Load Tests Python GBK Dataflow Streaming',
'Load Tests Python GBK Dataflow Streaming suite',
this
) {
additionalPipelineArgs = [:]
loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'streaming')
}

CronJobBuilder.cronJob('beam_LoadTests_Python_GBK_Dataflow_Streaming', 'H 12 * * *', this) {
additionalPipelineArgs = [
influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname,
]
loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'streaming')
}

80 changes: 54 additions & 26 deletions .test-infra/jenkins/job_LoadTests_GBK_Python_reiterate.groovy
Expand Up @@ -25,7 +25,7 @@ import InfluxDBCredentialsHelper

def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))

def loadTestConfigurations = { datasetName ->
def loadTestConfigurations = { mode, datasetName ->
[
[
title : 'GroupByKey Python Load test: reiterate 4 times 10kB values',
Expand All @@ -34,21 +34,21 @@ def loadTestConfigurations = { datasetName ->
pipelineOptions: [
project : 'apache-beam-testing',
region : 'us-central1',
job_name : 'load-tests-python-dataflow-batch-gbk-6-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-6-${now}",
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : "python_dataflow_batch_gbk_6",
influx_measurement : 'python_batch_gbk_6',
metrics_table : "python_dataflow_${mode}_gbk_6",
influx_measurement : "python_${mode}_gbk_6",
input_options : '\'{"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90,' +
'"num_hot_keys": 200,' +
'"hot_key_fraction": 1}\'',
fanout : 1,
iterations : 4,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: "NONE"
autoscaling_algorithm: 'NONE',
]
],
[
Expand All @@ -58,43 +58,52 @@ def loadTestConfigurations = { datasetName ->
pipelineOptions: [
project : 'apache-beam-testing',
region : 'us-central1',
job_name : 'load-tests-python-dataflow-batch-gbk-7-' + now,
job_name : "load-tests-python-dataflow-${mode}-gbk-7-${now}",
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_gbk_7',
influx_measurement : 'python_batch_gbk_7',
metrics_table : "python_dataflow_${mode}_gbk_7",
influx_measurement : "python_${mode}_gbk_7",
input_options : '\'{"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90,' +
'"num_hot_keys": 10,' +
'"hot_key_fraction": 1}\'',
fanout : 1,
iterations : 4,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE'
autoscaling_algorithm: 'NONE',
]
]
].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
]
.each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
.each { test -> (mode != 'streaming') ?: addStreamingOptions(test) }
}

def batchLoadTestJob = { scope, triggeringContext ->
scope.description('Runs Python GBK reiterate load tests on Dataflow runner in batch mode')
commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)
def addStreamingOptions(test) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, please add a comment.

test.pipelineOptions << [
streaming: null,
// Use the new Dataflow runner, which offers improved efficiency of Dataflow jobs.
// See https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
// for more details.
experiments: 'use_runner_v2',
]
}

def loadTestJob = { scope, triggeringContext, mode ->
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)
for (testConfiguration in loadTestConfigurations(datasetName)) {
loadTestsBuilder.loadTest(scope, testConfiguration.title, testConfiguration.runner, CommonTestProperties.SDK.PYTHON_37, testConfiguration.pipelineOptions, testConfiguration.test)
}
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON_37,
loadTestConfigurations(mode, datasetName), 'GBK reiterate', mode)
}

CronJobBuilder.cronJob('beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch', 'H 14 * * *', this) {
additionalPipelineArgs = [
influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname,
]
batchLoadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT)
}
CronJobBuilder.cronJob('beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch',
'H 14 * * *', this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the methodology for picking the time to trigger these? Is it documented anywhere?

Copy link
Contributor Author

@kamilwu kamilwu Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, each test suite (GBK, ParDo, IO tests, etc.) should has its own, unique time in order no to flood Jenkins with many tests that are triggered at the same time. When adding a new test suite, a contributor has to take a look at what time slots are already occupied and avoid using them.

I think this is not documented. I'll add some information here: https://cwiki.apache.org/confluence/display/BEAM/Contribution+Testing+Guide#ContributionTestingGuide

additionalPipelineArgs = [
influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname,
]
loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'batch')
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
'beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch',
Expand All @@ -103,5 +112,24 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
this
) {
additionalPipelineArgs = [:]
batchLoadTestJob(delegate, CommonTestProperties.TriggeringContext.PR)
loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
}

CronJobBuilder.cronJob('beam_LoadTests_Python_GBK_reiterate_Dataflow_Streaming',
'H 14 * * *', this) {
additionalPipelineArgs = [
influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname,
]
loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'streaming')
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
'beam_LoadTests_Python_GBK_reiterate_Dataflow_Streaming',
'Run Load Tests Python GBK reiterate Dataflow Streaming',
'Load Tests Python GBK reiterate Dataflow Streaming suite',
this
) {
additionalPipelineArgs = [:]
loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'streaming')
}
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
Expand Up @@ -87,13 +87,13 @@ def __init__(self):
self.fanout = self.get_option_or_default('fanout', 1)
self.iterations = self.get_option_or_default('iterations', 1)

class _UngroupAndReiterate(beam.DoFn):
def process(self, element, iterations):
key, value = element
for i in range(iterations):
for v in value:
if i == iterations - 1:
return key, v
@staticmethod
def ungroup_and_reiterate(element, iterations):
key, value = element
for i in range(iterations):
for v in value:
if i == iterations - 1:
return key, v

def test(self):
pc = (
Expand All @@ -107,9 +107,9 @@ def test(self):
( # pylint: disable=expression-not-assigned
pc
| 'GroupByKey %i' % branch >> beam.GroupByKey()
| 'Ungroup %i' % branch >> beam.ParDo(
self._UngroupAndReiterate(), self.iterations)
| 'Measure latency' >> beam.ParDo(
| 'Ungroup %i' % branch >> beam.Map(self.ungroup_and_reiterate,
self.iterations)
| 'Measure latency %i' % branch >> beam.ParDo(
MeasureLatency(self.metrics_namespace))
| 'Measure time: End %i' % branch >> beam.ParDo(
MeasureTime(self.metrics_namespace)))
Expand Down