Skip to content

Commit

Permalink
feast(eso): rename topic (#7029)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Jan 24, 2023
1 parent 7283080 commit f3a5ad2
Show file tree
Hide file tree
Showing 31 changed files with 145 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.metadata.version.GitVersion;
import com.linkedin.mxe.BuildIndicesHistoryEvent;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -69,9 +69,9 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
}
}

BuildIndicesHistoryEvent buildIndicesHistoryEvent = new BuildIndicesHistoryEvent()
DataHubUpgradeHistoryEvent dataHubUpgradeHistoryEvent = new DataHubUpgradeHistoryEvent()
.setVersion(_gitVersion.getVersion());
_kafkaEventProducer.produceBuildIndicesHistoryEvent(buildIndicesHistoryEvent);
_kafkaEventProducer.produceDataHubUpgradeHistoryEvent(dataHubUpgradeHistoryEvent);
} catch (Exception e) {
log.error("PostBuildIndicesStep failed.", e);
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-gms/env/docker.cassandra.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-gms/env/docker.mariadb.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=mariadb:3306
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-gms/env/docker.postgres.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
EBEAN_DATASOURCE_USERNAME=datahub
EBEAN_DATASOURCE_PASSWORD=datahub
EBEAN_DATASOURCE_HOST=postgres:5432
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-mae-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-mae-consumer/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl
DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
DATAHUB_GMS_HOST=datahub-gms
DATAHUB_GMS_PORT=8080

Expand Down
2 changes: 1 addition & 1 deletion docker/kafka-setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ENV METADATA_CHANGE_LOG_TIMESERIES_TOPIC="MetadataChangeLog_Timeseries_v1"
ENV METADATA_CHANGE_PROPOSAL_TOPIC="MetadataChangeProposal_v1"
ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC="FailedMetadataChangeProposal_v1"
ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1"
ENV BUILD_INDICES_HISTORY_TOPIC="BuildIndicesHistory_v1"
ENV DATAHUB_UPGRADE_HISTORY_TOPIC="DataHubUpgradeHistory_v1"

COPY docker/kafka-setup/kafka-setup.sh ./kafka-setup.sh
COPY docker/kafka-setup/kafka-config.sh ./kafka-config.sh
Expand Down
2 changes: 1 addition & 1 deletion docker/kafka-setup/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
# FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4
# PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1
# DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1
# BUILD_INDICES_HISTORY_TOPIC=BuildIndicesHistory_v1
# DATAHUB_UPGRADE_HISTORY_TOPIC=DataHubUpgradeHistory_v1
# PARTITIONS=1
# REPLICATION_FACTOR=1

Expand Down
2 changes: 2 additions & 0 deletions docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ send "$METADATA_CHANGE_PROPOSAL_TOPIC" "--topic $METADATA_CHANGE_PROPOSAL_TOPIC"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC" "--topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC"
send "$PLATFORM_EVENT_TOPIC_NAME" "--topic $PLATFORM_EVENT_TOPIC_NAME"

# Infinite retention upgrade topic
send "$DATAHUB_UPGRADE_HISTORY_TOPIC" "config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC"
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--topic $DATAHUB_USAGE_EVENT_NAME"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ services:
depends_on:
- mysql
environment:
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
- EBEAN_DATASOURCE_USERNAME=datahub
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ services:
environment:
- DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart}
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true}
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
- EBEAN_DATASOURCE_USERNAME=datahub
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
datahub-mae-consumer:
container_name: datahub-mae-consumer
environment:
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
Expand Down
2 changes: 1 addition & 1 deletion docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
depends_on:
- neo4j
environment:
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl
- DATAHUB_GMS_HOST=datahub-gms
- DATAHUB_GMS_PORT=8080
- MAE_CONSUMER_ENABLED=true
Expand Down
2 changes: 1 addition & 1 deletion docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
environment:
- DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart}
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true}
- BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms
- DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms
- EBEAN_DATASOURCE_USERNAME=datahub
- EBEAN_DATASOURCE_PASSWORD=datahub
- EBEAN_DATASOURCE_HOST=mysql:3306
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.BuildIndicesHistoryEvent;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import com.linkedin.mxe.MetadataAuditEvent;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
Expand Down Expand Up @@ -159,17 +159,17 @@ record = EventUtils.pegasusToAvroPE(event);
}

@Override
public void produceBuildIndicesHistoryEvent(@Nonnull BuildIndicesHistoryEvent event) {
public void produceDataHubUpgradeHistoryEvent(@Nonnull DataHubUpgradeHistoryEvent event) {
GenericRecord record;
try {
log.debug(String.format("Converting Pegasus Event to Avro Event\nEvent: %s", event));
record = EventUtils.pegasusToAvroBIHE(event);
record = EventUtils.pegasusToAvroDUHE(event);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus Build Indices History Event to Avro: %s", event), e);
log.error(String.format("Failed to convert Pegasus DataHub Upgrade History Event to Avro: %s", event), e);
throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e);
}

final String topic = _topicConvention.getBuildIndicesHistoryTopicName();
final String topic = _topicConvention.getDataHubUpgradeHistoryTopicName();
_producer.send(new ProducerRecord(topic, event.getVersion(), record), _kafkaHealthChecker
.getKafkaCallBack("History Event", "Event Version: " + event.getVersion()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public interface TopicConvention {
String getMetadataChangeEventTopicName(@Nonnull Urn urn, @Nonnull RecordTemplate aspect);

/**
* The name of the build indices history topic.
* The name of the DataHub Upgrade history topic.
*/
String getBuildIndicesHistoryTopicName();
String getDataHubUpgradeHistoryTopicName();

/**
* Returns the avro class that defines the given MCE v5 topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class TopicConventionImpl implements TopicConvention {
private final String _metadataChangeLogTimeseriesTopicName;
private final String _failedMetadataChangeProposalTopicName;
private final String _platformEventTopicName;
private final String _buildIndicesHistoryTopicName;
private final String _dataHubUpgradeHistoryTopicName;

// v5 patterns
private final String _eventPattern;
Expand All @@ -49,7 +49,7 @@ public TopicConventionImpl(@Nonnull String metadataChangeEventTopicName, @Nonnul
@Nonnull String failedMetadataChangeEventTopicName, @Nonnull String metadataChangeProposalTopicName,
@Nonnull String metadataChangeLogVersionedTopicName, @Nonnull String metadataChangeLogTimeseriesTopicName,
@Nonnull String failedMetadataChangeProposalTopicName, @Nonnull String platformEventTopicName,
@Nonnull String eventPattern, @Nonnull String buildIndicesHistoryTopicName) {
@Nonnull String eventPattern, @Nonnull String dataHubUpgradeHistoryTopicName) {
_metadataChangeEventTopicName = metadataChangeEventTopicName;
_metadataAuditEventTopicName = metadataAuditEventTopicName;
_failedMetadataChangeEventTopicName = failedMetadataChangeEventTopicName;
Expand All @@ -59,13 +59,13 @@ public TopicConventionImpl(@Nonnull String metadataChangeEventTopicName, @Nonnul
_failedMetadataChangeProposalTopicName = failedMetadataChangeProposalTopicName;
_platformEventTopicName = platformEventTopicName;
_eventPattern = eventPattern;
_buildIndicesHistoryTopicName = buildIndicesHistoryTopicName;
_dataHubUpgradeHistoryTopicName = dataHubUpgradeHistoryTopicName;
}

public TopicConventionImpl() {
this(Topics.METADATA_CHANGE_EVENT, Topics.METADATA_AUDIT_EVENT, Topics.FAILED_METADATA_CHANGE_EVENT,
Topics.METADATA_CHANGE_PROPOSAL, Topics.METADATA_CHANGE_LOG_VERSIONED, Topics.METADATA_CHANGE_LOG_TIMESERIES,
Topics.FAILED_METADATA_CHANGE_PROPOSAL, Topics.PLATFORM_EVENT, DEFAULT_EVENT_PATTERN, Topics.BUILD_INDICES_HISTORY_TOPIC_NAME);
Topics.FAILED_METADATA_CHANGE_PROPOSAL, Topics.PLATFORM_EVENT, DEFAULT_EVENT_PATTERN, Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME);
}

@Nonnull
Expand Down Expand Up @@ -142,8 +142,8 @@ public String getMetadataChangeEventTopicName(@Nonnull Urn urn, @Nonnull RecordT
}

@Override
public String getBuildIndicesHistoryTopicName() {
return _buildIndicesHistoryTopicName;
public String getDataHubUpgradeHistoryTopicName() {
return _dataHubUpgradeHistoryTopicName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class Topics {
public static final String METADATA_CHANGE_PROPOSAL = "MetadataChangeProposal_v1";
public static final String FAILED_METADATA_CHANGE_PROPOSAL = "FailedMetadataChangeProposal_v1";
public static final String PLATFORM_EVENT = "PlatformEvent_v1";
public static final String BUILD_INDICES_HISTORY_TOPIC_NAME = "BuildIndicesHistory_v1";
public static final String DATAHUB_UPGRADE_HISTORY_TOPIC_NAME = "DataHubUpgradeHistory_v1";

public static final String DEV_METADATA_AUDIT_EVENT = "MetadataAuditEvent_v4_dev";
public static final String DEV_METADATA_CHANGE_EVENT = "MetadataChangeEvent_v4_dev";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.data.avro.DataTranslator;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.BuildIndicesHistoryEvent;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import com.linkedin.mxe.FailedMetadataChangeEvent;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeLog;
Expand Down Expand Up @@ -43,7 +43,7 @@ public class EventUtils {

private static final RecordDataSchema PE_PEGASUS_SCHEMA = new PlatformEvent().schema();

private static final RecordDataSchema BIHE_PEGASUS_SCHEMA = new BuildIndicesHistoryEvent().schema();
private static final RecordDataSchema DUHE_PEGASUS_SCHEMA = new DataHubUpgradeHistoryEvent().schema();

private static final Schema ORIGINAL_MCE_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc");
Expand All @@ -66,8 +66,8 @@ public class EventUtils {
private static final Schema ORIGINAL_PE_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/PlatformEvent.avsc");

private static final Schema ORIGINAL_BIHE_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/BuildIndicesHistoryEvent.avsc");
private static final Schema ORIGINAL_DUHE_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/DataHubUpgradeHistoryEvent.avsc");

private static final Schema RENAMED_MCE_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$;

Expand All @@ -87,8 +87,8 @@ public class EventUtils {
private static final Schema RENAMED_FMCP_AVRO_SCHEMA =
com.linkedin.pegasus2avro.mxe.FailedMetadataChangeProposal.SCHEMA$;

private static final Schema RENAMED_BIHE_AVRO_SCHEMA =
com.linkedin.pegasus2avro.mxe.BuildIndicesHistoryEvent.SCHEMA$;
private static final Schema RENAMED_DUHE_AVRO_SCHEMA =
com.linkedin.pegasus2avro.mxe.DataHubUpgradeHistoryEvent.SCHEMA$;

private EventUtils() {
// Util class
Expand Down Expand Up @@ -176,10 +176,10 @@ public static PlatformEvent avroToPegasusPE(@Nonnull GenericRecord record) throw
* @return the Pegasus {@link PlatformEvent} model
*/
@Nonnull
public static BuildIndicesHistoryEvent avroToPegasusBIHE(@Nonnull GenericRecord record) throws IOException {
return new BuildIndicesHistoryEvent(DataTranslator.genericRecordToDataMap(
renameSchemaNamespace(record, RENAMED_BIHE_AVRO_SCHEMA, ORIGINAL_BIHE_AVRO_SCHEMA),
BIHE_PEGASUS_SCHEMA, ORIGINAL_BIHE_AVRO_SCHEMA));
public static DataHubUpgradeHistoryEvent avroToPegasusDUHE(@Nonnull GenericRecord record) throws IOException {
return new DataHubUpgradeHistoryEvent(DataTranslator.genericRecordToDataMap(
renameSchemaNamespace(record, RENAMED_DUHE_AVRO_SCHEMA, ORIGINAL_DUHE_AVRO_SCHEMA),
DUHE_PEGASUS_SCHEMA, ORIGINAL_DUHE_AVRO_SCHEMA));
}

/**
Expand Down Expand Up @@ -302,17 +302,17 @@ public static GenericRecord pegasusToAvroPE(@Nonnull PlatformEvent event) throws
}

/**
* Converts a Pegasus Build Indices History Event into the equivalent Avro model as a {@link GenericRecord}.
* Converts a Pegasus DataHub Upgrade History Event into the equivalent Avro model as a {@link GenericRecord}.
*
* @param event the Pegasus {@link com.linkedin.mxe.BuildIndicesHistoryEvent} model
* @param event the Pegasus {@link com.linkedin.mxe.DataHubUpgradeHistoryEvent} model
* @return the Avro model with com.linkedin.pegasus2avro.event namespace
* @throws IOException if the conversion fails
*/
@Nonnull
public static GenericRecord pegasusToAvroBIHE(@Nonnull BuildIndicesHistoryEvent event) throws IOException {
public static GenericRecord pegasusToAvroDUHE(@Nonnull DataHubUpgradeHistoryEvent event) throws IOException {
GenericRecord original =
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_BIHE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_BIHE_AVRO_SCHEMA);
DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_DUHE_AVRO_SCHEMA);
return renameSchemaNamespace(original, RENAMED_DUHE_AVRO_SCHEMA);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.BuildIndicesHistoryEvent;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeProposal;
Expand Down Expand Up @@ -81,9 +81,9 @@ void producePlatformEvent(
/**
* Creates an entry on the history log of when the indices were last rebuilt with the latest configuration.
*
* @param event the history event to send to the build indices history topic
* @param event the history event to send to the DataHub Upgrade history topic
*/
void produceBuildIndicesHistoryEvent(
@Nonnull BuildIndicesHistoryEvent event
void produceDataHubUpgradeHistoryEvent(
@Nonnull DataHubUpgradeHistoryEvent event
);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.linkedin.metadata.kafka.boot;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.boot.BootstrapManager;
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.elasticsearch.indices.BuildIndicesKafkaListener;
import com.linkedin.metadata.kafka.elasticsearch.indices.DataHubUpgradeKafkaListener;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -23,21 +24,24 @@ public class ApplicationStartupListener implements ApplicationListener<ContextRe

private static final String ROOT_WEB_APPLICATION_CONTEXT_ID = String.format("%s:", WebApplicationContext.class.getName());

private final BuildIndicesKafkaListener _buildIndicesKafkaListener;
private final DataHubUpgradeKafkaListener _dataHubUpgradeKafkaListener;
private final ConfigurationProvider _configurationProvider;
private final BootstrapManager _mclBootstrapManager;

public ApplicationStartupListener(
@Qualifier("buildIndicesKafkaListener") BuildIndicesKafkaListener buildIndicesKafkaListener,
ConfigurationProvider configurationProvider) {
_buildIndicesKafkaListener = buildIndicesKafkaListener;
@Qualifier("dataHubUpgradeKafkaListener") DataHubUpgradeKafkaListener dataHubUpgradeKafkaListener,
ConfigurationProvider configurationProvider,
@Qualifier("mclBootstrapManager") BootstrapManager bootstrapManager) {
_dataHubUpgradeKafkaListener = dataHubUpgradeKafkaListener;
_configurationProvider = configurationProvider;
_mclBootstrapManager = bootstrapManager;
}

@Override
public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) {
if (ROOT_WEB_APPLICATION_CONTEXT_ID.equals(event.getApplicationContext().getId())
&& _configurationProvider.getElasticSearch().getBuildIndices().isWaitForBuildIndices()) {
_buildIndicesKafkaListener.waitForBootstrap();
_mclBootstrapManager.start();
}
}
}

0 comments on commit f3a5ad2

Please sign in to comment.