Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recordio cloud and local interface #2665

Merged
merged 19 commits into from
Jul 7, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/master/c/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake")

project(cxx_go C Go)

include(golang)
#include(golang)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个文件在最新的develop branch已经大改了,需要rebase或者pull一下。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

include(flags)

set(MASTER_LIB_NAME "paddle_master")
Expand Down
21 changes: 20 additions & 1 deletion go/master/c/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ typedef int paddle_master_client;
import "C"

import (
"io"
"sync"
"unsafe"

Expand Down Expand Up @@ -84,11 +85,29 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
return C.PADDLE_MASTER_OK
}

// return value:
// 0:ok
// -1:EOF
// -2:error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not -1 error? -2 is a little strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

//export paddle_next_record
func paddle_next_record(client C.paddle_master_client, record **C.uchar) C.int {
c := get(client)
r := c.NextRecord()
r, err := c.NextRecord()
if err == io.EOF {
// EOF
*record = (*C.uchar)(nullPtr)
return -1
}

if err != nil {
// Error
// TODO: return the type of error?
*record = (*C.uchar)(nullPtr)
return -2
}

if len(r) == 0 {
// Empty record
*record = (*C.uchar)(nullPtr)
return 0
}
Expand Down
20 changes: 15 additions & 5 deletions go/master/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package master

import (
"io"
"os"
"time"

Expand All @@ -17,7 +18,12 @@ type Addresser interface {
// Client is the client of the master server.
type Client struct {
conn *connection.Conn
ch chan []byte
ch chan record
}

type record struct {
r []byte
err error
}

// NewClient creates a new Client.
Expand All @@ -27,7 +33,7 @@ type Client struct {
func NewClient(addr Addresser, bufSize int) *Client {
c := &Client{}
c.conn = connection.New()
c.ch = make(chan []byte, bufSize)
c.ch = make(chan record, bufSize)
go c.monitorMaster(addr)
go c.getRecords()
return c
Expand All @@ -52,17 +58,20 @@ func (c *Client) getRecords() {

s := recordio.NewRangeScanner(f, &chunk.Index, -1, -1)
for s.Scan() {
c.ch <- s.Record()
c.ch <- record{s.Record(), nil}
}

if s.Err() != nil {
c.ch <- record{nil, s.Err()}
log.Errorln(err, chunk.Path)
}

err = f.Close()
if err != nil {
log.Errorln(err)
}

c.ch <- record{nil, io.EOF}
}

// We treat a task as finished whenever the last data
Expand Down Expand Up @@ -132,6 +141,7 @@ func (c *Client) taskFinished(taskID int) error {
//
// NextRecord will block until the next record is available. It is
// thread-safe.
func (c *Client) NextRecord() []byte {
return <-c.ch
func (c *Client) NextRecord() ([]byte, error) {
r := <-c.ch
return r.r, r.err
}
18 changes: 14 additions & 4 deletions go/master/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package master_test

import (
"fmt"
"io"
"net"
"net/http"
"net/rpc"
Expand Down Expand Up @@ -69,13 +70,22 @@ func TestNextRecord(t *testing.T) {

for pass := 0; pass < 50; pass++ {
received := make(map[byte]bool)
for i := 0; i < total; i++ {
r := c.NextRecord()
for i := 0; i <= total; i++ {
r, err := c.NextRecord()
if err == io.EOF {
break
}

if err != nil {
t.Fatal(pass, i, "Read error:", err)
}

if len(r) != 1 {
t.Fatal("Length should be 1.", r)
t.Fatal(pass, i, "Length should be 1.", r)
}

if received[r[0]] {
t.Fatal("Received duplicate.", received, r)
t.Fatal(pass, i, "Received duplicate.", received, r)
}
received[r[0]] = true
}
Expand Down
17 changes: 15 additions & 2 deletions python/paddle/v2/master/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,27 @@ def set_dataset(self, paths):
holder[idx] = c_ptr
lib.paddle_set_dataset(self.c, holder, len(paths))

# return format: (record, errno)
# errno = 0: ok
# = -1: EOF
Copy link
Contributor

@helinwang helinwang Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to return EOF. The function name is next_record , from the user's perspective, it's an infinite stream of data. EOF does not make sense here.

Actually, do we really need to return error here? Current implementation is next_record will log error and try to recover from the error.
If we return error here, what can user do with it? (the retry logic is already implemented inside next_record).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如线下讨论:错误是需要暴露的,cloud端EOF不需要暴露。
我在这里建立了一个ISSUE:#2678
这个PR的py接口先不暴露错误,以便保持本地和cloud接口的统一。

# < -1: error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< 0: error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

def next_record(self):
p = ctypes.c_char_p()
ret = ctypes.pointer(p)
size = lib.paddle_next_record(self.c, ret)
if size == -1:
# EOF
return None, -1

if size < -1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps size < 0 is better. Errors are typically < 0, not < -1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# Error
return None, size

if size == 0:
# Empty record
return ""
return "", 0

record = ret.contents.value[:size]
# Memory created from C should be freed.
lib.mem_free(ret.contents)
return record
return record, 0
43 changes: 36 additions & 7 deletions python/paddle/v2/reader/creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,51 @@ def reader():
return reader


def recordio(path):
def recordio_local(paths):
"""
Creates a data reader that outputs record one one by one from given recordio file
:path: path of recordio file
:returns: data reader of recordio file
Creates a data reader that outputs record one one by one
from given local recordio fils path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change Creates a data reader that outputs record one one by one from given local recordio fils path. to Creates a data reader from given RecordIO file paths separated by ",", glob pattern is supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Done.

:path: path of recordio files.
:returns: data reader of recordio files.
"""

import recordio as rec

def reader():
f = rec.reader(path)
for i, path in enumerate(paths):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need enum here, rec.reader support "," separated paths: https://github.com/PaddlePaddle/recordio/blob/master/c/crecordio.go#L79

Copy link
Contributor Author

@gongweibao gongweibao Jul 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方我觉得有些疑惑,我们让用户输入的是一个以','分割的字符串?还是一个字符串的array?还是两者的混合?
https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/master/client.py#L24

Copy link
Contributor

@helinwang helinwang Jul 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不好意思,不是很一致。这里跟set_dataset不同哈~set_dataset接受的是一个list,这里是","分割的字符串。具体接受什么参数要看一下Go那边是怎么实现的。

f = rec.reader(path)
while True:
r = f.read()
if r is None:
break
yield r
f.close()

return reader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the recordio function takes the buf_size argument, we need to honor that argument in local reader as well. So perhaps return buffered(reader, buf_size), please see: https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/reader/decorator.py#L162

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.


def recordio(paths, addr="", buf_size=100):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this function name into __all__ variable inside this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经放了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think addr need to be an environment variable, since user would never know the address of the etcd cluster. (so no way to provide this argument).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!Done.

"""
Creates a data reader that outputs record one one by one
from given local or cloud recordio path.
:path: path of recordio files.
:returns: data reader of recordio files.
"""
import os
import paddle.v2.master.client as cloud

if "KUBERNETES_SERVICE_HOST" not in os.environ.keys():
return recordio_local(paths)

def reader():
c = cloud(addr, buf_size)
c.set_dataset(paths)

while True:
r = f.read()
r, err = client.next_record()
if r is None:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to break only if error happens. None could be an empty record (which we should not break).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.Done.

yield r
f.close()

c.close()

return reader
2 changes: 1 addition & 1 deletion python/paddle/v2/reader/tests/creator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestRecordIO(unittest.TestCase):
def test_recordio(self):
path = os.path.join(
os.path.dirname(__file__), "test_recordio_creator.dat")
reader = paddle.v2.reader.creator.recordio(path)
reader = paddle.v2.reader.creator.recordio([path])
for idx, r in enumerate(reader()):
self.assertSequenceEqual(r, str(idx))

Expand Down