Skip to content

Commit

Permalink
Updated some concurrency code to properly implement lazy initializati…
Browse files Browse the repository at this point in the history
…on according to Effective Java, Third Edition
  • Loading branch information
Ian Emmons committed Dec 11, 2019
1 parent 0499f10 commit c086c91
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 307 deletions.
198 changes: 80 additions & 118 deletions Parliament/java/com/bbn/parliament/recovery/RecoveryManager.java
Expand Up @@ -9,173 +9,135 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @author dkolas */
public class RecoveryManager
{
public static final String JOURNAL_FILE_NAME = "recovery.journal";
public static final long AUTO_FLUSH_INTERVAL = 5000;

private static Logger _logger = LoggerFactory.getLogger(RecoveryManager.class);

private String _directoryPath;
private boolean _needsRecovery;
BufferedWriter _writer;
private File _journalFile;
private AutoFlush _autoFlush;

public RecoveryManager(String directoryPath, Recoverable recoverable)
{
_directoryPath = directoryPath;
_journalFile = new File(_directoryPath, JOURNAL_FILE_NAME);

_needsRecovery = _journalFile.exists() && _journalFile.length() > 0;
if (_needsRecovery)
{
public class RecoveryManager {
public static final String JOURNAL_FILE_NAME = "recovery.journal";
public static final long AUTO_FLUSH_INTERVAL = 5000;
private static final Logger LOG = LoggerFactory.getLogger(RecoveryManager.class);

private final File _journalFile;
private volatile BufferedWriter _writer;

public RecoveryManager(String directoryPath, Recoverable recoverable) {
_journalFile = new File(directoryPath, JOURNAL_FILE_NAME);

if (_journalFile.exists() && _journalFile.length() > 0) {
recover(recoverable);
}
_autoFlush = new AutoFlush();
new Thread(_autoFlush, "RecoveryManagerAutoFlusher").start();

Thread autoFlusher = new Thread(() -> {
while (true) {
try {
Thread.sleep(AUTO_FLUSH_INTERVAL);
} catch (InterruptedException e) {
// Who cares?
}
try {
getWriter().flush();
} catch (IOException ex) {
throw new RuntimeException("Can't flush journal!", ex);
}
}
});
autoFlusher.setName("RecoveryManagerAutoFlusher");
autoFlusher.setDaemon(true);
autoFlusher.start();
}

public synchronized void recordAdd(String subject, String predicate,
String object) throws IOException
{
StringBuffer buffer = new StringBuffer();
buffer.append('+');
buffer.append(triple(subject, predicate, object));
buffer.append('\n');
getWriter().write(buffer.toString());
public void startBlock() throws IOException {
getWriter().write("~start~");
}

public synchronized void recordDelete(String subject, String predicate,
String object) throws IOException
{
StringBuilder buffer = new StringBuilder();
buffer.append('-');
buffer.append(triple(subject, predicate, object));
buffer.append('\n');
getWriter().write(buffer.toString());
public void endBlock() throws IOException {
getWriter().write("~end~");
}

private static String triple(String subject, String predicate, String object)
{
StringBuilder result = new StringBuilder();
result.append(subject);
result.append(' ');
result.append(predicate);
result.append(' ');
result.append(object);
return result.toString();
public void recordAdd(String subject, String predicate, String object) throws IOException {
getWriter().write(appendTriple('+', subject, predicate, object));
}

public void startBlock() throws IOException
{
getWriter().write("~start~");
public void recordDelete(String subject, String predicate, String object) throws IOException {
getWriter().write(appendTriple('-', subject, predicate, object));
}

public void endBlock() throws IOException
{
getWriter().write("~end~");
private static String appendTriple(char operation, String subject, String predicate, String object) {
StringBuilder buffer = new StringBuilder();
buffer.append(operation);
buffer.append(subject);
buffer.append(' ');
buffer.append(predicate);
buffer.append(' ');
buffer.append(object);
buffer.append('\n');
return buffer.toString();
}

private BufferedWriter getWriter() throws IOException
{
if (_writer == null)
{
_writer = new BufferedWriter(new FileWriter(_journalFile));
/**
* Get the writer, with lazy initialization. This follows the double-check idiom
* for lazy initialization of an instance field. See Item 83 of Effective Java,
* Third Edition, by Joshua Bloch for details.
*
* @return the writer
*/
private BufferedWriter getWriter() throws IOException {
BufferedWriter result = _writer;
if (result == null) {
synchronized (this) {
if (_writer == null) {
_writer = result = new BufferedWriter(
new OutputStreamWriter(
new FileOutputStream(_journalFile), StandardCharsets.UTF_8));
}
}
}
return _writer;
return result;
}

public void instanceFlushed() throws IOException
{
if (_writer != null)
{
public void instanceFlushed() throws IOException {
if (_writer != null) {
_writer.close();
}
_writer = null;
_journalFile.delete();
}

private void recover(Recoverable recoverable)
{
_logger.warn("Needed to recover unflushed changes!");
private void recover(Recoverable recoverable) {
LOG.warn("Needed to recover unflushed changes!");
boolean finishedRecovery = false;
try (BufferedReader rdr = Files.newBufferedReader(_journalFile.toPath(), StandardCharsets.UTF_8)) {
String line = null;
while ((line = rdr.readLine()) != null)
{
while ((line = rdr.readLine()) != null) {
int firstSpace = line.indexOf(' ');
int secondSpace = line.indexOf(' ', firstSpace + 1);
String subject = line.substring(1, firstSpace);
String predicate = line.substring(firstSpace + 1, secondSpace);
String object = line.substring(secondSpace + 1);

if (line.charAt(0) == '+')
{
_logger.debug("Recovery: adding statement: {} {} {}",
new Object[] { subject, predicate, object });
if (line.charAt(0) == '+') {
LOG.debug("Recovery: adding statement: {} {} {}", subject, predicate, object);
recoverable.recoverAdd(subject, predicate, object);
}
else if (line.charAt(0) == '-')
{
_logger.debug("Recovery: deleting statement: {} {} {}",
new Object[] { subject, predicate, object });
} else if (line.charAt(0) == '-') {
LOG.debug("Recovery: deleting statement: {} {} {}", subject, predicate, object);
recoverable.recoverDelete(subject, predicate, object);
}
}
recoverable.recoverFlush();
finishedRecovery = true;
_logger.warn("Recovered successfully!");
}
catch (IOException e)
{
LOG.warn("Recovered successfully!");
} catch (IOException e) {
throw new RuntimeException("Error during recovery!", e);
}
if (finishedRecovery)
{
if (finishedRecovery) {
_journalFile.delete();
}
}

private class AutoFlush implements Runnable
{
public AutoFlush() {
}

@Override
public void run()
{
while (true)
{
try
{
Thread.sleep(AUTO_FLUSH_INTERVAL);
}
catch (InterruptedException e)
{
// Who cares?
}
if (_writer != null)
{
try
{
_writer.flush();
}
catch (IOException e)
{
throw new RuntimeException("Can't flush journal!", e);
}
}
}
}
}
}
74 changes: 26 additions & 48 deletions Parliament/java/com/bbn/parliament/utilities/Timer.java
Expand Up @@ -25,18 +25,15 @@ public Timer()
init();
}

private void init()
private synchronized void init()
{
synchronized (this)
{
_running = false;
_start = 0L;
_end = 0L;
_total = 0L;
}
_running = false;
_start = 0L;
_end = 0L;
_total = 0L;
}

public void reset()
public synchronized void reset()
{
if (_running)
{
Expand All @@ -46,63 +43,44 @@ public void reset()
init();
}

public void start()
public synchronized void start()
{
synchronized (this)
if (_running)
{
if (_running)
{
throw new IllegalStateException("Timer is already started");
}

_running = true;
_start = Calendar.getInstance().getTimeInMillis();
throw new IllegalStateException("Timer is already started");
}

_running = true;
_start = Calendar.getInstance().getTimeInMillis();
}

public void stop()
public synchronized void stop()
{
synchronized (this)
if (!_running)
{
if (!_running)
{
throw new IllegalStateException("Timer has not been started");
}

_end = Calendar.getInstance().getTimeInMillis();
_total = _end - _start;
_running = false;
throw new IllegalStateException("Timer has not been started");
}

_end = Calendar.getInstance().getTimeInMillis();
_total = _end - _start;
_running = false;
}

public TimeValue getElapsedTime()
public synchronized TimeValue getElapsedTime()
{
TimeValue retval;

synchronized (this)
if (_running)
{
if (_running)
{
throw new IllegalStateException("Timer is still running");
}

retval = new TimeValue();
retval.setMsec(_total);
throw new IllegalStateException("Timer is still running");
}

TimeValue retval = new TimeValue();
retval.setMsec(_total);
return retval;
}

public boolean isRunning()
public synchronized boolean isRunning()
{
boolean retval;

synchronized (this)
{
retval = _running;
}

return retval;
return _running;
}

public static void main(String[] args)
Expand Down
Expand Up @@ -16,7 +16,11 @@ private static class IndexFactoryRegistryHolder {
}

/**
* Get the instance of the registry.
* Get the instance of the index factory registry.
*
* This follows the "lazy initialization holder class" idiom for lazy
* initialization of a static field. See Item 83 of Effective Java, Third
* Edition, by Joshua Bloch for details.
*
* @return the instance.
*/
Expand Down
Expand Up @@ -34,7 +34,13 @@ private static class IndexRegistryHolder {
private static final IndexManager INSTANCE = new IndexManager();
}

/** Get the singleton instance of the <code>IndexManager</code>. */
/**
* Get the singleton instance of the index manager. This follows the "lazy
* initialization holder class" idiom for lazy initialization of a static field.
* See Item 83 of Effective Java, Third Edition, by Joshua Bloch for details.
*
* @return the instance
*/
public static IndexManager getInstance() {
return IndexRegistryHolder.INSTANCE;
}
Expand Down
@@ -1,5 +1,6 @@
package com.bbn.parliament.jena.jetty;

// TODO: Replace this class with Java's CountdownLatch
public class InterThreadSignal {
private boolean signalSent;
private final Object lock;
Expand Down

0 comments on commit c086c91

Please sign in to comment.