From f3a5ad227dbee6912b1b627c1ab0fb3ad090130e Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 23 Jan 2023 20:47:37 -0600 Subject: [PATCH] feast(eso): rename topic (#7029) --- .../buildindices/PostBuildIndicesStep.java | 6 +-- .../datahub-gms/env/docker-without-neo4j.env | 2 +- docker/datahub-gms/env/docker.cassandra.env | 2 +- docker/datahub-gms/env/docker.env | 2 +- docker/datahub-gms/env/docker.mariadb.env | 2 +- docker/datahub-gms/env/docker.postgres.env | 2 +- .../env/docker-without-neo4j.env | 2 +- docker/datahub-mae-consumer/env/docker.env | 2 +- docker/kafka-setup/Dockerfile | 2 +- docker/kafka-setup/env/docker.env | 2 +- docker/kafka-setup/kafka-setup.sh | 2 + ...er-compose-without-neo4j-m1.quickstart.yml | 2 +- ...ocker-compose-without-neo4j.quickstart.yml | 2 +- ...ose.consumers-without-neo4j.quickstart.yml | 2 +- .../docker-compose.consumers.quickstart.yml | 2 +- .../quickstart/docker-compose.quickstart.yml | 2 +- .../dao/producer/KafkaEventProducer.java | 10 ++-- .../com/linkedin/mxe/TopicConvention.java | 4 +- .../com/linkedin/mxe/TopicConventionImpl.java | 12 ++--- .../main/java/com/linkedin/mxe/Topics.java | 2 +- .../com/linkedin/metadata/EventUtils.java | 30 ++++++------ .../metadata/event/EventProducer.java | 8 ++-- .../boot/ApplicationStartupListener.java | 16 ++++--- .../boot/MCLBootstrapManagerFactory.java | 46 +++++++++++++++++++ ....java => DataHubUpgradeKafkaListener.java} | 14 +++--- .../kafka/hook/UpdateIndicesHookTest.java | 6 +-- .../linkedin/mxe/BuildIndicesHistoryEvent.pdl | 18 -------- .../mxe/DataHubUpgradeHistoryEvent.pdl | 18 ++++++++ .../common/TopicConventionFactory.java | 6 +-- .../factories/BootstrapManagerFactory.java | 7 +-- .../boot/steps/WaitForBuildIndicesStep.java | 4 +- 31 files changed, 145 insertions(+), 92 deletions(-) create mode 100644 metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCLBootstrapManagerFactory.java rename metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/{BuildIndicesKafkaListener.java => DataHubUpgradeKafkaListener.java} (90%) delete mode 100644 metadata-models/src/main/pegasus/com/linkedin/mxe/BuildIndicesHistoryEvent.pdl create mode 100644 metadata-models/src/main/pegasus/com/linkedin/mxe/DataHubUpgradeHistoryEvent.pdl diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/buildindices/PostBuildIndicesStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/buildindices/PostBuildIndicesStep.java index cc2caa10bc38a..8cdfe0f9e4e41 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/buildindices/PostBuildIndicesStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/buildindices/PostBuildIndicesStep.java @@ -10,7 +10,7 @@ import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; import com.linkedin.metadata.shared.ElasticSearchIndexed; import com.linkedin.metadata.version.GitVersion; -import com.linkedin.mxe.BuildIndicesHistoryEvent; +import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -69,9 +69,9 @@ public Function executable() { } } - BuildIndicesHistoryEvent buildIndicesHistoryEvent = new BuildIndicesHistoryEvent() + DataHubUpgradeHistoryEvent dataHubUpgradeHistoryEvent = new DataHubUpgradeHistoryEvent() .setVersion(_gitVersion.getVersion()); - _kafkaEventProducer.produceBuildIndicesHistoryEvent(buildIndicesHistoryEvent); + _kafkaEventProducer.produceDataHubUpgradeHistoryEvent(dataHubUpgradeHistoryEvent); } catch (Exception e) { log.error("PostBuildIndicesStep failed.", e); return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); diff --git a/docker/datahub-gms/env/docker-without-neo4j.env b/docker/datahub-gms/env/docker-without-neo4j.env index a6247344e6994..dcc1562ce90c8 100644 --- a/docker/datahub-gms/env/docker-without-neo4j.env +++ b/docker/datahub-gms/env/docker-without-neo4j.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms EBEAN_DATASOURCE_USERNAME=datahub EBEAN_DATASOURCE_PASSWORD=datahub EBEAN_DATASOURCE_HOST=mysql:3306 diff --git a/docker/datahub-gms/env/docker.cassandra.env b/docker/datahub-gms/env/docker.cassandra.env index eae6741d14dff..5fc981d77b6f3 100644 --- a/docker/datahub-gms/env/docker.cassandra.env +++ b/docker/datahub-gms/env/docker.cassandra.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms KAFKA_BOOTSTRAP_SERVER=broker:29092 KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 ELASTICSEARCH_HOST=elasticsearch diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index a8a46a3054a80..34b2a82d2a210 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms EBEAN_DATASOURCE_USERNAME=datahub EBEAN_DATASOURCE_PASSWORD=datahub EBEAN_DATASOURCE_HOST=mysql:3306 diff --git a/docker/datahub-gms/env/docker.mariadb.env b/docker/datahub-gms/env/docker.mariadb.env index e6691ab3a5f9c..e7032b069b3fe 100644 --- a/docker/datahub-gms/env/docker.mariadb.env +++ b/docker/datahub-gms/env/docker.mariadb.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms EBEAN_DATASOURCE_USERNAME=datahub EBEAN_DATASOURCE_PASSWORD=datahub EBEAN_DATASOURCE_HOST=mariadb:3306 diff --git a/docker/datahub-gms/env/docker.postgres.env b/docker/datahub-gms/env/docker.postgres.env index c5d83f809247c..d907b44b3814a 100644 --- a/docker/datahub-gms/env/docker.postgres.env +++ b/docker/datahub-gms/env/docker.postgres.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms EBEAN_DATASOURCE_USERNAME=datahub EBEAN_DATASOURCE_PASSWORD=datahub EBEAN_DATASOURCE_HOST=postgres:5432 diff --git a/docker/datahub-mae-consumer/env/docker-without-neo4j.env b/docker/datahub-mae-consumer/env/docker-without-neo4j.env index 9c6d3e88aea8a..4e853b6756407 100644 --- a/docker/datahub-mae-consumer/env/docker-without-neo4j.env +++ b/docker/datahub-mae-consumer/env/docker-without-neo4j.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 diff --git a/docker/datahub-mae-consumer/env/docker.env b/docker/datahub-mae-consumer/env/docker.env index 4dc932b5320b3..a6883074e2a39 100644 --- a/docker/datahub-mae-consumer/env/docker.env +++ b/docker/datahub-mae-consumer/env/docker.env @@ -1,4 +1,4 @@ -BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl +DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 diff --git a/docker/kafka-setup/Dockerfile b/docker/kafka-setup/Dockerfile index 5448af50eb037..7af6ead6283e3 100644 --- a/docker/kafka-setup/Dockerfile +++ b/docker/kafka-setup/Dockerfile @@ -56,7 +56,7 @@ ENV METADATA_CHANGE_LOG_TIMESERIES_TOPIC="MetadataChangeLog_Timeseries_v1" ENV METADATA_CHANGE_PROPOSAL_TOPIC="MetadataChangeProposal_v1" ENV FAILED_METADATA_CHANGE_PROPOSAL_TOPIC="FailedMetadataChangeProposal_v1" ENV PLATFORM_EVENT_TOPIC_NAME="PlatformEvent_v1" -ENV BUILD_INDICES_HISTORY_TOPIC="BuildIndicesHistory_v1" +ENV DATAHUB_UPGRADE_HISTORY_TOPIC="DataHubUpgradeHistory_v1" COPY docker/kafka-setup/kafka-setup.sh ./kafka-setup.sh COPY docker/kafka-setup/kafka-config.sh ./kafka-config.sh diff --git a/docker/kafka-setup/env/docker.env b/docker/kafka-setup/env/docker.env index 4e7f3214481a2..6f70b6aba633c 100644 --- a/docker/kafka-setup/env/docker.env +++ b/docker/kafka-setup/env/docker.env @@ -8,7 +8,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092 # FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4 # PLATFORM_EVENT_TOPIC_NAME=PlatformEvent_v1 # DATAHUB_USAGE_EVENT_NAME=DataHubUsageEvent_v1 -# BUILD_INDICES_HISTORY_TOPIC=BuildIndicesHistory_v1 +# DATAHUB_UPGRADE_HISTORY_TOPIC=DataHubUpgradeHistory_v1 # PARTITIONS=1 # REPLICATION_FACTOR=1 diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh index f7e99db3d857c..20490889377a4 100644 --- a/docker/kafka-setup/kafka-setup.sh +++ b/docker/kafka-setup/kafka-setup.sh @@ -108,6 +108,8 @@ send "$METADATA_CHANGE_PROPOSAL_TOPIC" "--topic $METADATA_CHANGE_PROPOSAL_TOPIC" send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC" "--topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC" send "$PLATFORM_EVENT_TOPIC_NAME" "--topic $PLATFORM_EVENT_TOPIC_NAME" +# Infinite retention upgrade topic +send "$DATAHUB_UPGRADE_HISTORY_TOPIC" "config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC" # Create topic for datahub usage event if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then send "$DATAHUB_USAGE_EVENT_NAME" "--topic $DATAHUB_USAGE_EVENT_NAME" diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index 306ed34221c6c..873d64ead1c05 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -74,7 +74,7 @@ services: depends_on: - mysql environment: - - BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms + - DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms - EBEAN_DATASOURCE_USERNAME=datahub - EBEAN_DATASOURCE_PASSWORD=datahub - EBEAN_DATASOURCE_HOST=mysql:3306 diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index e7f88a7110589..7f3592d3df840 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -76,7 +76,7 @@ services: environment: - DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart} - DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true} - - BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms + - DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms - EBEAN_DATASOURCE_USERNAME=datahub - EBEAN_DATASOURCE_PASSWORD=datahub - EBEAN_DATASOURCE_HOST=mysql:3306 diff --git a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml index 48b0cdef426c9..896b7b5676576 100644 --- a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml @@ -6,7 +6,7 @@ services: datahub-mae-consumer: container_name: datahub-mae-consumer environment: - - BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl + - DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - MAE_CONSUMER_ENABLED=true diff --git a/docker/quickstart/docker-compose.consumers.quickstart.yml b/docker/quickstart/docker-compose.consumers.quickstart.yml index 4cd1f79bf20cd..578060299c76f 100644 --- a/docker/quickstart/docker-compose.consumers.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers.quickstart.yml @@ -8,7 +8,7 @@ services: depends_on: - neo4j environment: - - BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-mcl + - DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-mcl - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - MAE_CONSUMER_ENABLED=true diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index 5572423b71a5d..8dcfbb369a848 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -78,7 +78,7 @@ services: environment: - DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart} - DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true} - - BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-bihe-consumer-job-client-gms + - DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID=generic-duhe-consumer-job-client-gms - EBEAN_DATASOURCE_USERNAME=datahub - EBEAN_DATASOURCE_PASSWORD=datahub - EBEAN_DATASOURCE_HOST=mysql:3306 diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java index 12d314c3717ed..40db7100cc8de 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java @@ -7,7 +7,7 @@ import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.mxe.BuildIndicesHistoryEvent; +import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import com.linkedin.mxe.MetadataAuditEvent; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; @@ -159,17 +159,17 @@ record = EventUtils.pegasusToAvroPE(event); } @Override - public void produceBuildIndicesHistoryEvent(@Nonnull BuildIndicesHistoryEvent event) { + public void produceDataHubUpgradeHistoryEvent(@Nonnull DataHubUpgradeHistoryEvent event) { GenericRecord record; try { log.debug(String.format("Converting Pegasus Event to Avro Event\nEvent: %s", event)); - record = EventUtils.pegasusToAvroBIHE(event); + record = EventUtils.pegasusToAvroDUHE(event); } catch (IOException e) { - log.error(String.format("Failed to convert Pegasus Build Indices History Event to Avro: %s", event), e); + log.error(String.format("Failed to convert Pegasus DataHub Upgrade History Event to Avro: %s", event), e); throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e); } - final String topic = _topicConvention.getBuildIndicesHistoryTopicName(); + final String topic = _topicConvention.getDataHubUpgradeHistoryTopicName(); _producer.send(new ProducerRecord(topic, event.getVersion(), record), _kafkaHealthChecker .getKafkaCallBack("History Event", "Event Version: " + event.getVersion())); } diff --git a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConvention.java b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConvention.java index ff54a4d3c15e1..463abfdeca845 100644 --- a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConvention.java +++ b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConvention.java @@ -78,9 +78,9 @@ public interface TopicConvention { String getMetadataChangeEventTopicName(@Nonnull Urn urn, @Nonnull RecordTemplate aspect); /** - * The name of the build indices history topic. + * The name of the DataHub Upgrade history topic. */ - String getBuildIndicesHistoryTopicName(); + String getDataHubUpgradeHistoryTopicName(); /** * Returns the avro class that defines the given MCE v5 topic. diff --git a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConventionImpl.java b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConventionImpl.java index 5270fbd48b36d..3143584bbdcaf 100644 --- a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConventionImpl.java +++ b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/TopicConventionImpl.java @@ -40,7 +40,7 @@ public final class TopicConventionImpl implements TopicConvention { private final String _metadataChangeLogTimeseriesTopicName; private final String _failedMetadataChangeProposalTopicName; private final String _platformEventTopicName; - private final String _buildIndicesHistoryTopicName; + private final String _dataHubUpgradeHistoryTopicName; // v5 patterns private final String _eventPattern; @@ -49,7 +49,7 @@ public TopicConventionImpl(@Nonnull String metadataChangeEventTopicName, @Nonnul @Nonnull String failedMetadataChangeEventTopicName, @Nonnull String metadataChangeProposalTopicName, @Nonnull String metadataChangeLogVersionedTopicName, @Nonnull String metadataChangeLogTimeseriesTopicName, @Nonnull String failedMetadataChangeProposalTopicName, @Nonnull String platformEventTopicName, - @Nonnull String eventPattern, @Nonnull String buildIndicesHistoryTopicName) { + @Nonnull String eventPattern, @Nonnull String dataHubUpgradeHistoryTopicName) { _metadataChangeEventTopicName = metadataChangeEventTopicName; _metadataAuditEventTopicName = metadataAuditEventTopicName; _failedMetadataChangeEventTopicName = failedMetadataChangeEventTopicName; @@ -59,13 +59,13 @@ public TopicConventionImpl(@Nonnull String metadataChangeEventTopicName, @Nonnul _failedMetadataChangeProposalTopicName = failedMetadataChangeProposalTopicName; _platformEventTopicName = platformEventTopicName; _eventPattern = eventPattern; - _buildIndicesHistoryTopicName = buildIndicesHistoryTopicName; + _dataHubUpgradeHistoryTopicName = dataHubUpgradeHistoryTopicName; } public TopicConventionImpl() { this(Topics.METADATA_CHANGE_EVENT, Topics.METADATA_AUDIT_EVENT, Topics.FAILED_METADATA_CHANGE_EVENT, Topics.METADATA_CHANGE_PROPOSAL, Topics.METADATA_CHANGE_LOG_VERSIONED, Topics.METADATA_CHANGE_LOG_TIMESERIES, - Topics.FAILED_METADATA_CHANGE_PROPOSAL, Topics.PLATFORM_EVENT, DEFAULT_EVENT_PATTERN, Topics.BUILD_INDICES_HISTORY_TOPIC_NAME); + Topics.FAILED_METADATA_CHANGE_PROPOSAL, Topics.PLATFORM_EVENT, DEFAULT_EVENT_PATTERN, Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME); } @Nonnull @@ -142,8 +142,8 @@ public String getMetadataChangeEventTopicName(@Nonnull Urn urn, @Nonnull RecordT } @Override - public String getBuildIndicesHistoryTopicName() { - return _buildIndicesHistoryTopicName; + public String getDataHubUpgradeHistoryTopicName() { + return _dataHubUpgradeHistoryTopicName; } @Override diff --git a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java index 713c243f926f7..3a9a0812e1031 100644 --- a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java +++ b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java @@ -13,7 +13,7 @@ public class Topics { public static final String METADATA_CHANGE_PROPOSAL = "MetadataChangeProposal_v1"; public static final String FAILED_METADATA_CHANGE_PROPOSAL = "FailedMetadataChangeProposal_v1"; public static final String PLATFORM_EVENT = "PlatformEvent_v1"; - public static final String BUILD_INDICES_HISTORY_TOPIC_NAME = "BuildIndicesHistory_v1"; + public static final String DATAHUB_UPGRADE_HISTORY_TOPIC_NAME = "DataHubUpgradeHistory_v1"; public static final String DEV_METADATA_AUDIT_EVENT = "MetadataAuditEvent_v4_dev"; public static final String DEV_METADATA_CHANGE_EVENT = "MetadataChangeEvent_v4_dev"; diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java index d02c163e5a98d..be0302061a3c2 100644 --- a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java @@ -5,7 +5,7 @@ import com.linkedin.data.avro.DataTranslator; import com.linkedin.data.schema.RecordDataSchema; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.mxe.BuildIndicesHistoryEvent; +import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import com.linkedin.mxe.FailedMetadataChangeEvent; import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.MetadataChangeLog; @@ -43,7 +43,7 @@ public class EventUtils { private static final RecordDataSchema PE_PEGASUS_SCHEMA = new PlatformEvent().schema(); - private static final RecordDataSchema BIHE_PEGASUS_SCHEMA = new BuildIndicesHistoryEvent().schema(); + private static final RecordDataSchema DUHE_PEGASUS_SCHEMA = new DataHubUpgradeHistoryEvent().schema(); private static final Schema ORIGINAL_MCE_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc"); @@ -66,8 +66,8 @@ public class EventUtils { private static final Schema ORIGINAL_PE_AVRO_SCHEMA = getAvroSchemaFromResource("avro/com/linkedin/mxe/PlatformEvent.avsc"); - private static final Schema ORIGINAL_BIHE_AVRO_SCHEMA = - getAvroSchemaFromResource("avro/com/linkedin/mxe/BuildIndicesHistoryEvent.avsc"); + private static final Schema ORIGINAL_DUHE_AVRO_SCHEMA = + getAvroSchemaFromResource("avro/com/linkedin/mxe/DataHubUpgradeHistoryEvent.avsc"); private static final Schema RENAMED_MCE_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$; @@ -87,8 +87,8 @@ public class EventUtils { private static final Schema RENAMED_FMCP_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.FailedMetadataChangeProposal.SCHEMA$; - private static final Schema RENAMED_BIHE_AVRO_SCHEMA = - com.linkedin.pegasus2avro.mxe.BuildIndicesHistoryEvent.SCHEMA$; + private static final Schema RENAMED_DUHE_AVRO_SCHEMA = + com.linkedin.pegasus2avro.mxe.DataHubUpgradeHistoryEvent.SCHEMA$; private EventUtils() { // Util class @@ -176,10 +176,10 @@ public static PlatformEvent avroToPegasusPE(@Nonnull GenericRecord record) throw * @return the Pegasus {@link PlatformEvent} model */ @Nonnull - public static BuildIndicesHistoryEvent avroToPegasusBIHE(@Nonnull GenericRecord record) throws IOException { - return new BuildIndicesHistoryEvent(DataTranslator.genericRecordToDataMap( - renameSchemaNamespace(record, RENAMED_BIHE_AVRO_SCHEMA, ORIGINAL_BIHE_AVRO_SCHEMA), - BIHE_PEGASUS_SCHEMA, ORIGINAL_BIHE_AVRO_SCHEMA)); + public static DataHubUpgradeHistoryEvent avroToPegasusDUHE(@Nonnull GenericRecord record) throws IOException { + return new DataHubUpgradeHistoryEvent(DataTranslator.genericRecordToDataMap( + renameSchemaNamespace(record, RENAMED_DUHE_AVRO_SCHEMA, ORIGINAL_DUHE_AVRO_SCHEMA), + DUHE_PEGASUS_SCHEMA, ORIGINAL_DUHE_AVRO_SCHEMA)); } /** @@ -302,17 +302,17 @@ public static GenericRecord pegasusToAvroPE(@Nonnull PlatformEvent event) throws } /** - * Converts a Pegasus Build Indices History Event into the equivalent Avro model as a {@link GenericRecord}. + * Converts a Pegasus DataHub Upgrade History Event into the equivalent Avro model as a {@link GenericRecord}. * - * @param event the Pegasus {@link com.linkedin.mxe.BuildIndicesHistoryEvent} model + * @param event the Pegasus {@link com.linkedin.mxe.DataHubUpgradeHistoryEvent} model * @return the Avro model with com.linkedin.pegasus2avro.event namespace * @throws IOException if the conversion fails */ @Nonnull - public static GenericRecord pegasusToAvroBIHE(@Nonnull BuildIndicesHistoryEvent event) throws IOException { + public static GenericRecord pegasusToAvroDUHE(@Nonnull DataHubUpgradeHistoryEvent event) throws IOException { GenericRecord original = - DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_BIHE_AVRO_SCHEMA); - return renameSchemaNamespace(original, RENAMED_BIHE_AVRO_SCHEMA); + DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), ORIGINAL_DUHE_AVRO_SCHEMA); + return renameSchemaNamespace(original, RENAMED_DUHE_AVRO_SCHEMA); } /** diff --git a/metadata-io/src/main/java/com/linkedin/metadata/event/EventProducer.java b/metadata-io/src/main/java/com/linkedin/metadata/event/EventProducer.java index f5d4ad4f1f8d8..fafaaefa5ecb0 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/event/EventProducer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/event/EventProducer.java @@ -5,7 +5,7 @@ import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.mxe.BuildIndicesHistoryEvent; +import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeProposal; @@ -81,9 +81,9 @@ void producePlatformEvent( /** * Creates an entry on the history log of when the indices were last rebuilt with the latest configuration. * - * @param event the history event to send to the build indices history topic + * @param event the history event to send to the DataHub Upgrade history topic */ - void produceBuildIndicesHistoryEvent( - @Nonnull BuildIndicesHistoryEvent event + void produceDataHubUpgradeHistoryEvent( + @Nonnull DataHubUpgradeHistoryEvent event ); } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java index 38e4be56c6278..016659680f966 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/ApplicationStartupListener.java @@ -1,8 +1,9 @@ package com.linkedin.metadata.kafka.boot; import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.boot.BootstrapManager; import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; -import com.linkedin.metadata.kafka.elasticsearch.indices.BuildIndicesKafkaListener; +import com.linkedin.metadata.kafka.elasticsearch.indices.DataHubUpgradeKafkaListener; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; @@ -23,21 +24,24 @@ public class ApplicationStartupListener implements ApplicationListener finalSteps = ImmutableList.of(waitForBuildIndicesStep); + + return new BootstrapManager(finalSteps); + } +} diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/BuildIndicesKafkaListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/DataHubUpgradeKafkaListener.java similarity index 90% rename from metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/BuildIndicesKafkaListener.java rename to metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/DataHubUpgradeKafkaListener.java index 21ccae78615fc..6ffb43bc17e09 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/BuildIndicesKafkaListener.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/indices/DataHubUpgradeKafkaListener.java @@ -5,7 +5,7 @@ import com.linkedin.metadata.boot.dependencies.BootstrapDependency; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.metadata.version.GitVersion; -import com.linkedin.mxe.BuildIndicesHistoryEvent; +import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import com.linkedin.mxe.Topics; import java.util.Map; @@ -30,17 +30,17 @@ // We don't disable this on GMS since we want GMS to also wait until the indices are ready to read in case of // backwards incompatible query logic dependent on index updates. -@Component("buildIndicesKafkaListener") +@Component("dataHubUpgradeKafkaListener") @RequiredArgsConstructor @Slf4j @EnableKafka -public class BuildIndicesKafkaListener implements ConsumerSeekAware, BootstrapDependency { +public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, BootstrapDependency { @Autowired private KafkaListenerEndpointRegistry registry; - private static final String CONSUMER_GROUP = "${BUILD_INDICES_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-bihe-consumer-job-client}"; + private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}"; private static final String SUFFIX = "temp"; - private static final String TOPIC_NAME = "${BUILD_INDICES_HISTORY_TOPIC_NAME:" + Topics.BUILD_INDICES_HISTORY_TOPIC_NAME + "}"; + private static final String TOPIC_NAME = "${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}"; private final DefaultKafkaConsumerFactory _defaultKafkaConsumerFactory; private final GitVersion _gitVersion; @@ -72,9 +72,9 @@ public void onPartitionsAssigned(Map assignments, Consumer public void checkIndexVersion(final ConsumerRecord consumerRecord) { final GenericRecord record = consumerRecord.value(); - BuildIndicesHistoryEvent event; + DataHubUpgradeHistoryEvent event; try { - event = EventUtils.avroToPegasusBIHE(record); + event = EventUtils.avroToPegasusDUHE(record); log.info("Latest index update version: {}", event.getVersion()); if (_gitVersion.getVersion().equals(event.getVersion())) { isUpdated.getAndSet(true); diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index 983794db7f87a..623ddeddf6a19 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -23,7 +23,7 @@ import com.linkedin.metadata.config.ElasticSearchConfiguration; import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.GraphService; -import com.linkedin.metadata.kafka.elasticsearch.indices.BuildIndicesKafkaListener; +import com.linkedin.metadata.kafka.elasticsearch.indices.DataHubUpgradeKafkaListener; import com.linkedin.metadata.key.ChartKey; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -64,7 +64,7 @@ public class UpdateIndicesHookTest { private TimeseriesAspectService _mockTimeseriesAspectService; private SystemMetadataService _mockSystemMetadataService; private SearchDocumentTransformer _mockSearchDocumentTransformer; - private BuildIndicesKafkaListener _mockBuildIndicesKafkaListener; + private DataHubUpgradeKafkaListener _mockDataHubUpgradeKafkaListener; private ConfigurationProvider _mockConfigurationProvider; private Urn _actorUrn; @@ -78,7 +78,7 @@ public void setupTest() { _mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class); _mockSystemMetadataService = Mockito.mock(SystemMetadataService.class); _mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class); - _mockBuildIndicesKafkaListener = Mockito.mock(BuildIndicesKafkaListener.class); + _mockDataHubUpgradeKafkaListener = Mockito.mock(DataHubUpgradeKafkaListener.class); _mockConfigurationProvider = Mockito.mock(ConfigurationProvider.class); ElasticSearchConfiguration elasticSearchConfiguration = new ElasticSearchConfiguration(); BuildIndicesConfiguration buildIndicesConfiguration = new BuildIndicesConfiguration(); diff --git a/metadata-models/src/main/pegasus/com/linkedin/mxe/BuildIndicesHistoryEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/mxe/BuildIndicesHistoryEvent.pdl deleted file mode 100644 index 1ec84253fcbed..0000000000000 --- a/metadata-models/src/main/pegasus/com/linkedin/mxe/BuildIndicesHistoryEvent.pdl +++ /dev/null @@ -1,18 +0,0 @@ -namespace com.linkedin.mxe - -/** - * Kafka event for recording a historical index rebuild change. Used for backwards incompatible changes to mappings and settings that require full reindexing. - */ -record BuildIndicesHistoryEvent { - - /** - * Version of the build - */ - version: string - - /** - * A string->string map of custom properties that one might want to attach to an event - **/ - systemMetadata: optional SystemMetadata - -} diff --git a/metadata-models/src/main/pegasus/com/linkedin/mxe/DataHubUpgradeHistoryEvent.pdl b/metadata-models/src/main/pegasus/com/linkedin/mxe/DataHubUpgradeHistoryEvent.pdl new file mode 100644 index 0000000000000..7c0bef691acdc --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/mxe/DataHubUpgradeHistoryEvent.pdl @@ -0,0 +1,18 @@ +namespace com.linkedin.mxe + +/** + * Kafka event for recording a historical version upgrade. Used for backwards incompatible changes to infrastructure that requires infrastructure level blocking changes. + */ +record DataHubUpgradeHistoryEvent { + + /** + * Version of the upgrade + */ + version: string + + /** + * A string->string map of custom properties that one might want to attach to an event + **/ + systemMetadata: optional SystemMetadata + +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/TopicConventionFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/TopicConventionFactory.java index 9dbde35bc0c1d..c7df8b1cde6ec 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/TopicConventionFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/TopicConventionFactory.java @@ -41,8 +41,8 @@ public class TopicConventionFactory { @Value("${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}") private String platformEventTopicName; - @Value("${BUILD_INDICES_HISTORY_TOPIC_NAME:" + Topics.BUILD_INDICES_HISTORY_TOPIC_NAME + "}") - private String buildIndicesHistoryTopicName; + @Value("${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}") + private String dataHubUpgradeHistoryTopicName; @Bean(name = TOPIC_CONVENTION_BEAN) protected TopicConvention createInstance() { @@ -50,6 +50,6 @@ protected TopicConvention createInstance() { metadataChangeProposalName, metadataChangeLogVersionedTopicName, metadataChangeLogTimeseriesTopicName, failedMetadataChangeProposalName, platformEventTopicName, // TODO once we start rolling out v5 add support for changing the new event names. - TopicConventionImpl.DEFAULT_EVENT_PATTERN, buildIndicesHistoryTopicName); + TopicConventionImpl.DEFAULT_EVENT_PATTERN, dataHubUpgradeHistoryTopicName); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 320236d1298a3..1f1101da5d4a2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -70,8 +70,8 @@ public class BootstrapManagerFactory { private IngestRetentionPoliciesStep _ingestRetentionPoliciesStep; @Autowired - @Qualifier("buildIndicesKafkaListener") - private BootstrapDependency _buildIndicesKafkaListener; + @Qualifier("dataHubUpgradeKafkaListener") + private BootstrapDependency _dataHubUpgradeKafkaListener; @Autowired private ConfigurationProvider _configurationProvider; @@ -99,7 +99,8 @@ protected BootstrapManager createInstance() { final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); final RestoreColumnLineageIndices restoreColumnLineageIndices = new RestoreColumnLineageIndices(_entityService, _entityRegistry); final IngestDefaultGlobalSettingsStep ingestSettingsStep = new IngestDefaultGlobalSettingsStep(_entityService); - final WaitForBuildIndicesStep waitForBuildIndicesStep = new WaitForBuildIndicesStep(_buildIndicesKafkaListener, _configurationProvider); + final WaitForBuildIndicesStep waitForBuildIndicesStep = new WaitForBuildIndicesStep(_dataHubUpgradeKafkaListener, + _configurationProvider); final List finalSteps = new ArrayList<>(ImmutableList.of( waitForBuildIndicesStep, diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/WaitForBuildIndicesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/WaitForBuildIndicesStep.java index aa0f4ebc90662..4673b807ccb32 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/WaitForBuildIndicesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/WaitForBuildIndicesStep.java @@ -8,7 +8,7 @@ @RequiredArgsConstructor public class WaitForBuildIndicesStep implements BootstrapStep { - private final BootstrapDependency _buildIndicesKafkaListener; + private final BootstrapDependency _dataHubUpgradeKafkaListener; private final ConfigurationProvider _enableWaitForBuildIndices; @Override @@ -18,7 +18,7 @@ public String name() { @Override public void execute() throws Exception { - if (!_buildIndicesKafkaListener.waitForBootstrap()) { + if (!_dataHubUpgradeKafkaListener.waitForBootstrap()) { throw new IllegalStateException("Build indices was unsuccessful, stopping bootstrap process."); } }