Skip to content

Commit

Permalink
fix(system-update): fixes system-update with more than 1 partition (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Feb 10, 2023
1 parent fc12fad commit b4b3a39
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 162 deletions.
20 changes: 10 additions & 10 deletions docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,22 @@ send() {
}

## Produce the jobs to run.
send "$METADATA_AUDIT_EVENT_NAME" "--topic $METADATA_AUDIT_EVENT_NAME"
send "$METADATA_CHANGE_EVENT_NAME" "--topic $METADATA_CHANGE_EVENT_NAME"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--topic $FAILED_METADATA_CHANGE_EVENT_NAME"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME"

# Set retention to 90 days
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$PLATFORM_EVENT_TOPIC_NAME" "--topic $PLATFORM_EVENT_TOPIC_NAME"
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME"

# Infinite retention upgrade topic
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME"
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--topic $DATAHUB_USAGE_EVENT_NAME"
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
fi

## close the filo
Expand Down
2 changes: 1 addition & 1 deletion docker/kafka-setup/kafka-topic-workers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ job() {
i=$1
topic_args=$2
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR \
--replication-factor $REPLICATION_FACTOR \
$topic_args
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class, CassandraAutoConfiguration.class,
SolrHealthContributorAutoConfiguration.class})
@ComponentScan(excludeFilters = {
@ComponentScan(basePackages = {
"com.linkedin.metadata.boot.kafka",
"com.linkedin.metadata.kafka"
}, excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)})
public class MaeConsumerApplication {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.boot.BootstrapManager;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;

import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.linkedin.metadata.config.SystemUpdateConfiguration;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.key.ChartKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SolrHealthContributorAutoConfiguration.class
})
@ComponentScan(basePackages = {
"com.linkedin.metadata.boot.kafka",
"com.linkedin.gms.factory.auth",
"com.linkedin.gms.factory.common",
"com.linkedin.gms.factory.config",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.boot.BootstrapManager;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.kafka.boot;
package com.linkedin.metadata.boot.kafka;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.EventUtils;
Expand All @@ -18,7 +18,6 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -35,8 +34,8 @@
@Slf4j
@EnableKafka
public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency {
@Autowired
private KafkaListenerEndpointRegistry registry;

private final KafkaListenerEndpointRegistry registry;

private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}";
private static final String SUFFIX = "temp";
Expand All @@ -55,7 +54,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
@Value(TOPIC_NAME)
private String topicName;

private AtomicBoolean isUpdated = new AtomicBoolean(false);
private final static AtomicBoolean IS_UPDATED = new AtomicBoolean(false);


// Constructs a consumer to read determine final offset to assign, prevents re-reading whole topic to get the latest version
Expand All @@ -66,12 +65,15 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
final Map<TopicPartition, Long> offsetMap = kafkaConsumer.endOffsets(assignments.keySet());
assignments.entrySet().stream()
.filter(entry -> topicName.equals(entry.getKey().topic()))
.forEach(entry ->
callback.seek(entry.getKey().topic(), entry.getKey().partition(), offsetMap.get(entry.getKey()) - 1));
.forEach(entry -> {
log.info("Partition: {} Current Offset: {}", entry.getKey(), offsetMap.get(entry.getKey()));
long newOffset = offsetMap.get(entry.getKey()) - 1;
callback.seek(entry.getKey().topic(), entry.getKey().partition(), Math.max(0, newOffset));
});
}
}

@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer")
@KafkaListener(id = CONSUMER_GROUP, topics = {TOPIC_NAME}, containerFactory = "kafkaEventConsumer", concurrency = "1")
public void checkSystemVersion(final ConsumerRecord<String, GenericRecord> consumerRecord) {
final GenericRecord record = consumerRecord.value();
final String expectedVersion = String.format("%s-%s", _gitVersion.getVersion(), revision);
Expand All @@ -81,7 +83,7 @@ public void checkSystemVersion(final ConsumerRecord<String, GenericRecord> consu
event = EventUtils.avroToPegasusDUHE(record);
log.info("Latest system update version: {}", event.getVersion());
if (expectedVersion.equals(event.getVersion())) {
isUpdated.getAndSet(true);
IS_UPDATED.getAndSet(true);
} else {
log.debug("System version is not up to date: {}", expectedVersion);
}
Expand All @@ -101,7 +103,7 @@ public void waitForUpdate() {

long backOffMs = initialBackOffMs;
for (int i = 0; i < maxBackOffs; i++) {
if (isUpdated.get()) {
if (IS_UPDATED.get()) {
log.debug("Finished waiting for updated indices.");
try {
log.info("Containers: {}", registry.getListenerContainers().stream()
Expand All @@ -123,7 +125,7 @@ public void waitForUpdate() {
backOffMs = backOffMs * backOffFactor;
}

if (!isUpdated.get()) {
if (!IS_UPDATED.get()) {

throw new IllegalStateException("Indices are not updated after exponential backoff."
+ " Please try restarting and consider increasing back off settings.");
Expand Down

0 comments on commit b4b3a39

Please sign in to comment.