In [1]:
CONFIG_FILE_S3_URI="<< S3 LOCATION URI FOR CONFIGURATION JSON FILE >>"
RS_HOST = '<< YOUR REDSHIFT CLUSTER ENDPOINT >>'

In [2]:
%%bash
pip install sqlalchemy 
pip install psycopg2-binary



In [3]:
import json
import boto3
import psycopg2
import time
import pandas
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from urllib.parse import quote_plus as urlquote
import urllib

In [4]:
def connect_to_redshift(host,username):
    client = boto3.client('redshift')
    cluster_creds = client.get_cluster_credentials(DbUser=username,
                                                   DbName=RS_HOST.split('/')[1],
                                                   ClusterIdentifier=RS_HOST.split('.')[0])


    connection_string='postgresql://'+ urlquote(cluster_creds['DbUser']) + ':'+ urlquote(cluster_creds['DbPassword']) + '@'+ RS_HOST
    return create_engine(connection_string)

def get_json_config_from_s3(script_s3_path):
    bucket, key = script_s3_path.replace("s3://", "").split("/", 1)
    obj = boto3.client('s3').get_object(Bucket=bucket, Key=key)
    return json.loads(obj['Body'].read().decode('utf-8'))


def get_concurrency_scripts_from_s3(cluster_identifier,config_json,number_of_parallel_sessions):
    script_s3_path = config_json.get('concurrent_user_queries_and_load_s3_path')
    redshift_iam_role = config_json.get('redshift_iam_role')
    bucket_name = config_json.get('s3_bucket_name')
                           
    bucket, key = script_s3_path.replace("s3://", "").split("/", 1)
    obj = boto3.client('s3').get_object(Bucket=bucket, Key=key)
    scripts = obj['Body'].read().decode('utf-8')
    scripts = scripts.format(redshift_iam_role=redshift_iam_role, bucket_name=bucket_name,cluster_identifier=cluster_identifier)
    split_scripts = scripts.split(';')[:-1]
    if len(split_scripts) < number_of_parallel_sessions:
        while len(split_scripts) < number_of_parallel_sessions:
            split_scripts.extend(split_scripts)
    return split_scripts


def get_sql(engine, script, sequence_number):
    df = None
    sql = "set enable_result_cache_for_session to false;" + script[sequence_number];
    df = pandas.read_sql(sql, engine)
    return df


def run_concurrency_test(number_of_parallel_sessions): 
    config = get_json_config_from_s3(CONFIG_FILE_S3_URI)
    engine=connect_to_redshift(RS_HOST,config.get('master_user_name'))
    script = get_concurrency_scripts_from_s3(RS_HOST.split('.')[0],config, number_of_parallel_sessions)
    start_time = time.time()
    try:
        with ThreadPoolExecutor(max_workers=number_of_parallel_sessions) as executor:
            futures = []
            for sequence_number in range(number_of_parallel_sessions):
                futures.append(executor.submit(
                    get_sql, engine, script, sequence_number))
            for future in as_completed(futures):
                rs = future.result()

    except Exception as e:
        raise e
    elapsed_time_in_secs = (time.time() - start_time)
    print("--- %s seconds ---" % elapsed_time_in_secs)
    return elapsed_time_in_secs


In [None]:
################################################################################################################
# Please input the desired parallel threads below. By default, it runs five times with 1, 20, 40, 50 threads
# Average of 5 runs of concurrency testing is taken as the final outcome.
################################################################################################################
for number_of_parallel_sessions in [1,20,40,50]:
    print("running %s parallel threads .." % number_of_parallel_sessions)
    tm = []
    # try 5 times for each thread count
    for j in range(0, 5):
        tm.append(run_concurrency_test(number_of_parallel_sessions))
    avg = sum(tm)/len(tm)
    print(f"average of five runs with {number_of_parallel_sessions} parallel sessions: {avg}")


running 1 parallel threads ..
--- 15.19753122329712 seconds ---
--- 15.018664836883545 seconds ---
--- 14.543593883514404 seconds ---
--- 16.660288095474243 seconds ---
--- 15.54794955253601 seconds ---
average of five runs with 1 parallel sessions: 15.393605518341065
running 20 parallel threads ..
