diff --git a/CMakeLists.txt b/CMakeLists.txt index 4ba060bbc..d1f8f405f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -291,6 +291,7 @@ if(CASS_BUILD_EXAMPLES) add_subdirectory(examples/callbacks) add_subdirectory(examples/collections) add_subdirectory(examples/date_time) + add_subdirectory(examples/duration) add_subdirectory(examples/logging) add_subdirectory(examples/maps) add_subdirectory(examples/prepared) diff --git a/examples/duration/duration.c b/examples/duration/duration.c index 7c582d6db..9ddf0a999 100644 --- a/examples/duration/duration.c +++ b/examples/duration/duration.c @@ -28,6 +28,8 @@ #include #include "cassandra.h" +#define NANOS_IN_A_SEC (1000LL * 1000LL * 1000LL) + void print_error(CassFuture* future) { const char* message; size_t message_length; @@ -74,14 +76,11 @@ CassError execute_query(CassSession* session, const char* query) { return rc; } -CassError insert_into(CassSession* session, const char* key, cass_int32_t months, cass_int32_t days, cass_int32_t nanos) { +CassError insert_into(CassSession* session, const char* key, cass_int32_t months, cass_int32_t days, cass_int64_t nanos) { CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture* future = NULL; const char* query = "INSERT INTO examples.duration (key, d) VALUES (?, ?);"; - cass_byte_t* data; - size_t data_size; - statement = cass_statement_new(query, 2); @@ -123,11 +122,12 @@ CassError select_from(CassSession* session, const char* key) { CassIterator* iterator = cass_iterator_from_result(result); if (cass_iterator_next(iterator)) { - cass_int32_t months, days, nanos; + cass_int32_t months, days; + cass_int64_t nanos; const CassRow* row = cass_iterator_get_row(iterator); cass_value_get_duration(cass_row_get_column(row, 0), &months, &days, &nanos); - printf("months: %d days: %d nanos: %d\n", months, days, nanos); + printf("months: %d days: %d nanos: %lld\n", months, days, nanos); } cass_result_free(result); @@ -158,23 +158,23 @@ int main(int argc, char* argv[]) { } execute_query(session, - "CREATE KEYSPACE examples WITH replication = { \ - 'class': 'SimpleStrategy', 'replication_factor': '3' };"); + "CREATE KEYSPACE IF NOT EXISTS examples WITH replication = { " + "'class': 'SimpleStrategy', 'replication_factor': '3' };"); execute_query(session, - "CREATE TABLE examples.duration (key text PRIMARY KEY, d duration)"); + "CREATE TABLE IF NOT EXISTS examples.duration " + "(key text PRIMARY KEY, d duration)"); - // Insert some rows into the table, playing with edge values; each of months, days, and nanos may be - // a 64-bit long. + /* Insert some rows into the table and read them back out */ - insert_into(session, "base", 0, 0, 0); - insert_into(session, "simple", 1, 2, 3); - insert_into(session, "negative", -1, -2, -3); + insert_into(session, "zero", 0, 0, 0); + insert_into(session, "one_month_two_days_three_seconds", 1, 2, 3 * NANOS_IN_A_SEC); + insert_into(session, "negative_one_month_two_days_three_seconds", -1, -2, -3 * NANOS_IN_A_SEC); - select_from(session, "base"); - select_from(session, "simple"); - select_from(session, "negative"); + select_from(session, "zero"); + select_from(session, "one_month_two_days_three_seconds"); + select_from(session, "negative_one_month_two_days_three_seconds"); close_future = cass_session_close(session); cass_future_wait(close_future); diff --git a/include/cassandra.h b/include/cassandra.h index 02d4ed1cd..d9de5f8d2 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -51,7 +51,7 @@ */ #define CASS_VERSION_MAJOR 2 -#define CASS_VERSION_MINOR 6 +#define CASS_VERSION_MINOR 7 #define CASS_VERSION_PATCH 0 #define CASS_VERSION_SUFFIX "" @@ -635,13 +635,20 @@ typedef enum CassLogLevel_ { /* @endcond */ } CassLogLevel; -typedef enum CassSslVerifyFlags { +typedef enum CassSslVerifyFlags_ { CASS_SSL_VERIFY_NONE = 0x00, CASS_SSL_VERIFY_PEER_CERT = 0x01, CASS_SSL_VERIFY_PEER_IDENTITY = 0x02, CASS_SSL_VERIFY_PEER_IDENTITY_DNS = 0x04 } CassSslVerifyFlags; +typedef enum CassProtocolVersion_ { + CASS_PROTOCOL_VERSION_V1 = 0x01, + CASS_PROTOCOL_VERSION_V2 = 0x02, + CASS_PROTOCOL_VERSION_V3 = 0x03, + CASS_PROTOCOL_VERSION_V4 = 0x04 +} CassProtocolVersion; + typedef enum CassErrorSource_ { CASS_ERROR_SOURCE_NONE, CASS_ERROR_SOURCE_LIB, @@ -976,11 +983,29 @@ cass_cluster_set_authenticator_callbacks(CassCluster* cluster, * @param[in] cluster * @param[in] protocol_version * @return CASS_OK if successful, otherwise an error occurred. + * + * @see cass_cluster_set_use_beta_protocol_version() */ CASS_EXPORT CassError cass_cluster_set_protocol_version(CassCluster* cluster, int protocol_version); +/** + * Use the newest beta protocol version. This currently enables the use of + * protocol version 5. + * + * Default: cass_false + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] enable if false the highest non-beta protocol version will be used + * @return CASS_OK if successful, otherwise an error occurred. + */ +CASS_EXPORT CassError +cass_cluster_set_use_beta_protocol_version(CassCluster* cluster, + cass_bool_t enable); + /** * Sets the number of IO threads. This is the number of threads * that will handle query requests. @@ -4973,7 +4998,7 @@ cass_statement_bind_duration(CassStatement* statement, size_t index, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Binds a "duration" to all the values with the specified name. @@ -4994,7 +5019,7 @@ cass_statement_bind_duration_by_name(CassStatement* statement, const char* name, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Same as cass_statement_bind_duration_by_name(), but with lengths for string @@ -5020,7 +5045,7 @@ cass_statement_bind_duration_by_name_n(CassStatement* statement, size_t name_length, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Bind a "list", "map" or "set" to a query or bound statement at the @@ -6193,7 +6218,7 @@ CASS_EXPORT CassError cass_collection_append_duration(CassCollection* collection, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Appends a "list", "map" or "set" to the collection. @@ -6630,7 +6655,7 @@ cass_tuple_set_duration(CassTuple* tuple, size_t index, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Sets a "list", "map" or "set" in a tuple at the specified index. @@ -7663,7 +7688,7 @@ cass_user_type_set_duration(CassUserType* user_type, size_t index, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Sets "duration" in a user defined type at the specified name. @@ -7684,7 +7709,7 @@ cass_user_type_set_duration_by_name(CassUserType* user_type, const char* name, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Same as cass_user_type_set_duration_by_name(), but with lengths for string @@ -7710,7 +7735,7 @@ cass_user_type_set_duration_by_name_n(CassUserType* user_type, size_t name_length, cass_int32_t months, cass_int32_t days, - cass_int32_t nanos); + cass_int64_t nanos); /** * Sets a "list", "map" or "set" in a user defined type at the @@ -9202,7 +9227,7 @@ CASS_EXPORT CassError cass_value_get_duration(const CassValue* value, cass_int32_t* months, cass_int32_t* days, - cass_int32_t* nanos); + cass_int64_t* nanos); /** * Gets the type of the specified value. diff --git a/src/abstract_data.hpp b/src/abstract_data.hpp index 3e0c0ea50..e319eab9f 100644 --- a/src/abstract_data.hpp +++ b/src/abstract_data.hpp @@ -89,7 +89,6 @@ class AbstractData { virtual ~AbstractData() { } const ElementVec& elements() const { return elements_; } - size_t elements_count() const { return elements_.size(); } void reset(size_t count) { elements_.clear(); diff --git a/src/batch_request.cpp b/src/batch_request.cpp index 00f990c2c..06036129e 100644 --- a/src/batch_request.cpp +++ b/src/batch_request.cpp @@ -86,9 +86,19 @@ CassError cass_batch_add_statement(CassBatch* batch, CassStatement* statement) { namespace cass { +// Format: ...[][] +// where: +// is a [byte] +// is a [short] +// has the format []...[] +// is a [short] +// Only protocol v3 and higher for the following: +// is a [byte] (or [int] for protocol v5) +// is a [short] +// is a [long] int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const { int length = 0; - uint8_t flags = 0; + uint32_t flags = 0; if (version == 1) { return REQUEST_ERROR_UNSUPPORTED_PROTOCOL; @@ -115,7 +125,7 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs "Batches cannot contain queries with named values"); return REQUEST_ERROR_BATCH_WITH_NAMED_VALUES; } - int32_t result = (*i)->encode_batch(version, bufs, callback); + int32_t result = statement->encode_batch(version, callback, bufs); if (result < 0) { return result; } @@ -127,7 +137,11 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs size_t buf_size = sizeof(uint16_t); if (version >= 3) { // [] - buf_size += sizeof(uint8_t); // [byte] + if (version >= 5) { + buf_size += sizeof(int32_t); // [int] + } else { + buf_size += sizeof(uint8_t); // [byte] + } if (serial_consistency() != 0) { buf_size += sizeof(uint16_t); // [short] @@ -144,7 +158,11 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs size_t pos = buf.encode_uint16(0, callback->consistency()); if (version >= 3) { - pos = buf.encode_byte(pos, flags); + if (version >= 5) { + pos = buf.encode_int32(pos, flags); + } else { + pos = buf.encode_byte(pos, flags); + } if (serial_consistency() != 0) { pos = buf.encode_uint16(pos, serial_consistency()); diff --git a/src/cluster.cpp b/src/cluster.cpp index da9d635d9..cec4d31b6 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -16,6 +16,7 @@ #include "cluster.hpp" +#include "constants.hpp" #include "dc_aware_policy.hpp" #include "external.hpp" #include "logger.hpp" @@ -50,10 +51,28 @@ CassError cass_cluster_set_protocol_version(CassCluster* cluster, if (protocol_version < 1) { return CASS_ERROR_LIB_BAD_PARAMS; } + if (cluster->config().use_beta_protocol_version()) { + LOG_ERROR("The protocol version is already set to the newest beta version v%d " + "and cannot be explicitly set.", CASS_NEWEST_BETA_PROTOCOL_VERSION); + return CASS_ERROR_LIB_BAD_PARAMS; + } else if (protocol_version > CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION) { + LOG_ERROR("Protocol version v%d is higher than the highest supported " + "protocol version v%d (consider using the newest beta protocol version).", + protocol_version, CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION); + return CASS_ERROR_LIB_BAD_PARAMS; + } cluster->config().set_protocol_version(protocol_version); return CASS_OK; } +CassError cass_cluster_set_use_beta_protocol_version(CassCluster* cluster, + cass_bool_t enable) { + cluster->config().set_use_beta_protocol_version(enable == cass_true); + cluster->config().set_protocol_version(enable ? CASS_NEWEST_BETA_PROTOCOL_VERSION + : CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION); + return CASS_OK; +} + CassError cass_cluster_set_num_threads_io(CassCluster* cluster, unsigned num_threads) { if (num_threads == 0) { diff --git a/src/collection.cpp b/src/collection.cpp index e3509f040..7ac064cff 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -79,7 +79,7 @@ CASS_COLLECTION_APPEND(decimal, THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale), cass::CassDecimal(varint, varint_size, scale)) CASS_COLLECTION_APPEND(duration, - THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int32_t nanos), + THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos), cass::CassDuration(months, days, nanos)) #undef CASS_COLLECTION_APPEND diff --git a/src/config.hpp b/src/config.hpp index b1ce210ac..3705efd0d 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -19,6 +19,7 @@ #include "auth.hpp" #include "cassandra.h" +#include "constants.hpp" #include "dc_aware_policy.hpp" #include "host_targeting_policy.hpp" #include "latency_aware_policy.hpp" @@ -43,7 +44,8 @@ class Config { public: Config() : port_(9042) - , protocol_version_(4) + , protocol_version_(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION) + , use_beta_protocol_version_(false) , thread_count_io_(1) , queue_size_io_(8192) , queue_size_event_(8192) @@ -217,6 +219,14 @@ class Config { protocol_version_ = protocol_version; } + bool use_beta_protocol_version() const { + return use_beta_protocol_version_; + } + + void set_use_beta_protocol_version(bool enable) { + use_beta_protocol_version_ = enable; + } + CassLogLevel log_level() const { return log_level_; } void set_log_level(CassLogLevel log_level) { @@ -388,6 +398,7 @@ class Config { private: int port_; int protocol_version_; + bool use_beta_protocol_version_; ContactPointList contact_points_; unsigned thread_count_io_; unsigned queue_size_io_; diff --git a/src/constants.hpp b/src/constants.hpp index eff58c96c..40fdd8047 100644 --- a/src/constants.hpp +++ b/src/constants.hpp @@ -39,6 +39,7 @@ #define CQL_OPCODE_AUTH_CHALLENGE 0x0E #define CQL_OPCODE_AUTH_RESPONSE 0x0F #define CQL_OPCODE_AUTH_SUCCESS 0x10 +#define CQL_OPCODE_CANCEL 0xFF // TODO(mpenick): We need to expose these (not as "CQL") #define CQL_ERROR_SERVER_ERROR 0x0000 @@ -59,19 +60,21 @@ #define CQL_ERROR_CONFIG_ERROR 0x2300 #define CQL_ERROR_ALREADY_EXISTS 0x2400 #define CQL_ERROR_UNPREPARED 0x2500 +#define CQL_ERROR_CLIENT_WRITE_FAILURE 0x8000 #define CASS_FLAG_COMPRESSION 0x01 #define CASS_FLAG_TRACING 0x02 #define CASS_FLAG_CUSTOM_PAYLOAD 0x04 #define CASS_FLAG_WARNING 0x08 +#define CASS_FLAG_BETA 0x10 -#define CASS_QUERY_FLAG_VALUES 0x01 -#define CASS_QUERY_FLAG_SKIP_METADATA 0x02 -#define CASS_QUERY_FLAG_PAGE_SIZE 0x04 -#define CASS_QUERY_FLAG_PAGING_STATE 0x08 -#define CASS_QUERY_FLAG_SERIAL_CONSISTENCY 0x10 -#define CASS_QUERY_FLAG_DEFAULT_TIMESTAMP 0x20 -#define CASS_QUERY_FLAG_NAMES_FOR_VALUES 0x40 +#define CASS_QUERY_FLAG_VALUES 0x00000001 +#define CASS_QUERY_FLAG_SKIP_METADATA 0x00000002 +#define CASS_QUERY_FLAG_PAGE_SIZE 0x00000004 +#define CASS_QUERY_FLAG_PAGING_STATE 0x00000008 +#define CASS_QUERY_FLAG_SERIAL_CONSISTENCY 0x00000010 +#define CASS_QUERY_FLAG_DEFAULT_TIMESTAMP 0x00000020 +#define CASS_QUERY_FLAG_NAMES_FOR_VALUES 0x00000040 #define CASS_BATCH_KIND_QUERY 0 #define CASS_BATCH_KIND_PREPARED 1 @@ -82,9 +85,9 @@ #define CASS_RESULT_KIND_PREPARED 4 #define CASS_RESULT_KIND_SCHEMA_CHANGE 5 -#define CASS_RESULT_FLAG_GLOBAL_TABLESPEC 1 -#define CASS_RESULT_FLAG_HAS_MORE_PAGES 2 -#define CASS_RESULT_FLAG_NO_METADATA 4 +#define CASS_RESULT_FLAG_GLOBAL_TABLESPEC 0x00000001 +#define CASS_RESULT_FLAG_HAS_MORE_PAGES 0x00000002 +#define CASS_RESULT_FLAG_NO_METADATA 0x00000004 #define CASS_EVENT_TOPOLOGY_CHANGE 1 #define CASS_EVENT_STATUS_CHANGE 2 @@ -93,6 +96,7 @@ #define CASS_HEADER_SIZE_V1_AND_V2 8 #define CASS_HEADER_SIZE_V3 9 -#define CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION 3 +#define CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION 4 +#define CASS_NEWEST_BETA_PROTOCOL_VERSION 5 #endif diff --git a/src/encode.cpp b/src/encode.cpp index f9e7eaa82..79c5f47d5 100644 --- a/src/encode.cpp +++ b/src/encode.cpp @@ -20,8 +20,7 @@ namespace cass { -static char* encode_vint(char* output, uint64_t value, size_t value_size) -{ +static char* encode_vint(char* output, uint64_t value, size_t value_size) { if (value_size == 1) { // This is just a one byte value; write it and get out. *output = value; diff --git a/src/error_response.cpp b/src/error_response.cpp index d7a906c46..080a45f41 100644 --- a/src/error_response.cpp +++ b/src/error_response.cpp @@ -151,6 +151,9 @@ bool ErrorResponse::decode(int version, char* buffer, size_t size) { pos = decode_int32(pos, received_); pos = decode_int32(pos, required_); pos = decode_int32(pos, num_failures_); + if (version >= 5) { + pos = decode_failures(pos); + } decode_byte(pos, data_present_); break; case CQL_ERROR_FUNCTION_FAILURE: @@ -163,6 +166,9 @@ bool ErrorResponse::decode(int version, char* buffer, size_t size) { pos = decode_int32(pos, received_); pos = decode_int32(pos, required_); pos = decode_int32(pos, num_failures_); + if (version >= 5) { + pos = decode_failures(pos); + } decode_write_type(pos); break; case CQL_ERROR_UNPREPARED: @@ -176,6 +182,21 @@ bool ErrorResponse::decode(int version, char* buffer, size_t size) { return true; } +// Format: +// where: +// is a [inetaddr] +// is a [short] +char* ErrorResponse::decode_failures(char* pos) { + failures_.reserve(num_failures_); + for (int32_t i = 0; i < num_failures_; ++i) { + Failure failure; + pos = decode_inet(pos, &failure.endpoint); + pos = decode_uint16(pos, failure.failurecode); + failures_.push_back(failure); + } + return pos; +} + void ErrorResponse::decode_write_type(char* pos) { StringRef write_type; decode_string(pos, &write_type); diff --git a/src/error_response.hpp b/src/error_response.hpp index 5934a735f..4dc56d5ad 100644 --- a/src/error_response.hpp +++ b/src/error_response.hpp @@ -28,6 +28,7 @@ #include #include +#include namespace cass { @@ -62,8 +63,17 @@ class ErrorResponse : public Response { bool decode(int version, char* buffer, size_t size); private: + char* decode_failures(char* pos); void decode_write_type(char* pos); +private: + struct Failure { + CassInet endpoint; + uint16_t failurecode; + }; + + typedef std::vector FailureVec; + private: int32_t code_; StringRef message_; @@ -72,6 +82,7 @@ class ErrorResponse : public Response { int32_t received_; int32_t required_; int32_t num_failures_; + FailureVec failures_; uint8_t data_present_; CassWriteType write_type_; StringRef keyspace_; diff --git a/src/execute_request.cpp b/src/execute_request.cpp index 343183c25..62f64aad5 100644 --- a/src/execute_request.cpp +++ b/src/execute_request.cpp @@ -21,156 +21,18 @@ namespace cass { -int32_t ExecuteRequest::encode_batch(int version, BufferVec* bufs, RequestCallback* callback) const { - int32_t length = 0; - const std::string& id(prepared_->id()); - - // ... ([byte][short bytes][short][bytes]...[bytes]) - int buf_size = sizeof(uint8_t) + sizeof(uint16_t) + id.size() + sizeof(uint16_t); - - bufs->push_back(Buffer(buf_size)); - length += buf_size; - - Buffer& buf = bufs->back(); - size_t pos = buf.encode_byte(0, kind()); - pos = buf.encode_string(pos, id.data(), id.size()); - - buf.encode_uint16(pos, elements_count()); - if (elements_count() > 0) { - int32_t result = copy_buffers(version, bufs, callback); - if (result < 0) return result; - length += result; - } - - return length; -} - int ExecuteRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const { if (version == 1) { - return internal_encode_v1(callback, bufs); + return encode_v1(callback, bufs); } else { - return internal_encode(version, callback, bufs); - } -} - -int ExecuteRequest::internal_encode_v1(RequestCallback* callback, BufferVec* bufs) const { - size_t length = 0; - const int version = 1; - - const std::string& prepared_id = prepared_->id(); - - // [short bytes] + [short] - size_t prepared_buf_size = sizeof(uint16_t) + prepared_id.size() + - sizeof(uint16_t); - - { - bufs->push_back(Buffer(prepared_buf_size)); - length += prepared_buf_size; - - Buffer& buf = bufs->back(); - size_t pos = buf.encode_string(0, - prepared_id.data(), - prepared_id.size()); - buf.encode_uint16(pos, elements_count()); - // ... - int32_t result = copy_buffers(version, bufs, callback); + int32_t length = 0; + length += encode_begin(version, elements().size(), callback, bufs); + int32_t result = encode_values(version, callback, bufs); if (result < 0) return result; length += result; + length += encode_end(version, callback, bufs); + return length; } - - { - // [short] - size_t buf_size = sizeof(uint16_t); - - Buffer buf(buf_size); - buf.encode_uint16(0, callback->consistency()); - bufs->push_back(buf); - length += buf_size; - } - - return length; -} - -int ExecuteRequest::internal_encode(int version, RequestCallback* callback, BufferVec* bufs) const { - int length = 0; - uint8_t flags = this->flags(); - - const std::string& prepared_id = prepared_->id(); - - // [short bytes] + [short] + [byte] - size_t prepared_buf_size = sizeof(uint16_t) + prepared_id.size() + - sizeof(uint16_t) + sizeof(uint8_t); - size_t paging_buf_size = 0; - - if (elements_count() > 0) { // = ... - prepared_buf_size += sizeof(uint16_t); // [short] - flags |= CASS_QUERY_FLAG_VALUES; - } - - if (page_size() >= 0) { - paging_buf_size += sizeof(int32_t); // [int] - flags |= CASS_QUERY_FLAG_PAGE_SIZE; - } - - if (!paging_state().empty()) { - paging_buf_size += sizeof(int32_t) + paging_state().size(); // [bytes] - flags |= CASS_QUERY_FLAG_PAGING_STATE; - } - - if (serial_consistency() != 0) { - paging_buf_size += sizeof(uint16_t); // [short] - flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY; - } - - if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { - paging_buf_size += sizeof(int64_t); // [long] - flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP; - } - - { - bufs->push_back(Buffer(prepared_buf_size)); - length += prepared_buf_size; - - Buffer& buf = bufs->back(); - size_t pos = buf.encode_string(0, - prepared_id.data(), - prepared_id.size()); - pos = buf.encode_uint16(pos, callback->consistency()); - pos = buf.encode_byte(pos, flags); - - if (elements_count() > 0) { - buf.encode_uint16(pos, elements_count()); - int32_t result = copy_buffers(version, bufs, callback); - if (result < 0) return result; - length += result; - } - } - - if (paging_buf_size > 0) { - bufs->push_back(Buffer(paging_buf_size)); - length += paging_buf_size; - - Buffer& buf = bufs->back(); - size_t pos = 0; - - if (page_size() >= 0) { - pos = buf.encode_int32(pos, page_size()); - } - - if (!paging_state().empty()) { - pos = buf.encode_bytes(pos, paging_state().data(), paging_state().size()); - } - - if (serial_consistency() != 0) { - pos = buf.encode_uint16(pos, serial_consistency()); - } - - if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { - pos = buf.encode_int64(pos, callback->timestamp()); - } - } - - return length; } } // namespace cass diff --git a/src/execute_request.hpp b/src/execute_request.hpp index 094e70fb4..31f738e14 100644 --- a/src/execute_request.hpp +++ b/src/execute_request.hpp @@ -30,12 +30,8 @@ namespace cass { class ExecuteRequest : public Statement { public: ExecuteRequest(const Prepared* prepared) - : Statement(CQL_OPCODE_EXECUTE, CASS_BATCH_KIND_PREPARED, - prepared->result()->column_count(), - prepared->key_indices(), - prepared->result()->keyspace().to_string()) - , prepared_(prepared) - , metadata_(prepared->result()->metadata()){ + : Statement(prepared) + , prepared_(prepared) { // If the prepared statement has result metadata then there is no // need to get the metadata with this request too. if (prepared->result()->result_metadata()) { @@ -45,25 +41,23 @@ class ExecuteRequest : public Statement { const Prepared::ConstPtr& prepared() const { return prepared_; } + virtual int encode(int version, RequestCallback* callback, BufferVec* bufs) const; + + bool get_routing_key(std::string* routing_key, EncodingCache* cache) const { + return calculate_routing_key(prepared_->key_indices(), routing_key, cache); + } + private: virtual size_t get_indices(StringRef name, IndexVec* indices) { - return metadata_->get_indices(name, indices); + return prepared_->result()->metadata()->get_indices(name, indices); } virtual const DataType::ConstPtr& get_type(size_t index) const { - return metadata_->get_column_definition(index).data_type; + return prepared_->result()->metadata()->get_column_definition(index).data_type; } - virtual int32_t encode_batch(int version, BufferVec* bufs, RequestCallback* callback) const; - -private: - int encode(int version, RequestCallback* callback, BufferVec* bufs) const; - int internal_encode_v1(RequestCallback* callback, BufferVec* bufs) const; - int internal_encode(int version, RequestCallback* callback, BufferVec* bufs) const; - private: Prepared::ConstPtr prepared_; - ResultMetadata::Ptr metadata_; }; } // namespace cass diff --git a/src/logger.hpp b/src/logger.hpp index 4e64d6640..68188b7ec 100644 --- a/src/logger.hpp +++ b/src/logger.hpp @@ -102,12 +102,12 @@ class Logger { # define LOG_FUNCTION_ "" #endif -#define LOG_CHECK_LEVEL(severity, ...) do { \ - if (severity <= Logger::log_level()) { \ - Logger::log(severity, \ - LOG_FILE_, __LINE__, LOG_FUNCTION_, \ - LOG_FIRST_(__VA_ARGS__) LOG_REST_(__VA_ARGS__)); \ - } \ +#define LOG_CHECK_LEVEL(severity, ...) do { \ + if (severity <= cass::Logger::log_level()) { \ + cass::Logger::log(severity, \ + LOG_FILE_, __LINE__, LOG_FUNCTION_, \ + LOG_FIRST_(__VA_ARGS__) LOG_REST_(__VA_ARGS__)); \ + } \ } while(0) #define LOG_CRITICAL(...) LOG_CHECK_LEVEL(CASS_LOG_CRITICAL, __VA_ARGS__) diff --git a/src/query_request.cpp b/src/query_request.cpp index 76c4ff5de..64fbcb055 100644 --- a/src/query_request.cpp +++ b/src/query_request.cpp @@ -23,65 +23,37 @@ namespace cass { -int32_t QueryRequest::encode_batch(int version, BufferVec* bufs, RequestCallback* callback) const { - int32_t length = 0; - const std::string& query(query_); - - // [name_1]...[name_n] ([byte][long string][short][bytes]...[bytes]) - int buf_size = sizeof(uint8_t) + sizeof(int32_t) + query.size() + sizeof(uint16_t); - - bufs->push_back(Buffer(buf_size)); - length += buf_size; - - Buffer& buf = bufs->back(); - size_t pos = buf.encode_byte(0, kind()); - pos = buf.encode_long_string(pos, query.data(), query.size()); - - if (has_names_for_values()) { - if (version < 3) { - LOG_ERROR("Protocol version %d does not support named values", version); - return REQUEST_ERROR_UNSUPPORTED_PROTOCOL; - } - buf.encode_uint16(pos, value_names_.size()); - length += copy_buffers_with_names(version, bufs, callback->encoding_cache()); +int QueryRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const { + if (version == 1) { + return encode_v1(callback, bufs); } else { - buf.encode_uint16(pos, elements_count()); - if (elements_count() > 0) { - int32_t result = copy_buffers(version, bufs, callback); - if (result < 0) return result; - length += result; - } - } - - return length; -} - -size_t QueryRequest::get_indices(StringRef name, IndexVec* indices) { - set_has_names_for_values(true); - - if (value_names_.get_indices(name, indices) == 0) { - if (value_names_.size() > elements_count()) { - // No more space left for new named values - return 0; - } - if (name.size() > 0 && name.front() == '"' && name.back() == '"') { - name = name.substr(1, name.size() - 2); + int32_t length = 0; + int32_t result; + if (has_names_for_values()) { + length += encode_begin(version, value_names_->size(), callback, bufs); + result = encode_values_with_names(version, callback, bufs); + } else { + length += encode_begin(version, elements().size(), callback, bufs); + result = encode_values(version, callback, bufs); } - indices->push_back(value_names_.add(ValueName(name.to_string()))); + if (result < 0) return result; + length += result; + length += encode_end(version, callback, bufs); + return length; } - - return indices->size(); } -int32_t QueryRequest::copy_buffers_with_names(int version, - BufferVec* bufs, - EncodingCache* cache) const { +// Format: [...] +// where: +// is a [string] +// is a [bytes] +int32_t QueryRequest::encode_values_with_names(int version, RequestCallback* callback, BufferVec* bufs) const { int32_t size = 0; - for (size_t i = 0; i < value_names_.size(); ++i) { - const Buffer& name_buf = value_names_[i].buf; + for (size_t i = 0; i < value_names_->size(); ++i) { + const Buffer& name_buf = (*value_names_)[i].buf; bufs->push_back(name_buf); - Buffer value_buf(elements()[i].get_buffer_cached(version, cache, false)); + Buffer value_buf(elements()[i].get_buffer_cached(version, callback->encoding_cache(), false)); bufs->push_back(value_buf); size += name_buf.size() + value_buf.size(); @@ -89,109 +61,24 @@ int32_t QueryRequest::copy_buffers_with_names(int version, return size; } -int QueryRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const { - if (version == 1) { - return internal_encode_v1(callback, bufs); - } else { - return internal_encode(version, callback, bufs); - } -} - -int QueryRequest::internal_encode_v1(RequestCallback* callback, BufferVec* bufs) const { - // [long string] + [short] - size_t length = sizeof(int32_t) + query_.size() + sizeof(uint16_t); - - Buffer buf(length); - size_t pos = buf.encode_long_string(0, query_.data(), query_.size()); - buf.encode_uint16(pos, callback->consistency()); - bufs->push_back(buf); - - return length; -} - -int QueryRequest::internal_encode(int version, RequestCallback* callback, BufferVec* bufs) const { - int length = 0; - uint8_t flags = this->flags(); - - // [long string] + [short] + [byte] - size_t query_buf_size = sizeof(int32_t) + query_.size() + - sizeof(uint16_t) + sizeof(uint8_t); - size_t paging_buf_size = 0; - - if (elements_count() > 0) { // = ... - query_buf_size += sizeof(uint16_t); // [short] - flags |= CASS_QUERY_FLAG_VALUES; - } - - if (page_size() > 0) { - paging_buf_size += sizeof(int32_t); // [int] - flags |= CASS_QUERY_FLAG_PAGE_SIZE; - } - - if (!paging_state().empty()) { - paging_buf_size += sizeof(int32_t) + paging_state().size(); // [bytes] - flags |= CASS_QUERY_FLAG_PAGING_STATE; - } - - if (serial_consistency() != 0) { - paging_buf_size += sizeof(uint16_t); // [short] - flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY; - } - - if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { - paging_buf_size += sizeof(int64_t); // [long] - flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP; - } - - { - bufs->push_back(Buffer(query_buf_size)); - length += query_buf_size; - - Buffer& buf = bufs->back(); - size_t pos = buf.encode_long_string(0, query_.data(), query_.size()); - pos = buf.encode_uint16(pos, callback->consistency()); - pos = buf.encode_byte(pos, flags); - - if (has_names_for_values()) { - if (version < 3) { - LOG_ERROR("Protocol version %d does not support named values", version); - return REQUEST_ERROR_UNSUPPORTED_PROTOCOL; - } - buf.encode_uint16(pos, value_names_.size()); - length += copy_buffers_with_names(version, bufs, callback->encoding_cache()); - } else if (elements_count() > 0) { - buf.encode_uint16(pos, elements_count()); - int32_t result = copy_buffers(version, bufs, callback); - if (result < 0) return result; - length += result; - } +size_t QueryRequest::get_indices(StringRef name, IndexVec* indices) { + if (!value_names_) { + set_has_names_for_values(true); + value_names_.reset(new ValueNameHashTable(elements().size())); } - if (paging_buf_size > 0) { - bufs->push_back(Buffer(paging_buf_size)); - length += paging_buf_size; - - Buffer& buf = bufs->back(); - size_t pos = 0; - - if (page_size() >= 0) { - pos = buf.encode_int32(pos, page_size()); - } - - if (!paging_state().empty()) { - pos = buf.encode_bytes(pos, paging_state().data(), paging_state().size()); - } - - if (serial_consistency() != 0) { - pos = buf.encode_uint16(pos, serial_consistency()); + if (value_names_->get_indices(name, indices) == 0) { + if (value_names_->size() > elements().size()) { + // No more space left for new named values + return 0; } - - if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { - pos = buf.encode_int64(pos, callback->timestamp()); + if (name.size() > 0 && name.front() == '"' && name.back() == '"') { + name = name.substr(1, name.size() - 2); } + indices->push_back(value_names_->add(ValueName(name.to_string()))); } - return length; + return indices->size(); } } // namespace cass diff --git a/src/query_request.hpp b/src/query_request.hpp index 03ed03c9b..dca3a2eb4 100644 --- a/src/query_request.hpp +++ b/src/query_request.hpp @@ -28,58 +28,40 @@ namespace cass { class QueryRequest : public Statement { public: - struct ValueName : HashTableEntry { - ValueName() { } - - ValueName(const std::string& name) - : name(name) - , buf(sizeof(uint16_t) + name.size()) { - buf.encode_string(0, name.data(), name.size()); - } - - std::string name; - Buffer buf; - }; - - explicit QueryRequest(size_t value_count = 0) - : Statement(CQL_OPCODE_QUERY, CASS_BATCH_KIND_QUERY, - value_count) - , value_names_(value_count) { } - QueryRequest(const std::string& query, size_t value_count = 0) - : Statement(CQL_OPCODE_QUERY, CASS_BATCH_KIND_QUERY, - value_count) - , query_(query) - , value_names_(value_count) { } + : Statement(query.data(), query.size(), value_count) { } QueryRequest(const char* query, size_t query_length, - size_t value_count = 0) - : Statement(CQL_OPCODE_QUERY, CASS_BATCH_KIND_QUERY, - value_count) - , query_(query, query_length) - , value_names_(value_count) { } + size_t value_count) + : Statement(query, query_length, value_count) { } - virtual int32_t encode_batch(int version, BufferVec* bufs, RequestCallback* callback) const; + virtual int encode(int version, RequestCallback* callback, BufferVec* bufs) const; private: - virtual size_t get_indices(StringRef name, - IndexVec* indices); + int32_t encode_values_with_names(int version, RequestCallback* callback, BufferVec* bufs) const; + + virtual size_t get_indices(StringRef name, IndexVec* indices); virtual const DataType::ConstPtr& get_type(size_t index) const { return DataType::NIL; } private: - int32_t copy_buffers_with_names(int version, BufferVec* bufs, EncodingCache* cache) const; + struct ValueName : HashTableEntry { + ValueName() { } - int encode(int version, RequestCallback* callback, BufferVec* bufs) const; - int internal_encode_v1(RequestCallback* callback, BufferVec* bufs) const; - int internal_encode(int version, RequestCallback* callback, BufferVec* bufs) const; + ValueName(const std::string& name) + : name(name) + , buf(sizeof(uint16_t) + name.size()) { + buf.encode_string(0, name.data(), name.size()); + } -private: - std::string query_; - CaseInsensitiveHashTable value_names_; + std::string name; + Buffer buf; + }; + typedef CaseInsensitiveHashTable ValueNameHashTable; + ScopedPtr value_names_; }; } // namespace cass diff --git a/src/request_callback.cpp b/src/request_callback.cpp index d82837fe7..c75f7beb2 100644 --- a/src/request_callback.cpp +++ b/src/request_callback.cpp @@ -35,16 +35,16 @@ void RequestCallback::start(Connection* connection, int stream) { } int32_t RequestCallback::encode(int version, int flags, BufferVec* bufs) { - if (version < 1 || version > 4) { - return Request::REQUEST_ERROR_UNSUPPORTED_PROTOCOL; - } - size_t index = bufs->size(); bufs->push_back(Buffer()); // Placeholder const Request* req = request(); int32_t length = 0; + if (version == CASS_NEWEST_BETA_PROTOCOL_VERSION) { + flags |= CASS_FLAG_BETA; + } + if (version >= 4 && req->custom_payload()) { flags |= CASS_FLAG_CUSTOM_PAYLOAD; length += req->custom_payload()->encode(bufs); diff --git a/src/serialization.hpp b/src/serialization.hpp index c0e5de219..764e1b137 100644 --- a/src/serialization.hpp +++ b/src/serialization.hpp @@ -240,6 +240,15 @@ inline char* decode_inet(char* input, Address* output) { return pos; } +inline char* decode_inet(char* input, CassInet* output) { + char* pos = decode_byte(input, output->address_length); + + assert(output->address_length <= 16); + memcpy(output->address, pos, output->address_length); + + return pos + output->address_length; +} + inline char* decode_string_map(char* input, std::map& map) { @@ -385,6 +394,45 @@ inline uint64_t encode_zig_zag(int64_t n) { return (n << 1) ^ (n >> 63); } +inline const uint8_t* decode_vint(const uint8_t* input, const uint8_t* end, uint64_t* output) { + int num_extra_bytes; + int i; + uint8_t first_byte = *input++; + if (first_byte <= 127) { + // If this is a multibyte vint, at least the MSB of the first byte + // will be set. Since that's not the case, this is a one-byte value. + *output = first_byte; + } else { + // The number of consecutive most significant bits of the first-byte tell us how + // many additional bytes are in this vint. Count them like this: + // 1. Invert the firstByte so that all leading 1s become 0s. + // 2. Count the number of leading zeros; num_leading_zeros assumes a 64-bit long. + // 3. We care about leading 0s in the byte, not int, so subtract out the + // appropriate number of extra bits (56 for a 64-bit int). + + // We mask out high-order bits to prevent sign-extension as the value is placed in a 64-bit arg + // to the num_leading_zeros function. + num_extra_bytes = cass::num_leading_zeros(~first_byte & 0xff) - 56; + + // Error out if we don't have num_extra_bytes left in our data. + if (input + num_extra_bytes > end) { + // There aren't enough bytes. This duration object is not fully defined. + return NULL; + } + + // Build up the vint value one byte at a time from the data bytes. + // The firstByte contains size as well as the most significant bits of + // the value. Extract just the value. + *output = first_byte & (0xff >> num_extra_bytes); + for (i = 0; i < num_extra_bytes; ++i) { + uint8_t b = *input++; + *output <<= 8; + *output |= b & 0xff; + } + } + return input; +} + } // namespace cass #endif diff --git a/src/statement.cpp b/src/statement.cpp index ba5a03a28..6db5cfe2f 100644 --- a/src/statement.cpp +++ b/src/statement.cpp @@ -53,7 +53,7 @@ CassError cass_statement_reset_parameters(CassStatement* statement, size_t count CassError cass_statement_add_key_index(CassStatement* statement, size_t index) { if (statement->kind() != CASS_BATCH_KIND_QUERY) return CASS_ERROR_LIB_BAD_PARAMS; - if (index >= statement->elements_count()) return CASS_ERROR_LIB_BAD_PARAMS; + if (index >= statement->elements().size()) return CASS_ERROR_LIB_BAD_PARAMS; statement->add_key_index(index); return CASS_OK; } @@ -171,7 +171,7 @@ CASS_STATEMENT_BIND(decimal, THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale), cass::CassDecimal(varint, varint_size, scale)) CASS_STATEMENT_BIND(duration, - THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int32_t nanos), + THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos), cass::CassDuration(months, days, nanos)) #undef CASS_STATEMENT_BIND @@ -250,8 +250,154 @@ CassError cass_statement_bind_custom_by_name_n(CassStatement* statement, namespace cass { -int32_t Statement::copy_buffers(int version, BufferVec* bufs, RequestCallback* callback) const { - int32_t size = 0; +// Format: ... +// where: +// is a [byte] +// is a [long string] for and a [short bytes] for +// is a [short] +// is a [bytes] +int32_t Statement::encode_batch(int version, RequestCallback* callback, BufferVec* bufs) const { + int32_t length = 0; + + { // [byte] + bufs->push_back(Buffer(sizeof(uint8_t))); + Buffer& buf = bufs->back(); + buf.encode_byte(0, kind()); + length += sizeof(uint8_t); + } + + bufs->push_back(query_or_id_); + length += query_or_id_.size(); + + { // [short] + bufs->push_back(Buffer(sizeof(uint16_t))); + Buffer& buf = bufs->back(); + buf.encode_uint16(0, elements().size()); + length += sizeof(uint16_t); + } + + if (elements().size() > 0) { + int32_t result = encode_values(version, callback, bufs); + if (result < 0) return result; + length += result; + } + + return length; +} + +// Format: [...] +// where: +// is a [long string] for and a [short bytes] for +// is a [short] (only for execute statements) +// is a [bytes] (only for execute statements) +// is a [short] +int32_t Statement::encode_v1(RequestCallback* callback, BufferVec* bufs) const { + int32_t length = 0; + const int version = 1; + + bufs->push_back(query_or_id_); + length += query_or_id_.size(); + + if (opcode() == CQL_OPCODE_EXECUTE) { + { // [short] + Buffer buf(sizeof(uint16_t)); + buf.encode_uint16(0, elements().size()); + bufs->push_back(buf); + length += sizeof(uint16_t); + } + // ... + int32_t result = encode_values(version, callback, bufs); + if (result < 0) return result; + length += result; + } + + { // [short] + Buffer buf(sizeof(uint16_t)); + buf.encode_uint16(0, callback->consistency()); + bufs->push_back(buf); + length += sizeof(uint16_t); + } + + return length; +} + +// For query statements the format is: +// +// where: +// has the format [long string] +// is a [short] +// is a [byte] (or [int] for protocol v5) +// is a [short] +// +// For execute statements the format is: +// +// where: +// has the format [short bytes] (or [string]) +// is a [short] +// is a [byte] (or [int] for protocol v5) +// is a [short] +int32_t Statement::encode_begin(int version, uint16_t element_count, + RequestCallback* callback, BufferVec* bufs) const { + int32_t length = 0; + size_t query_params_buf_size = 0; + int32_t flags = flags_; + + bufs->push_back(query_or_id_); + length += query_or_id_.size(); + + query_params_buf_size += sizeof(uint16_t); // [short] + + if (version >= 5) { + query_params_buf_size += sizeof(int32_t); // [int] + } else { + query_params_buf_size += sizeof(uint8_t); // [byte] + } + + if (element_count > 0) { + query_params_buf_size += sizeof(uint16_t); // [short] + flags |= CASS_QUERY_FLAG_VALUES; + } + + if (page_size() > 0) { + flags |= CASS_QUERY_FLAG_PAGE_SIZE; + } + + if (!paging_state().empty()) { + flags |= CASS_QUERY_FLAG_PAGING_STATE; + } + + if (serial_consistency() != 0) { + flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY; + } + + if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { + flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP; + } + + bufs->push_back(Buffer(query_params_buf_size)); + length += query_params_buf_size; + + Buffer& buf = bufs->back(); + size_t pos = buf.encode_uint16(0, callback->consistency()); + + if (version >= 5) { + pos = buf.encode_int32(pos, flags); + } else { + pos = buf.encode_byte(pos, flags); + } + + if (element_count > 0) { + buf.encode_uint16(pos, element_count); + } + + return length; +} + +// Format: [...] +// where: +// is a [bytes] +int32_t Statement::encode_values(int version, RequestCallback* callback, BufferVec* bufs) const { + int32_t length = 0; for (size_t i = 0; i < elements().size(); ++i) { const Element& element = elements()[i]; if (!element.is_unset()) { @@ -266,29 +412,83 @@ int32_t Statement::copy_buffers(int version, BufferVec* bufs, RequestCallback* c return Request::REQUEST_ERROR_PARAMETER_UNSET; } } - size += bufs->back().size(); + length += bufs->back().size(); } - return size; + return length; } -bool Statement::get_routing_key(std::string* routing_key, EncodingCache* cache) const { - if (key_indices_.empty()) return false; +// Format: [][][][] +// where: +// is a [int] +// is a [bytes] +// is a [short] +// is a [long] +int32_t Statement::encode_end(int version, RequestCallback* callback, BufferVec* bufs) const { + int32_t length = 0; + size_t paging_buf_size = 0; + + if (page_size() > 0) { + paging_buf_size += sizeof(int32_t); // [int] + } - if (key_indices_.size() == 1) { - assert(key_indices_.front() < elements_count()); - const AbstractData::Element& element(elements()[key_indices_.front()]); - if (element.is_unset() || element.is_null()) { - return false; - } - Buffer buf(element.get_buffer_cached(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION, cache, true)); - routing_key->assign(buf.data() + sizeof(int32_t), - buf.size() - sizeof(int32_t)); + if (!paging_state().empty()) { + paging_buf_size += sizeof(int32_t) + paging_state().size(); // [bytes] + } + + if (serial_consistency() != 0) { + paging_buf_size += sizeof(uint16_t); // [short] + } + + if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { + paging_buf_size += sizeof(int64_t); // [long] + } + + if (paging_buf_size > 0) { + bufs->push_back(Buffer(paging_buf_size)); + length += paging_buf_size; + + Buffer& buf = bufs->back(); + size_t pos = 0; + + if (page_size() >= 0) { + pos = buf.encode_int32(pos, page_size()); + } + + if (!paging_state().empty()) { + pos = buf.encode_bytes(pos, paging_state().data(), paging_state().size()); + } + + if (serial_consistency() != 0) { + pos = buf.encode_uint16(pos, serial_consistency()); + } + + if (version >= 3 && callback->timestamp() != CASS_INT64_MIN) { + pos = buf.encode_int64(pos, callback->timestamp()); + } + } + + return length; +} + +bool Statement::calculate_routing_key(const std::vector& key_indices, + std::string* routing_key, EncodingCache* cache) const { + if (key_indices.empty()) return false; + + if (key_indices.size() == 1) { + assert(key_indices.front() < elements().size()); + const AbstractData::Element& element(elements()[key_indices.front()]); + if (element.is_unset() || element.is_null()) { + return false; + } + Buffer buf(element.get_buffer_cached(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION, cache, true)); + routing_key->assign(buf.data() + sizeof(int32_t), + buf.size() - sizeof(int32_t)); } else { size_t length = 0; - for (std::vector::const_iterator i = key_indices_.begin(); - i != key_indices_.end(); ++i) { - assert(*i < elements_count()); + for (std::vector::const_iterator i = key_indices.begin(); + i != key_indices.end(); ++i) { + assert(*i < elements().size()); const AbstractData::Element& element(elements()[*i]); if (element.is_unset() || element.is_null()) { return false; @@ -300,8 +500,8 @@ bool Statement::get_routing_key(std::string* routing_key, EncodingCache* cache) routing_key->clear(); routing_key->reserve(length); - for (std::vector::const_iterator i = key_indices_.begin(); - i != key_indices_.end(); ++i) { + for (std::vector::const_iterator i = key_indices.begin(); + i != key_indices.end(); ++i) { const AbstractData::Element& element(elements()[*i]); Buffer buf(element.get_buffer_cached(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION, cache, true)); size_t size = buf.size() - sizeof(int32_t); diff --git a/src/statement.hpp b/src/statement.hpp index 671fc6e92..9de095c53 100644 --- a/src/statement.hpp +++ b/src/statement.hpp @@ -21,10 +21,12 @@ #include "constants.hpp" #include "external.hpp" #include "macros.hpp" +#include "prepared.hpp" #include "request.hpp" #include "result_metadata.hpp" #include "result_response.hpp" #include "retry_policy.hpp" +#include "scoped_ptr.hpp" #include #include @@ -37,27 +39,31 @@ class Statement : public RoutableRequest, public AbstractData { public: typedef SharedRefPtr Ptr; - Statement(uint8_t opcode, uint8_t kind, size_t values_count = 0) - : RoutableRequest(opcode) + Statement(const char* query, size_t query_length, + size_t values_count) + : RoutableRequest(CQL_OPCODE_QUERY) , AbstractData(values_count) + , query_or_id_(sizeof(int32_t) + query_length) , flags_(0) - , page_size_(-1) - , kind_(kind) { } + , page_size_(-1) { + // [long string] + query_or_id_.encode_long_string(0, query, query_length); + } - Statement(uint8_t opcode, uint8_t kind, size_t values_count, - const std::vector& key_indices, - const std::string& keyspace) - : RoutableRequest(opcode, keyspace) - , AbstractData(values_count) + Statement(const Prepared* prepared) + : RoutableRequest(CQL_OPCODE_EXECUTE, + prepared->result()->keyspace().to_string()) + , AbstractData(prepared->result()->column_count()) + , query_or_id_(sizeof(uint16_t) + prepared->id().size()) , flags_(0) - , page_size_(-1) - , kind_(kind) - , key_indices_(key_indices) { } + , page_size_(-1) { + // [short bytes] (or [string]) + const std::string& id = prepared->id(); + query_or_id_.encode_string(0, id.data(), id.size()); + } virtual ~Statement() { } - uint8_t flags() const { return flags_; } - bool skip_metadata() const { return flags_ & CASS_QUERY_FLAG_SKIP_METADATA; } @@ -82,7 +88,7 @@ class Statement : public RoutableRequest, public AbstractData { return flags_ & CASS_QUERY_FLAG_NAMES_FOR_VALUES; } - int32_t page_size() const { return page_size_; } + int32_t page_size() const { return page_size_; } void set_page_size(int32_t page_size) { page_size_ = page_size; } @@ -92,22 +98,35 @@ class Statement : public RoutableRequest, public AbstractData { paging_state_ = paging_state; } - uint8_t kind() const { return kind_; } + uint8_t kind() const { + return opcode() == CQL_OPCODE_QUERY ? CASS_BATCH_KIND_QUERY + : CASS_BATCH_KIND_PREPARED; + } void add_key_index(size_t index) { key_indices_.push_back(index); } - virtual bool get_routing_key(std::string* routing_key, EncodingCache* cache) const; + virtual bool get_routing_key(std::string* routing_key, EncodingCache* cache) const { + return calculate_routing_key(key_indices_, routing_key, cache); + } - virtual int32_t encode_batch(int version, BufferVec* bufs, RequestCallback* callback) const = 0; + int32_t encode_batch(int version, RequestCallback* callback, BufferVec* bufs) const; protected: - int32_t copy_buffers(int version, BufferVec* bufs, RequestCallback* callback) const; + int32_t encode_v1(RequestCallback* callback, BufferVec* bufs) const; + + int32_t encode_begin(int version, uint16_t element_count, + RequestCallback* callback, BufferVec* bufs) const; + int32_t encode_values(int version, RequestCallback* callback, BufferVec* bufs) const; + int32_t encode_end(int version, RequestCallback* callback, BufferVec* bufs) const; + + bool calculate_routing_key(const std::vector& key_indices, + std::string* routing_key, EncodingCache* cache) const; private: - uint8_t flags_; + Buffer query_or_id_; + int32_t flags_; int32_t page_size_; std::string paging_state_; - uint8_t kind_; std::vector key_indices_; private: diff --git a/src/tuple.cpp b/src/tuple.cpp index 7878165f0..79760df7a 100644 --- a/src/tuple.cpp +++ b/src/tuple.cpp @@ -74,7 +74,7 @@ CASS_TUPLE_SET(decimal, THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale), cass::CassDecimal(varint, varint_size, scale)) CASS_TUPLE_SET(duration, - THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int32_t nanos), + THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos), cass::CassDuration(months, days, nanos)) #undef CASS_TUPLE_SET diff --git a/src/types.hpp b/src/types.hpp index f189dd44f..c06b2746c 100644 --- a/src/types.hpp +++ b/src/types.hpp @@ -65,13 +65,13 @@ struct CassDecimal { struct CassDuration { CassDuration(cass_int32_t months, cass_int32_t days, - cass_int32_t nanos) + cass_int64_t nanos) : months(months) , days(days) , nanos(nanos) { } cass_int32_t months; cass_int32_t days; - cass_int32_t nanos; + cass_int64_t nanos; }; } // namespace cass diff --git a/src/user_type_value.cpp b/src/user_type_value.cpp index 130bfec75..0ed345388 100644 --- a/src/user_type_value.cpp +++ b/src/user_type_value.cpp @@ -79,7 +79,7 @@ CASS_USER_TYPE_SET(decimal, THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale), cass::CassDecimal(varint, varint_size, scale)) CASS_USER_TYPE_SET(duration, - THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int32_t nanos), + THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos), cass::CassDuration(months, days, nanos)) #undef CASS_USER_TYPE_SET diff --git a/src/value.cpp b/src/value.cpp index c0d575d80..3693e1e28 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -139,77 +139,33 @@ CassError cass_value_get_bytes(const CassValue* value, return CASS_OK; } -static const cass_byte_t* decode_vint(const cass_byte_t* input, const cass_byte_t* end, cass_int64_t* output) -{ - int num_extra_bytes; - int i; - cass_byte_t first_byte = *input++; - if (first_byte <= 127) { - // If this is a multibyte vint, at least the MSB of the first byte - // will be set. Since that's not the case, this is a one-byte value. - *output = first_byte; - } else { - // The number of consecutive most significant bits of the first-byte tell us how - // many additional bytes are in this vint. Count them like this: - // 1. Invert the firstByte so that all leading 1s become 0s. - // 2. Count the number of leading zeros; num_leading_zeros assumes a 64-bit long. - // 3. We care about leading 0s in the byte, not int, so subtract out the - // appropriate number of extra bits (56 for a 64-bit int). - - // We mask out high-order bits to prevent sign-extension as the value is placed in a 64-bit arg - // to the num_leading_zeros function. - num_extra_bytes = cass::num_leading_zeros(~first_byte & 0xff) - 56; - - // Error out if we don't have num_extra_bytes left in our data. - if (input + num_extra_bytes > end) { - // There aren't enough bytes. This duration object is not fully defined. - return NULL; - } - - // Build up the vint value one byte at a time from the data bytes. - // The firstByte contains size as well as the most significant bits of - // the value. Extract just the value. - *output = first_byte & (0xff >> num_extra_bytes); - for (i = 0; i < num_extra_bytes; ++i) { - cass_byte_t b = *input++; - *output <<= 8; - *output |= b & 0xff; - } - } - return input; -} - -CassError cass_value_get_duration(const CassValue* value, cass_int32_t* months, cass_int32_t* days, cass_int32_t* nanos) -{ - cass_int32_t *outs[3]; - int ctr; - size_t data_size = 0; - const cass_byte_t* cur_byte = NULL; - const cass_byte_t* end = NULL; +CassError cass_value_get_duration(const CassValue* value, + cass_int32_t* months, + cass_int32_t* days, + cass_int64_t* nanos) { + const uint8_t* cur_byte = NULL; + const uint8_t* end = NULL; if (value == NULL || value->is_null()) return CASS_ERROR_LIB_NULL_VALUE; if (!cass_value_is_duration(value)) return CASS_ERROR_LIB_INVALID_VALUE_TYPE; - // Package up the out-args in an array. Duration's always have months, then days, then nanos. - outs[0] = months; - outs[1] = days; - outs[2] = nanos; + cur_byte = reinterpret_cast(value->data()); + end = cur_byte + value->size(); - cass_value_get_bytes(value, &cur_byte, &data_size); - end = cur_byte + data_size; + uint64_t decoded = 0; - for (ctr = 0; ctr < 3 && cur_byte != end; ++ctr) { - cass_int64_t decoded = 0; - cur_byte = decode_vint(cur_byte, end, &decoded); - if (cur_byte == NULL) - return CASS_ERROR_LIB_BAD_PARAMS; - *outs[ctr] = static_cast(cass::decode_zig_zag(decoded)); - } + cur_byte = cass::decode_vint(cur_byte, end, &decoded); + if (cur_byte == NULL || cur_byte == end) return CASS_ERROR_LIB_BAD_PARAMS; + *months = static_cast(cass::decode_zig_zag(decoded)); + + cur_byte = cass::decode_vint(cur_byte, end, &decoded); + if (cur_byte == NULL || cur_byte == end) return CASS_ERROR_LIB_BAD_PARAMS; + *days = static_cast(cass::decode_zig_zag(decoded)); + + cur_byte = cass::decode_vint(cur_byte, end, &decoded); + if (cur_byte == NULL || cur_byte != end) return CASS_ERROR_LIB_BAD_PARAMS; + *nanos = static_cast(cass::decode_zig_zag(decoded)); - if (ctr < 3) { - // There aren't enough bytes. This duration object is not fully defined. - return CASS_ERROR_LIB_BAD_PARAMS; - } return CASS_OK; } diff --git a/test/integration_tests/src/test_datatypes.cpp b/test/integration_tests/src/test_datatypes.cpp index 6f648f18e..966c286e2 100644 --- a/test/integration_tests/src/test_datatypes.cpp +++ b/test/integration_tests/src/test_datatypes.cpp @@ -150,10 +150,10 @@ BOOST_AUTO_TEST_CASE(read_write_primitives) { value = CassDuration(1, 2, 3); insert_value(CASS_VALUE_TYPE_DURATION, value); - value = CassDuration((1ULL << 31) - 1, (1ULL << 31) - 1, (1ULL << 31) - 1); + value = CassDuration((1ULL << 31) - 1, (1ULL << 31) - 1, (1ULL << 63) - 1); insert_value(CASS_VALUE_TYPE_DURATION, value); - value = CassDuration(1LL << 31, 1LL << 31, 1LL << 31); + value = CassDuration(1LL << 31, 1LL << 31, 1LL << 63); insert_value(CASS_VALUE_TYPE_DURATION, value); } diff --git a/test/integration_tests/src/test_utils.cpp b/test/integration_tests/src/test_utils.cpp index 9eb377cee..90022424c 100644 --- a/test/integration_tests/src/test_utils.cpp +++ b/test/integration_tests/src/test_utils.cpp @@ -244,7 +244,12 @@ MultipleNodesTest::MultipleNodesTest(unsigned int num_nodes_dc1, cass_cluster_set_max_connections_per_host(cluster, 4); cass_cluster_set_num_threads_io(cluster, 4); cass_cluster_set_max_concurrent_creation(cluster, 8); - cass_cluster_set_protocol_version(cluster, protocol_version); + if (version.major_version >= 3 && version.minor_version >= 10 && + protocol_version == CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION) { + cass_cluster_set_use_beta_protocol_version(cluster, cass_true); + } else { + cass_cluster_set_protocol_version(cluster, protocol_version); + } cass_cluster_set_use_randomized_contact_points(cluster, cass_false); } diff --git a/test/integration_tests/src/test_utils.hpp b/test/integration_tests/src/test_utils.hpp index 2254c4625..0d0a21963 100644 --- a/test/integration_tests/src/test_utils.hpp +++ b/test/integration_tests/src/test_utils.hpp @@ -36,6 +36,7 @@ #include "cassandra.h" #include "bridge.hpp" +#include "constants.hpp" #ifdef min #undef min @@ -88,11 +89,11 @@ struct CassDuration { : months(0) , days(0) , nanos(0) {} - CassDuration(cass_int32_t months, cass_int32_t days, cass_int32_t nanos) + CassDuration(cass_int32_t months, cass_int32_t days, cass_int64_t nanos) : months(months), days(days), nanos(nanos) {} cass_int32_t months; cass_int32_t days; - cass_int32_t nanos; + cass_int64_t nanos; }; struct CassDate { @@ -1169,7 +1170,7 @@ struct Value { */ struct MultipleNodesTest { MultipleNodesTest(unsigned int num_nodes_dc1, unsigned int num_nodes_dc2, - unsigned int protocol_version = 4, bool with_vnodes = false, + unsigned int protocol_version = CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION, bool with_vnodes = false, bool is_ssl = false); virtual ~MultipleNodesTest(); @@ -1182,7 +1183,7 @@ struct MultipleNodesTest { struct SingleSessionTest : public MultipleNodesTest { SingleSessionTest(unsigned int num_nodes_dc1, unsigned int num_nodes_dc2, - bool with_session = true, unsigned int protocol_version = 4, + bool with_session = true, unsigned int protocol_version = CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION, bool with_vnodes = false, bool is_ssl = false); virtual ~SingleSessionTest(); @@ -1429,4 +1430,4 @@ inline bool operator<(CassDuration a, CassDuration b) { } return a.nanos < b.nanos; -} \ No newline at end of file +} diff --git a/test/unit_tests/src/test_encode.cpp b/test/unit_tests/src/test_encode.cpp index 414847ee0..3b60fd68b 100644 --- a/test/unit_tests/src/test_encode.cpp +++ b/test/unit_tests/src/test_encode.cpp @@ -62,17 +62,20 @@ BOOST_AUTO_TEST_CASE(simple_negative) BOOST_AUTO_TEST_CASE(edge_positive) { - CassDuration value((1UL << 31) - 1, (1UL << 31) - 1, (1UL << 31) - 1); + CassDuration value((1UL << 31) - 1, (1UL << 31) - 1, (1ULL << 63) - 1); Buffer result = encode(value); - BOOST_CHECK_EQUAL(15, result.size()); + BOOST_CHECK_EQUAL(19, result.size()); unsigned const char* result_data = reinterpret_cast(result.data()); - // The first 5 bytes represent (1UL<<31 - 1), the max 32-bit number. Byte 0 + // The first 5 bytes represent (1UL << 31 - 1), the max 32-bit number. Byte 0 // has the first 4 bits set to indicate that there are 4 bytes beyond this one that // define this field (each field is a vint of a zigzag encoding of the original // value). Encoding places the least-significant byte at byte 4 and works backwards // to record more significant bytes. Zigzag encoding just left shifts a value - // by one bit for positive values, so byte 4 ends in a 0. + // by one bit for positive values, so byte 4 ends in a 0. The last 9 bytes represent + // (1UL << 63 - 1), the max 64-bit integer. Byte 10 has the first 8 bits set to indicate + // there are 8 follow up bytes to encode this value. The last byte also ends in a 0 because + // it is a positive value. BOOST_CHECK_EQUAL(result_data[0], 0xf0); for (int ind = 1; ind < 4; ++ind) { @@ -87,23 +90,25 @@ BOOST_AUTO_TEST_CASE(edge_positive) } BOOST_CHECK_EQUAL(result_data[9], 0xfe); - BOOST_CHECK_EQUAL(result_data[10], 0xf0); - for (int ind = 11; ind < 14; ++ind) { + BOOST_CHECK_EQUAL(result_data[10], 0xff); + for (int ind = 11; ind < 18; ++ind) { BOOST_CHECK_EQUAL(result_data[ind], 0xff); } - BOOST_CHECK_EQUAL(result_data[14], 0xfe); + BOOST_CHECK_EQUAL(result_data[18], 0xfe); } BOOST_AUTO_TEST_CASE(edge_negative) { - CassDuration value(1L << 31, 1L << 31, 1L << 31); + CassDuration value(1L << 31, 1L << 31, 1LL << 63); Buffer result = encode(value); - BOOST_CHECK_EQUAL(15, result.size()); + BOOST_CHECK_EQUAL(19, result.size()); unsigned const char* result_data = reinterpret_cast(result.data()); // We have 5-bytes for 1L << 31, the min 32-bit number. The zigzag // representation is 4 bytes of 0xff, and the first byte is 0xf0 to say we have - // 4 bytes of value beyond these size-spec-bits. + // 4 bytes of value beyond these size-spec-bits. The last 9 bytes represent + // 1L << 63 with all the first byte bits set to indicate 8 more bytes are needed to + // encode this value. BOOST_CHECK_EQUAL(result_data[0], 0xf0); for (int ind = 1; ind <= 4; ++ind) { @@ -116,8 +121,8 @@ BOOST_AUTO_TEST_CASE(edge_negative) BOOST_CHECK_EQUAL(result_data[ind], 0xff); } - BOOST_CHECK_EQUAL(result_data[10], 0xf0); - for (int ind = 11; ind <= 14; ++ind) { + BOOST_CHECK_EQUAL(result_data[10], 0xff); + for (int ind = 11; ind <= 18; ++ind) { BOOST_CHECK_EQUAL(result_data[ind], 0xff); } } diff --git a/test/unit_tests/src/test_load_balancing.cpp b/test/unit_tests/src/test_load_balancing.cpp index 5397e9d48..b5000a03c 100644 --- a/test/unit_tests/src/test_load_balancing.cpp +++ b/test/unit_tests/src/test_load_balancing.cpp @@ -439,7 +439,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) policy.init(cass::SharedRefPtr(), hosts, NULL); // Set local CL - cass::SharedRefPtr request(new cass::QueryRequest()); + cass::SharedRefPtr request(new cass::QueryRequest("", 0)); request->set_consistency(CASS_CONSISTENCY_LOCAL_ONE); cass::SharedRefPtr request_handler( new cass::RequestHandler(request, cass::ResponseFuture::Ptr(), NULL)); @@ -457,7 +457,7 @@ BOOST_AUTO_TEST_CASE(allow_remote_dcs_for_local_cl) policy.init(cass::SharedRefPtr(), hosts, NULL); // Set local CL - cass::SharedRefPtr request(new cass::QueryRequest()); + cass::SharedRefPtr request(new cass::QueryRequest("", 0)); request->set_consistency(CASS_CONSISTENCY_LOCAL_QUORUM); cass::SharedRefPtr request_handler( new cass::RequestHandler(request, cass::ResponseFuture::Ptr(), NULL)); @@ -530,7 +530,7 @@ BOOST_AUTO_TEST_CASE(simple) cass::TokenAwarePolicy policy(new cass::RoundRobinPolicy()); policy.init(cass::SharedRefPtr(), hosts, NULL); - cass::SharedRefPtr request(new cass::QueryRequest(1)); + cass::SharedRefPtr request(new cass::QueryRequest("", 1)); const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887 request->set(0, cass::CassString(value, strlen(value))); request->add_key_index(0); @@ -610,7 +610,7 @@ BOOST_AUTO_TEST_CASE(network_topology) cass::TokenAwarePolicy policy(new cass::DCAwarePolicy(LOCAL_DC, num_hosts / 2, false)); policy.init(cass::SharedRefPtr(), hosts, NULL); - cass::SharedRefPtr request(new cass::QueryRequest(1)); + cass::SharedRefPtr request(new cass::QueryRequest("", 1)); const char* value = "abc"; // hash: -5434086359492102041 request->set(0, cass::CassString(value, strlen(value))); request->add_key_index(0); diff --git a/test/unit_tests/src/test_routing_key.cpp b/test/unit_tests/src/test_routing_key.cpp index 91fb919cd..75d902cc6 100644 --- a/test/unit_tests/src/test_routing_key.cpp +++ b/test/unit_tests/src/test_routing_key.cpp @@ -42,7 +42,7 @@ BOOST_FIXTURE_TEST_SUITE(routing_key, RoutingKeyTest) BOOST_AUTO_TEST_CASE(single) { { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; @@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(single) } { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; cass_int32_t value = 123456789; @@ -75,7 +75,7 @@ BOOST_AUTO_TEST_CASE(single) } { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; cass_int64_t value = 123456789; @@ -90,7 +90,7 @@ BOOST_AUTO_TEST_CASE(single) } { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; query.set(0, cass_true); @@ -104,7 +104,7 @@ BOOST_AUTO_TEST_CASE(single) } { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; const char* value = "abcdefghijklmnop"; @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(single) BOOST_AUTO_TEST_CASE(empty_and_null) { - cass::QueryRequest query(1); + cass::QueryRequest query("", 1); cass::Request::EncodingCache cache; std::string routing_key; @@ -136,7 +136,7 @@ BOOST_AUTO_TEST_CASE(empty_and_null) BOOST_AUTO_TEST_CASE(composite) { { - cass::QueryRequest query(3); + cass::QueryRequest query("", 3); cass::Request::EncodingCache cache; CassUuid uuid; @@ -160,7 +160,7 @@ BOOST_AUTO_TEST_CASE(composite) } { - cass::QueryRequest query(3); + cass::QueryRequest query("", 3); cass::Request::EncodingCache cache; query.set(0, cass_false); diff --git a/test/unit_tests/src/test_value.cpp b/test/unit_tests/src/test_value.cpp index 40ad8da29..142f18652 100644 --- a/test/unit_tests/src/test_value.cpp +++ b/test/unit_tests/src/test_value.cpp @@ -55,7 +55,8 @@ TEST_TYPE(inet, CassInet) BOOST_AUTO_TEST_CASE(bad_duration) { - cass_int32_t months, days, nanos; + cass_int32_t months, days; + cass_int64_t nanos; BOOST_CHECK_EQUAL(cass_value_get_duration(s_text_value, &months, &days, &nanos), CASS_ERROR_LIB_INVALID_VALUE_TYPE); }