Skip to content

Commit

Permalink
Merge pull request #8780 from reyoung/feature/recordio
Browse files Browse the repository at this point in the history
Feature/recordio
  • Loading branch information
reyoung committed Mar 6, 2018
2 parents f608bb2 + 9dc6958 commit 82b149c
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ include(external/eigen) # download eigen3
include(external/pybind11) # download pybind11
include(external/cares)
include(external/grpc)
include(external/snappy) # download snappy
include(external/snappystream)

include(cudnn) # set cudnn libraries, must before configure
include(cupti)
Expand Down
57 changes: 57 additions & 0 deletions cmake/external/snappy.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

IF(MOBILE_INFERENCE)
return()
ENDIF()

include (ExternalProject)

# NOTE: snappy is needed when linking with recordio

SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy)
SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy)
SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE)

ExternalProject_Add(
extern_snappy
GIT_REPOSITORY "https://github.com/google/snappy"
GIT_TAG "1.1.7"
PREFIX ${SNAPPY_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
)

add_library(snappy STATIC IMPORTED GLOBAL)
set_property(TARGET snappy PROPERTY IMPORTED_LOCATION
"${SNAPPY_INSTALL_DIR}/lib/libsnappy.a")

include_directories(${SNAPPY_INCLUDE_DIR})
add_dependencies(snappy extern_snappy)
58 changes: 58 additions & 0 deletions cmake/external/snappystream.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

IF(MOBILE_INFERENCE)
return()
ENDIF()

include (ExternalProject)

# NOTE: snappy is needed when linking with recordio

SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE)

ExternalProject_Add(
extern_snappystream
GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git"
GIT_TAG "0.2.8"
PREFIX ${SNAPPYSTREAM_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS
-DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
DEPENDS snappy
)

add_library(snappystream STATIC IMPORTED GLOBAL)
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION
"${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")

include_directories(${SNAPPYSTREAM_INCLUDE_DIR})
add_dependencies(snappystream extern_snappystream)
1 change: 1 addition & 0 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ add_subdirectory(operators)
add_subdirectory(pybind)
add_subdirectory(inference)
add_subdirectory(string)
add_subdirectory(recordio)
6 changes: 6 additions & 0 deletions paddle/fluid/recordio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# internal library.
cc_library(header SRCS header.cc)
cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib)
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
cc_library(recordio DEPS chunk header)
134 changes: 134 additions & 0 deletions paddle/fluid/recordio/chunk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

#include <memory>
#include <sstream>
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"

namespace paddle {
namespace recordio {
constexpr size_t kMaxBufSize = 1024;

template <typename Callback>
static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) {
char buf[kMaxBufSize];
std::streamsize actual_size;
size_t counter = 0;
do {
auto actual_max =
limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
actual_size = in.readsome(buf, actual_max);
if (actual_size == 0) {
break;
}
callback(buf, actual_size);
if (limit > 0) {
counter += actual_size;
}
} while (actual_size == kMaxBufSize);
}

static void PipeStream(std::istream& in, std::ostream& os) {
ReadStreamByBuf(
in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); });
}
static uint32_t Crc32Stream(std::istream& in, int limit = -1) {
auto crc = crc32(0, nullptr, 0);
ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
crc = crc32(crc, reinterpret_cast<const Bytef*>(buf), len);
});
return crc;
}

bool Chunk::Write(std::ostream& os, Compressor ct) const {
// NOTE(dzhwinter): don't check records.numBytes instead, because
// empty records are allowed.
if (records_.empty()) {
return false;
}
std::stringstream sout;
std::unique_ptr<std::ostream> compressed_stream;
switch (ct) {
case Compressor::kNoCompress:
break;
case Compressor::kSnappy:
compressed_stream.reset(new snappy::oSnappyStream(sout));
break;
default:
PADDLE_THROW("Not implemented");
}

std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;

for (auto& record : records_) {
size_t sz = record.size();
buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
.write(record.data(), record.size());
}

if (compressed_stream) {
compressed_stream.reset();
}

auto end_pos = sout.tellg();
sout.seekg(0, std::ios::beg);
uint32_t len = static_cast<uint32_t>(end_pos - sout.tellg());
uint32_t crc = Crc32Stream(sout);
sout.seekg(0, std::ios::beg);

Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
hdr.Write(os);
PipeStream(sout, os);
return true;
}

void Chunk::Parse(std::istream& sin) {
Header hdr;
hdr.Parse(sin);
auto beg_pos = sin.tellg();
auto crc = Crc32Stream(sin, hdr.CompressSize());
PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);

Clear();

sin.seekg(beg_pos, std::ios::beg);
std::unique_ptr<std::istream> compressed_stream;
switch (hdr.CompressType()) {
case Compressor::kNoCompress:
break;
case Compressor::kSnappy:
compressed_stream.reset(new snappy::iSnappyStream(sin));
break;
default:
PADDLE_THROW("Not implemented");
}

std::istream& stream = compressed_stream ? *compressed_stream : sin;

for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
uint32_t rec_len;
stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
std::string buf;
buf.resize(rec_len);
stream.read(&buf[0], rec_len);
Add(buf);
}
}

} // namespace recordio
} // namespace paddle
56 changes: 56 additions & 0 deletions paddle/fluid/recordio/chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <string>
#include <vector>

#include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/recordio/header.h"

namespace paddle {
namespace recordio {

// A Chunk contains the Header and optionally compressed records.
class Chunk {
public:
Chunk() : num_bytes_(0) {}
void Add(std::string buf) {
records_.push_back(buf);
num_bytes_ += buf.size();
}
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
bool Write(std::ostream& fo, Compressor ct) const;
void Clear() {
records_.clear();
num_bytes_ = 0;
}
void Parse(std::istream& sin);
size_t NumBytes() { return num_bytes_; }
const std::string& Record(int i) const { return records_[i]; }

private:
std::vector<std::string> records_;
// sum of record lengths in bytes.
size_t num_bytes_;
DISABLE_COPY_AND_ASSIGN(Chunk);
};

size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out);

void DeflateData(const char* in, size_t in_length, Compressor ct, char* out);

} // namespace recordio
} // namespace paddle
49 changes: 49 additions & 0 deletions paddle/fluid/recordio/chunk_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

#include <sstream>

#include "gtest/gtest.h"

using namespace paddle::recordio;

TEST(Chunk, SaveLoad) {
Chunk ch;
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kNoCompress);
ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 10U);
}

TEST(Chunk, Compressor) {
Chunk ch;
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kSnappy);
std::stringstream ss2;
ch.Write(ss2, Compressor::kNoCompress);
ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data;

ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 18);
}
Loading

0 comments on commit 82b149c

Please sign in to comment.