Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compressor/zstd: add zstd compression plugin #13075

Merged
merged 2 commits into from Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Expand Up @@ -47,3 +47,6 @@
[submodule "src/dpdk"]
path = src/dpdk
url = https://github.com/ceph/dpdk
[submodule "src/zstd"]
path = src/zstd
url = https://github.com/facebook/zstd
6 changes: 4 additions & 2 deletions src/compressor/CMakeLists.txt
Expand Up @@ -10,14 +10,16 @@ set(compressor_plugin_dir ${CMAKE_INSTALL_PKGLIBDIR}/compressor)

add_subdirectory(snappy)
add_subdirectory(zlib)
add_subdirectory(zstd)

add_custom_target(compressor_plugins DEPENDS
ceph_snappy
ceph_zlib)
ceph_zlib
ceph_zstd)

if(WITH_EMBEDDED)
include(MergeStaticLibraries)
add_library(cephd_compressor_base STATIC ${compressor_srcs})
set_target_properties(cephd_compressor_base PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
merge_static_libraries(cephd_compressor cephd_compressor_base cephd_compressor_snappy cephd_compressor_zlib)
merge_static_libraries(cephd_compressor cephd_compressor_base cephd_compressor_snappy cephd_compressor_zlib cephd_compressor_zstd)
endif()
11 changes: 7 additions & 4 deletions src/compressor/Compressor.cc
Expand Up @@ -19,10 +19,11 @@

const char * Compressor::get_comp_alg_name(int a) {
switch (a) {
case COMP_ALG_NONE: return "none";
case COMP_ALG_SNAPPY: return "snappy";
case COMP_ALG_ZLIB: return "zlib";
default: return "???";
case COMP_ALG_NONE: return "none";
case COMP_ALG_SNAPPY: return "snappy";
case COMP_ALG_ZLIB: return "zlib";
case COMP_ALG_ZSTD: return "zstd";
default: return "???";
}
}

Expand All @@ -31,6 +32,8 @@ boost::optional<Compressor::CompressionAlgorithm> Compressor::get_comp_alg_type(
return COMP_ALG_SNAPPY;
if (s == "zlib")
return COMP_ALG_ZLIB;
if (s == "zstd")
return COMP_ALG_ZSTD;
if (s == "")
return COMP_ALG_NONE;

Expand Down
1 change: 1 addition & 0 deletions src/compressor/Compressor.h
Expand Up @@ -29,6 +29,7 @@ class Compressor {
COMP_ALG_NONE = 0,
COMP_ALG_SNAPPY = 1,
COMP_ALG_ZLIB = 2,
COMP_ALG_ZSTD = 3,
COMP_ALG_LAST //the last value for range checks
};
// compression options
Expand Down
42 changes: 42 additions & 0 deletions src/compressor/zstd/CMakeLists.txt
@@ -0,0 +1,42 @@
# zstd

# libzstd - build it statically
set(ZSTD_C_FLAGS -fPIC -Wno-unused-variable -O3)

include(ExternalProject)
ExternalProject_Add(zstd_ext
SOURCE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/build/cmake
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_C_FLAGS=${ZSTD_C_FLAGS}
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzstd
BUILD_COMMAND $(MAKE) libzstd_static
INSTALL_COMMAND "true")

# force zstd make to be called on each time
ExternalProject_Add_Step(zstd_ext forcebuild
DEPENDEES configure
DEPENDERS build
COMMAND "true"
ALWAYS 1)

add_library(zstd STATIC IMPORTED)
set_property(TARGET zstd PROPERTY
IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/libzstd/lib/libzstd.a")
add_dependencies(zstd zstd_ext)
set(ZSTD_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/lib)

#
set(zstd_sources
CompressionPluginZstd.cc
)

add_library(ceph_zstd SHARED ${zstd_sources})
add_dependencies(ceph_zstd ${CMAKE_SOURCE_DIR}/src/ceph_ver.h)
target_link_libraries(ceph_zstd zstd)
set_target_properties(ceph_zstd PROPERTIES VERSION 2.0.0 SOVERSION 2)
install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir})

if(WITH_EMBEDDED)
add_library(cephd_compressor_zstd STATIC ${zstd_sources})
set_target_properties(cephd_compressor_zstd PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
endif()
58 changes: 58 additions & 0 deletions src/compressor/zstd/CompressionPluginZstd.cc
@@ -0,0 +1,58 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Mirantis, Inc.
*
* Author: Alyona Kiseleva <akiselyova@mirantis.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
*/

#include <ostream>

// -----------------------------------------------------------------------------
#include "ceph_ver.h"
#include "compressor/CompressionPlugin.h"
#include "ZstdCompressor.h"
// -----------------------------------------------------------------------------

class CompressionPluginZstd : public CompressionPlugin {

public:

explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct)
{}

virtual int factory(CompressorRef *cs,
std::ostream *ss)
{
if (compressor == 0) {
ZstdCompressor *interface = new ZstdCompressor();
compressor = CompressorRef(interface);
}
*cs = compressor;
return 0;
}
};

// -----------------------------------------------------------------------------

const char *__ceph_plugin_version()
{
return CEPH_GIT_NICE_VER;
}

// -----------------------------------------------------------------------------

int __ceph_plugin_init(CephContext *cct,
const std::string& type,
const std::string& name)
{
PluginRegistry *instance = cct->get_plugin_registry();

return instance->add(type, name, new CompressionPluginZstd(cct));
}
99 changes: 99 additions & 0 deletions src/compressor/zstd/ZstdCompressor.h
@@ -0,0 +1,99 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_ZSTDCOMPRESSOR_H
#define CEPH_ZSTDCOMPRESSOR_H

#include "zstd/lib/zstd.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include "compressor/Compressor.h"

#define COMPRESSION_LEVEL 5

class ZstdCompressor : public Compressor {
public:
ZstdCompressor() : Compressor(COMP_ALG_ZSTD, "zstd") {}

int compress(const bufferlist &src, bufferlist &dst) override {
bufferptr outptr = buffer::create_page_aligned(
ZSTD_compressBound(src.length()));
ZSTD_outBuffer_s outbuf;
outbuf.dst = outptr.c_str();
outbuf.size = outptr.length();
outbuf.pos = 0;

ZSTD_CStream *s = ZSTD_createCStream();
ZSTD_initCStream(s, COMPRESSION_LEVEL);
auto p = src.begin();
size_t left = src.length();
while (left) {
assert(!p.end());
struct ZSTD_inBuffer_s inbuf;
inbuf.pos = 0;
inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src);
ZSTD_compressStream(s, &outbuf, &inbuf);
left -= inbuf.size;
}
assert(p.end());
ZSTD_flushStream(s, &outbuf);
ZSTD_endStream(s, &outbuf);
ZSTD_freeCStream(s);

// prefix with decompressed length
::encode((uint32_t)src.length(), dst);
dst.append(outptr, 0, outbuf.pos);
return 0;
}

int decompress(const bufferlist &src, bufferlist &dst) override {
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}

int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
if (compressed_len < 4) {
return -1;
}
compressed_len -= 4;
uint32_t dst_len;
::decode(dst_len, p);

bufferptr dstptr(dst_len);
ZSTD_outBuffer_s outbuf;
outbuf.dst = dstptr.c_str();
outbuf.size = dstptr.length();
outbuf.pos = 0;
ZSTD_DStream *s = ZSTD_createDStream();
ZSTD_initDStream(s);
while (compressed_len > 0) {
if (p.end()) {
return -1;
}
ZSTD_inBuffer_s inbuf;
inbuf.pos = 0;
inbuf.size = p.get_ptr_and_advance(compressed_len, (const char**)&inbuf.src);
ZSTD_decompressStream(s, &outbuf, &inbuf);
compressed_len -= inbuf.size;
}
ZSTD_freeDStream(s);

dst.append(dstptr, 0, outbuf.pos);
return 0;
}
};

#endif
3 changes: 2 additions & 1 deletion src/test/compressor/test_compression.cc
Expand Up @@ -324,7 +324,8 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(
"zlib/isal",
"zlib/noisal",
"snappy"));
"snappy",
"zstd"));

TEST(ZlibCompressor, zlib_isal_compatibility)
{
Expand Down
1 change: 1 addition & 0 deletions src/zstd
Submodule zstd added at dc9931