Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ private void recoverInternal(String streamIdentifier) throws IOException {
*/
@Override
public void delete() throws IOException {
// delete the actual log stream and log segments
BKLogWriteHandler ledgerHandler = createWriteHandler(true);
ledgerHandler.deleteLog();
// delete the log stream metadata
Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
.deleteLog(uri, getStreamName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@ public void onAbort(Throwable t) {
});
}

/**
* Delete the whole log and all log segments under the log
*/
void deleteLog() throws IOException {
lock.checkOwnershipAndReacquire();
Utils.ioResult(purgeLogSegmentsOlderThanTxnId(-1));
Utils.closeQuietly(lock);
}

/**
* The caller could call this before any actions, which to hold the lock for
* the write handler of its whole lifecycle. The lock will only be released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.distributedlog;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
Expand All @@ -41,6 +43,7 @@
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.LogReadException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
Expand All @@ -65,6 +68,7 @@
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;

import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -1202,4 +1206,43 @@ public void testTruncationValidation() throws Exception {

zookeeperClient.close();
}

@Test(timeout = 60000)
public void testDeleteLog() throws Exception {
String name = "delete-log-should-delete-ledgers";
DistributedLogManager dlm = createNewDLM(conf, name);
long txid = 1;
// Create the log and write some records
BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) {
writer.write(DLMTestUtil.getLogRecordInstance(txid++));
}
BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter();
writer.closeAndComplete();
BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(txid, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
Utils.ioResult(blplm.asyncClose());

// Should be able to open the underline ledger using BK client
long ledgerId = perStreamLogWriter.getLogSegmentId();
BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
// Delete the log and we shouldn't be able the open the ledger
dlm.delete();
try {
driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId,
BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
fail("Should fail to open ledger after we delete the log");
} catch (BKException.BKNoSuchLedgerExistsException e) {
// ignore
}
// delete again should not throw any exception
try {
dlm.delete();
} catch (IOException ioe) {
fail("Delete log twice should not throw any exception");
}
}
}