Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka auth #28

Merged
merged 34 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ccdf2ae
Add missing license headers to a few files
dominikriemer Apr 24, 2020
f5587f8
Add UI config for release 0.66.0
dominikriemer Apr 24, 2020
d6e81e0
Update settings to configure release validation
dominikriemer Apr 24, 2020
737029e
add license header
Apr 24, 2020
ffd8b65
Fix bug in prebuild script
dominikriemer Apr 24, 2020
3bbe273
Explicitly add dist subdirectories to gitignore to avoid problems wit…
dominikriemer Apr 24, 2020
3d5e33e
[STREAMPIPES-110] Temporarily disable dependency-check maven plugin
dominikriemer Apr 24, 2020
8cf6f46
Modify docker-compose version
dominikriemer Apr 25, 2020
cef2d67
Fix bug in prebuild script
dominikriemer Apr 26, 2020
7617b2b
Use 0.66.0-SNAPSHOT as sp.version in archetypes
dominikriemer Apr 29, 2020
edb43f6
Set sp.version in archetypes to 0.66.0
dominikriemer May 1, 2020
73a3272
Set streampipes version to 0.66.0
dominikriemer May 1, 2020
d606f2c
Perform some cleanup
dominikriemer May 1, 2020
72f394a
Modify release settings in pom
dominikriemer May 1, 2020
5a989fc
[maven-release-plugin] prepare release release/0.66.0
dominikriemer May 1, 2020
5444d20
[maven-release-plugin] prepare for next development iteration
dominikriemer May 1, 2020
81e8693
Revert to development version 0.66.0-SNAPSHOT
dominikriemer May 3, 2020
7aada08
Cleanup properties files
dominikriemer May 3, 2020
09775ad
Update package.json
dominikriemer May 3, 2020
ebac9f4
Fix bug in app overview module
dominikriemer May 3, 2020
71cc882
Add NotificationCount view to installation process
dominikriemer May 3, 2020
60c697e
Import Roboto font in angular.json
dominikriemer May 3, 2020
47e22d7
Add assembly profile
dominikriemer May 5, 2020
b8c23e6
[maven-release-plugin] prepare release release/0.66.0
dominikriemer May 5, 2020
cf18d51
Kafka producer and consumer
heymarco Nov 17, 2020
8b69d44
Merge branch 'dev' of github.com:apache/incubator-streampipes into ka…
heymarco Nov 17, 2020
ab864f4
Merge branch 'dev' into kafka-auth
heymarco Nov 17, 2020
99fcb8e
Removed legacy ui code
heymarco Nov 23, 2020
999cc60
Removed legacy ui code
heymarco Nov 23, 2020
499ef66
Merge branch 'dev' into kafka-auth
heymarco Nov 25, 2020
1324b37
Fix v1-api route
heymarco Nov 26, 2020
fe213d1
Deleted streampipes-app-file-export/
heymarco Nov 30, 2020
82b79fd
Merge remote-tracking branch 'main/dev' into kafka-auth
heymarco Nov 30, 2020
df9e394
Removed DISCLAIMER-WIP
heymarco Nov 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions streampipes-backend/src/main/resources/shiro.ini
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ securityManager.rememberMeManager.cookie.maxAge = 1000000000
/api/v2/connect/*/master/sources/* = anon
/api/v2/connect/*/master/sources/*/streams = anon
/api/v2/connect/*/master/sources/*/streams/* = anon
/api/v2/connect/*/master/resolvable/*/configurations = anon
/api/** = customFilter
/** = customFilter
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.streampipes.messaging.kafka;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +42,8 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
private String topic;
private InternalEventProcessor<byte[]> eventProcessor;
private KafkaTransportProtocol protocol;
private String username;
private String password;
private volatile boolean isRunning;
private Boolean patternTopic = false;

Expand All @@ -56,12 +55,25 @@ public SpKafkaConsumer() {

public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor) {
this.protocol = protocol;
this.username = null;
this.password = null;
this.topic = topic;
this.eventProcessor = eventProcessor;
this.isRunning = true;
}


public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor,
String username, String password) {
this.protocol = protocol;
this.topic = topic;
this.eventProcessor = eventProcessor;
this.isRunning = true;
this.username = username;
this.password = password;
}


// TODO backwards compatibility, remove later
public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byte[]> callback) {
KafkaTransportProtocol protocol = new KafkaTransportProtocol();
Expand All @@ -78,12 +90,19 @@ public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byt

@Override
public void run() {
KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(getProperties());
Properties props;
if (username != null && password != null) {
props = makePropertiesSaslPlain(protocol, username, password);
}
else {
props = makeProperties(protocol);
}
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
if (!patternTopic) {
kafkaConsumer.subscribe(Collections.singletonList(topic));
consumer.subscribe(Collections.singletonList(topic));
} else {
topic = replaceWildcardWithPatternFormat(topic);
kafkaConsumer.subscribe(Pattern.compile(topic), new ConsumerRebalanceListener() {
consumer.subscribe(Pattern.compile(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO
Expand All @@ -96,24 +115,28 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
});
}
while (isRunning) {
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records) {
eventProcessor.onEvent(record.value());
}
}
LOG.info("Closing Kafka Consumer.");
kafkaConsumer.close();
consumer.close();
}

private String replaceWildcardWithPatternFormat(String topic) {
topic = topic.replaceAll("\\.", "\\\\.");
return topic.replaceAll("\\*", ".*");
}

private Properties getProperties() {
private Properties makeProperties(KafkaTransportProtocol protocol) {
return new ConsumerConfigFactory(protocol).makeProperties();
}

private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
return new ConsumerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
}

@Override
public void connect(KafkaTransportProtocol protocol, InternalEventProcessor<byte[]>
eventProcessor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S

private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

public SpKafkaProducer() {

}
public SpKafkaProducer() { }

// TODO backwards compatibility, remove later
public SpKafkaProducer(String url, String topic) {
Expand All @@ -66,6 +64,16 @@ public SpKafkaProducer(String url, String topic) {
this.producer = new KafkaProducer<>(makeProperties(protocol));
}

// TODO backwards compatibility, remove later
public SpKafkaProducer(String url, String topic, String username, String password) {
String[] urlParts = url.split(COLON);
KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
Integer.parseInt(urlParts[1]), topic);
this.brokerUrl = url;
this.topic = topic;
this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
}

public void publish(String message) {
publish(message.getBytes());
}
Expand All @@ -78,6 +86,10 @@ private Properties makeProperties(KafkaTransportProtocol protocol) {
return new ProducerConfigFactory(protocol).makeProperties();
}

private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
}

@Override
public void connect(KafkaTransportProtocol protocol) {
LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public AbstractConfigFactory(KafkaTransportProtocol protocol) {

public abstract Properties makeProperties();

public abstract Properties makePropertiesSaslPlain(String username, String password);

protected <T> T getConfigOrDefault(Supplier<T> function,
T defaultValue) {
return function.get() != null ? function.get() : defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.streampipes.messaging.kafka.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import java.util.Properties;
Expand All @@ -33,6 +37,7 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
".serialization.StringDeserializer";
private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
".serialization.ByteArrayDeserializer";
private static final String SASL_MECHANISM = "PLAIN";

public ConsumerConfigFactory(KafkaTransportProtocol protocol) {
super(protocol);
Expand All @@ -41,7 +46,6 @@ public ConsumerConfigFactory(KafkaTransportProtocol protocol) {
@Override
public Properties makeProperties() {
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
UUID.randomUUID().toString()));
Expand All @@ -54,6 +58,17 @@ public Properties makeProperties() {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());

return props;
}

@Override
public Properties makePropertiesSaslPlain(String username, String password) {
Properties props = makeProperties();
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
*/
package org.apache.streampipes.messaging.kafka.config;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;

import javax.management.monitor.CounterMonitor;
import java.util.Properties;

public class ProducerConfigFactory extends AbstractConfigFactory {
Expand Down Expand Up @@ -59,4 +66,14 @@ public Properties makeProperties() {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
return props;
}

@Override
public Properties makePropertiesSaslPlain(String username, String password) {
Properties props = makeProperties();
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
return props;
}
}
2 changes: 1 addition & 1 deletion streampipes-wrapper-python/streampipes/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
2 changes: 1 addition & 1 deletion streampipes-wrapper-python/streampipes/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class RuntimeResolvableService {
}

fetchRemoteOptionsForAdapter(resolvableOptionsParameterRequest: RuntimeOptionsRequest, adapterId: string): Observable<RuntimeOptionsResponse> {
let url: string = "/streampipes-connect/api/v1/"
let url: string = "/streampipes-backend/api/v2/connect/"
+ this.authStatusService.email
+ "/master/resolvable/"
+ encodeURIComponent(adapterId)
Expand Down