Skip to content

Commit

Permalink
- Add ruleunit-bean-reactive-quarkus where AlertingService is @Applic…
Browse files Browse the repository at this point in the history
…ationScoped
  • Loading branch information
tkobayas committed Oct 14, 2021
1 parent 834f4b6 commit 09aaadf
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 0 deletions.
38 changes: 38 additions & 0 deletions ruleunit-bean-reactive-quarkus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Kogito Example :: RuleUnit bean with Reactive DataSource - Quarkus

See that AlertingService is @ApplicationScoped

### Start test Kafka instance via Docker Compose

There's a useful [docker-compose.yml](docker-compose.yml) in the root that starts a dedicated Kafka instance for quick tests.

Simply start it with this command from the root of the repo:

```
docker-compose up -d
```

### Package and Run in JVM mode

```
mvn clean package
java -jar target/quarkus-app/quarkus-run.jar
```


## Test with Kafka

Send Event with JSON format to "events" topic.

```sh
echo '{"type":"temperature","value":35}' | kafka-console-producer.sh --broker-list localhost:9092 --topic events
```

You will see the result in "alerts" topic via Kafdrop (http://localhost:9000).

```json
{"severity":"warning","message":"Event [type=temperature, value=35]"}
```


Note that this example doesn't expose REST endpoints.
37 changes: 37 additions & 0 deletions ruleunit-bean-reactive-quarkus/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '2'

services:

zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
environment:
LOG_DIR: "/tmp/logs"

kafka:
image: wurstmeister/kafka:2.12-2.2.1
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
LOG_DIR: "/tmp/logs"

kafdrop:
image: obsidiandynamics/kafdrop
depends_on:
- kafka
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9093"
JVM_OPTS: "-Xms32M -Xmx64M"
SERVER_SERVLET_CONTEXTPATH: "/"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: app.kiegroup.org/v1beta1
kind: KogitoBuild
metadata:
name: ruleunit-bean-reactive-quarkus
spec:
type: RemoteSource
#env:
# env can be used to set variables during build
#- name: MY_CUSTOM_ENV
# value: "my value"
gitSource:
contextDir: ruleunit-bean-reactive-quarkus
uri: 'https://github.com/kiegroup/kogito-examples'
# set your maven nexus repository to speed up the build time
#mavenMirrorURL:
---
apiVersion: app.kiegroup.org/v1beta1
kind: KogitoRuntime
metadata:
name: ruleunit-bean-reactive-quarkus
100 changes: 100 additions & 0 deletions ruleunit-bean-reactive-quarkus/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.kie.kogito.examples</groupId>
<artifactId>kogito-examples</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>ruleunit-bean-reactive-quarkus</artifactId>
<name>Kogito Example :: RuleUnit bean with Reactive DataSource - Quarkus</name>
<properties>
<quarkus-plugin.version>2.3.0.Final</quarkus-plugin.version>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>2.3.0.Final</quarkus.platform.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-quarkus-rules</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-messaging</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-drools</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.alerts;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.kie.kogito.rules.RuleUnit;
import org.kie.kogito.rules.RuleUnitInstance;

import io.quarkus.runtime.Startup;
import io.quarkus.scheduler.Scheduled;

@Startup
@ApplicationScoped
public class Adaptor {

@Inject
RuleUnit<AlertingService> ruleUnit;
@Inject
AlertingService alertingService;

RuleUnitInstance<AlertingService> ruleUnitInstance;

@PostConstruct
void init() {
this.ruleUnitInstance = ruleUnit.createInstance(alertingService);
// I am not 100% sure of this because I don't know if this subscription will come before or after the other
// I'd assume _after_ because AlertingService must be initialized first
// this.alertingService.getEventData().subscribe(DataObserver.of(incoming -> ruleUnitInstance.fire()));
}

// alternatively, we can set a timer!
@Scheduled(every = "10s")
void fire() {
ruleUnitInstance.fire();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.alerts;

public class Alert {

private String severity;
private String message;

public Alert() {
}

public Alert(String severity, String message) {
super();
this.severity = severity;
this.message = message;
}

public String getSeverity() {
return severity;
}

public void setSeverity(String severity) {
this.severity = severity;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

@Override
public String toString() {
return "Alert [severity=" + severity + ", message=" + message + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.alerts;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.rules.DataObserver;
import org.kie.kogito.rules.DataSource;
import org.kie.kogito.rules.DataStream;
import org.kie.kogito.rules.RuleUnitData;

@ApplicationScoped
public class AlertingService implements RuleUnitData {

private DataStream<Event> eventData = DataSource.createStream();
private DataStream<Alert> alertData = DataSource.createStream();

@Inject
@Channel("alerts")
Emitter<Alert> emitter;

@PostConstruct
void init() {
alertData.subscribe(DataObserver.of(emitter::send));
}

@Incoming("events")
void receive(Event event) throws InterruptedException {
eventData.append(event);
}

public DataStream<Event> getEventData() {
return eventData;
}

public void setEventData(DataStream<Event> eventData) {
this.eventData = eventData;
}

public DataStream<Alert> getAlertData() {
return alertData;
}

public void setAlertData(DataStream<Alert> alertData) {
this.alertData = alertData;
}

}
Loading

0 comments on commit 09aaadf

Please sign in to comment.