Skip to content
Permalink
Browse files
Fix #3592 add some ReflectiveClassBuildItem for camel-kafka (#3594)
  • Loading branch information
zhfeng authored and jamesnetherton committed Mar 24, 2022
1 parent 43641cd commit fa1642a0af71382244279a314ca6fcbb6084fc07
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 1 deletion.
@@ -31,7 +31,7 @@
**Merged pull requests:**

- Avoid compiling regular expressions in loops [\#3649](https://github.com/apache/camel-quarkus/pull/3649) ([ppalaga](https://github.com/ppalaga))
- Switch from `NativeImageTest` to `QuarkusIntegrationTest` [\#3648](https://github.com/apache/camel-quarkus/pull/3648) ([jamesnetherton](https://github.com/jamesnetherton))
- Switch from `QuarkusIntegrationTest` to `QuarkusIntegrationTest` [\#3648](https://github.com/apache/camel-quarkus/pull/3648) ([jamesnetherton](https://github.com/jamesnetherton))
- file: Ensure FileTest.charset is fixed under Windows \#3530 [\#3647](https://github.com/apache/camel-quarkus/pull/3647) ([aldettinger](https://github.com/aldettinger))
- Tidy geronimo-jms\_2.0\_spec exclusions [\#3646](https://github.com/apache/camel-quarkus/pull/3646) ([ppalaga](https://github.com/ppalaga))
- file: Rewrite the charset related test \#3627 [\#3645](https://github.com/apache/camel-quarkus/pull/3645) ([aldettinger](https://github.com/aldettinger))
@@ -16,27 +16,38 @@
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.DevServicesLauncherConfigResultBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;

class KafkaProcessor {
private static final String FEATURE = "camel-kafka";
private static final String CAMEL_KAFKA_BROKERS = "camel.component.kafka.brokers";
private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
private static final DotName[] KAFKA_CLIENTS_TYPES = {
DotName.createSimple("org.apache.kafka.clients.producer.Producer"),
DotName.createSimple("org.apache.kafka.clients.consumer.Consumer")
};

@BuildStep
FeatureBuildItem feature() {
@@ -68,4 +79,19 @@ public void configureKafkaComponentForDevServices(
}
}
}

@BuildStep
public void reflectiveClasses(CombinedIndexBuildItem combinedIndex,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
IndexView index = combinedIndex.getIndex();

Stream.of(KAFKA_CLIENTS_TYPES)
.map(index::getAllKnownImplementors)
.flatMap(Collection::stream)
.map(ClassInfo::toString)
.forEach(name -> reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, name)));

reflectiveClass
.produce(new ReflectiveClassBuildItem(false, true, "org.apache.kafka.clients.producer.internals.Sender"));
}
}
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.test.support.kafka;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface InjectKafka {
}
@@ -63,4 +63,11 @@ public void stop() {
}
}
}

@Override
public void inject(TestInjector testInjector) {
testInjector.injectIntoFields(container,
new TestInjector.AnnotatedAndMatchesType(InjectKafka.class, KafkaContainer.class));
}

}
@@ -55,6 +55,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-seda</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-microprofile-health</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.it;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class CamelKafkaHealthCheckIT extends CamelKafkaHealthCheckTest {
}
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.it;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.kafka.InjectKafka;
import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@QuarkusTest
@QuarkusTestResource(KafkaTestResource.class)
@TestProfile(KafkaHealthCheckProfile.class)
public class CamelKafkaHealthCheckTest {

@InjectKafka
KafkaContainer container;

@Test
void testHealthCheck() {
RestAssured.when().get("/q/health").then()
.contentType(ContentType.JSON)
.header("Content-Type", containsString("charset=UTF-8"))
.body("status", is("UP"));

// stop the kafka container to test health-check DOWN
container.stop();

RestAssured.when().get("/q/health").then()
.contentType(ContentType.JSON)
.header("Content-Type", containsString("charset=UTF-8"))
.body("status", is("DOWN"),
"checks.find { it.name == 'camel-kafka' }.status", is("DOWN"),
"checks.find { it.name == 'camel-kafka' }.data.topic", notNullValue(),
"checks.find { it.name == 'camel-kafka' }.data.'client.id'", notNullValue());
}
}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.it;

import java.util.Map;

import io.quarkus.test.junit.QuarkusTestProfile;

public class KafkaHealthCheckProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
// force shutdown
return Map.of("camel.main.shutdownTimeout", "10");
}
}

0 comments on commit fa1642a

Please sign in to comment.