From 7368f6969959fbb8a08381170f13f595dda0158e Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 4 Jul 2019 15:13:55 -0400 Subject: [PATCH] Add 2 concurrent execution examples --- .../execute_async_with_queue.py | 65 +++++++++++++++++ .../execute_with_threads.py | 72 +++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 examples/concurrent_executions/execute_async_with_queue.py create mode 100644 examples/concurrent_executions/execute_with_threads.py diff --git a/examples/concurrent_executions/execute_async_with_queue.py b/examples/concurrent_executions/execute_async_with_queue.py new file mode 100644 index 0000000000..60d2a69c3c --- /dev/null +++ b/examples/concurrent_executions/execute_async_with_queue.py @@ -0,0 +1,65 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Inserts multiple rows in a table asynchronously, limiting the amount +of parallel requests with a Queue. +""" + +import time +import uuid +from six.moves import queue + +from cassandra.cluster import Cluster + + +CONCURRENCY_LEVEL = 32 +TOTAL_QUERIES = 10000 + +cluster = Cluster() +session = cluster.connect() + +session.execute(("CREATE KEYSPACE IF NOT EXISTS examples " + "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' }")) +session.execute("USE examples") +session.execute("CREATE TABLE IF NOT EXISTS tbl_sample_kv (id uuid, value text, PRIMARY KEY (id))") +prepared_insert = session.prepare("INSERT INTO tbl_sample_kv (id, value) VALUES (?, ?)") + + +def clear_queue(): + while True: + try: + futures.get_nowait().result() + except queue.Empty: + break + + +start = time.time() +futures = queue.Queue(maxsize=CONCURRENCY_LEVEL) + +# Chunking way, when the max concurrency level is reached, we +# wait the current chunk of requests to finish +for i in range(TOTAL_QUERIES): + future = session.execute_async(prepared_insert, (uuid.uuid4(), str(i))) + try: + futures.put_nowait(future) + except queue.Full: + clear_queue() + futures.put_nowait(future) + +clear_queue() +end = time.time() + +print("Finished executing {} queries with a concurrency level of {} in {:.2f} seconds.". + format(TOTAL_QUERIES, CONCURRENCY_LEVEL, (end-start))) diff --git a/examples/concurrent_executions/execute_with_threads.py b/examples/concurrent_executions/execute_with_threads.py new file mode 100644 index 0000000000..e3c80f5d6b --- /dev/null +++ b/examples/concurrent_executions/execute_with_threads.py @@ -0,0 +1,72 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Inserts multiple rows in a table limiting the amount of parallel requests. + +Note that the driver also provide convenient utility functions to accomplish this. +See https://docs.datastax.com/en/developer/python-driver/latest/api/cassandra/concurrent/ +""" + +import time +import uuid +import threading +from cassandra.cluster import Cluster + + +CONCURRENCY_LEVEL = 32 +TOTAL_QUERIES = 10000 +COUNTER = 0 +COUNTER_LOCK = threading.Lock() + +cluster = Cluster() +session = cluster.connect() + +session.execute(("CREATE KEYSPACE IF NOT EXISTS examples " + "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1' }")) +session.execute("USE examples") +session.execute("CREATE TABLE IF NOT EXISTS tbl_sample_kv (id uuid, value text, PRIMARY KEY (id))") +prepared_insert = session.prepare("INSERT INTO tbl_sample_kv (id, value) VALUES (?, ?)") + + +class SimpleQueryExecutor(threading.Thread): + + def run(self): + global COUNTER + + while True: + with COUNTER_LOCK: + current = COUNTER + COUNTER += 1 + + if current >= TOTAL_QUERIES: + break + + session.execute(prepared_insert, (uuid.uuid4(), str(current))) + + +# Launch in parallel n async operations (n being the concurrency level) +start = time.time() +threads = [] +for i in range(CONCURRENCY_LEVEL): + t = SimpleQueryExecutor() + threads.append(t) + t.start() + +for thread in threads: + thread.join() +end = time.time() + +print("Finished executing {} queries with a concurrency level of {} in {:.2f} seconds.". + format(TOTAL_QUERIES, CONCURRENCY_LEVEL, (end-start)))