diff --git a/src/common/config_opts.h b/src/common/config_opts.h index f17d61f386f9e6..e21c1749e9a43c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -166,6 +166,13 @@ OPTION(heartbeat_file, OPT_STR, "") OPTION(heartbeat_inject_failure, OPT_INT, 0) // force an unhealthy heartbeat for N seconds OPTION(perf, OPT_BOOL, true) // enable internal perf counters +OPTION(torrent_tracker, OPT_STR, "") // torrent_tracker +OPTION(torrent_createby, OPT_STR, "") // torrent_createby +OPTION(torrent_comment, OPT_STR, "") // torrent_comment +OPTION(torrent_encoding, OPT_STR, "") // torrent_encoding +OPTION(torrent_origin, OPT_STR, "") // torrent_origin +OPTION(sha_unit, OPT_INT, 512*1024) //sha_unit 521K + OPTION(ms_type, OPT_STR, "simple") // messenger backend OPTION(ms_tcp_nodelay, OPT_BOOL, true) OPTION(ms_tcp_rcvbuf, OPT_INT, 0) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 5bc7cb43fa051a..9c3692658ecf8b 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -31,10 +31,15 @@ #include "include/assert.h" +#include "include/rados/librados.hpp" + #define dout_subsys ceph_subsys_rgw using namespace std; using ceph::crypto::MD5; +using namespace librados; +using namespace boost; +using ceph::crypto::SHA1; static string mp_ns = RGW_OBJ_NS_MULTIPART; static string shadow_ns = RGW_OBJ_NS_SHADOW; @@ -1145,9 +1150,500 @@ int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len) gc_invalidate_time = start_time; gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2); } + return send_response_data(bl, bl_ofs, bl_len); } +seed::seed() +{ + seed::info.piece_length = 0; + seed::info.len = 0; + sha_len = 0; + is_torrent = false; +} + +seed::~seed() +{ + seed::info.sha1_bl.clear(); + bl.clear(); + torrent_bl.clear(); + list_info.clear(); + dic_info.clear(); + dic_list.clear(); + s = NULL; + store = NULL; +} + +void seed::init(struct req_state *p_req, RGWRados *p_store) +{ + s = p_req; + store = p_store; +} + +void seed::get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, uint64_t &total_len, + bufferlist &bl_data, rgw_obj &obj) +{ + string oid, key; + rgw_bucket bucket; + map m; + set obj_key; + get_obj_bucket_and_oid_loc(obj, bucket, oid, key); + INFO "oid = " << oid << dendl; + + obj_key.insert(RGW_OBJ_TORRENT); + op_ret = read_op.state.io_ctx.omap_get_vals_by_keys(oid, obj_key, &m); + if (op_ret < 0) + { + INFO "failed to omap_get_all ret=" << op_ret << dendl; + return; + } + + map::iterator iter; + for (iter = m.begin(); iter != m.end(); ++iter) + { + bufferlist bl_tmp = iter->second; + char *pbuff = bl_tmp.c_str(); + bl.append(pbuff, bl_tmp.length()); + } + + bl_data = bl; + total_len = bl.length(); + return; +} + +bool seed::get_flag() +{ + return is_torrent; +} + +void seed::save_data(bufferlist &bl) +{ + info.len += bl.length(); + torrent_bl.push_back(bl); +} + +off_t seed::get_data_len() +{ + return info.len; +} + +void seed::set_create_date(ceph::real_time value) +{ + utime_t date = ceph::real_clock::to_timespec(value); + create_date = date.sec(); +} + +void seed::set_info_pieces(char *buff) +{ + info.sha1_bl.append(buff, CEPH_CRYPTO_SHA1_DIGESTSIZE); +} + +void seed::set_info_name(string value) +{ + info.name = value; +} + +void seed::sha1(SHA1 *h, bufferlist &bl, off_t bl_len) +{ + off_t num = bl_len/info.piece_length; + off_t remain = 0; + remain = bl_len%info.piece_length; + + char *pstr = bl.c_str(); + char sha[25]; + + /* get sha1 */ + for (off_t i = 0; i < num; i++) + { + memset(sha, 0x00, sizeof(sha)); + h->Update((byte *)pstr, info.piece_length); + h->Final((byte *)sha); + + set_info_pieces(sha); + pstr += info.piece_length; + } + + /* process remain */ + if (0 != remain) + { + memset(sha, 0x00, sizeof(sha)); + + h->Update((byte *)pstr, remain); + h->Final((byte *)sha); + + set_info_pieces(sha); + } +} + +int seed::sha1_process() +{ + uint64_t remain = info.len%info.piece_length; + uint8_t remain_len = ((remain > 0)? 1 : 0); + sha_len = (info.len/info.piece_length + remain_len)*CEPH_CRYPTO_SHA1_DIGESTSIZE; + + SHA1 h; + list::iterator iter = torrent_bl.begin(); + for (; iter != torrent_bl.end(); iter++) + { + bufferlist &bl_info = *iter; + sha1(&h, bl_info, (*iter).length()); + } + + return 0; +} + +int seed::handle_data() +{ + int ret = 0; + + /* sha1 process */ + ret = sha1_process(); + if (0 != ret) + { + INFO "sha1_process fail ret= "<sha_unit; + create_by = g_conf->torrent_createby; + set_announce(g_conf->torrent_tracker); + encoding = g_conf->torrent_encoding; + origin = g_conf->torrent_origin; + comment = g_conf->torrent_comment; + + return 0; +} + +void seed::set_announce(string value) +{ + if (value.empty()) + { + INFO "torrent_tracker is empty" << dendl; + return; + } + const char *pstr = value.c_str(); + char *pstart = const_cast(pstr); + uint64_t len = value.length(); + char *pend = pstart + len; + char *location; + char flag = ','; + + list announce_list; // used to get announce list from conf + while (pstart < pend) + { + location = NULL; + location = strchr(pstart, flag); + if (!location) + { + string ann(pstart); + announce_list.push_back(ann); + break; + } + + char temp[100] = { 0 }; + snprintf(temp, location - pstart + 1, "%s", pstart); + string anno(temp); + announce_list.push_back(anno); + pstart = location + 1; + } + + list::iterator iter = announce_list.begin(); + announce = *iter; + iter++; + + for (; iter != announce_list.end(); ++iter) + { + listvalue info_list; + info_list.type = BE_LIST; + info_list.info = (*iter); + info_list.field = ""; + list_info.push_back(info_list); + } + announce_list.clear(); +} + +void seed::do_encode() +{ + if (!announce.empty()) + { + dic_list.push_back(dicvalue(BE_STR, ANNOUNCE, &announce)); + } + + if (!list_info.empty()) + { + dic_list.push_back(dicvalue(BE_LIST, ANNOUNCE_LIST, &list_info)); + } + + /* tracker and tracker list is empty, set announce to origin */ + if (announce.empty() && list_info.empty() && !origin.empty()) + { + dic_list.push_back(dicvalue(BE_STR, ANNOUNCE, &origin)); + } + + if (!comment.empty()) + { + dic_list.push_back(dicvalue(BE_STR, COMMENT, &comment)); + } + + if (!create_by.empty()) + { + dic_list.push_back(dicvalue(BE_STR, CREATED_BY, &create_by)); + } + + dic_list.push_back(dicvalue(BE_TIME_T, CREATION_DATE, &create_date)); + + if (!encoding.empty()) + { + dic_list.push_back(dicvalue(BE_STR, ENCODING, &encoding)); + } + + dic_list.push_back(dicvalue(BE_DICT, INFO_PIECES, &dic_info)); + dic_info.push_back(dicvalue(BE_INT64, LENGTH, &seed::info.len)); + dic_info.push_back(dicvalue(BE_STR, NAME, &seed::info.name)); + dic_info.push_back(dicvalue(BE_TIME_T, PIECE_LENGTH, &seed::info.piece_length)); + dic_info.push_back(dicvalue(BE_STR, PIECES, &seed::info.sha1_bl)); + + seed_encode(BE_DICT, &dic_list, ""); +} + +void seed::add_e() +{ + const char *p = "e"; + bl.append(p, 1); +} + +void seed::get_type(be_type type) +{ + switch(type) + { + case BE_STR: + { + return; + } + case BE_INT: + case BE_INT64: + case BE_TIME_T: + { + const char *p = "i"; + bl.append(p, 1); + break; + } + case BE_LIST: + { + const char *p = "l"; + bl.append(p, 1); + break; + } + case BE_DICT: + { + const char *p = "d"; + bl.append(p, 1); + break; + } + } +} + +void seed::seed_encode(be_type type, void* src_data, const char *field) +{ + uint64_t totalLen = 0; + uint64_t field_len_Len = 0; + char cfield_len[100] = { 0 }; + int field_len = 0; + field_len = strlen(field); + + if (0 != field_len) + { + totalLen += field_len; + sprintf(cfield_len, "%d", field_len); + field_len_Len = strlen(cfield_len); + totalLen += field_len_Len; + + char info[100] = { 0 }; + sprintf(info, "%d:%s", field_len, field); + bl.append(info, totalLen + 1); + get_type(type); + } + else + { + get_type(type); + } + + switch (type) + { + case BE_STR: + { + flush_bufflist(type, src_data, field); + break; + } + + case BE_INT: + case BE_INT64: + case BE_TIME_T: + { + flush_bufflist(type, src_data); + break; + } + + case BE_LIST: + { + list *pstr = static_cast *>(src_data); + list::iterator iter = pstr->begin(); + for (; iter != pstr->end(); iter++) + { + if ((*iter).type == BE_LIST) + { + get_type(BE_LIST); + seed_encode(BE_STR, &(*iter).info, (*iter).field.c_str()); + add_e(); + } + + if ((*iter).type == BE_STR) + { + seed_encode(BE_STR, &(*iter).info, (*iter).field.c_str()); + add_e(); + } + + if ((*iter).type == BE_INT || (*iter).type == BE_INT64 || (*iter).type == BE_TIME_T) + { + get_type((*iter).type); + seed_encode((*iter).type, &(*iter).info, (*iter).field.c_str()); + } + } + + add_e(); + break; + } + + case BE_DICT: + { + + list *pstr = static_cast *>(src_data); + list ::iterator iter = pstr->begin(); + for (; iter != pstr->end(); iter++) + { + if ((*iter).type == BE_LIST) + { + seed_encode(BE_LIST, (*iter).src_data, (*iter).field.c_str()); + } + else if ((*iter).type == BE_STR) + { + seed_encode(BE_STR, (*iter).src_data, (*iter).field.c_str()); + } + else if ((*iter).type == BE_INT || (*iter).type == BE_INT64 || (*iter).type == BE_TIME_T) + { + seed_encode((*iter).type, (*iter).src_data, (*iter).field.c_str()); + } + else if ((*iter).type == BE_DICT) + { + seed_encode(BE_DICT, (*iter).src_data, (*iter).field.c_str()); + } + else + { + INFO "error type"<(src_data))->c_str(); + int len = strlen(pstr); + sprintf(infolen, "%d:", len); + bl.append(infolen, strlen(infolen)); + bl.append(pstr, len); + break; + } + case BE_INT: + case BE_INT64: + case BE_TIME_T: + { + if (type == BE_INT) + { + int *p = static_cast(src_data); + sprintf(infolen, "%d", *p); + } + else if (type == BE_INT64) + { + uint64_t *p = static_cast(src_data); + sprintf(infolen, "%ld", *p); + } + else + { + long *p = static_cast(src_data); + sprintf(infolen, "%ld", *p); + } + bl.append(infolen, strlen(infolen)); + add_e(); + break; + } + default: + { + break; + } + } +} + +int seed::save_torrent_file() +{ + int op_ret = 0; + string key = RGW_OBJ_TORRENT; + rgw_obj obj(s->bucket, s->object.name); + + op_ret = store->omap_set(obj, key, bl); + if (op_ret < 0) + { + INFO "ret = " << op_ret<< dendl; + return op_ret; + } + + return op_ret; +} + bool RGWGetObj::prefetch_data() { /* HEAD request, stop prefetch*/ @@ -1244,6 +1740,27 @@ void RGWGetObj::execute() if (op_ret < 0) goto done_err; + /* start gettorrent */ + if (torrent.get_flag()) + { + torrent.get_torrent_file(op_ret, read_op, total_len, bl, obj); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret + << dendl; + goto done_err; + } + op_ret = send_response_data(bl, 0, total_len); + if (op_ret < 0) + { + ldout(s->cct, 0) << "ERROR: failed to send_response_data ret= " << op_ret + << dendl; + goto done_err; + } + return; + } + /* end gettorrent */ + attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST); if (attr_iter != attrs.end() && !skip_manifest) { op_ret = handle_user_manifest(attr_iter->second.c_str()); @@ -2531,6 +3048,9 @@ void RGWPutObj::execute() if (!len) break; + /* save data for producing torrent data */ + torrent.save_data(data); + /* do we need this operation to be synchronous? if we're dealing with an object with immutable * head, e.g., multipart object we need to make sure we're the first one writing to this object */ @@ -2680,6 +3200,18 @@ void RGWPutObj::execute() op_ret = processor->complete(etag, &mtime, real_time(), attrs, delete_at, if_match, if_nomatch); + /* produce torrent */ + if (ofs == torrent.get_data_len()) + { + torrent.init(s, store); + torrent.set_create_date(mtime); + op_ret = torrent.handle_data(); + if (0 != op_ret) + { + ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl; + goto done; + } + } done: dispose_processor(processor); diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 7bc6f84a21482a..e4361a91ae5b22 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -36,10 +36,124 @@ #include "include/assert.h" using namespace std; +using ceph::crypto::SHA1; struct req_state; class RGWHandler; +/* get torrent */ +#define INFO dout(1) <<__FILE__<<":"<<__func__<<":"<<__LINE__<<":"<< + +#define RGW_OBJ_TORRENT "rgw.torrent" + +#define ANNOUNCE "announce" +#define ANNOUNCE_LIST "announce-list" +#define COMMENT "comment" +#define CREATED_BY "created by" +#define CREATION_DATE "creation date" +#define ENCODING "encoding" +#define LENGTH "length" +#define NAME "name" +#define PIECE_LENGTH "piece length" +#define PIECES "pieces" +#define INFO_PIECES "info" +#define GET_TORRENT "get_torrent" + +typedef enum +{ + BE_STR, + BE_INT, + BE_INT64, + BE_TIME_T, + BE_LIST, + BE_DICT, +} be_type; + +// dictionary list struct +struct dicvalue +{ + dicvalue(be_type mtype, string name, void *value) + { + type = mtype; + field = name; + src_data = value; + } + be_type type; + string field; + void *src_data; +}; + +// announce list struct +struct listvalue +{ + be_type type; + string field; + string info; +}; + +// torrent file struct +class seed +{ +private: + struct + { + long piece_length; // each piece length + bufferlist sha1_bl; // save sha1 + string name; // file name + off_t len; // file total bytes + }info; + + string announce; // tracker + string origin; // origin + time_t create_date; // time of the file created + string comment; // comment + string create_by; // app name and version + string encoding; // if encode use gbk rather than gtf-8 use this field + + uint64_t sha_len; // sha1 length + bool is_torrent; // flag + bufferlist bl; // bufflist ready to send + list torrent_bl; // meate data + list list_info; // type is BE_LIST save data + list dic_info; // each type ready to generate to dictionary + list dic_list; // dictionary list + + struct req_state *s; + RGWRados *store; + +public: + seed(); + ~seed(); + + int get_params(); + void init(struct req_state *p_req, RGWRados *p_store); + void get_torrent_file(int &op_ret, RGWRados::Object::Read &read_op, + uint64_t &total_len, bufferlist &bl_data, rgw_obj &obj); + + off_t get_data_len(); + bool get_flag(); + + int handle_data(); + void save_data(bufferlist &bl); + void set_create_date(ceph::real_time value); + void set_info_name(string value); + +private: + void add_e(); + void do_encode (); + void set_announce(string value); + void set_exist(bool exist); + void get_type(be_type type); + void set_info_pieces(char *buff); + int sha1_process(); + void sha1(SHA1 *h, bufferlist &bl, off_t bl_len); + void flush_bufflist(be_type type, void *src_data, const char *field = NULL); + void seed_encode(be_type type, void* src_data, const char *field); + int save_torrent_file(); +}; +// end gettorrent + + /** * Provide the base class for all ops. */ @@ -101,6 +215,7 @@ RGWOp() : s(nullptr), dialect_handler(nullptr), store(nullptr), class RGWGetObj : public RGWOp { protected: + seed torrent; // get torrent const char *range_str; const char *if_mod; const char *if_unmod; @@ -639,6 +754,7 @@ class RGWPutObj : public RGWOp { friend class RGWPutObjProcessor; protected: + seed torrent; off_t ofs; const char *supplied_md5_b64; const char *supplied_etag; diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index a6cadf79e8af35..f6ebff17c50852 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -790,6 +790,19 @@ int RGWGetObj_ObjStore::get_params() mod_pg_ver = s->info.env->get_int("HTTP_DEST_PG_VER", 0); } + /* start gettorrent */ + const char *query_str = s->info.env->get("QUERY_STRING"); + if (0 == strcmp(query_str, GET_TORRENT)) + { + int ret = 0; + ret = torrent.get_params(); + if (ret < 0) + { + return ret; + } + } + /* end gettorrent */ + return 0; } @@ -1000,6 +1013,15 @@ int RGWPutObj_ObjStore::verify_params() int RGWPutObj_ObjStore::get_params() { + /* start gettorrent */ + int ret = 0; + ret = torrent.get_params(); + if (ret < 0) + { + return ret; + } + torrent.set_info_name((s->object).name); + /* end gettorrent */ supplied_md5_b64 = s->info.env->get("HTTP_CONTENT_MD5"); return 0;