Skip to content

Commit

Permalink
Revert "[fix][broker] Fix get topic policies as null during clean cac…
Browse files Browse the repository at this point in the history
…he (#20763)"

This reverts commit 644c11a.
  • Loading branch information
coderzc committed Aug 29, 2023
1 parent 644c11a commit 42e2ccd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
promise.complete(null);
}
});
return;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,36 @@
package org.apache.pulsar.compaction;

import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.buffer.ByteBuf;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Cleanup;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand All @@ -64,6 +59,7 @@
public class CompactorTest extends MockedPulsarServiceBaseTest {

private ScheduledExecutorService compactionScheduler;

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -86,19 +82,18 @@ public void cleanup() throws Exception {
compactionScheduler.shutdownNow();
}

private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics)
throws Exception {
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
long compactedLedgerId = compactor.compact(topic).get();

LedgerHandle ledger = bk.openLedger(compactedLedgerId,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
Assert.assertEquals(ledger.getLastAddConfirmed() + 1, // 0..lac
expected.size(),
"Should have as many entries as there is keys");
expected.size(),
"Should have as many entries as there is keys");

List<String> keys = new ArrayList<>();
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
Expand All @@ -112,7 +107,7 @@ private List<String> compactAndVerify(String topic, Map<String, byte[]> expected
byte[] bytes = new byte[payload.readableBytes()];
payload.readBytes(bytes);
Assert.assertEquals(bytes, expected.remove(key),
"Compacted version should match expected version");
"Compacted version should match expected version");
m.close();
}
if (checkMetrics) {
Expand All @@ -136,18 +131,17 @@ public void testCompaction() throws Exception {
final int numMessages = 1000;
final int maxKeys = 10;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

Map<String, byte[]> expected = new HashMap<>();
Random r = new Random(0);

for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
String key = "key" + keyIndex;
String key = "key"+keyIndex;
byte[] data = ("my-message-" + key + "-" + j).getBytes();
producer.newMessage()
.key(key)
Expand All @@ -162,11 +156,10 @@ public void testCompaction() throws Exception {
public void testCompactAddCompact() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

Map<String, byte[]> expected = new HashMap<>();

Expand Down Expand Up @@ -200,11 +193,10 @@ public void testCompactAddCompact() throws Exception {
public void testCompactedInOrder() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

producer.newMessage()
.key("c")
Expand Down Expand Up @@ -251,50 +243,6 @@ public void testPhaseOneLoopTimeConfiguration() {

}

@Test
public void testCompactedWithConcurrentSend() throws Exception {
String topic = "persistent://my-property/use/my-ns/testCompactedWithConcurrentSend";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
for (int i = 0; i < 100; i++) {
try {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i).getBytes()).send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
});

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
CompactedTopic compactedTopic = persistentTopic.getCompactedTopic();

Awaitility.await().untilAsserted(() -> {
long compactedLedgerId = compactor.compact(topic).join();
Thread.sleep(300);
Optional<CompactedTopicContext> compactedTopicContext = persistentTopic.getCompactedTopic()
Assert.assertTrue(compactedTopicContext.isPresent());
Assert.assertEquals(compactedTopicContext.get().ledger.getId(), compactedLedgerId);
});

Position lastCompactedPosition = compactedTopic.getCompactionHorizon().get();
Entry lastCompactedEntry = compactedTopic.readLastEntryOfCompactedLedger().get();

Assert.assertTrue(PositionImpl.get(lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId())
.compareTo(lastCompactedEntry.getLedgerId(), lastCompactedEntry.getEntryId()) >= 0);

future.join();
}

public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);
Expand Down

0 comments on commit 42e2ccd

Please sign in to comment.