diff --git a/pulsar-client-cpp/docker-build.sh b/pulsar-client-cpp/docker-build.sh index f8a2321c3c4de..ab400a99df447 100755 --- a/pulsar-client-cpp/docker-build.sh +++ b/pulsar-client-cpp/docker-build.sh @@ -43,4 +43,4 @@ if [ "$1" != "skip-clean" ]; then find . -name CMakeFiles | xargs rm -rf fi -$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && cmake . $CMAKE_ARGS && make check-format && make -j8" +$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && ./docker-lib-check.sh && cmake . $CMAKE_ARGS && make check-format && make -j8" diff --git a/pulsar-client-cpp/docker-lib-check.sh b/pulsar-client-cpp/docker-lib-check.sh new file mode 100755 index 0000000000000..cefd0981618cf --- /dev/null +++ b/pulsar-client-cpp/docker-lib-check.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if ! dpkg -l libsnappy-dev; then + apt-get install -y libsnappy-dev +fi diff --git a/pulsar-client-cpp/docker-tests.sh b/pulsar-client-cpp/docker-tests.sh index 5336827b00b57..0b9f5ff992a9c 100755 --- a/pulsar-client-cpp/docker-tests.sh +++ b/pulsar-client-cpp/docker-tests.sh @@ -58,4 +58,4 @@ done # Start 2 Pulsar standalone instances (one with TLS and one without) # and execute the tests -$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && ./run-unit-tests.sh ${tests}" +$DOCKER_CMD bash -c "cd /pulsar/pulsar-client-cpp && ./docker-lib-check.sh && ./run-unit-tests.sh ${tests}" diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.cc b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc index f7a53efae4421..6241ea7770bb1 100644 --- a/pulsar-client-cpp/lib/CompressionCodecSnappy.cc +++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc @@ -20,39 +20,28 @@ #if HAS_SNAPPY #include -#include "snappy-c.h" +#include namespace pulsar { SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) { // Get the max size of the compressed data and allocate a buffer to hold it - int maxCompressedSize = snappy_max_compressed_length(raw.readableBytes()); - SharedBuffer compressed = SharedBuffer::allocate(maxCompressedSize); - - unsigned long bytesWritten = maxCompressedSize; - - snappy_status status = - snappy_compress(raw.data(), raw.readableBytes(), compressed.mutableData(), &bytesWritten); - - if (status != SNAPPY_OK) { - LOG_ERROR("Failed to compress to snappy. res=" << res); - abort(); - } - - compressed.bytesWritten(bytesWritten); - + size_t maxCompressedLength = snappy::MaxCompressedLength(raw.readableBytes()); + SharedBuffer compressed = SharedBuffer::allocate(static_cast(maxCompressedLength)); + snappy::ByteArraySource source(raw.data(), raw.readableBytes()); + snappy::UncheckedByteArraySink sink(compressed.mutableData()); + size_t compressedSize = snappy::Compress(&source, &sink); + compressed.setWriterIndex(static_cast(compressedSize)); return compressed; } bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize, SharedBuffer& decoded) { - SharedBuffer decompressed = SharedBuffer::allocate(uncompressedSize); - - snappy_status status = snappy_uncompress(encoded.data(), encoded.readableBytes(), - decompressed.mutableData(), uncompressedSize); - - if (status == SNAPPY_OK) { - decoded = decompressed; + SharedBuffer uncompressed = SharedBuffer::allocate(uncompressedSize); + snappy::ByteArraySource source(encoded.data(), encoded.readableBytes()); + snappy::UncheckedByteArraySink sink(uncompressed.mutableData()); + if (snappy::Uncompress(&source, &sink)) { + decoded = uncompressed; decoded.setWriterIndex(uncompressedSize); return true; } else { diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.h b/pulsar-client-cpp/lib/CompressionCodecSnappy.h index 47383761f095f..933b9efb4133b 100644 --- a/pulsar-client-cpp/lib/CompressionCodecSnappy.h +++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.h @@ -22,7 +22,7 @@ namespace pulsar { -class CompressionCodecSnappy : public CompressionCodec { +class PULSAR_PUBLIC CompressionCodecSnappy : public CompressionCodec { public: SharedBuffer encode(const SharedBuffer& raw); diff --git a/pulsar-client-cpp/tests/CompressionCodecSnappyTest.cc b/pulsar-client-cpp/tests/CompressionCodecSnappyTest.cc new file mode 100644 index 0000000000000..27d668f36e2a1 --- /dev/null +++ b/pulsar-client-cpp/tests/CompressionCodecSnappyTest.cc @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include "../lib/CompressionCodecSnappy.h" + +using namespace pulsar; + +TEST(CompressionCodecSnappyTest, testEncodeAndDecode) { + CompressionCodecSnappy compressionCodecSnappy; + char data[] = "snappy compression compresses snappy"; + size_t sz = sizeof(data); + SharedBuffer source = SharedBuffer::wrap(data, sz); + SharedBuffer compressed = compressionCodecSnappy.encode(source); + ASSERT_GT(compressed.readableBytes(), 0); + + SharedBuffer uncompressed; + bool res = compressionCodecSnappy.decode(compressed, static_cast(sz), uncompressed); + ASSERT_TRUE(res); + ASSERT_EQ(uncompressed.readableBytes(), sz); + ASSERT_STREQ(data, uncompressed.data()); +}