Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Add Kafka as an event source #1911

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
15b0d46
added the code, now for test cases and documentation
Jul 30, 2020
ad5e811
prepare for dev merge
Jul 30, 2020
28dd27c
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jul 30, 2020
7e3daa0
merged latest dev branch
Jul 30, 2020
3fd754f
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Aug 4, 2020
283022b
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Aug 5, 2020
098f9db
add event source documentation
Aug 6, 2020
dac93af
Update kafkaeventsource.md
rickfish Aug 6, 2020
9fe7ad9
Update kafkaeventsource.md
rickfish Aug 6, 2020
ebffac2
Update kafkaeventsource.md
rickfish Aug 6, 2020
c3048d9
Update kafkaeventsource.md
rickfish Aug 6, 2020
ec4e30e
Update kafkaeventsource.md
rickfish Aug 6, 2020
d2f9783
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Aug 11, 2020
5b29264
add revKafkaClient
rickfish Aug 11, 2020
0727be4
add revJavaMail
rickfish Aug 11, 2020
20c13ae
prepare for merge
Oct 5, 2020
517ff60
prepare for merge
Oct 5, 2020
04073b8
prepare for merge
Oct 5, 2020
dd44973
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 5, 2020
f8c92c5
add back changes after merge
Oct 5, 2020
30e7e8a
make executeEvent() protected to conform to latest change in dev branch
Oct 5, 2020
8e8c47e
make private methods protected to conform to latest change in dev branch
Oct 5, 2020
ad21322
prepare for latest merge
Oct 5, 2020
ca84f2a
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 5, 2020
3e7a434
add back code to simpleevent processor for kafka event source
Oct 5, 2020
059af8f
add in the changes for latest PR
Oct 5, 2020
d0513ee
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 9, 2020
bdcd53c
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 13, 2020
61125a4
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 14, 2020
0f49118
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 26, 2020
0ca3520
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 27, 2020
1d8ff27
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Oct 29, 2020
34f952f
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Nov 5, 2020
fba59a3
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Nov 11, 2020
2eb2438
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Nov 17, 2020
e5da585
use the appropriate kafka username/pw properties
Nov 20, 2020
e95ea7d
get rid of email functionality for processing Kafka event failures
Nov 20, 2020
5d03c62
prepare for merge
Dec 1, 2020
95468d0
prepare for merge
Dec 1, 2020
e27159b
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 1, 2020
0bc7c6b
add back kafka client version
Dec 1, 2020
4ca51a8
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 2, 2020
2636d3c
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 3, 2020
63a7581
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 8, 2020
4264017
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 9, 2020
f404854
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 10, 2020
283d8f6
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 15, 2020
73760ec
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Dec 22, 2020
4ec3902
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 4, 2021
81c05ed
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 5, 2021
46913da
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 13, 2021
95aaab4
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 14, 2021
ce81eda
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 19, 2021
b4b245a
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 21, 2021
258c084
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Jan 26, 2021
3e116dc
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Feb 11, 2021
edc473f
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Feb 16, 2021
d9cd318
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Feb 24, 2021
4f3c776
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Mar 2, 2021
020ce5e
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Mar 9, 2021
17c8e17
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Mar 16, 2021
a5dd048
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Mar 19, 2021
5ba373e
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Mar 30, 2021
aa16061
Merge branch 'dev' of https://github.com/Netflix/conductor into featu…
Apr 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contribs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
compileOnly "javax.ws.rs:jsr311-api:${revJsr311Api}"
compile "io.swagger:swagger-jaxrs:${revSwagger}"

compile "org.apache.kafka:kafka-clients:2.2.0"
compile "org.apache.kafka:kafka-clients:${revKafkaClient}"
testCompile "org.eclipse.jetty:jetty-server:${revJetteyServer}"
testCompile "org.eclipse.jetty:jetty-servlet:${revJettyServlet}"
testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.netflix.conductor.contribs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.google.inject.multibindings.ProvidesIntoMap;
import com.google.inject.multibindings.StringMapKey;
import com.google.inject.name.Named;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.kafka.KafkaEventQueueProvider;

/**
* @author preeth, rickfish
*
*/
public class KafkaModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(KafkaModule.class);

@Override
protected void configure() {
logger.info("Kafka module configured.");
}

@ProvidesIntoMap
@StringMapKey("kafka")
@Singleton
@Named("EventQueueProviders")
public EventQueueProvider getKafkaEventQueueProvider(Configuration configuration) {
return new KafkaEventQueueProvider(configuration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.netflix.conductor.contribs.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;

import org.apache.commons.lang3.StringUtils;

/**
* Java security module used to provide JAAS username and password for access to a Kafka topic
* @author rickfish
*
*/
public class KafkaLoginModule implements LoginModule {
rickfish marked this conversation as resolved.
Show resolved Hide resolved

String KAFKA_EVENTS_JAAS_USERNAME_PROPERTY_NAME = "kafka.events.jaas.username";
String KAFKA_DEFAULT_JAAS_USERNAME_PROPERTY_NAME = "kafka.default.jaas.username";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant is unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to use the constants. Fixed.

String KAFKA_DEFAULT_JAAS_USERNAME_DEFAULT_VALUE = "";

String KAFKA_EVENTS_JAAS_PASSWORD_PROPERTY_NAME = "kafka.events.jaas.password";
String KAFKA_DEFAULT_JAAS_PASSWORD_PROPERTY_NAME = "kafka.default.jaas.password";
String KAFKA_DEFAULT_JAAS_PASSWORD_DEFAULT_VALUE = "";

public KafkaLoginModule() {
super();
}
protected Map<String, Object> sharedState;
protected Map<String, Object> options;
@SuppressWarnings("unchecked")
@Override
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
this.sharedState = (Map<String, Object>) sharedState;
this.options = (Map<String, Object>) options;
}

@Override
public boolean login() throws LoginException {
String userName = getProperty(getKafkaEventsJaasUsername(), null);
String password = getProperty(getKafkaEventsJaasPassword(), null);
List<String> messages = new ArrayList<String>();

if (StringUtils.isBlank(userName)) {
messages.add("The user name is required.");
}

if (StringUtils.isBlank(password)) {
messages.add("The password is required.");
}

if (messages.size() > 0) {
throw new LoginException(String.join(" ", messages));
}

sharedState.put("javax.security.auth.login.name", userName);
sharedState.put("javax.security.auth.login.password", password.toCharArray());

return true;
}

@Override
public boolean logout() throws LoginException {
return true;
}

@Override
public boolean abort() throws LoginException {
return true;
}

@Override
public boolean commit() throws LoginException {
return true;
}


/**
* Get the JAAS username for the 'events' topic
* @return
*/
String getKafkaDefaultJaasUsername() {
return getProperty(KAFKA_EVENTS_JAAS_USERNAME_PROPERTY_NAME,
KAFKA_DEFAULT_JAAS_USERNAME_DEFAULT_VALUE);
}

/**
* Get the JAAS username for the 'events' topic
* @return
*/
String getKafkaEventsJaasUsername() {
return getProperty(KAFKA_EVENTS_JAAS_USERNAME_PROPERTY_NAME,
getKafkaDefaultJaasUsername());
}

/**
* There are several uses for Kafka topics. If all the topics are in the same Kafka cluster, this property can be used.
* Otherwise, the JAAS password for each specific use should be used.
*/
String getKafkaDefaultJaasPassword() {
return getProperty(KAFKA_DEFAULT_JAAS_PASSWORD_PROPERTY_NAME,
KAFKA_DEFAULT_JAAS_PASSWORD_DEFAULT_VALUE);
}

/**
* Get the JAAS password for the 'events' topic
* @return
*/
String getKafkaEventsJaasPassword() {
return getProperty(KAFKA_EVENTS_JAAS_PASSWORD_PROPERTY_NAME,
getKafkaDefaultJaasPassword());
}

public String getProperty(String key, String defaultValue) {

String val = null;
try{
val = System.getenv(key.replace('.','_'));
if (val == null || val.isEmpty()) {
val = Optional.ofNullable(System.getProperty(key)).orElse(defaultValue);
}
}catch(Exception e){
e.printStackTrace();
}
return val;
}
}
Loading