diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 10d91b4c0107b..c3d056d175f00 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1457,6 +1457,14 @@ OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entrie OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter +/* The following are tunables for torrent data */ +OPTION(rgw_torrent_tracker, OPT_STR, "") // torrent field annouce and annouce list +OPTION(rgw_torrent_createby, OPT_STR, "") // torrent field created by +OPTION(rgw_torrent_comment, OPT_STR, "") // torrent field comment +OPTION(rgw_torrent_encoding, OPT_STR, "") // torrent field encoding +OPTION(rgw_torrent_origin, OPT_STR, "") // torrent origin +OPTION(rgw_torrent_sha_unit, OPT_INT, 512*1024) //torrent field piece length 521K + // This will be set to true when it is safe to start threads. // Once it is true, it will never change. OPTION(internal_safe_to_start_threads, OPT_BOOL, false) diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 96c9c1a7bc3fd..613fa6a21711b 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -89,7 +89,8 @@ librgw_la_SOURCES = \ rgw/librgw.cc \ rgw/rgw_xml.cc \ rgw/rgw_xml_enc.cc \ - rgw/rgw_website.cc + rgw/rgw_website.cc \ + rgw/rgw_torrent.cc if WITH_OPENLDAP librgw_la_SOURCES += rgw/rgw_ldap.cc @@ -269,6 +270,7 @@ noinst_HEADERS += \ rgw/rgw_civetweb_log.h \ rgw/rgw_website.h \ rgw/rgw_rest_s3website.h \ + rgw/rgw_torrent.h \ civetweb/civetweb.h \ civetweb/include/civetweb.h \ civetweb/include/civetweb_conf.h \ diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index b3833e1e2e829..71390168aacbd 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1234,6 +1234,7 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) gc_invalidate_time = start_time; gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); } + return send_response_data(bl, bl_ofs, bl_len); } @@ -1333,6 +1334,28 @@ void RGWGetObj::execute() if (op_ret < 0) goto done_err; + /* start gettorrent */ + if (torrent.get_flag()) + { + torrent.init(s, store); + torrent.get_torrent_file(op_ret, read_op, total_len, bl, obj); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret + << dendl; + goto done_err; + } + op_ret = send_response_data(bl, 0, total_len); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to send_response_data ret= " << op_ret + << dendl; + goto done_err; + } + return; + } + /* end gettorrent */ + attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST); if (attr_iter != attrs.end() && !skip_manifest) { op_ret = handle_user_manifest(attr_iter->second.c_str()); @@ -2580,7 +2603,7 @@ void RGWPutObj::execute() int len; map::iterator iter; bool multipart; - + bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL); perfcounter->inc(l_rgw_put); @@ -2676,6 +2699,9 @@ void RGWPutObj::execute() len = data.length(); } + /* save data for producing torrent data */ + torrent.save_data(data_in); + /* do we need this operation to be synchronous? if we're dealing with an object with immutable * head, e.g., multipart object we need to make sure we're the first one writing to this object */ @@ -2828,6 +2854,18 @@ void RGWPutObj::execute() op_ret = processor->complete(etag, &mtime, real_time(), attrs, delete_at, if_match, if_nomatch); + /* produce torrent */ + if (ofs == torrent.get_data_len()) + { + torrent.init(s, store); + torrent.set_create_date(mtime); + op_ret = torrent.handle_data(); + if (0 != op_ret) + { + ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl; + goto done; + } + } done: dispose_processor(processor); diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 49018f457c78f..8d201a27963e4 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -34,10 +34,12 @@ #include "rgw_acl.h" #include "rgw_cors.h" #include "rgw_quota.h" +#include "rgw_torrent.h" #include "include/assert.h" using namespace std; +using ceph::crypto::SHA1; struct req_state; class RGWHandler; @@ -103,6 +105,7 @@ RGWOp() : s(nullptr), dialect_handler(nullptr), store(nullptr), class RGWGetObj : public RGWOp { protected: + seed torrent; // get torrent const char *range_str; const char *if_mod; const char *if_unmod; @@ -642,6 +645,7 @@ class RGWPutObj : public RGWOp { friend class RGWPutObjProcessor; protected: + seed torrent; off_t ofs; const char *supplied_md5_b64; const char *supplied_etag; diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 655bbd290e59e..d775dc8f86079 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -790,6 +790,19 @@ int RGWGetObj_ObjStore::get_params() mod_pg_ver = s->info.env->get_int("HTTP_DEST_PG_VER", 0); } + /* start gettorrent */ + bool is_torrent = s->info.args.exists(GET_TORRENT); + if (is_torrent) + { + int ret = 0; + ret = torrent.get_params(); + if (ret < 0) + { + return ret; + } + } + /* end gettorrent */ + return 0; } @@ -1000,6 +1013,15 @@ int RGWPutObj_ObjStore::verify_params() int RGWPutObj_ObjStore::get_params() { + /* start gettorrent */ + int ret = 0; + ret = torrent.get_params(); + if (ret < 0) + { + return ret; + } + torrent.set_info_name((s->object).name); + /* end gettorrent */ supplied_md5_b64 = s->info.env->get("HTTP_CONTENT_MD5"); return 0; diff --git a/src/rgw/rgw_torrent.cc b/src/rgw/rgw_torrent.cc new file mode 100644 index 0000000000000..7953809784c07 --- /dev/null +++ b/src/rgw/rgw_torrent.cc @@ -0,0 +1,274 @@ +#include +#include + +#include + +#include "rgw_torrent.h" +#include "include/str_list.h" +#include "include/rados/librados.hpp" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; +using ceph::crypto::MD5; +using namespace librados; +using namespace boost; +using ceph::crypto::SHA1; + +seed::seed() +{ + seed::info.piece_length = 0; + seed::info.len = 0; + sha_len = 0; + is_torrent = false; +} + +seed::~seed() +{ + seed::info.sha1_bl.clear(); + bl.clear(); + torrent_bl.clear(); + s = NULL; + store = NULL; +} + +void seed::init(struct req_state *p_req, RGWRados *p_store) +{ + s = p_req; + store = p_store; +} + +void seed::get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, uint64_t &total_len, + bufferlist &bl_data, rgw_obj &obj) +{ + /* add other field if config is set */ + dencode.bencode_dict(bl); + set_announce(); + if (!comment.empty()) + { + dencode.bencode(COMMENT, comment, bl); + } + if (!create_by.empty()) + { + dencode.bencode(CREATED_BY, create_by, bl); + } + if (!encoding.empty()) + { + dencode.bencode(ENCODING, encoding, bl); + } + + string oid, key; + rgw_bucket bucket; + map m; + set obj_key; + get_obj_bucket_and_oid_loc(obj, bucket, oid, key); + ldout(s->cct, 0) << "NOTICE: head obj oid= " << oid << dendl; + + obj_key.insert(RGW_OBJ_TORRENT); + op_ret = read_op.state.io_ctx.omap_get_vals_by_keys(oid, obj_key, &m); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to omap_get_vals_by_keys op_ret = " << op_ret << dendl; + return; + } + + map::iterator iter; + for (iter = m.begin(); iter != m.end(); ++iter) + { + bufferlist bl_tmp = iter->second; + char *pbuff = bl_tmp.c_str(); + bl.append(pbuff, bl_tmp.length()); + } + dencode.bencode_end(bl); + + bl_data = bl; + total_len = bl.length(); + return; +} + +bool seed::get_flag() +{ + return is_torrent; +} + +void seed::save_data(bufferlist &bl) +{ + info.len += bl.length(); + torrent_bl.push_back(bl); +} + +off_t seed::get_data_len() +{ + return info.len; +} + +void seed::set_create_date(ceph::real_time& value) +{ + utime_t date = ceph::real_clock::to_timespec(value); + create_date = date.sec(); +} + +void seed::set_info_pieces(char *buff) +{ + info.sha1_bl.append(buff, CEPH_CRYPTO_SHA1_DIGESTSIZE); +} + +void seed::set_info_name(const string& value) +{ + info.name = value; +} + +void seed::sha1(SHA1 *h, bufferlist &bl, off_t bl_len) +{ + off_t num = bl_len/info.piece_length; + off_t remain = 0; + remain = bl_len%info.piece_length; + + char *pstr = bl.c_str(); + char sha[25]; + + /* get sha1 */ + for (off_t i = 0; i < num; i++) + { + memset(sha, 0x00, sizeof(sha)); + h->Update((byte *)pstr, info.piece_length); + h->Final((byte *)sha); + set_info_pieces(sha); + pstr += info.piece_length; + } + + /* process remain */ + if (0 != remain) + { + memset(sha, 0x00, sizeof(sha)); + h->Update((byte *)pstr, remain); + h->Final((byte *)sha); + set_info_pieces(sha); + } +} + +int seed::sha1_process() +{ + uint64_t remain = info.len%info.piece_length; + uint8_t remain_len = ((remain > 0)? 1 : 0); + sha_len = (info.len/info.piece_length + remain_len)*CEPH_CRYPTO_SHA1_DIGESTSIZE; + + SHA1 h; + list::iterator iter = torrent_bl.begin(); + for (; iter != torrent_bl.end(); iter++) + { + bufferlist &bl_info = *iter; + sha1(&h, bl_info, (*iter).length()); + } + + return 0; +} + +int seed::handle_data() +{ + int ret = 0; + + /* sha1 process */ + ret = sha1_process(); + if (0 != ret) + { + ldout(s->cct, 0) << "ERROR: failed to sha1_process() ret= "<< ret << dendl; + return ret; + } + + /* produce torrent data */ + do_encode(); + + /* save torrent data into OMAP */ + ret = save_torrent_file(); + if (0 != ret) + { + ldout(s->cct, 0) << "ERROR: failed to save_torrent_file() ret= "<< ret << dendl; + return ret; + } + + return 0; +} + +int seed::get_params() +{ + is_torrent = true; + info.piece_length = g_conf->rgw_torrent_sha_unit; + create_by = g_conf->rgw_torrent_createby; + encoding = g_conf->rgw_torrent_encoding; + origin = g_conf->rgw_torrent_origin; + comment = g_conf->rgw_torrent_comment; + announce = g_conf->rgw_torrent_tracker; + + /* tracker and tracker list is empty, set announce to origin */ + if (announce.empty() && !origin.empty()) + { + announce = origin; + } + + return 0; +} + +void seed::set_announce() +{ + list announce_list; // used to get announce list from conf + get_str_list(announce, ",", announce_list); + + if (announce_list.empty()) + { + ldout(s->cct, 5) << "NOTICE: announce_list is empty " << dendl; + return; + } + + list::iterator iter = announce_list.begin(); + dencode.bencode_key(ANNOUNCE, bl); + dencode.bencode_key((*iter), bl); + + dencode.bencode_key(ANNOUNCE_LIST, bl); + dencode.bencode_list(bl); + for (; iter != announce_list.end(); ++iter) + { + dencode.bencode_list(bl); + dencode.bencode_key((*iter), bl); + dencode.bencode_end(bl); + } + dencode.bencode_end(bl); +} + +void seed::do_encode() +{ + /*Only encode create_date and sha1 info. + /*Other field will be added if confi is set when run get torrent*/ + dencode.bencode(CREATION_DATE, create_date, bl); + + dencode.bencode_key(INFO_PIECES, bl); + dencode.bencode_dict(bl); + dencode.bencode(LENGTH, info.len, bl); + dencode.bencode(NAME, info.name, bl); + dencode.bencode(PIECE_LENGTH, info.piece_length, bl); + + char info_sha[100] = { 0 }; + sprintf(info_sha, "%ld", sha_len); + string sha_len_str = info_sha; + dencode.bencode_key(PIECES, bl); + bl.append(sha_len_str.c_str(), sha_len_str.length()); + bl.append(':'); + bl.append(info.sha1_bl.c_str(), sha_len); + dencode.bencode_end(bl); +} + +int seed::save_torrent_file() +{ + int op_ret = 0; + string key = RGW_OBJ_TORRENT; + rgw_obj obj(s->bucket, s->object.name); + + op_ret = store->omap_set(obj, key, bl); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to omap_set() op_ret = " << op_ret << dendl; + return op_ret; + } + + return op_ret; +} diff --git a/src/rgw/rgw_torrent.h b/src/rgw/rgw_torrent.h new file mode 100644 index 0000000000000..268d2671c245a --- /dev/null +++ b/src/rgw/rgw_torrent.h @@ -0,0 +1,139 @@ +#ifndef CEPH_RGW_TORRENT_H +#define CEPH_RGW_TORRENT_H + +#include +#include +#include +#include + +#include "common/ceph_time.h" + +#include "rgw_rados.h" +#include "rgw_common.h" + +using namespace std; +using ceph::crypto::SHA1; + +struct req_state; + +#define RGW_OBJ_TORRENT "rgw.torrent" + +#define ANNOUNCE "announce" +#define ANNOUNCE_LIST "announce-list" +#define COMMENT "comment" +#define CREATED_BY "created by" +#define CREATION_DATE "creation date" +#define ENCODING "encoding" +#define LENGTH "length" +#define NAME "name" +#define PIECE_LENGTH "piece length" +#define PIECES "pieces" +#define INFO_PIECES "info" +#define GET_TORRENT "get_torrent" + +class TorrentBencode +{ +public: + TorrentBencode() {} + ~TorrentBencode() {} + + //control characters + void bencode_dict(bufferlist& bl) { bl.append('d'); } + void bencode_list(bufferlist& bl) { bl.append('l'); } + void bencode_end(bufferlist& bl) { bl.append('e'); } + + //single values + void bencode(int value, bufferlist& bl) + { + bl.append('i'); + char info[100] = { 0 }; + sprintf(info, "%d", value); + bl.append(info, strlen(info)); + bencode_end(bl); + } + + //single values + void bencode(const std::string& str, bufferlist& bl) + { + bencode_key(str, bl); + } + + //dictionary elements + void bencode(const std::string& key, int value, bufferlist& bl) + { + bencode_key(key, bl); + bencode(value, bl); + } + + //dictionary elements + void bencode(const std::string& key, const std::string& value, bufferlist& bl) + { + bencode_key(key, bl); + bencode(value, bl); + } + + //key len + void bencode_key(const std::string& key, bufferlist& bl) + { + int len = key.length(); + char info[100] = { 0 }; + sprintf(info, "%d:", len); + bl.append(info, strlen(info)); + bl.append(key.c_str(), len); + } +}; + +/* torrent file struct */ +class seed +{ +private: + struct + { + int piece_length; // each piece length + bufferlist sha1_bl; // save sha1 + string name; // file name + off_t len; // file total bytes + }info; + + string announce; // tracker + string origin; // origin + time_t create_date; // time of the file created + string comment; // comment + string create_by; // app name and version + string encoding; // if encode use gbk rather than gtf-8 use this field + uint64_t sha_len; // sha1 length + bool is_torrent; // flag + bufferlist bl; // bufflist ready to send + list torrent_bl; // meate data + + struct req_state *s; + RGWRados *store; + + TorrentBencode dencode; +public: + seed(); + ~seed(); + + int get_params(); + void init(struct req_state *p_req, RGWRados *p_store); + void get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, + uint64_t &total_len, bufferlist &bl_data, rgw_obj &obj); + + off_t get_data_len(); + bool get_flag(); + + int handle_data(); + void save_data(bufferlist &bl); + void set_create_date(ceph::real_time& value); + void set_info_name(const string& value); + +private: + void do_encode (); + void set_announce(); + void set_exist(bool exist); + void set_info_pieces(char *buff); + int sha1_process(); + void sha1(SHA1 *h, bufferlist &bl, off_t bl_len); + int save_torrent_file(); +}; +#endif /* CEPH_RGW_TORRENT_H */ diff --git a/src/test/Makefile-client.am b/src/test/Makefile-client.am index f9534e5e1a301..951e195930bc5 100644 --- a/src/test/Makefile-client.am +++ b/src/test/Makefile-client.am @@ -687,6 +687,13 @@ ceph_test_cors_LDADD = \ ceph_test_cors_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_cors +unittest_rgw_bencode_SOURCES = test/rgw/test_rgw_bencode.cc +unittest_rgw_bencode_LDADD = \ + $(LIBRADOS) $(LIBRGW) $(LIBRGW_DEPS) $(CEPH_GLOBAL) \ + $(UNITTEST_LDADD) $(CRYPTO_LIBS) -lcurl -lexpat +unittest_rgw_bencode_CXXFLAGS = $(UNITTEST_CXXFLAGS) +check_TESTPROGRAMS += unittest_rgw_bencode + ceph_test_rgw_manifest_SOURCES = test/rgw/test_rgw_manifest.cc ceph_test_rgw_manifest_LDADD = \ $(LIBRADOS) $(LIBRGW) $(LIBRGW_DEPS) $(CEPH_GLOBAL) \ diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 3c4e6d88d9049..f4797442836f6 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -1,3 +1,8 @@ +#unittest_rgw_bencode +add_executable(unittest_rgw_bencode test_rgw_bencode.cc) +add_ceph_unittest(unittest_rgw_bencode ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_rgw_bencode) +target_link_libraries(unittest_rgw_bencode rgw_a) + #unitttest_rgw_period_history add_executable(unittest_rgw_period_history test_rgw_period_history.cc) add_ceph_unittest(unittest_rgw_period_history ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_rgw_period_history) @@ -61,5 +66,4 @@ target_link_libraries(ceph_test_rgw_obj ${CRYPTO_LIBS} ) set_target_properties(ceph_test_rgw_obj PROPERTIES COMPILE_FLAGS - ${UNITTEST_CXX_FLAGS}) - + ${UNITTEST_CXX_FLAGS}) \ No newline at end of file diff --git a/src/test/rgw/test_rgw_bencode.cc b/src/test/rgw/test_rgw_bencode.cc new file mode 100644 index 0000000000000..177d72f327e0d --- /dev/null +++ b/src/test/rgw/test_rgw_bencode.cc @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "gtest/gtest.h" + +#include "rgw/rgw_torrent.h" + +TEST(Bencode, String) +{ + TorrentBencode decode; + bufferlist bl; + + decode.bencode("foo", bl); + decode.bencode("bar", bl); + decode.bencode("baz", bl); + + ASSERT_STREQ("3:foo3:bar3:baz", bl.c_str()); +} + +TEST(Bencode, Integers) +{ + TorrentBencode decode; + bufferlist bl; + + decode.bencode(0, bl); + decode.bencode(-3, bl); + decode.bencode(7, bl); + + ASSERT_STREQ("i0ei-3ei7e", bl.c_str()); +} + +TEST(Bencode, Dict) +{ + TorrentBencode decode; + bufferlist bl; + + decode.bencode_dict(bl); + decode.bencode("foo", 5, bl); + decode.bencode("bar", "baz", bl); + decode.bencode_end(bl); + + ASSERT_STREQ("d3:fooi5e3:bar3:baze", bl.c_str()); +} + +TEST(Bencode, List) +{ + TorrentBencode decode; + bufferlist bl; + + decode.bencode_list(bl); + decode.bencode("foo", 5, bl); + decode.bencode("bar", "baz", bl); + decode.bencode_end(bl); + + ASSERT_STREQ("l3:fooi5e3:bar3:baze", bl.c_str()); +}