Skip to content

Commit

Permalink
updated checkpointing logger names
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthieu Morel committed Jul 12, 2011
1 parent e628327 commit 0e61639
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 56 deletions.
10 changes: 8 additions & 2 deletions s4-core/src/main/java/io/s4/ft/BookKeeperStateStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class BookKeeperStateStorage
//DeleteCallback,
AddCallback,
ReadCallback, StatCallback, DeleteCallback, org.apache.zookeeper.AsyncCallback.StringCallback {
private static Logger logger = Logger.getLogger(BookKeeperStateStorage.class);
private static Logger logger = Logger.getLogger("s4-ft");

private String zkServers;
private BookKeeper bk;
Expand Down Expand Up @@ -183,6 +183,12 @@ public void saveState(SafeKeeperId key, byte[] state,
logger.debug("checkpointing: " + key);
}
SaveCtx sctx = new SaveCtx(key, state, callback);

// TODO
// if ledger exist for this prototype, use it, else fetch it from zookeeper, else create it
// then write entry to ledger
// then add entry id to index ledger

/*
* Creates a new ledger to store the checkpoint
*/
Expand Down Expand Up @@ -333,7 +339,7 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
fctx.state = seq.nextElement().getEntry();

} else {
logger.error("Reading checkpoint failed.");
logger.error("Reading checkpoint failed : " + rc);
}

if(fctx.sb != null){
Expand Down
85 changes: 35 additions & 50 deletions s4-core/src/main/java/io/s4/ft/DefaultFileSystemStateStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,8 +34,7 @@
*/
public class DefaultFileSystemStateStorage implements StateStorage {

private static Logger logger = Logger
.getLogger(DefaultFileSystemStateStorage.class);
private static Logger logger = Logger.getLogger("s4-ft");
private String storageRootPath;
ThreadPoolExecutor threadPool;
int maxWriteThreads = 1;
Expand All @@ -46,19 +46,24 @@ public DefaultFileSystemStateStorage() {

/**
* <p>
* Must be called by the dependency injection framework.<p/>
* Must be called by the dependency injection framework.
* <p/>
*/
public void init() {
checkStorageDir();
threadPool = new ThreadPoolExecutor(0, maxWriteThreads,
writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
threadPool = new ThreadPoolExecutor(0, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
}

@Override
public void saveState(SafeKeeperId key, byte[] state,
StorageCallback callback) {
threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
try {
threadPool.submit(new SaveTask(key, state, callback, storageRootPath));
} catch (RejectedExecutionException e) {
logger.error("Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests + "]");
}
}

@Override
Expand All @@ -82,23 +87,19 @@ public byte[] fetchState(SafeKeeperId key) {
* array.
*/
if (length > Integer.MAX_VALUE) {
throw new IOException("Error file is too large: "
+ file.getName() + " " + length + " bytes");
throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
}

byte[] buffer = new byte[(int) length];
int offSet = 0;
int numRead = 0;

while (offSet < buffer.length
&& (numRead = in.read(buffer, offSet, buffer.length
- offSet)) >= 0) {
while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
offSet += numRead;
}

if (offSet < buffer.length) {
throw new IOException("Error, could not read entire file: "
+ file.getName() + " " + offSet + "/"
throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+ buffer.length + " bytes read");
}

Expand Down Expand Up @@ -147,15 +148,10 @@ public boolean accept(File file) {
}

// files kept as : root/<partitionId>/<prototypeId>/encodedKeyWithFullInfo
private static File safeKeeperID2File(SafeKeeperId key,
String storageRootPath) {

return new File(storageRootPath
+ File.separator
+ key.getPrototypeId()
+ File.separator
+ Base64.encodeBase64URLSafeString(key
.getStringRepresentation().getBytes()));
private static File safeKeeperID2File(SafeKeeperId key, String storageRootPath) {

return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+ Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
}

private static SafeKeeperId file2SafeKeeperID(File file) {
Expand All @@ -173,8 +169,7 @@ public void setStorageRootPath(String storageRootPath) {
File rootPathFile = new File(storageRootPath);
if (!rootPathFile.exists()) {
if (!rootPathFile.mkdirs()) {
logger.error("could not create root storage directory : "
+ storageRootPath);
logger.error("could not create root storage directory : " + storageRootPath);
}

}
Expand Down Expand Up @@ -207,18 +202,16 @@ public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
public void checkStorageDir() {
if (storageRootPath == null) {

File defaultStorageDir = new File(System.getProperty("user.dir")
+ File.separator + "tmp" + File.separator + "storage");
File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ "storage");
storageRootPath = defaultStorageDir.getAbsolutePath();
if (logger.isInfoEnabled()) {
logger.info("Unspecified storage dir; using default dir: "
+ defaultStorageDir.getAbsolutePath());
logger.info("Unspecified storage dir; using default dir: " + defaultStorageDir.getAbsolutePath());
}
if (!defaultStorageDir.exists()) {
if (!(defaultStorageDir.mkdirs())) {
logger.error("Storage directory not specified, and cannot create default storage directory : "
+ defaultStorageDir.getAbsolutePath()
+ "\n Checkpointing and recovery will be disabled.");
+ defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
}
}
}
Expand All @@ -227,16 +220,15 @@ public void checkStorageDir() {
/**
*
* Writing to storage is an asynchronous operation specified in this class.
*
*
*/
private static class SaveTask implements Runnable {
SafeKeeperId key;
byte[] state;
StorageCallback callback;
private String storageRootPath;

public SaveTask(SafeKeeperId key, byte[] state,
StorageCallback callback, String storageRootPath) {
public SaveTask(SafeKeeperId key, byte[] state, StorageCallback callback, String storageRootPath) {
super();
this.key = key;
this.state = state;
Expand All @@ -247,15 +239,13 @@ public SaveTask(SafeKeeperId key, byte[] state,
public void run() {
File f = safeKeeperID2File(key, storageRootPath);
if (logger.isDebugEnabled()) {
logger.debug("Checkpointing [" + key + "] into file: ["
+ f.getAbsolutePath() + "]");
logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
}
if (!f.exists()) {
if (!f.getParentFile().exists()) {
// parent file has prototype id
if (!f.getParentFile().mkdir()) {
callback.storageOperationResult(
SafeKeeper.StorageResultCode.FAILURE,
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
"Cannot create directory for storing PE for prototype: "
+ f.getParentFile().getAbsolutePath());
return;
Expand All @@ -265,17 +255,14 @@ public void run() {
try {
f.createNewFile();
} catch (IOException e) {
callback.storageOperationResult(
SafeKeeper.StorageResultCode.FAILURE,
e.getMessage());
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
return;
}
} else {
if (!f.delete()) {
callback.storageOperationResult(
SafeKeeper.StorageResultCode.FAILURE,
"Cannot delete previously saved checkpoint file ["
+ f.getParentFile().getAbsolutePath() + "]");
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
"Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath()
+ "]");
return;
}
}
Expand All @@ -284,11 +271,9 @@ public void run() {
fos = new FileOutputStream(f);
fos.write(state);
} catch (FileNotFoundException e) {
callback.storageOperationResult(
SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
} catch (IOException e) {
callback.storageOperationResult(
SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE, e.getMessage());
} finally {
try {
if (fos != null) {
Expand Down
6 changes: 2 additions & 4 deletions s4-core/src/main/java/io/s4/processor/AbstractPE.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ public String getName() {
}
}

transient private static Logger LOG = Logger.getLogger(AbstractPE.class);

transient private Clock s4Clock;
// FIXME replaces monitor wait on AbstractPE, for triggering possible extra
// thread when checkpointing activated
Expand Down Expand Up @@ -624,11 +622,11 @@ private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, Abstra
// TODO use reflectasm
field.set(this, field.get(oldState));
} catch (IllegalArgumentException e) {
LOG.error("Cannot recover old state for this PE ["
Logger.getLogger("s4-ft").error("Cannot recover old state for this PE ["
+ this + "]", e);
return;
} catch (IllegalAccessException e) {
LOG.error("Cannot recover old state for this PE ["
Logger.getLogger("s4-ft").error("Cannot recover old state for this PE ["
+ this + "]", e);
return;
}
Expand Down
4 changes: 4 additions & 0 deletions s4-core/src/main/resources/s4-core/conf/default/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<level value="info"/>
<appender-ref ref="S"/>
</logger>
<logger name="s4-ft">
<level value="info"/>
<appender-ref ref="R"/>
</logger>
<appender name="R" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${log_loc}/s4-core/s4-core_${instanceId}.log" />
<layout class="org.apache.log4j.PatternLayout">
Expand Down

0 comments on commit 0e61639

Please sign in to comment.