diff --git a/CMakeLists.txt b/CMakeLists.txt index 707f2132e..f27ca50b6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 2.6.4) -set(PROJECT_NAME_STR cql) +set(PROJECT_NAME_STR datastaxcql) # Alias for project name used in unit tests set(CQL_DRIVER_PROJECT_NAME ${PROJECT_NAME_STR}) project(${PROJECT_NAME_STR} C CXX) @@ -8,7 +8,7 @@ project(${PROJECT_NAME_STR} C CXX) # The version number #------------------- set(PROJECT_VERSION_MAJOR 0) -set(PROJECT_VERSION_MINOR 7) +set(PROJECT_VERSION_MINOR 8) set(PROJECT_VERSION_PATCH 0) set(PROJECT_VERSION_STRING ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}) @@ -224,3 +224,8 @@ add_custom_target(UNINSTALL # the demo program #------------------- add_subdirectory(demo) + +#------------------- +# the stress test +#------------------- +add_subdirectory(stress) diff --git a/demo/main.cpp b/demo/main.cpp index b4e9723d2..875026685 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -75,14 +75,17 @@ log_callback( void demo( - const std::string& host, + const std::vector& hosts, bool use_ssl) { try { boost::shared_ptr builder = cql::cql_cluster_t::builder(); builder->with_log_callback(&log_callback); - builder->add_contact_point(boost::asio::ip::address::from_string(host)); + + for (std::vector::const_iterator it = hosts.begin(); it != hosts.end(); it++) { + builder->add_contact_point(boost::asio::ip::address::from_string(*it)); + } if (use_ssl) { builder->with_ssl(); @@ -140,13 +143,14 @@ main( char** vargs) { bool ssl = false; - std::string host; + std::string hoststr; + std::vector hosts; boost::program_options::options_description desc("Options"); desc.add_options() ("help", "produce help message") ("ssl", boost::program_options::value(&ssl)->default_value(false), "use SSL") - ("host", boost::program_options::value(&host)->default_value("127.0.0.1"), "node to use as initial contact point"); + ("hosts", boost::program_options::value(&hoststr)->default_value("127.0.0.1"), "comma separated list of notes to use as initial contact point"); boost::program_options::variables_map variables_map; try { @@ -164,8 +168,14 @@ main( return 0; } + std::stringstream ss(hoststr); + std::string s; + while (getline(ss, s, ',')) { + hosts.push_back(s); + } + cql::cql_initialize(); - demo(host, ssl); + demo(hosts, ssl); cql::cql_terminate(); return 0; diff --git a/stress/CMakeLists.txt b/stress/CMakeLists.txt new file mode 100644 index 000000000..52b61a125 --- /dev/null +++ b/stress/CMakeLists.txt @@ -0,0 +1,32 @@ +#------------------- +# Stress +#------------------- +cmake_minimum_required(VERSION 2.6.4) + +# set(STRESS Stress_${CQL_DRIVER_PROJECT_NAME}) +# project(${STRESS} C CXX) + +# file(GLOB STRESS_SRC_FILES *.cpp) + +# # Add EXCLUDE_FROM_ALL flag to exclude stress test +# # from ALL target +# add_executable(${STRESS} ${STRESS_SRC_FILES}) + +# set_property( +# TARGET ${STRESS} +# APPEND PROPERTY COMPILE_FLAGS ${PROJECT_COMPILER_FLAGS}) + + + + +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ".") +set(PROJECT_STRESS_NAME ${PROJECT_NAME_STR}_stress) + +file(GLOB STRESS_SRC_FILES ${PROJECT_SOURCE_DIR}/stress/*.cpp) +include_directories(${INCLUDES}) +add_executable(${PROJECT_STRESS_NAME} ${STRESS_SRC_FILES}) +target_link_libraries(${PROJECT_STRESS_NAME} ${LIBS} ${PROJECT_LIB_NAME}) + +set_property( + TARGET ${PROJECT_STRESS_NAME} + APPEND PROPERTY COMPILE_FLAGS ${PROJECT_COMPILER_FLAGS}) \ No newline at end of file diff --git a/stress/main.cpp b/stress/main.cpp new file mode 100644 index 000000000..42cc22df8 --- /dev/null +++ b/stress/main.cpp @@ -0,0 +1,307 @@ +/* + Copyright (c) 2013 Matthew Stump + + This file is part of cassandra. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MIN_KEY 10000 +#define MAX_KEY 19999 + +#define TEST_PREFIX "LOADTEST_" +#define TEST_VALUE "some payload but not too much" + + +static bool terminate = false; +static int64_t count; +static pthread_mutex_t mutex; + + +static void* writeThread( void* args ) { + cql::cql_session_t* session = (cql::cql_session_t*) args; + + while ( !terminate ) { + pthread_mutex_lock( &mutex ); + count++; + pthread_mutex_unlock( &mutex ); + + char buf[256]; + int key = (rand() % ( MAX_KEY - MIN_KEY ) + MIN_KEY); + + snprintf(buf, sizeof(buf), "INSERT INTO test.loadtest(mykey, mytext) VALUES ('%s%d', '%s') USING TTL 60;", TEST_PREFIX, key, TEST_VALUE); + + // execute a query, select all rows from the keyspace + boost::shared_ptr insert( + new cql::cql_query_t(buf, cql::CQL_CONSISTENCY_ONE)); + + boost::shared_future future = session->query(insert); + + future.wait(); + + if(future.get().error.is_err()){ + std::cout << "insert failed: " << future.get().error.message << std::endl; + continue; + } + } + return 0; +} + + +static void* readThread( void* args ) { + cql::cql_session_t* session = (cql::cql_session_t*) args; + + while ( !terminate ) { + pthread_mutex_lock( &mutex ); + count++; + pthread_mutex_unlock( &mutex ); + + char buf[256]; + int key = (rand() % ( MAX_KEY - MIN_KEY ) + MIN_KEY); + + snprintf(buf, sizeof(buf), "SELECT * FROM test.loadtest WHERE mykey='%s%d';", TEST_PREFIX, key); + + // execute a query, select all rows from the keyspace + boost::shared_ptr insert( + new cql::cql_query_t(buf, cql::CQL_CONSISTENCY_ONE)); + + boost::shared_future future = session->query(insert); + + future.wait(); + + if(future.get().error.is_err()){ + std::cout << "read failed: " << future.get().error.message << std::endl; + continue; + } + + boost::shared_ptr result(future.get().result); + if (result->row_count() > 1) { + std::cout << "supicious number of results for key " << TEST_PREFIX << key << ": " << result->row_count() << std::endl; + continue; + } + + while (result->next()) { + std::string mytext; + + if ( !result->get_string( "mytext", mytext )) { + std::cout << "problem parsing value for key " << TEST_PREFIX << key << std::endl; + continue; + } + + if (mytext != TEST_VALUE) { + std::cout << "unexpected value for key " << TEST_PREFIX << key << ": " << mytext << std::endl; + continue; + } + } + } + return 0; +} + + + +void +stress( + const std::vector& hosts, + bool use_ssl, + int numthreads, + int runtime) +{ + + std::vector pids; + + try + { + boost::shared_ptr builder = cql::cql_cluster_t::builder(); + // Logging callback is way to verbose so leave it off. + // builder->with_log_callback(&log_callback); + + for (std::vector::const_iterator it = hosts.begin(); it != hosts.end(); it++) { + std::cout << "adding contact point " << *it << std::endl; + builder->add_contact_point(boost::asio::ip::address::from_string(*it)); + } + + if (use_ssl) { + builder->with_ssl(); + } + + boost::shared_ptr cluster(builder->build()); + boost::shared_ptr session(cluster->connect()); + + if (session) { + + // Step 1: set up structure + // Note: Don't check for errors - we'll find out soon enough. + + boost::shared_future future; + + boost::shared_ptr use_stmt( + new cql::cql_query_t("USE test;", cql::CQL_CONSISTENCY_ONE)); + future = session->query(use_stmt); + future.wait(); + + boost::shared_ptr drop_stmt( + new cql::cql_query_t("DROP TABLE loadtest;", cql::CQL_CONSISTENCY_ONE)); + future = session->query(drop_stmt); + future.wait(); + + boost::shared_ptr create_stmt( + new cql::cql_query_t("CREATE TABLE loadtest (mykey text, myblob blob, mytext text, PRIMARY KEY (mykey)) WITH caching='ALL'", cql::CQL_CONSISTENCY_ONE)); + future = session->query(create_stmt); + future.wait(); + + // Step 2: write test + count = 0; + terminate = false; + pids.clear(); + + for (int i = 0; i < numthreads; i++) { + pthread_t pid; + pthread_create( &pid, NULL, writeThread, (void*) (session.get()) ); + pids.push_back(pid); + } + + // Let the threads run for a bit. + sleep (runtime); + + terminate = true; + std::cout << "wrapping up write test" << std::endl; + + // give all threads the chance to finish + while (pids.size() > 0) { + void *status; + pthread_join(pids.back(), &status); + pids.pop_back(); + } + + std::cout << "writes from " << numthreads << " threads for " << runtime << " secs:\t" << count << " total,\t " << count/runtime << " per sec" << std::endl; + + // Step 3: read test + count = 0; + terminate = false; + pids.clear(); + + for (int i = 0; i < numthreads; i++) { + pthread_t pid; + pthread_create( &pid, NULL, readThread, (void*) (session.get()) ); + pids[i] = pid; + } + + // Let the threads run for a bit. + sleep (runtime); + + terminate = true; + std::cout << "wrapping up read test" << std::endl; + + // give all threads the chance to finish + for (int i = 0; i < numthreads; i++) { + void *status; + pthread_join(pids[i], &status); + } + std::cout << "reads from " << numthreads << " threads for " << runtime << " secs:\t" << count << " total,\t " << count/runtime << " per sec" << std::endl; + + + // close the connection session + session->close(); + } + + cluster->shutdown(); + std::cout << "THE END" << std::endl; + } + catch (std::exception& e) + { + std::cout << "Exception: " << e.what() << std::endl; + } +} + + + +int +main( + int argc, + char** vargs) +{ + bool ssl = false; + std::string hoststr; + std::vector hosts; + int numthreads; + int runtime; + + boost::program_options::options_description desc("Options"); + desc.add_options() + ("help", "produce help message") + ("ssl", boost::program_options::value(&ssl)->default_value(false), "use SSL") + ("hosts", boost::program_options::value(&hoststr)->default_value("127.0.0.1"), "comma separated list of notes to use as initial contact point") + ("threads", boost::program_options::value(&numthreads)->default_value(1), "number of threads for stress test") + ("runtime", boost::program_options::value(&runtime)->default_value(3), "number seconds each segment of the stress test is run"); + + boost::program_options::variables_map variables_map; + try { + boost::program_options::store(boost::program_options::parse_command_line(argc, vargs, desc), variables_map); + boost::program_options::notify(variables_map); + } + catch (boost::program_options::unknown_option ex) { + std::cerr << desc << "\n"; + std::cerr << ex.what() << "\n"; + return 1; + } + + if (variables_map.count("help")) { + std::cerr << desc << "\n"; + return 0; + } + + std::stringstream ss(hoststr); + std::string s; + while (getline(ss, s, ',')) { + hosts.push_back(s); + } + + std::cout << "start" << std::endl; + cql::cql_initialize(); + + pthread_mutex_init( &mutex, NULL ); + + std::cout << "stress start" << std::endl; + stress(hosts, ssl, numthreads, runtime); + + pthread_mutex_destroy( &mutex ); + + cql::cql_terminate(); + return 0; +}