Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datanode failover is not supported for writes #86

Open
kof02guy opened this issue Aug 19, 2017 · 6 comments
Open

Datanode failover is not supported for writes #86

kof02guy opened this issue Aug 19, 2017 · 6 comments

Comments

@kof02guy
Copy link

What is the correct way to retry when errors such as EOF or broken pipe is encountered when writing?
I saw the comment in the code that when a failure is encountered, the stream is not currently recoverable by itself? So should I Close this stream and invoke client.Append() to get a new one?

@kof02guy
Copy link
Author

I saw many comments saying that the file write stream is not recoverable automatically. Will the fixing be scheduled in the near future?

@colinmarc
Copy link
Owner

colinmarc commented Aug 23, 2017

Currently, failover during writes is sketched out, but not implemented. See this comment:

hdfs/rpc/block_writer.go

Lines 53 to 58 in cdda132

// Unlike BlockReader, BlockWriter currently has no ability to recover from
// write failures (timeouts, datanode failure, etc). Once it returns an error
// from Write or Close, it may be in an invalid state.
//
// This will hopefully be fixed in a future release.
func (bw *BlockWriter) Write(b []byte) (int, error) {

There's even a (skipped) test for it, here:

func TestWriteFailsOver(t *testing.T) {
t.Skip("Write failover isn't implemented")
name := "/_test/create/6.txt"
baleet(t, name)
mobydick, err := os.Open("../test/mobydick.txt")
require.NoError(t, err)
bw := createBlock(t, name)
bw.connectNext()
bw.stream.ackError = ackError{0, 0, hdfs.Status_ERROR}
_, err = io.CopyN(bw, mobydick, 1048576)
require.NoError(t, err)
finishBlock(t, name, bw)
br, _ := getBlockReader(t, name)
hash := crc32.NewIEEE()
n, err := io.Copy(hash, br)
require.NoError(t, err)
assert.EqualValues(t, 1048576, n)
assert.EqualValues(t, 0xb35a6a0e, hash.Sum32())
}

Unfortunately, the process is extremely client-heavy, so it's very complex to implement. I'll leave this issue open to track it.

To answer your question, you should indeed close the stream and start a new write.

@kof02guy
Copy link
Author

I think this package is the 'native' hdfs client package for golang and it will be very nice if you can add the failover recovery part.
For the retrying, do you mean to close the same file and use Append() to get a new stream or create another file?
By the way, if the write fails, how can I ensure that the Close() will succeed? If I fails to Close() I'll lost the data not written to hdfs

@kof02guy
Copy link
Author

Hi, @colinmarc . I took your advice and use reopening to retry, but got the following error constantly:

2017/08/25 03:26:00.320699 file_writer.go:108:  [INFO] Close error:updateBlockForPipeline call failed with ERROR_APPLICATION (java.io.IOException) file:hdfs://rndhadoop001.rnd.fwmrm.net:8020/user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log
2017/08/25 03:26:00.321857 file_writer.go:114:  [WARNING] Reopen error:append /user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log: append call failed with ERROR_APPLICATION (org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException) file:hdfs://rndhadoop001.rnd.fwmrm.net:8020/user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log

the retry logic:

        for {
            _, err = w.hdfsWriter.Write(b)
            if nil == err {
                break
            }
            retry := 0
            lastSleepTime := 0
            closeOpenRetry := 0
            for nil != err && retry < MAX_WRITE_RETRY {
                LogWarning(WARN, INTERNAL_LOGIC, "Error:%s retry:%s", err.Error(), w.name)
                lastSleepTime = GetRetrySleepTime(BUSY_RETRY, lastSleepTime)
                time.Sleep(time.Duration(lastSleepTime) * time.Second)
                _, err = w.hdfsWriter.Write(b)
                retry++
            }
            for nil != err && closeOpenRetry < MAX_CLOSE_OPEN_RETRY {
                LogWarning(WARN, INTERNAL_LOGIC, "Error:%s closeopenretry:%s", err.Error(), w.name)
                lastSleepTime = GetRetrySleepTime(BUSY_RETRY, lastSleepTime)
                time.Sleep(time.Duration(lastSleepTime) * time.Second)
                err = w.hdfsWriter.Close()
                if nil != err {
                    LogInfo("Close error:%s file:%s", err.Error(), w.name)
                }
                tFn := strings.TrimPrefix(w.name, HDFS_FILE_PREFIX)
                protocol := tFn[:strings.Index(tFn, "/")]
                tmpWriter, err := GetHdfsClient(protocol).Append(w.name[len(HDFS_FILE_PREFIX)+len(protocol):])
                if nil != err {
                    LogWarning(WARN, INTERNAL_LOGIC, "Reopen error:%s file:%s", err.Error(), w.name)
                } else {
                    w.hdfsWriter = tmpWriter
                }
            }
        }

the GetHdfsClient() will get hdfs.Client and w.hdfsWriter is hdfs.FileWriter

Can you help me on this? Thans a lot

@colinmarc colinmarc changed the title Correct way to retry when failure occurs when writing Datanode failover is not supported Aug 6, 2018
@colinmarc colinmarc changed the title Datanode failover is not supported Datanode failover is not supported for writes Aug 6, 2018
@mohanraj1311
Copy link

mohanraj1311 commented Sep 8, 2019

@colinmarc : is this issue fixed and still kept open or its yet to be fixed ? Still getting this error though.

@KasonBraley
Copy link

@colinmarc since this feature is not currently implemented, and seems relatively easy to implement the basic logic in my own application, I was hoping you could expand a little more on @kof02guy's question regarding closing the open file to retry.

To answer your question, you should indeed close the stream and start a new write.

Based on the BlockWriter comment, how would one "close the stream" in the most correct way? I would imagine calling Close on that current FileWriter would fail due to the same error that caused Write to fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants