Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ if(CASS_BUILD_EXAMPLES)
add_subdirectory(examples/callbacks)
add_subdirectory(examples/collections)
add_subdirectory(examples/date_time)
add_subdirectory(examples/duration)
add_subdirectory(examples/logging)
add_subdirectory(examples/maps)
add_subdirectory(examples/prepared)
Expand Down
34 changes: 17 additions & 17 deletions examples/duration/duration.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <stdio.h>
#include "cassandra.h"

#define NANOS_IN_A_SEC (1000LL * 1000LL * 1000LL)

void print_error(CassFuture* future) {
const char* message;
size_t message_length;
Expand Down Expand Up @@ -74,14 +76,11 @@ CassError execute_query(CassSession* session, const char* query) {
return rc;
}

CassError insert_into(CassSession* session, const char* key, cass_int32_t months, cass_int32_t days, cass_int32_t nanos) {
CassError insert_into(CassSession* session, const char* key, cass_int32_t months, cass_int32_t days, cass_int64_t nanos) {
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
const char* query = "INSERT INTO examples.duration (key, d) VALUES (?, ?);";
cass_byte_t* data;
size_t data_size;


statement = cass_statement_new(query, 2);

Expand Down Expand Up @@ -123,11 +122,12 @@ CassError select_from(CassSession* session, const char* key) {
CassIterator* iterator = cass_iterator_from_result(result);

if (cass_iterator_next(iterator)) {
cass_int32_t months, days, nanos;
cass_int32_t months, days;
cass_int64_t nanos;
const CassRow* row = cass_iterator_get_row(iterator);

cass_value_get_duration(cass_row_get_column(row, 0), &months, &days, &nanos);
printf("months: %d days: %d nanos: %d\n", months, days, nanos);
printf("months: %d days: %d nanos: %lld\n", months, days, nanos);
}

cass_result_free(result);
Expand Down Expand Up @@ -158,23 +158,23 @@ int main(int argc, char* argv[]) {
}

execute_query(session,
"CREATE KEYSPACE examples WITH replication = { \
'class': 'SimpleStrategy', 'replication_factor': '3' };");
"CREATE KEYSPACE IF NOT EXISTS examples WITH replication = { "
"'class': 'SimpleStrategy', 'replication_factor': '3' };");


execute_query(session,
"CREATE TABLE examples.duration (key text PRIMARY KEY, d duration)");
"CREATE TABLE IF NOT EXISTS examples.duration "
"(key text PRIMARY KEY, d duration)");

// Insert some rows into the table, playing with edge values; each of months, days, and nanos may be
// a 64-bit long.
/* Insert some rows into the table and read them back out */

insert_into(session, "base", 0, 0, 0);
insert_into(session, "simple", 1, 2, 3);
insert_into(session, "negative", -1, -2, -3);
insert_into(session, "zero", 0, 0, 0);
insert_into(session, "one_month_two_days_three_seconds", 1, 2, 3 * NANOS_IN_A_SEC);
insert_into(session, "negative_one_month_two_days_three_seconds", -1, -2, -3 * NANOS_IN_A_SEC);

select_from(session, "base");
select_from(session, "simple");
select_from(session, "negative");
select_from(session, "zero");
select_from(session, "one_month_two_days_three_seconds");
select_from(session, "negative_one_month_two_days_three_seconds");

close_future = cass_session_close(session);
cass_future_wait(close_future);
Expand Down
47 changes: 36 additions & 11 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*/

#define CASS_VERSION_MAJOR 2
#define CASS_VERSION_MINOR 6
#define CASS_VERSION_MINOR 7
#define CASS_VERSION_PATCH 0
#define CASS_VERSION_SUFFIX ""

Expand Down Expand Up @@ -635,13 +635,20 @@ typedef enum CassLogLevel_ {
/* @endcond */
} CassLogLevel;

typedef enum CassSslVerifyFlags {
typedef enum CassSslVerifyFlags_ {
CASS_SSL_VERIFY_NONE = 0x00,
CASS_SSL_VERIFY_PEER_CERT = 0x01,
CASS_SSL_VERIFY_PEER_IDENTITY = 0x02,
CASS_SSL_VERIFY_PEER_IDENTITY_DNS = 0x04
} CassSslVerifyFlags;

typedef enum CassProtocolVersion_ {
CASS_PROTOCOL_VERSION_V1 = 0x01,
CASS_PROTOCOL_VERSION_V2 = 0x02,
CASS_PROTOCOL_VERSION_V3 = 0x03,
CASS_PROTOCOL_VERSION_V4 = 0x04
} CassProtocolVersion;

typedef enum CassErrorSource_ {
CASS_ERROR_SOURCE_NONE,
CASS_ERROR_SOURCE_LIB,
Expand Down Expand Up @@ -976,11 +983,29 @@ cass_cluster_set_authenticator_callbacks(CassCluster* cluster,
* @param[in] cluster
* @param[in] protocol_version
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_cluster_set_use_beta_protocol_version()
*/
CASS_EXPORT CassError
cass_cluster_set_protocol_version(CassCluster* cluster,
int protocol_version);

/**
* Use the newest beta protocol version. This currently enables the use of
* protocol version 5.
*
* <b>Default:</b> cass_false
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] enable if false the highest non-beta protocol version will be used
* @return CASS_OK if successful, otherwise an error occurred.
*/
CASS_EXPORT CassError
cass_cluster_set_use_beta_protocol_version(CassCluster* cluster,
cass_bool_t enable);

/**
* Sets the number of IO threads. This is the number of threads
* that will handle query requests.
Expand Down Expand Up @@ -4973,7 +4998,7 @@ cass_statement_bind_duration(CassStatement* statement,
size_t index,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Binds a "duration" to all the values with the specified name.
Expand All @@ -4994,7 +5019,7 @@ cass_statement_bind_duration_by_name(CassStatement* statement,
const char* name,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Same as cass_statement_bind_duration_by_name(), but with lengths for string
Expand All @@ -5020,7 +5045,7 @@ cass_statement_bind_duration_by_name_n(CassStatement* statement,
size_t name_length,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Bind a "list", "map" or "set" to a query or bound statement at the
Expand Down Expand Up @@ -6193,7 +6218,7 @@ CASS_EXPORT CassError
cass_collection_append_duration(CassCollection* collection,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Appends a "list", "map" or "set" to the collection.
Expand Down Expand Up @@ -6630,7 +6655,7 @@ cass_tuple_set_duration(CassTuple* tuple,
size_t index,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Sets a "list", "map" or "set" in a tuple at the specified index.
Expand Down Expand Up @@ -7663,7 +7688,7 @@ cass_user_type_set_duration(CassUserType* user_type,
size_t index,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Sets "duration" in a user defined type at the specified name.
Expand All @@ -7684,7 +7709,7 @@ cass_user_type_set_duration_by_name(CassUserType* user_type,
const char* name,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Same as cass_user_type_set_duration_by_name(), but with lengths for string
Expand All @@ -7710,7 +7735,7 @@ cass_user_type_set_duration_by_name_n(CassUserType* user_type,
size_t name_length,
cass_int32_t months,
cass_int32_t days,
cass_int32_t nanos);
cass_int64_t nanos);

/**
* Sets a "list", "map" or "set" in a user defined type at the
Expand Down Expand Up @@ -9202,7 +9227,7 @@ CASS_EXPORT CassError
cass_value_get_duration(const CassValue* value,
cass_int32_t* months,
cass_int32_t* days,
cass_int32_t* nanos);
cass_int64_t* nanos);

/**
* Gets the type of the specified value.
Expand Down
1 change: 0 additions & 1 deletion src/abstract_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class AbstractData {
virtual ~AbstractData() { }

const ElementVec& elements() const { return elements_; }
size_t elements_count() const { return elements_.size(); }

void reset(size_t count) {
elements_.clear();
Expand Down
26 changes: 22 additions & 4 deletions src/batch_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,19 @@ CassError cass_batch_add_statement(CassBatch* batch, CassStatement* statement) {

namespace cass {

// Format: <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
// where:
// <type> is a [byte]
// <n> is a [short]
// <query> has the format <kind><string_or_id><n>[<name_1>]<value_1>...[<name_n>]<value_n>
// <consistency> is a [short]
// Only protocol v3 and higher for the following:
// <flags> is a [byte] (or [int] for protocol v5)
// <serial_consistency> is a [short]
// <timestamp> is a [long]
int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const {
int length = 0;
uint8_t flags = 0;
uint32_t flags = 0;

if (version == 1) {
return REQUEST_ERROR_UNSUPPORTED_PROTOCOL;
Expand All @@ -115,7 +125,7 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs
"Batches cannot contain queries with named values");
return REQUEST_ERROR_BATCH_WITH_NAMED_VALUES;
}
int32_t result = (*i)->encode_batch(version, bufs, callback);
int32_t result = statement->encode_batch(version, callback, bufs);
if (result < 0) {
return result;
}
Expand All @@ -127,7 +137,11 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs
size_t buf_size = sizeof(uint16_t);
if (version >= 3) {
// <flags>[<serial_consistency><timestamp>]
buf_size += sizeof(uint8_t); // [byte]
if (version >= 5) {
buf_size += sizeof(int32_t); // [int]
} else {
buf_size += sizeof(uint8_t); // [byte]
}

if (serial_consistency() != 0) {
buf_size += sizeof(uint16_t); // [short]
Expand All @@ -144,7 +158,11 @@ int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs

size_t pos = buf.encode_uint16(0, callback->consistency());
if (version >= 3) {
pos = buf.encode_byte(pos, flags);
if (version >= 5) {
pos = buf.encode_int32(pos, flags);
} else {
pos = buf.encode_byte(pos, flags);
}

if (serial_consistency() != 0) {
pos = buf.encode_uint16(pos, serial_consistency());
Expand Down
19 changes: 19 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "cluster.hpp"

#include "constants.hpp"
#include "dc_aware_policy.hpp"
#include "external.hpp"
#include "logger.hpp"
Expand Down Expand Up @@ -50,10 +51,28 @@ CassError cass_cluster_set_protocol_version(CassCluster* cluster,
if (protocol_version < 1) {
return CASS_ERROR_LIB_BAD_PARAMS;
}
if (cluster->config().use_beta_protocol_version()) {
LOG_ERROR("The protocol version is already set to the newest beta version v%d "
"and cannot be explicitly set.", CASS_NEWEST_BETA_PROTOCOL_VERSION);
return CASS_ERROR_LIB_BAD_PARAMS;
} else if (protocol_version > CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION) {
LOG_ERROR("Protocol version v%d is higher than the highest supported "
"protocol version v%d (consider using the newest beta protocol version).",
protocol_version, CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION);
return CASS_ERROR_LIB_BAD_PARAMS;
}
cluster->config().set_protocol_version(protocol_version);
return CASS_OK;
}

CassError cass_cluster_set_use_beta_protocol_version(CassCluster* cluster,
cass_bool_t enable) {
cluster->config().set_use_beta_protocol_version(enable == cass_true);
cluster->config().set_protocol_version(enable ? CASS_NEWEST_BETA_PROTOCOL_VERSION
: CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION);
return CASS_OK;
}

CassError cass_cluster_set_num_threads_io(CassCluster* cluster,
unsigned num_threads) {
if (num_threads == 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ CASS_COLLECTION_APPEND(decimal,
THREE_PARAMS_(const cass_byte_t* varint, size_t varint_size, int scale),
cass::CassDecimal(varint, varint_size, scale))
CASS_COLLECTION_APPEND(duration,
THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int32_t nanos),
THREE_PARAMS_(cass_int32_t months, cass_int32_t days, cass_int64_t nanos),
cass::CassDuration(months, days, nanos))

#undef CASS_COLLECTION_APPEND
Expand Down
13 changes: 12 additions & 1 deletion src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "auth.hpp"
#include "cassandra.h"
#include "constants.hpp"
#include "dc_aware_policy.hpp"
#include "host_targeting_policy.hpp"
#include "latency_aware_policy.hpp"
Expand All @@ -43,7 +44,8 @@ class Config {
public:
Config()
: port_(9042)
, protocol_version_(4)
, protocol_version_(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION)
, use_beta_protocol_version_(false)
, thread_count_io_(1)
, queue_size_io_(8192)
, queue_size_event_(8192)
Expand Down Expand Up @@ -217,6 +219,14 @@ class Config {
protocol_version_ = protocol_version;
}

bool use_beta_protocol_version() const {
return use_beta_protocol_version_;
}

void set_use_beta_protocol_version(bool enable) {
use_beta_protocol_version_ = enable;
}

CassLogLevel log_level() const { return log_level_; }

void set_log_level(CassLogLevel log_level) {
Expand Down Expand Up @@ -388,6 +398,7 @@ class Config {
private:
int port_;
int protocol_version_;
bool use_beta_protocol_version_;
ContactPointList contact_points_;
unsigned thread_count_io_;
unsigned queue_size_io_;
Expand Down
Loading