Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not trying to hold leases on WAL files if we are holding them already. #142

Merged
merged 8 commits into from Aug 17, 2017

Conversation

skyahead
Copy link
Contributor

@skyahead skyahead commented Oct 21, 2016

This PR is for: #141

@ConfluentJenkins
Copy link
Collaborator

Can one of the admins verify this patch?

@lakeofsand
Copy link

@skyahead We meet the same question recently.
But,with log's traceback,we found our question is trigger by a close failed:

ERROR Error closing hdfs://192.168.101.55:8020/logs/*_/1/log.
org.apache.kafka.connect.errors.ConnectException: Error closing hdfs://192.168.101.55:8020/logs/_
/1/log
at io.confluent.connect.hdfs.wal.FSWAL.close(FSWAL.java:156)
at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:325)
at io.confluent.connect.hdfs.DataWriter.onPartitionsRevoked(DataWriter.java:318)
at io.confluent.connect.hdfs.HdfsSinkTask.onPartitionsRevoked(HdfsSinkTask.java:108)
...
Caused by: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2132)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2230)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at io.confluent.connect.hdfs.wal.WALFile$Writer.close(WALFile.java:329)
at io.confluent.connect.hdfs.wal.FSWAL.close(FSWAL.java:150)

@skyahead
Copy link
Contributor Author

@lakeofsand I do not see exactly the same exceptions, but I do see lots of similar ones :-)

But I think after all those 'Error closing' exceptions, there is a 'topicPartitionWriters.remove(tp)' call in DataWriter.java's close() method, which will remove this writer and a new one will be recreated for the current partition. I.e., the connect code should survive these exceptions. Do you see your coding keeps generating these errors for ever?

But, It seems you are using a version that is older than this commit: b2b1c61#diff-d4f63c72e615f6185c4d472918ba1e95, and the close() method is called
onPartitionsRevoked() in your version.

@lakeofsand
Copy link

Our issue seems like because of some bug in hdfs client(): @skyahead

org.apache.hadoop.hdfs.DFSOutputStream.java:
@OverRide
public void close() throws IOException {
synchronized (this) {
TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#close",
src);
try {
closeImpl(); ==> the previous "FSWAL.close" exception is throw here,dfsClient.endFileLease is missed.
} finally {
scope.close();
}
}
dfsClient.endFileLease(fileId);
}

@skyahead
Copy link
Contributor Author

@lakeofsand When your errors happen, do you see anything wrong in your HDFS namenode log file?

@lakeofsand
Copy link

@skyahead
The "close exception" lead to a lease-renewer continue exist in connect process,HDFS namenode would not konw about it. File lease is continuely renewed by connect periodically.

Neither the process itself nor other process‘s opertions to this file will be failed for the lease is still owned by the previous process.

"
Failed to APPEND_FILE /logs/beaver_http_response/1/log for DFSClient_NONMAPREDUCE_450119871_31 on 192.168.101.101 because this file lease is currently owned by DFSClient_NONMAPREDUCE_1501121577_33 on 192.168.101.102
"

@cotedm
Copy link
Contributor

cotedm commented Jan 6, 2017

@skyahead it's definitely dangerous to parse an exception message (these can change without warning if we say upgrade the HDFS client dependency). However, the symptom you describe in issue #141 looks like the writer isn't set to null which happens here
https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L154

It seems possible that the writer didn't close properly in your scenario because you cut the network. In which case we thrown an exception and never null out the writer, so we don't ever create a new writer (thus obtaining a new lease). Maybe a better approach is to null out the writer and reader in a finally block in the close() method presented above.

Copy link
Contributor

@cotedm cotedm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skyahead left you a few comments and suggestions. I think there's a couple of accidental changes in here and I think we can be a bit cleaner by modifying close() instead of adding a hard reset option

@@ -96,6 +97,11 @@ public void acquireLease() throws ConnectException {
}
}

private void reset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a new method, could we just do this in the close() method in a finally block so that we attempt to close the reader/writer gracefully first before nulling them out? Then replace calls to reset() with close() in this file instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Will do.

out = streamOption.getValue();

init(conf, out, ownStream);
} catch (RemoteException re) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this part a bit more? I'm not sure why we need to look for this exception here but I might just be missing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78.

When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name.

@@ -73,6 +73,7 @@ public void run() {
});
thread.start();

Thread.sleep(3001);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems unrelated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to fix the test but it seems I was wrong. Will commit something new for this.

long start = startOpt == null ? 0 : startOpt.getValue();
// really set up
initialize(filename, file, start, len, conf, headerOnly != null);
} catch (RemoteException re) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other RemoteException, can you explain why we need to catch this?

Copy link
Contributor Author

@skyahead skyahead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WALTest fixed, please review

@@ -96,6 +97,11 @@ public void acquireLease() throws ConnectException {
}
}

private void reset() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Will do.

out = streamOption.getValue();

init(conf, out, ownStream);
} catch (RemoteException re) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the how the leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" can be caught. See here: https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/wal/FSWAL.java#L78.

When this exception is seen, it means we are creating a new lease from a same DFSClient. Previously, we give up and keep the original lease which may lead to the forever waiting issue. The change close open files and clear the FileSystem cache. A new lease will be regenerated by future writing/reading.

@@ -73,6 +73,7 @@ public void run() {
});
thread.start();

Thread.sleep(3001);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to fix the test but it seems I was wrong. Will commit something new for this.

@@ -60,12 +61,16 @@ public void append(String tempFile, String committedFile) throws ConnectExceptio
writer.append(key, value);
writer.hsync();
} catch (IOException e) {
close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so just for the sake of debuggability, if we're doing work in the exception handler that could itself throw exceptions, I think we might want to at least log that something went wrong. Because with the current code, if close() throws an exception, then all the info about the original exception is lost.

This is true for a couple of the other similar changes below. The only alternative I could think of is allowing close() to take an extra parameter that's the original cause of calling close (or null if there isn't one) and attaching that as the cause if it fails, but then we also lose info about the other exception. I think logging this one proactively is probably fine since these should all be exceptional conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look now.

out = streamOption.getValue();

init(conf, out, ownStream);
} catch (RemoteException re) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can't remember the details but I think the issue is which jar the class is defined in or something like that. So we had to go down the hacky route of checking the class name.


init(conf, out, ownStream);
} catch (RemoteException re) {
log.error("Failed creating a WAL Writer: " + re.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do

log.error("Failed creating a WAL Writer: ", re);

instead so the log will include stack trace info?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, is the log even going to be useful since it gets logged again by FSWAL which calls this constructor? I think this might be the case in the other case below as well, though I haven't extensively checked all callers. I guess worst case we're just logging a bit more information, so perhaps being conservative about logging the details isn't a bad idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was hoping to log a bit more information so that it is easier to read the debug log which is VERY long.

So re.getMessage() is the short version that I like :-) If we log the stack trace, the same info will be logged again all the way up at https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L324.

log.error("Failed creating a WAL Writer: " + re.getMessage());
if (re.getClassName().equals(leaseException)) {
if (fs != null) {
fs.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need protection from a possible IOException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -27,4 +27,5 @@
void truncate() throws ConnectException;
void close() throws ConnectException;
String getLogFile();
long getSleepIntervalMs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, a bit of a nit, but since this is only used for testing I'm not sure about making it public for testing. In fact, we separately have some work going on to refactor some of this code to make it more reusable and this seems like an odd addition to the interface given that it's really just public for testing. /cc @kkonstantine

I'm wondering if just casting to the more specific FSWAL class in the test would be a better solution (and label the FSWAL public API as public for testing)? Keeps the interface clean but allows the test to access the info that it needs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea

@cotedm
Copy link
Contributor

cotedm commented Feb 7, 2017

I don't have any other comments, what do you think @ewencp ?

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR. I got a pointer to review an addition to the WAL interface that's not present any more, so I thought I give it an look.

Looks good in general. I've added some nitpicks that would help clean-up the code a little bit.

throw new ConnectException(e);
}
}

public long getSleepIntervalMs() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could have "package private" scope instead and a non-javadoc comment above to show it's destined for testing.

@@ -143,6 +151,8 @@ public void truncate() throws ConnectException {
close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant call, since close has been added in the finally block.

@@ -159,6 +169,9 @@ public void close() throws ConnectException {
}
} catch (IOException e) {
throw new ConnectException("Error closing " + logFile, e);
} finally {
writer = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't point to the actual lines, but nullifying reader and writer above, is redundant now that it's done within the finally block.

@@ -56,6 +57,8 @@
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash

private static final String leaseException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It'd be nice for variable naming to be consistent with our code style for static final member fields.
Thus, it should be something like: LEASE_EXCEPTION_CLASS_NAME
(I added more to the name to show it's a class name string and not to actual class. Your call).

I know we don't run checkstyle currently, but we will soon.

@@ -43,7 +43,7 @@ public void testWALMultiClient() throws Exception {
Storage storage = StorageFactory.createStorage(storageClass, conf, url);

final WAL wal1 = storage.wal(topicsDir, TOPIC_PARTITION);
final WAL wal2 = storage.wal(topicsDir, TOPIC_PARTITION);
final FSWAL wal2 = (FSWAL)storage.wal(topicsDir, TOPIC_PARTITION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single white space needed between casting and storage

// holding the lease for awhile
Thread.sleep(3000);
// holding the lease for time that is less than wal2's retry interval, which is 1000 ms.
Thread.sleep(wal2.getSleepIntervalMs()-100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

White space needed around -
(Again checkstyle coming soon :) )

@@ -73,6 +73,8 @@ public void run() {
});
thread.start();

// AcquireLease will try to acquire the same lease that wal1 is holding and fail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd use the actual method name (acquireLease) if I wanted to refer to it, instead of the concept ("Acquiring the lease")

@skyahead
Copy link
Contributor Author

skyahead commented Feb 9, 2017

@kkonstantine Thanks for the review! how about now?

@kkonstantine
Copy link
Member

Great, thanks. I see you also pushed the declaration of initial and max intervals as constants outside the method, I forgot to mention that.

Ok by me, I'd wait for final comments from @ewencp.

@skyahead
Copy link
Contributor Author

skyahead commented Feb 9, 2017

@kkonstantine thanks for replying so promptly :-)

@skyahead
Copy link
Contributor Author

@ewencp Can you have a final look at this PR please?

@Perdjesk
Copy link

Perdjesk commented May 18, 2017

@ewencp @kkonstantine
I hope everyone is doing well. Is there anything that can help to go forward with this PR?
#143 (comment)

@pronvis
Copy link

pronvis commented Aug 15, 2017

Good day for everyone!
I have the same issue in my cluster and need to execute hdfs debug recoverLease -path ${path} every day...
Looks like you fixed this issue for a long time ago, so why it is still not merged?

@ConfluentJenkins
Copy link
Collaborator

Can one of the admins verify this patch?

@ewencp
Copy link
Contributor

ewencp commented Aug 15, 2017

ok to test

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM. I merged w/ master to clean up the conflicts. @kkonstantine any further thoughts?

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems ok overall, but one change needs justification (see comment)


private WALFile.Writer writer = null;
private WALFile.Reader reader = null;
private String logFile = null;
private HdfsSinkConnectorConfig conf = null;
private HdfsStorage storage = null;
private long sleepIntervalMs = WALConstants.INITIAL_SLEEP_INTERVAL_MS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this one upgraded from local variable to member field variable? The behavior definitely changes on repeated calls on the same object. Notice that this variable is mutated iteratively as part of the while loop in acquiredLease. Even if it happens and there's only one call of acquireLease per FSWAL object I think it's a better practice to keep it local if that's what we intend to do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@@ -0,0 +1,23 @@
/**
* Copyright 2015 Confluent Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If you apply any changes based on the comment above, you may change this one too to be 2017 instead. Otherwise, never mind.

Copy link
Member

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks for the fix @skyahead!
LGTM

@kkonstantine kkonstantine changed the base branch from master to 3.4.x August 16, 2017 20:49
@kkonstantine kkonstantine changed the base branch from 3.4.x to master August 16, 2017 20:53
@kkonstantine kkonstantine merged commit cd0656f into confluentinc:master Aug 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants