Skip to content

Commit

Permalink
Merge pull request #10 from SnuK87/producer-config
Browse files Browse the repository at this point in the history
Added kafka producer config
  • Loading branch information
SnuK87 committed Jul 14, 2021
2 parents e48ffca + 9ee73a5 commit e0f5db2
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 45 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
@@ -1,8 +1,8 @@
FROM jboss/keycloak:10.0.1
FROM jboss/keycloak:14.0.0

ADD ./keycloak-kafka-1.0.0-jar-with-dependencies.jar /opt/jboss/keycloak/standalone/deployments/
ADD ./keycloak-kafka-1.1.0-jar-with-dependencies.jar /opt/jboss/keycloak/standalone/deployments/

ADD kafka-module.cli /opt/jboss/startup-scripts/
ADD add-kafka-config.cli /opt/jboss/startup-scripts/

#ADD realm-export.json /init/

Expand All @@ -17,4 +17,4 @@ EXPOSE 8443

ENTRYPOINT [ "/opt/jboss/tools/docker-entrypoint.sh" ]

CMD ["-b", "0.0.0.0", "-Dkeycloak.import=/init/realm-export.json"]
CMD ["-b", "0.0.0.0"]
77 changes: 55 additions & 22 deletions README.md
Expand Up @@ -4,38 +4,45 @@ Simple module for [Keycloak](https://www.keycloak.org/) to produce keycloak even
- [Keycloak Kafka Module](#keycloak-kafka-module)
* [Build](#build)
* [Installation](#installation)
* [Configuration](#configuration)
+ [Enable Events in keycloak](#enable-events-in-keycloak)
+ [Kafka module](#kafka-module)
* [Docker Container](#configuration)
* [Module Configuration](#module-configuration)
+ [Kafka client configuration](#kafka-client-configuration)
+ [Kafka client using secure connection](#kafka-client-using-secure-connection)
* [Module Deployment](#module-deployment)
* [Keycloak Configuration](#keycloak-configuration)
+ [Enable Events in keycloak](#enable-events-in-keycloak)
* [Docker Container](#docker-container)
* [Sample Client](#sample-client)

**Tested with**

Kafka version: `2.12-2.1.x`, `2.12-2.4.x`, `2.12-2.5.x`
Kafka version: `2.12-2.1.x`, `2.12-2.4.x`, `2.12-2.5.x`, `2.13-2.8`

Keycloak version: `4.8.3`, `6.0.x`, `7.0.0`, `9.0.x`, `10.0.x`
Keycloak version: `4.8.3`, `6.0.x`, `7.0.0`, `9.0.x`, `10.0.x`, `13.0.x`, `14.0.x`

Java version: `11`, `13`


## Build
You can simply use Maven to build the jar file. Thanks to the assembly plugin the build process will create a fat jar that includes all dependencies and makes the deployment quite easy.
Just use the following command to build the jar file.

`mvn clean package`
```bash
mvn clean package
```

## Installation
First you have to build or [download](https://github.com/SnuK87/keycloak-kafka/releases) the keycloak-kafka module.
First you need to build or [download](https://github.com/SnuK87/keycloak-kafka/releases) the keycloak-kafka module.

To install the module to your keycloak server first you have to configure the module and then deploy the it.
If you deploy the module without configuration your keycloak server will fail to start up with a NullPointerException.
To install the module to your keycloak server you have to configure the module and deploy it.
If you deploy the module without configuration, your keycloak server will fail to start throwing a `NullPointerException`.

If you want to install the module manually as described in the initial version you can follow this [guide](https://github.com/SnuK87/keycloak-kafka/wiki/Manual-Installation).

### Module configuration
Download the [CLI script](kafka-module.cli) from this repository and edit the properties to fit your environment. Also make sure that you use the right
server config (line 1). As a default the script will change the `standalone.xml`.
## Module Configuration
Download the [CLI script](add-kafka-config.cli) from this repository and edit the properties to fit your environment. Also make sure to use the right
server config (line 1). As default the script will configure the module in the `standalone.xml`. (Be aware that the docker image uses the `standalone-ha.xml` by default)

Currently the following properties are available and should be changed to fit your environemnt:
The following properties are mandatory and can be set via environment variables (e.g. `${env.KAFKA_TOPIC}`)

`topicEvents`: The name of the kafka topic to where the events will be produced to.

Expand All @@ -45,16 +52,41 @@ Currently the following properties are available and should be changed to fit y

`events`: (Optional; default=REGISTER) The events that will be send to kafka.

`topicAdminEvents`: (Optional) The name of the kafka topic to where the admin events will be produced to.
`topicAdminEvents`: (Optional) The name of the kafka topic to where the admin events will be produced to. No events will be produced when this property isn't set.

A list of available events can be found [here](https://www.keycloak.org/docs/latest/server_admin/#event-types)

Run the CLI script using the following command and check the output on the console. You should see some server logs and lines of `{"outcome" => "success"}`.

Run the CLI script using the following command and check the output on the console. You should see some server logs and 6 lines of `{"outcome" => "success"}`.
```bash
$KEYCLOAK_HOME/bin/jboss-cli.sh --file=/path/to/kafka-module.cli
$KEYCLOAK_HOME/bin/jboss-cli.sh --file=/path/to/add-kafka-config.cli
```

If you want to remove the configuration of the keycloak-kafka module from your server you can run [this](remove-kafka-config.cli).

### Kafka client configuration
It's also possible to configure the kafka client by adding parameters to the cli script. This makes it possible to connect this module to a kafka broker that requires SSL/TLS connections.
For example to change the timeout of how long the producer will block the thread to 10 seconds you just have to add the following line to the cli script.

```
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=max.block.ms,value=10000)
```

Note the difference of `kafka:map-put` for kafka client parameters compared to `kafka:write-attribute` for module parameters.
A full list of available configurations can be found in the [official kafka docs](https://kafka.apache.org/documentation/#producerconfigs).

### Kafka client using secure connection
As mentioned above the kafka client can be configured through the cli script. To make the kafka open a SSL/TLS secured connection you can add the following lines to the script:

```
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=security.protocol,value=SSL)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=ssl.truststore.location,value=kafka.client.truststore.jks)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=ssl.truststore.password,value=test1234)
```

### Module deployment
Copy the `keycloak-kafka-<version>-jar-with-dependencies.jar` into the $KEYCLOAK_HOME/standalone/deployments folder. Keycloak will automatically
install the module with all dependencies on start up. To verify that the deployment of the module was successful you can check if a new file
## Module Deployment
Copy the `keycloak-kafka-<version>-jar-with-dependencies.jar` into the `$KEYCLOAK_HOME/standalone/deployments` folder. Keycloak will automatically
install the module with all it's dependencies on start up. To verify that the deployment of the module was successful you can check if a new file
with the name `keycloak-kafka-<version>-jar-with-dependencies.jar.deployed` was created in the same folder.


Expand All @@ -66,16 +98,17 @@ with the name `keycloak-kafka-<version>-jar-with-dependencies.jar.deployed` was
3. Go to Events
4. Open `Config` tab and add `kafka` to Event Listeners.

![Admin console config](images/event_config.png)

## Docker Container
The simplest way to enable the kafka module in a docker container is to create a custom docker image from the [keycloak base image](https://hub.docker.com/r/jboss/keycloak/).
The `keycloak-kafka-<version>-jar-with-dependencies.jar` must be added to the `/standalone/deployments` folder and the CLI script must be added to the `/opt/jboss/startup-scripts/` folder
as explained in [Installation](#installation). The only difference is that the CLI script will be executed automatically in start up and doesn't have to be executed manually.
as explained in [Installation](#installation). The only difference is that the CLI script will be executed automatically on start up and doesn't have to be executed manually.
An example can be found in this [Dockerfile](Dockerfile).

## Sample Client

The following snippet shows a minimal Spring Boot Kafka client to consume keycloak events. Additional properties can be added to `KeycloakEvent`.
The following snippet shows a minimal Spring Boot Kafka client to consume keycloak events. Additional properties can be added to the `KeycloakEvent` class.

```java
@SpringBootApplication
Expand Down
9 changes: 5 additions & 4 deletions kafka-module.cli → add-kafka-config.cli
Expand Up @@ -3,9 +3,10 @@ embed-server --server-config=standalone.xml --std-out=echo
if (outcome != success) of /subsystem=keycloak-server/spi=eventsListener:read-resource()
/subsystem=keycloak-server/spi=eventsListener:add()
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:add(enabled=true)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.topicEvents,value=keycloak-events)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.clientId,value=keycloak)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.bootstrapServers,value=192.168.0.1:9092)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.events,value=REGISTER)
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.topicEvents,value=${env.KAFKA_TOPIC})
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.clientId,value=${env.KAFKA_CLIENT_ID})
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.bootstrapServers,value=${env.KAFKA_BOOTSTRAP_SERVERS})
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.events,value=${env.KAFKA_EVENTS})
/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=max.block.ms,value=10000)
end-if
stop-embedded-server
Binary file added images/event_config.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions pom.xml
Expand Up @@ -4,13 +4,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.snuk87.keycloak</groupId>
<artifactId>keycloak-kafka</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<keycloak.version>10.0.1</keycloak.version>
<kafka.version>2.5.0</kafka.version>
<keycloak.version>14.0.0</keycloak.version>
<kafka.version>2.8.0</kafka.version>
</properties>

<dependencies>
Expand Down
6 changes: 6 additions & 0 deletions remove-kafka-config.cli
@@ -0,0 +1,6 @@
embed-server --server-config=standalone.xml --std-out=echo

if (outcome == success) of /subsystem=keycloak-server/spi=eventsListener:read-resource()
/subsystem=keycloak-server/spi=eventsListener:remove()
end-if
stop-embedded-server
Expand Up @@ -2,8 +2,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -32,29 +35,30 @@ public class KafkaEventListenerProvider implements EventListenerProvider {
private ObjectMapper mapper;

public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events,
String topicAdminEvents) {
String topicAdminEvents, Map<String, Object> kafkaProducerProperties) {
this.topicEvents = topicEvents;
this.events = new ArrayList<>();
this.topicAdminEvents = topicAdminEvents;

for (int i = 0; i < events.length; i++) {
for (String event : events) {
try {
EventType eventType = EventType.valueOf(events[i].toUpperCase());
EventType eventType = EventType.valueOf(event.toUpperCase());
this.events.add(eventType);
} catch (IllegalArgumentException e) {
LOG.debug("Ignoring event >" + events[i] + "<. Event does not exist.");
LOG.debug("Ignoring event >" + event + "<. Event does not exist.");
}
}

producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers);
producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers, kafkaProducerProperties);
mapper = new ObjectMapper();
}

private void produceEvent(String eventAsString, String topic) throws InterruptedException, ExecutionException {
private void produceEvent(String eventAsString, String topic)
throws InterruptedException, ExecutionException, TimeoutException {
LOG.debug("Produce to topic: " + topicEvents + " ...");
ProducerRecord<String, String> record = new ProducerRecord<>(topic, eventAsString);
Future<RecordMetadata> metaData = producer.send(record);
RecordMetadata recordMetadata = metaData.get();
RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS);
LOG.debug("Produced to topic: " + recordMetadata.topic());
}

Expand All @@ -63,7 +67,7 @@ public void onEvent(Event event) {
if (events.contains(event.getType())) {
try {
produceEvent(mapper.writeValueAsString(event), topicEvents);
} catch (JsonProcessingException | ExecutionException e) {
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Expand All @@ -77,7 +81,7 @@ public void onEvent(AdminEvent event, boolean includeRepresentation) {
if (topicAdminEvents != null) {
try {
produceEvent(mapper.writeValueAsString(event), topicAdminEvents);
} catch (JsonProcessingException | ExecutionException e) {
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Expand Down
@@ -1,5 +1,7 @@
package com.github.snuk87.keycloak.kafka;

import java.util.Map;

import org.jboss.logging.Logger;
import org.keycloak.Config.Scope;
import org.keycloak.events.EventListenerProvider;
Expand All @@ -19,12 +21,13 @@ public class KafkaEventListenerProviderFactory implements EventListenerProviderF
private String topicAdminEvents;
private String clientId;
private String[] events;
private Map<String, Object> kafkaProducerProperties;

@Override
public EventListenerProvider create(KeycloakSession session) {
if (instance == null) {
instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events,
topicAdminEvents);
instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events, topicAdminEvents,
kafkaProducerProperties);
}

return instance;
Expand Down Expand Up @@ -65,6 +68,8 @@ public void init(Scope config) {
events = new String[1];
events[0] = "REGISTER";
}

kafkaProducerProperties = KafkaProducerConfig.init(config);
}

@Override
Expand Down
@@ -0,0 +1,98 @@
package com.github.snuk87.keycloak.kafka;

import java.util.HashMap;
import java.util.Map;

import org.keycloak.Config.Scope;

public class KafkaProducerConfig {

// https://kafka.apache.org/documentation/#producerconfigs

public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();

for (KafkaProducerProperty property : producerProperties) {
if (property.getName() != null && scope.get(property.getName()) != null) {
propertyMap.put(property.getName(), scope.get(property.getName()));
}
}

return propertyMap;
}

enum KafkaProducerProperty {
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
RETRIES("retries"), //
SSL_KEY_PASSWORD("ssl.key.password"), //
SSL_KEYSTORE_LOCATION("ssl.keystore.location"), //
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), //
SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), //
SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), //
BATCH_SIZE("batch.size"), //
CLIENT_DNS_LOOKUP("client.dns.lookup"), //
CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), //
DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), //
LINGER_MS("linger.ms"), //
MAX_BLOCK_MS("max.block.ms"), //
MAX_REQUEST_SIZE("max.request.size"), //
PARTITIONER_CLASS("partitioner.class"), //
RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), //
REQUEST_TIMEOUT_MS("request.timeout.ms"), //
SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), //
SASL_JAAS_CONFIG("sasl.jaas.config"), //
SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), //
SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), //
SASL_LOGIN_CLASS("sasl.login.class"), //
SASL_MECHANISM("sasl.mechanism"), //
SECURITY_PROTOCOL("security.protocol"), //
SEND_BUFFER_BYTES("send.buffer.bytes"), //
SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), //
SSL_KEYSTORE_TYPE("ssl.keystore.type"), //
SSL_PROTOCOL("ssl.protocol"), //
SSL_PROVIDER("ssl.provider"), //
SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), //
ENABLE_IDEMPOTENCE("enable.idempotence"), //
INTERCEPTOR_CLASS("interceptor.classes"), //
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), //
METADATA_MAX_AGE_MS("metadata.max.age.ms"), //
METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), //
METRIC_REPORTERS("metric.reporters"), //
METRIC_NUM_SAMPLES("metrics.num.samples"), //
METRICS_RECORDING_LEVEL("metrics.recording.level"), //
METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), //
RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), //
RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), //
RETRY_BACKOFF_MS("retry.backoff.ms"), //
SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), //
SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), //
SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), //
SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), //
SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), //
SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), //
SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), //
SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), //
SECURITY_PROVIDERS("security.providers"), //
SSL_CIPHER_SUITES("ssl.cipher.suites"), //
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), //
SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), //
SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), //
SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), //
TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), //
TRANSACTION_ID("transactional.id");

private String name;

private KafkaProducerProperty(String name) {
this.name = name;
}

public String getName() {
return name;
}
}

}

0 comments on commit e0f5db2

Please sign in to comment.