Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hadoop-ozone/ozone-manager-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
</properties>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.eventlistener;

import java.util.Collections;
import java.util.List;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;

/**
* A no-op implementation of {@link OMEventListenerNotificationStrategy}
* which produces no events.
*/
public class NoOpOMEventListenerNotificationStrategy implements OMEventListenerNotificationStrategy {
@Override
public List<String> determineEventsForOperation(OmCompletedRequestInfo completedRequestInfo) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.hadoop.ozone.om.eventlistener;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotificationStrategy;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -42,10 +45,16 @@ public class OMEventListenerKafkaPublisher implements OMEventListener {
public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class);

private static final String KAFKA_CONFIG_PREFIX = "ozone.om.plugin.kafka.";
private static final String NOTIFICATION_STRATEGY_CONFIG = KAFKA_CONFIG_PREFIX + "notification.strategy";
private static final Class<? extends OMEventListenerNotificationStrategy>
DEFAULT_NOTIFICATION_STRATEGY = S3EventNotificationStrategy.class;
private static final String KAFKA_SERVICE_INTERVAL_CONFIG = KAFKA_CONFIG_PREFIX + "service.interval";
private static final String KAFKA_SERVICE_TIMEOUT_CONFIG = KAFKA_CONFIG_PREFIX + "service.timeout";
private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1;

private OMEventListenerLedgerPoller ledgerPoller;
private KafkaClientWrapper kafkaClient;
private OMEventListenerNotificationStrategy notificationStrategy;
private OMEventListenerLedgerPollerSeekPosition seekPosition;

@Override
Expand All @@ -56,10 +65,22 @@ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext plu

this.kafkaClient = new KafkaClientWrapper(kafkaProps);

// TODO: these constants should be read from config
long kafkaServiceInterval = 2 * 1000;
long kafkaServiceTimeout = 300 * 1000;
long kafkaServiceInterval = conf.getTimeDuration(
KAFKA_SERVICE_INTERVAL_CONFIG, "2s", TimeUnit.MILLISECONDS);
long kafkaServiceTimeout = conf.getTimeDuration(
KAFKA_SERVICE_TIMEOUT_CONFIG, "5m", TimeUnit.MILLISECONDS);

Class<? extends OMEventListenerNotificationStrategy> strategyClass = conf.getClass(
NOTIFICATION_STRATEGY_CONFIG,
DEFAULT_NOTIFICATION_STRATEGY,
OMEventListenerNotificationStrategy.class);
try {
this.notificationStrategy = strategyClass.getDeclaredConstructor().newInstance();
} catch (Exception ex) {
LOG.error("Failed to instantiate notification strategy: {}. " +
"Falling back to NoOp strategy.", strategyClass, ex);
this.notificationStrategy = new NoOpOMEventListenerNotificationStrategy();
}
this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();

LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
Expand Down Expand Up @@ -102,24 +123,23 @@ public void stop() {
public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) {
LOG.debug("Processing {}", completedRequestInfo);

// stub event until we implement a strategy to convert the events to
// a user facing schema (e.g. S3)
String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}",
completedRequestInfo.getVolumeName(),
completedRequestInfo.getBucketName(),
completedRequestInfo.getKeyName(),
String.valueOf(completedRequestInfo.getCmdType()));
List<String> eventsToSend = notificationStrategy.determineEventsForOperation(completedRequestInfo);

LOG.debug("Sending {}", event);

try {
kafkaClient.send(event);
} catch (IOException ex) {
LOG.error("Failure to send event {}", event, ex);
return;
// loop over events and send them to our kafka sink
for (String event : eventsToSend) {
if (event == null) {
LOG.warn("Skipping null event for transaction {}", completedRequestInfo.getTrxLogIndex());
continue;
}
try {
kafkaClient.send(event);
} catch (IOException ex) {
LOG.error("Failure to send event {}", event, ex);
return;
}
Comment on lines +126 to +139
}

// we can update the seek position
// no errors so we can update the seek position
seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex()));
}

Expand All @@ -145,7 +165,8 @@ public void initialize() throws IOException {

public void shutdown() throws IOException {
if (producer != null) {
producer.close();
LOG.info("Closing kafka producer for topic {}", topic);
producer.close(Duration.ofSeconds(10));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.eventlistener;

import java.util.List;
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;

/**
* Interface for strategies which turn completed events into
* notifications.
*/
public interface OMEventListenerNotificationStrategy {
Comment thread
peterxcli marked this conversation as resolved.

List<String> determineEventsForOperation(OmCompletedRequestInfo completedRequestInfo);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.eventlistener.s3;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;

/**
* A simple replacement for com.amazonaws.internal.DateTimeJsonSerializer.
*/
public class DateTimeJsonSerializer extends JsonSerializer<OffsetDateTime> {
@Override
public void serialize(OffsetDateTime value, JsonGenerator gen, SerializerProvider provider) throws IOException {
// AWS SDK typically uses ISO8601 format for S3 events
gen.writeString(value.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
}
Loading
Loading