Skip to content

Commit

Permalink
GarbageCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Apr 4, 2017
1 parent 389cc71 commit 1129e35
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 175 deletions.
10 changes: 10 additions & 0 deletions core/common/src/main/java/alluxio/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public enum PropertyKey {
Name.MASTER_JOURNAL_TAILER_SHUTDOWN_QUIET_WAIT_TIME_MS, 5000),
MASTER_JOURNAL_TAILER_SLEEP_TIME_MS(Name.MASTER_JOURNAL_TAILER_SLEEP_TIME_MS, 1000),
MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES(Name.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES, 160000),
MASTER_JOURNAL_GC_PERIOD_MS(Name.MASTER_JOURNAL_GC_PERIOD_MS, 600000),
MASTER_JOURNAL_GC_THRESHOLD_MS(Name.MASTER_JOURNAL_GC_THRESHOLD_MS, 600000),
MASTER_JOURNAL_TEMPORARY_FILE_GC_THRESHOLD_MS(Name.MASTER_JOURNAL_TEMPORARY_FILE_GC_THRESHOLD_MS,
1800000),
MASTER_KEYTAB_KEY_FILE(Name.MASTER_KEYTAB_KEY_FILE, null),
MASTER_LINEAGE_CHECKPOINT_CLASS(Name.MASTER_LINEAGE_CHECKPOINT_CLASS,
"alluxio.master.lineage.checkpoint.CheckpointLatestPlanner"),
Expand Down Expand Up @@ -575,6 +579,12 @@ public static final class Name {
public static final String MASTER_WORKER_TIMEOUT_MS = "alluxio.master.worker.timeout.ms";
public static final String MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES =
"alluxio.master.journal.checkpoint.period.entries";
public static final String MASTER_JOURNAL_GC_PERIOD_MS = "alluxio.master.journal.gc.period.ms";
public static final String MASTER_JOURNAL_GC_THRESHOLD_MS =
"alluxio.master.journal.gc.threshold.ms";
public static final String MASTER_JOURNAL_TEMPORARY_FILE_GC_THRESHOLD_MS =
"alluxio.master.journal.temporary.file.gc.threshold.ms";

//
// Worker related properties
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public void start(boolean isPrimary) throws IOException {
* The sequence for dealing with the journal before starting as the primary:
*
* 1. Create a journal reader to replay the logs from the next sequence number.
* 2. Start the journal garbage collector.
* 3. Start the journal writer.
* 2. Start the journal writer.
*
* Since this method is called before the master RPC server starts serving, there is no
* concurrent access to the master during these phases.
Expand Down Expand Up @@ -132,7 +131,7 @@ public void start(boolean isPrimary) throws IOException {
}
}

// Step 3: Start the journal writer.
// Step 2: Start the journal writer.
mJournalWriter = mJournal.getWriter(JournalWriterCreateOptions.defaults()
.setNextSequenceNumber(journalReader.getNextSequenceNumber()).setPrimary(true));
mAsyncJournalWriter = new AsyncJournalWriter(mJournalWriter);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal.ufs;

import alluxio.Configuration;
import alluxio.Constants;
import alluxio.PropertyKey;
import alluxio.master.journal.JournalWriter;
import alluxio.underfs.UnderFileSystem;
import alluxio.util.ThreadFactoryUtils;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.ThreadSafe;

/**
* Implementation of {@link JournalWriter} based on UFS.
*/
@ThreadSafe
final class UfsJournalGarbageCollector implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(UfsJournalGarbageCollector.class);

private ScheduledExecutorService mExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryUtils.build("UfsJournalGarbageCollector-%d", true));

private UfsJournal mJournal;

private ScheduledFuture<?> mGc;

UfsJournalGarbageCollector(UfsJournal journal) {
mJournal = journal;

mGc = mExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
gc();
}
}, Constants.SECOND_MS, Configuration.getLong(PropertyKey.MASTER_JOURNAL_GC_PERIOD_MS),
TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (mGc != null) {
mGc.cancel(true);
mGc = null;
}
}

private void gc() {
UfsJournal.Snapshot snapshot;
try {
snapshot = mJournal.getSnapshot();
} catch (IOException e) {
LOG.warn("Failed to get journal snapshot with error {}.", e.getMessage());
return;
}
long nextCheckpointSequenceNumber = 0;

// Checkpoint.
List<UfsJournalFile> checkpoints = snapshot.mCheckpoints;
if (!checkpoints.isEmpty()) {
nextCheckpointSequenceNumber = checkpoints.get(checkpoints.size() - 1).getEnd();
}
for (int i = 0; i < checkpoints.size() - 1; ++i) {
// Only keep at most 2 checkpoints.
if (i != checkpoints.size() - 2) {
deleteNoException(checkpoints.get(i).getLocation());
}
// For the the second last checkpoint. Check whether it has been there for a long time.
maybeGc(checkpoints.get(i), nextCheckpointSequenceNumber);
}

for (UfsJournalFile log : snapshot.mLogs) {
maybeGc(log, nextCheckpointSequenceNumber);
}

for (UfsJournalFile tmpCheckpoint : snapshot.mTemporaryCheckpoints) {
maybeGc(tmpCheckpoint, nextCheckpointSequenceNumber);
}
}

private void maybeGc(UfsJournalFile file, long nextCheckpointSequenceNumber) {
if (file.getEnd() > nextCheckpointSequenceNumber && !file.isTmpCheckpoint()) {
return;
}

long lastModifiedTimeMs;
try {
lastModifiedTimeMs = mJournal.getUfs().getModificationTimeMs(file.getLocation().toString());
} catch (IOException e) {
LOG.warn("Failed to get the last modified time for {}.", file.getLocation());
return;
}

long thresholdMs = file.isTmpCheckpoint() ?
Configuration.getLong(PropertyKey.MASTER_JOURNAL_TEMPORARY_FILE_GC_THRESHOLD_MS) :
Configuration.getLong(PropertyKey.MASTER_JOURNAL_GC_THRESHOLD_MS);

if (System.currentTimeMillis() - lastModifiedTimeMs > thresholdMs) {
deleteNoException(file.getLocation());
}
}

void deleteNoException(URI location) {
try {
mJournal.getUfs().deleteFile(location.toString());
} catch (IOException e) {
LOG.warn("Failed to delete journal file {}.", location);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import alluxio.underfs.options.CreateOptions;

import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,6 +50,7 @@ public final class UfsJournalLogWriter implements JournalWriter {
*/
private boolean mRotateLogForNextWrite;
private JournalOutputStream mJournalOutputStream;
private UfsJournalGarbageCollector mGarbageCollector;

private class JournalOutputStream implements Closeable {
final DataOutputStream mOutputStream;
Expand Down Expand Up @@ -126,6 +128,7 @@ public void close() throws IOException {
mRotateLogForNextWrite = true;
mJournalOutputStream = new JournalOutputStream(currentLog, null);
}
mGarbageCollector = new UfsJournalGarbageCollector(mJournal);
}

@Override
Expand Down Expand Up @@ -205,9 +208,12 @@ public synchronized void flush() throws IOException {

@Override
public synchronized void close() throws IOException {
Closer closer = Closer.create();
if (mJournalOutputStream != null) {
mJournalOutputStream.close();
closer.register(mJournalOutputStream);
}
closer.register(mGarbageCollector);
closer.close();
mClosed = true;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected URI getJournalLocation() {
/**
* @param journalFactory the factory to use for creating journals
*/
protected void createMasters(final JournalFactory journalFactory) {
private void createMasters(final JournalFactory journalFactory) {
mRegistry = new MasterRegistry();
List<Callable<Void>> callables = new ArrayList<>();
for (final MasterFactory factory : ServerUtils.getMasterServiceLoader()) {
Expand Down

0 comments on commit 1129e35

Please sign in to comment.