Skip to content

Commit

Permalink
Merge pull request #142 from skyahead/issue141
Browse files Browse the repository at this point in the history
not trying to hold leases on WAL files if we are holding them already.
  • Loading branch information
kkonstantine committed Aug 17, 2017
2 parents dcc201e + e28ec4f commit cd0656f
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 66 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -9,7 +9,7 @@
<!-- TODO: fix all of these -->
<suppress
checks="AbbreviationAsWordInName"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile).java"
files="(DataWriter|FSWAL|TopicPartitionWriter|WAL|WALEntry|WALFile|WALConstants).java"
/>

<suppress
Expand Down
Expand Up @@ -385,7 +385,7 @@ public void write() {
closeTempFile();
appendToWAL();
commitFile();
} catch (DataException e) {
} catch (ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = System.currentTimeMillis();
setRetryTimeout(timeoutMs);
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java
Expand Up @@ -35,9 +35,6 @@
public class FSWAL implements WAL {

private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
private static final String leaseException =
"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
private static final long MAX_SLEEP_INTERVAL_MS = 16000L;

private WALFile.Writer writer = null;
private WALFile.Reader reader = null;
Expand All @@ -62,13 +59,15 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
writer.append(key, value);
writer.hsync();
} catch (IOException e) {
log.error("Error appending WAL file: {}, {}", logFile, e);
close();
throw new DataException(e);
}
}

public void acquireLease() throws ConnectException {
long sleepIntervalMs = 1000L;
while (sleepIntervalMs < MAX_SLEEP_INTERVAL_MS) {
long sleepIntervalMs = WALConstants.INITIAL_SLEEP_INTERVAL_MS;
while (sleepIntervalMs < WALConstants.MAX_SLEEP_INTERVAL_MS) {
try {
if (writer == null) {
writer = WALFile.createWriter(conf, Writer.file(new Path(logFile)),
Expand All @@ -77,7 +76,7 @@ public void acquireLease() throws ConnectException {
}
break;
} catch (RemoteException e) {
if (e.getClassName().equals(leaseException)) {
if (e.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
log.info("Cannot acquire lease on WAL {}", logFile);
try {
Thread.sleep(sleepIntervalMs);
Expand All @@ -92,7 +91,7 @@ public void acquireLease() throws ConnectException {
throw new DataException("Error creating writer for log file " + logFile, e);
}
}
if (sleepIntervalMs >= MAX_SLEEP_INTERVAL_MS) {
if (sleepIntervalMs >= WALConstants.MAX_SLEEP_INTERVAL_MS) {
throw new ConnectException("Cannot acquire lease after timeout, will retry.");
}
}
Expand Down Expand Up @@ -129,33 +128,37 @@ public void apply() throws ConnectException {
}
}
} catch (IOException e) {
log.error("Error applying WAL file: {}, {}", logFile, e);
close();
throw new DataException(e);
}
}

@Override
public void truncate() throws ConnectException {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
// Clean out references to the current WAL file.
// Open a new one on the next lease acquisition.
close();
try {
String oldLogFile = logFile + ".1";
storage.delete(oldLogFile);
storage.commit(logFile, oldLogFile);
} finally {
close();
}
}

@Override
public void close() throws ConnectException {
try {
if (writer != null) {
writer.close();
writer = null;
}
if (reader != null) {
reader.close();
reader = null;
}
} catch (IOException e) {
throw new DataException("Error closing " + logFile, e);
} finally {
writer = null;
reader = null;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/wal/WALConstants.java
@@ -0,0 +1,23 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/


package io.confluent.connect.hdfs.wal;

public class WALConstants {
protected static final String LEASE_EXCEPTION_CLASS_NAME
= "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
protected static final long MAX_SLEEP_INTERVAL_MS = 16000L;
protected static final long INITIAL_SLEEP_INTERVAL_MS = 1000L;
}
119 changes: 74 additions & 45 deletions src/main/java/io/confluent/connect/hdfs/wal/WALFile.java
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.util.Time;

Expand Down Expand Up @@ -132,42 +133,56 @@ public static class Writer implements Closeable, Syncable {
throw new IllegalArgumentException("file modifier options not compatible with stream");
}

FileSystem fs = null;
FSDataOutputStream out;
boolean ownStream = fileOption != null;
if (ownStream) {
Path p = fileOption.getValue();
FileSystem fs;
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue()
&& fs.exists(p)) {
// Read the file and verify header details
try (
WALFile.Reader reader =
new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())
) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());

try {
if (ownStream) {
Path p = fileOption.getValue();
fs = p.getFileSystem(conf);
int bufferSize = bufferSizeOption == null
? getBufferSize(conf)
: bufferSizeOption.getValue();
short replication = replicationOption == null
? fs.getDefaultReplication(p)
: (short) replicationOption.getValue();
long blockSize = blockSizeOption == null
? fs.getDefaultBlockSize(p)
: blockSizeOption.getValue();

if (appendIfExistsOption != null && appendIfExistsOption.getValue() && fs.exists(p)) {
// Read the file and verify header details
try (WALFile.Reader reader = new WALFile.Reader(
connectorConfig.getHadoopConfiguration(),
WALFile.Reader.file(p),
new Reader.OnlyHeaderOption()
)) {
if (reader.getVersion() != VERSION[3]) {
throw new VersionMismatchException(VERSION[3], reader.getVersion());
}
sync = reader.getSync();
}
sync = reader.getSync();
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
}
out = fs.append(p, bufferSize);
this.appendMode = true;
} else {
out = fs.create(p, true, bufferSize, replication, blockSize);
out = streamOption.getValue();
}
} else {
out = streamOption.getValue();

init(connectorConfig, out, ownStream);
} catch (RemoteException re) {
log.error("Failed creating a WAL Writer: " + re.getMessage());
if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
}
}
throw re;
}

init(connectorConfig, out, ownStream);
}

public static Option file(Path value) {
Expand Down Expand Up @@ -394,6 +409,7 @@ public Reader(Configuration conf, Option... opts) throws IOException {
InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts);
LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts);

// check for consistency
if ((fileOpt == null) == (streamOpt == null)) {
throw new
Expand All @@ -402,27 +418,40 @@ public Reader(Configuration conf, Option... opts) throws IOException {
if (fileOpt == null && bufOpt != null) {
throw new IllegalArgumentException("buffer size can only be set when a file is specified.");
}

// figure out the real values
Path filename = null;
FSDataInputStream file;
final long len;
if (fileOpt != null) {
filename = fileOpt.getValue();
FileSystem fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
FileSystem fs = null;

try {
if (fileOpt != null) {
filename = fileOpt.getValue();
fs = filename.getFileSystem(conf);
int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue();
len = null == lenOpt
? fs.getFileStatus(filename).getLen()
: lenOpt.getValue();
file = openFile(fs, filename, bufSize, len);
} else {
len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
file = streamOpt.getValue();
}
StartOption startOpt = Options.getOption(StartOption.class, opts);
long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts);
initialize(filename, file, start, len, conf, headerOnly != null);
} catch (RemoteException re) {
log.error("Failed creating a WAL Reader: " + re.getMessage());
if (re.getClassName().equals(WALConstants.LEASE_EXCEPTION_CLASS_NAME)) {
if (fs != null) {
fs.close();
}
}
throw re;
}
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/io/confluent/connect/hdfs/wal/WALTest.java
Expand Up @@ -36,7 +36,7 @@ public class WALTest extends TestWithMiniDFSCluster {
private static final String extension = ".avro";

@Test
public void testWALMultiClient() throws Exception {
public void testMultiWALFromOneDFSClient() throws Exception {
setUp();
fs.delete(new Path(FileUtils.directoryName(url, topicsDir, TOPIC_PARTITION)), true);

Expand All @@ -50,7 +50,7 @@ public void testWALMultiClient() throws Exception {
url
);
final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION);
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION);
final FSWAL wal2 = (FSWAL) storage.wal(topicsDir, TOPIC_PARTITION);

String directory = TOPIC + "/" + String.valueOf(PARTITION);
final String tempfile = FileUtils.tempFileName(url, topicsDir, directory, extension);
Expand All @@ -75,8 +75,8 @@ public void testWALMultiClient() throws Exception {
@Override
public void run() {
try {
// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's initial retry interval, which is 1000 ms.
Thread.sleep(WALConstants.INITIAL_SLEEP_INTERVAL_MS - 100);
closed = true;
wal1.close();
} catch (ConnectException | InterruptedException e) {
Expand All @@ -86,6 +86,7 @@ public void run() {
});
thread.start();

// acquireLease() will try to acquire the lease that wal1 is holding and fail. It will retry after 1000 ms.
wal2.acquireLease();
assertTrue(closed);
wal2.apply();
Expand Down

0 comments on commit cd0656f

Please sign in to comment.