Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions examples/perf/perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ CassCluster* create_cluster(const char* hosts) {

CassError connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect_keyspace(session, cluster, "examples");
CassFuture* future = cass_session_connect(session, cluster);

cass_future_wait(future);
rc = cass_future_error_code(future);
Expand Down Expand Up @@ -198,6 +198,8 @@ void insert_into_perf(CassSession* session, const char* query, const CassPrepare
statement = cass_statement_new(query, 5);
}

cass_statement_set_is_idempotent(statement, cass_true);

cass_uuid_gen_time(uuid_gen, &id);
cass_statement_bind_uuid(statement, 0, id);
cass_statement_bind_string(statement, 1, big_string);
Expand Down Expand Up @@ -227,7 +229,7 @@ void run_insert_queries(void* data) {
CassSession* session = (CassSession*)data;

const CassPrepared* insert_prepared = NULL;
const char* insert_query = "INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);";
const char* insert_query = "INSERT INTO stress.songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);";

#if USE_PREPARED
if (prepare_query(session, insert_query, &insert_prepared) == CASS_OK) {
Expand Down Expand Up @@ -256,6 +258,8 @@ void select_from_perf(CassSession* session, const char* query, const CassPrepare
statement = cass_statement_new(query, 0);
}

cass_statement_set_is_idempotent(statement, cass_true);

futures[i] = cass_session_execute(session, statement);

cass_statement_free(statement);
Expand All @@ -279,7 +283,7 @@ void run_select_queries(void* data) {
int i;
CassSession* session = (CassSession*)data;
const CassPrepared* select_prepared = NULL;
const char* select_query = "SELECT * FROM songs WHERE id = a98d21b2-1900-11e4-b97b-e5e358e71e0d";
const char* select_query = "SELECT * FROM stress.songs WHERE id = a98d21b2-1900-11e4-b97b-e5e358e71e0d";

#if USE_PREPARED
if (prepare_query(session, select_query, &select_prepared) == CASS_OK) {
Expand All @@ -301,7 +305,6 @@ int main(int argc, char* argv[]) {
uv_thread_t threads[NUM_THREADS];
CassCluster* cluster = NULL;
CassSession* session = NULL;
CassFuture* close_future = NULL;
char* hosts = "127.0.0.1";
if (argc > 1) {
hosts = argv[1];
Expand All @@ -321,8 +324,17 @@ int main(int argc, char* argv[]) {
return -1;
}

execute_query(session, "DROP KEYSPACE stress");

execute_query(session, "CREATE KEYSPACE IF NOT EXISTS stress WITH "
"replication = { 'class': 'SimpleStrategy', 'replication_factor': '3'}");

execute_query(session, "CREATE TABLE IF NOT EXISTS stress.songs (id uuid PRIMARY KEY, "
"title text, album text, artist text, "
"tags set<text>, data blob)");

execute_query(session,
"INSERT INTO songs (id, title, album, artist, tags) VALUES "
"INSERT INTO stress.songs (id, title, album, artist, tags) VALUES "
"(a98d21b2-1900-11e4-b97b-e5e358e71e0d, "
"'La Petite Tonkinoise', 'Bye Bye Blackbird', 'Joséphine Baker', { 'jazz', '2013' });");

Expand Down Expand Up @@ -355,10 +367,8 @@ int main(int argc, char* argv[]) {
uv_thread_join(&threads[i]);
}

close_future = cass_session_close(session);
cass_future_wait(close_future);
cass_future_free(close_future);
cass_cluster_free(cluster);
cass_session_free(session);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the session be freed before the cluster that presumably owns it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter. Session copies what it needs from cluster. In general the driver API uses the pattern "const *" for a parameter when it's going to either copy or keep a const reference to the object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.

cass_uuid_gen_free(uuid_gen);

status_destroy(&status);
Expand Down
63 changes: 61 additions & 2 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1714,13 +1714,39 @@ cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
* @param[in] cluster
* @param[in] enabled
* @return CASS_OK if successful, otherwise an error occurred
*
* @see cass_cluster_set_resolve_timeout()
*/
CASS_EXPORT CassError
cass_cluster_set_use_randomized_contact_points(CassCluster* cluster,
cass_bool_t enabled);

/**
* Enable constant speculative executions with the supplied settings.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] constant_delay_ms
* @param[in] max_speculative_executions
* @return CASS_OK if successful, otherwise an error occurred
*/
CASS_EXPORT CassError
cass_cluster_set_constant_speculative_execution_policy(CassCluster* cluster,
cass_int64_t constant_delay_ms,
int max_speculative_executions);

/**
* Disable speculative executions
*
* <b>Default:</b> This is the default speculative execution policy.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @return CASS_OK if successful, otherwise an error occurred
*/
CASS_EXPORT CassError
cass_cluster_set_no_speculative_execution_policy(CassCluster* cluster);

/***********************************************************************************
*
* Session
Expand Down Expand Up @@ -3938,6 +3964,22 @@ CASS_EXPORT CassError
cass_statement_set_request_timeout(CassStatement* statement,
cass_uint64_t timeout_ms);

/**
* Sets whether the statement is idempotent. Idempotent statements are able to be
* automatically retried after timeouts/errors and can be speculatively executed.
*
* @public @memberof CassStatement
*
* @param[in] statement
* @param[in] is_idempotent
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_cluster_set_constant_speculative_execution_policy()
*/
CASS_EXPORT CassError
cass_statement_set_is_idempotent(CassStatement* statement,
cass_bool_t is_idempotent);

/**
* Sets the statement's retry policy.
*
Expand Down Expand Up @@ -5214,6 +5256,23 @@ CASS_EXPORT CassError
cass_batch_set_request_timeout(CassBatch* batch,
cass_uint64_t timeout_ms);

/**
* Sets whether the statements in a batch are idempotent. Idempotent batches
* are able to be automatically retried after timeouts/errors and can be
* speculatively executed.
*
* @public @memberof CassBatch
*
* @param[in] batch
* @param[in] is_idempotent
* @return CASS_OK if successful, otherwise an error occurred.
*
* @see cass_cluster_set_constant_speculative_execution_policy()
*/
CASS_EXPORT CassError
cass_batch_set_is_idempotent(CassBatch* batch,
cass_bool_t is_idempotent);

/**
* Sets the batch's retry policy.
*
Expand Down
29 changes: 21 additions & 8 deletions src/auth.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class V1Authenticator {

class Authenticator : public RefCounted<Authenticator> {
public:
typedef SharedRefPtr<Authenticator> Ptr;

Authenticator() { }
virtual ~Authenticator() { }

Expand Down Expand Up @@ -78,13 +80,22 @@ class PlainTextAuthenticator : public V1Authenticator, public Authenticator {

class AuthProvider : public RefCounted<AuthProvider> {
public:
typedef SharedRefPtr<AuthProvider> Ptr;

AuthProvider()
: RefCounted<AuthProvider>() { }

virtual ~AuthProvider() { }

virtual V1Authenticator* new_authenticator_v1(const Host::ConstPtr& host, const std::string& class_name) const { return NULL; }
virtual Authenticator* new_authenticator(const Host::ConstPtr& host, const std::string& class_name) const { return NULL; }
virtual V1Authenticator* new_authenticator_v1(const Host::ConstPtr& host,
const std::string& class_name) const {
return NULL;
}

virtual Authenticator::Ptr new_authenticator(const Host::ConstPtr& host,
const std::string& class_name) const {
return Authenticator::Ptr();
}

private:
DISALLOW_COPY_AND_ASSIGN(AuthProvider);
Expand Down Expand Up @@ -136,9 +147,9 @@ class ExternalAuthProvider : public AuthProvider {
}
}

virtual V1Authenticator* new_authenticator_v1(const Host::ConstPtr& host, const std::string& class_name) const { return NULL; }
virtual Authenticator* new_authenticator(const Host::ConstPtr& host, const std::string& class_name) const {
return new ExternalAuthenticator(host, class_name, &exchange_callbacks_, data_);
virtual Authenticator::Ptr new_authenticator(const Host::ConstPtr& host,
const std::string& class_name) const {
return Authenticator::Ptr(new ExternalAuthenticator(host, class_name, &exchange_callbacks_, data_));
}

private:
Expand All @@ -154,12 +165,14 @@ class PlainTextAuthProvider : public AuthProvider {
: username_(username)
, password_(password) { }

virtual V1Authenticator* new_authenticator_v1(const Host::ConstPtr& host, const std::string& class_name) const {
virtual V1Authenticator* new_authenticator_v1(const Host::ConstPtr& host,
const std::string& class_name) const {
return new PlainTextAuthenticator(username_, password_);
}

virtual Authenticator* new_authenticator(const Host::ConstPtr& host, const std::string& class_name) const {
return new PlainTextAuthenticator(username_, password_);
virtual Authenticator::Ptr new_authenticator(const Host::ConstPtr& host,
const std::string& class_name) const {
return Authenticator::Ptr(new PlainTextAuthenticator(username_, password_));
}

private:
Expand Down
4 changes: 2 additions & 2 deletions src/auth_requests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace cass {

int CredentialsRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
int CredentialsRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const {
if (version != 1) {
return -1;
}
Expand All @@ -39,7 +39,7 @@ int CredentialsRequest::encode(int version, Handler* handler, BufferVec* bufs) c
return length;
}

int AuthResponseRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
int AuthResponseRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const {
if (version < 2) {
return -1;
}
Expand Down
10 changes: 5 additions & 5 deletions src/auth_requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CredentialsRequest : public Request {
, credentials_(credentials) { }

private:
int encode(int version, Handler* handler, BufferVec* bufs) const;
int encode(int version, RequestCallback* callback, BufferVec* bufs) const;

private:
V1Authenticator::Credentials credentials_;
Expand All @@ -40,19 +40,19 @@ class CredentialsRequest : public Request {
class AuthResponseRequest : public Request {
public:
AuthResponseRequest(const std::string& token,
const SharedRefPtr<Authenticator>& auth)
const Authenticator::Ptr& auth)
: Request(CQL_OPCODE_AUTH_RESPONSE)
, token_(token)
, auth_(auth) { }

const SharedRefPtr<Authenticator>& auth() const { return auth_; }
const Authenticator::Ptr& auth() const { return auth_; }

private:
int encode(int version, Handler* handler, BufferVec* bufs) const;
int encode(int version, RequestCallback* callback, BufferVec* bufs) const;

private:
std::string token_;
SharedRefPtr<Authenticator> auth_;
Authenticator::Ptr auth_;
};

} // namespace cass
Expand Down
28 changes: 17 additions & 11 deletions src/batch_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ CassError cass_batch_set_request_timeout(CassBatch *batch,
return CASS_OK;
}

CassError cass_batch_set_is_idempotent(CassBatch* batch,
cass_bool_t is_idempotent) {
batch->set_is_idempotent(is_idempotent == cass_true);
return CASS_OK;
}

CassError cass_batch_set_retry_policy(CassBatch* batch,
CassRetryPolicy* retry_policy) {
batch->set_retry_policy(retry_policy);
Expand All @@ -79,12 +85,12 @@ CassError cass_batch_add_statement(CassBatch* batch, CassStatement* statement) {

namespace cass {

int BatchRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
int BatchRequest::encode(int version, RequestCallback* callback, BufferVec* bufs) const {
int length = 0;
uint8_t flags = 0;

if (version == 1) {
return ENCODE_ERROR_UNSUPPORTED_PROTOCOL;
return REQUEST_ERROR_UNSUPPORTED_PROTOCOL;
}

{
Expand All @@ -102,13 +108,13 @@ int BatchRequest::encode(int version, Handler* handler, BufferVec* bufs) const {

for (BatchRequest::StatementList::const_iterator i = statements_.begin(),
end = statements_.end(); i != end; ++i) {
const SharedRefPtr<Statement>& statement(*i);
const Statement::Ptr& statement(*i);
if (statement->has_names_for_values()) {
handler->on_error(CASS_ERROR_LIB_BAD_PARAMS,
callback->on_error(CASS_ERROR_LIB_BAD_PARAMS,
"Batches cannot contain queries with named values");
return ENCODE_ERROR_BATCH_WITH_NAMED_VALUES;
return REQUEST_ERROR_BATCH_WITH_NAMED_VALUES;
}
int32_t result = (*i)->encode_batch(version, bufs, handler);
int32_t result = (*i)->encode_batch(version, bufs, callback);
if (result < 0) {
return result;
}
Expand All @@ -127,24 +133,24 @@ int BatchRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
}

if (handler->timestamp() != CASS_INT64_MIN) {
if (callback->timestamp() != CASS_INT64_MIN) {
buf_size += sizeof(int64_t); // [long]
flags |= CASS_QUERY_FLAG_DEFAULT_TIMESTAMP;
}
}

Buffer buf(buf_size);

size_t pos = buf.encode_uint16(0, handler->consistency());
size_t pos = buf.encode_uint16(0, callback->consistency());
if (version >= 3) {
pos = buf.encode_byte(pos, flags);

if (serial_consistency() != 0) {
pos = buf.encode_uint16(pos, serial_consistency());
}

if (handler->timestamp() != CASS_INT64_MIN) {
pos = buf.encode_int64(pos, handler->timestamp());
if (callback->timestamp() != CASS_INT64_MIN) {
pos = buf.encode_int64(pos, callback->timestamp());
}
}

Expand All @@ -160,7 +166,7 @@ void BatchRequest::add_statement(Statement* statement) {
ExecuteRequest* execute_request = static_cast<ExecuteRequest*>(statement);
prepared_statements_[execute_request->prepared()->id()] = execute_request;
}
statements_.push_back(SharedRefPtr<Statement>(statement));
statements_.push_back(Statement::Ptr(statement));
}

bool BatchRequest::prepared_statement(const std::string& id,
Expand Down
Loading