Skip to content

Commit

Permalink
Merge pull request #310 from BigVan/zfile_stdin
Browse files Browse the repository at this point in the history
support stdin as source for ZFile compression
  • Loading branch information
liulanzheng committed Jan 25, 2024
2 parents b33ab47 + d6aebe7 commit 08847cf
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/overlaybd/zfile/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ TEST_F(ZFileTest, verify_compression) {
opt.block_size = 1<<bs;
CompressArgs args(opt);
zfile_compress(fsrc.get(), nullptr, &args);
fsrc->lseek(0, SEEK_SET);
int ret = zfile_compress(fsrc.get(), fdst.get(), &args);
auto fzfile = zfile_open_ro(fdst.get(), opt.verify);
EXPECT_EQ(ret, 0);
Expand Down
44 changes: 22 additions & 22 deletions src/overlaybd/zfile/zfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,8 +1106,6 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
if (ret < 0) {
LOG_ERRNO_RETURN(0, -1, "failed to write header");
}
auto raw_data_size = file->lseek(0, SEEK_END);
LOG_INFO("source data size: `", raw_data_size);
auto block_size = opt.block_size;
LOG_INFO("block size: `", block_size);
auto buf_size = block_size + BUF_SIZE;
Expand All @@ -1124,39 +1122,41 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
compressed_len.resize(nbatch);
raw_chunk_len.resize(nbatch);
LOG_INFO("compress with start....");
off_t i = 0;
while (i < raw_data_size) {
off_t infile_size = 0;
while (true) {
int n = 0;
auto step = std::min((ssize_t)block_size * nbatch, (ssize_t)(raw_data_size - i));
auto ret = file->pread(raw_data, step, i);
if (ret < step) {
LOG_ERRNO_RETURN(0, -1, "failed to read from source file. (readn: `)", ret);
}
i += step;
while (step > 0) {
if (step < block_size) {
raw_chunk_len[n++] = step;
auto readn = file->read(raw_data, block_size * nbatch);
if (readn == 0) {
break;
}
if (readn < 0) {
LOG_ERRNO_RETURN(0, -1, "failed to read from source file. (readn: `)", readn);
}
infile_size += readn;
while (readn > 0) {
if (readn < block_size) {
raw_chunk_len[n++] = readn;
break;
}
raw_chunk_len[n++] = block_size;
step -= block_size;
readn -= block_size;
}
ret = compressor->compress_batch(raw_data, &(raw_chunk_len[0]), compressed_data,
readn = compressor->compress_batch(raw_data, &(raw_chunk_len[0]), compressed_data,
n * buf_size, &(compressed_len[0]), n);
if (ret != 0)
if (readn != 0)
return -1;
for (off_t j = 0; j < n; j++) {
ret = as->write(&compressed_data[j * buf_size], compressed_len[j]);
if (ret < (ssize_t)compressed_len[j]) {
readn = as->write(&compressed_data[j * buf_size], compressed_len[j]);
if (readn < (ssize_t)compressed_len[j]) {
LOG_ERRNO_RETURN(0, -1, "failed to write compressed data.");
}
if (crc32_verify) {
auto crc32_code = crc32c_salt(&compressed_data[j * buf_size], compressed_len[j]);
LOG_DEBUG("append ` bytes crc32_code: {offset: `, count: `, crc32: `}",
sizeof(uint32_t), moffset, compressed_len[j], HEX(crc32_code).width(8));
compressed_len[j] += sizeof(uint32_t);
ret = as->write(&crc32_code, sizeof(uint32_t));
if (ret < (ssize_t)sizeof(uint32_t)) {
readn = as->write(&crc32_code, sizeof(uint32_t));
if (readn < (ssize_t)sizeof(uint32_t)) {
LOG_ERRNO_RETURN(0, -1, "failed to write crc32code, offset: `, crc32: `",
moffset, HEX(crc32_code).width(8));
}
Expand All @@ -1176,8 +1176,8 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
LOG_INFO("index checksum: `", HEX(pht->index_crc).width(8));
pht->index_offset = index_offset;
pht->index_size = index_size;
pht->original_file_size = raw_data_size;
LOG_INFO("write trailer.");
pht->original_file_size = infile_size;
LOG_INFO("write trailer. (source file size: `)", infile_size);
ret = write_header_trailer(as, false, true, true, pht);
if (ret < 0)
LOG_ERRNO_RETURN(0, -1, "failed to write trailer");
Expand Down
47 changes: 43 additions & 4 deletions src/tools/overlaybd-zfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

#include "../overlaybd/zfile/zfile.h"
#include "../overlaybd/tar/tar_file.h"
#include <cstdint>
#include <photon/common/uuid.h>
#include <photon/common/utility.h>
#include <photon/fs/localfs.h>
#include <photon/common/alog.h>
#include <photon/net/basic_socket.h>
#include <photon/fs/virtual-file.h>
#include <cstdio>
#include <cstdlib>
#include <errno.h>
Expand All @@ -31,6 +34,7 @@
#include <unistd.h>
#include <photon/photon.h>
#include "CLI11.hpp"
#include "photon/net/basic_socket.h"
#include "photon/fs/filesystem.h"

using namespace std;
Expand All @@ -39,6 +43,24 @@ using namespace ZFile;

IFileSystem *lfs = nullptr;

class IStreamFile : public VirtualReadOnlyFile{
public:
virtual ssize_t read(void *buf, size_t count) override {
return photon::net::read(0, buf, count);
}

virtual off_t lseek(off_t offset, int whence) override {
return INT64_MAX;
}

UNIMPLEMENTED(int fstat(struct stat *buf) override);
UNIMPLEMENTED_POINTER(IFileSystem* filesystem() override);
};

IFile *new_streamFile(){
return new IStreamFile;
}

int verify_crc(IFile* src_file) {

if (is_zfile(src_file) != 1) {
Expand Down Expand Up @@ -71,7 +93,7 @@ int main(int argc, char **argv) {
->default_val(4);
app.add_option("source_file", fn_src, "source file path")
->type_name("FILEPATH")
->check(CLI::ExistingFile)
// ->check(CLI::ExistingFile)
->required();
app.add_option("target_file", fn_dst, "target file path")->type_name("FILEPATH");
app.add_flag("--verbose", verbose, "output debug info")->default_val(false);
Expand All @@ -82,8 +104,16 @@ int main(int argc, char **argv) {
DEFER({photon::fini();});

lfs = new_localfs_adaptor();


if (verify) {
auto file = lfs->open(fn_src.c_str(), O_RDONLY);
IFile *file = nullptr;
if (fn_src.empty()) {
LOG_INFO("read source from STDIN");
file = new_streamFile();
} else {
file = lfs->open(fn_src.c_str(), O_RDONLY);
}
if (!file) {
fprintf(stderr, "failed to open file %s\n", fn_src.c_str());
exit(-1);
Expand All @@ -95,6 +125,13 @@ int main(int argc, char **argv) {
printf("%s is a valid zfile blob.\n", fn_src.c_str());
return 0;
}
bool pipe = false;
if (fn_dst == "") {
LOG_INFO("read source from STDIN");
pipe = true;
fn_dst = fn_src;
fn_src = "";
}

CompressOptions opt;
opt.verify = 1;
Expand All @@ -119,7 +156,7 @@ int main(int argc, char **argv) {
CompressArgs args(opt);
if (!extract) {
printf("compress file %s as %s\n", fn_src.c_str(), fn_dst.c_str());
IFile *infile = lfs->open(fn_src.c_str(), O_RDONLY);
IFile *infile = (!pipe ? lfs->open(fn_src.c_str(), O_RDONLY) : new_streamFile() );
if (infile == nullptr) {
fprintf(stderr, "failed to open file %s\n", fn_src.c_str());
exit(-1);
Expand All @@ -141,8 +178,10 @@ int main(int argc, char **argv) {
printf("compress file done.\n");
return ret;
} else {
if (pipe) {
LOG_ERROR_RETURN(0, -1, "decompression can't use STDIN");
}
printf("decompress file %s as %s\n", fn_src.c_str(), fn_dst.c_str());

IFile *infile = fs->open(fn_src.c_str(), O_RDONLY);
if (infile == nullptr) {
fprintf(stderr, "failed to open file %s\n", fn_dst.c_str());
Expand Down

0 comments on commit 08847cf

Please sign in to comment.