forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
build.gradle
176 lines (161 loc) · 6.87 KB
/
build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.stream.Collectors
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.kafka',
mavenRepositories: [
[id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
],
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Kafka"
ext.summary = "Library to read Kafka topics."
def kafkaVersions = [
'01103': "0.11.0.3",
'100': "1.0.0",
'111': "1.1.1",
'201': "2.0.1",
'211': "2.1.1",
'222': "2.2.2",
'231': "2.3.1",
'241': "2.4.1",
'251': "2.5.1",
]
def sdfKafkaVersions = [
'201',
'211',
'222',
'231',
'241',
'251'
]
kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")}
dependencies {
implementation library.java.vendored_guava_26_0_jre
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":runners:core-construction-java")
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
implementation library.java.avro
// Get back to "provided" since 2.14
provided library.java.kafka_clients
testImplementation library.java.kafka_clients
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
implementation "org.springframework:spring-expression:5.3.25"
implementation ("io.confluent:kafka-avro-serializer:5.3.2") {
// zookeeper depends on "spotbugs-annotations:3.1.9" which clashes with current
// "spotbugs-annotations:3.1.12" used in Beam. Not required.
exclude group: "org.apache.zookeeper", module: "zookeeper"
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
implementation ("io.confluent:kafka-schema-registry-client:5.3.2") {
// It depends on "spotbugs-annotations:3.1.9" which clashes with current
// "spotbugs-annotations:3.1.12" used in Beam. Not required.
exclude group: "org.apache.zookeeper", module: "zookeeper"
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
// everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
permitUnusedDeclared library.java.everit_json_schema
provided library.java.everit_json_schema
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:io:synthetic")
testImplementation project(":sdks:java:extensions:avro")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
// For testing Cross-language transforms
testImplementation project(":runners:core-construction-java")
testImplementation library.java.avro
testImplementation library.java.junit
testImplementation library.java.powermock
testImplementation library.java.powermock_mockito
testImplementation library.java.testcontainers_kafka
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
kafkaVersions.each {"kafkaVersion$it.key" "org.apache.kafka:kafka-clients:$it.value"}
}
kafkaVersions.each { kv ->
configurations."kafkaVersion$kv.key" {
resolutionStrategy {
force "org.apache.kafka:kafka-clients:$kv.value"
}
}
}
kafkaVersions.each {kv ->
task "kafkaVersion${kv.key}Test"(type: Test) {
group = "Verification"
description = "Runs KafkaIO tests with Kafka clients API $kv.value"
outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath
include '**/KafkaIOTest.class'
}
}
kafkaVersions.each {kv ->
task "kafkaVersion${kv.key}BatchIT"(type: Test) {
group = "Verification"
description = "Runs KafkaIO IT tests with Kafka clients API $kv.value"
outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath
def pipelineOptions = [
'--sourceOptions={' +
'"numRecords": "1000",' +
'"keySizeBytes": "10",' +
'"valueSizeBytes": "90"' +
'}',
"--readTimeout=120",
"--kafkaTopic=beam",
"--withTestcontainers=true",
"--kafkaContainerVersion=5.5.2",
]
systemProperty "beamTestPipelineOptions", groovy.json.JsonOutput.toJson(pipelineOptions)
include '**/KafkaIOIT.class'
filter {
excludeTestsMatching "*InStreaming"
if (!(kv.key in sdfKafkaVersions)) {
excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1
}
}
}
}
task kafkaVersionsCompatibilityTest {
group = "Verification"
description = 'Runs KafkaIO with different Kafka client APIs'
def testNames = createTestList(kafkaVersions, "Test")
def batchItTestNames = createTestList(kafkaVersions, "BatchIT")
dependsOn testNames
dependsOn batchItTestNames
}
static def createTestList(Map<String, String> prefixMap, String suffix) {
return prefixMap.keySet().stream()
.map{version -> "kafkaVersion${version}${suffix}"}
.collect(Collectors.toList())
}