# Parallel Processing

If we want to write a program that can take advantage of multiple CPUs, this is the way to go.


# ES/Opensearch Benchmark Tool

This tool can send bulk dummy data to a specified elasticsearch cluster to test how much and how fast data can be sent at once. It uses multiple cores (1 per process) to maximize how many requests can be sent to an opensearch cluster, essentially stress testing it.

The ThreadPoolExecutor does multi-threading, but won't take advantage of multiple cores due to GIL in python, so it'll still be sequential work.

In [None]:
from elasticsearch import Elasticsearch, RequestsHttpConnection, helpers
import os
from requests.auth import HTTPBasicAuth
from requests_aws4auth import AWS4Auth
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures as cf
import boto3
import time
import argparse


def aws_client():
    # Get Temporary Credentials
    credentials = boto3.Session().get_credentials()
    service = 'es'
    region = 'us-west-2'
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
    return awsauth


def es_client(awsauth):
    # Connect to the Elasticsearch cluster
    host = os.environ['DOMAIN']

    es = Elasticsearch(
        hosts=[{'host': host[8:-1], 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )
    return es


def parse_args():
    parser = argparse.ArgumentParser(description='Read arguments')
    parser.add_argument('-t', '--total', type=int, help='total number of docs to create', required=True)
    parser.add_argument('-s', '--chunk_size', type=int, help='Use Bulk API to index dummy data into specfied chunks', required=True)
    parser.add_argument('-n', '--num_machines', type=int, help='Number of machines', required=True)
    parser.add_argument('-i', '--machine_id', type=int, help='Id of machine (should be > 0 and <= num_machines', required=True)
    parser.add_argument('-index', '--index_name', help='Name of index to add docs to', required=True)
    parser.add_argument('-w', '--max_workers', type=int, help='Max number of workers or processes to spawn', required=True)
    parser.add_argument('-c', '--create_index', help='Including this means this instance of the script will create the index (only the first machine should)', action='store_true')
    args = vars(parser.parse_args())
    return args


def create_index(index_name, es):
    # Set the settings for the index
    settings = {
        "settings": {
            "number_of_shards": 3,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "field1": {
                    "type": "text"
                },
                "field2": {
                    "type": "integer"
                },
                "field3": {
                    "type": "date"
                },
                "field4": {
                    "type": "boolean"
                },
                "field5": {
                    "type": "text"
                },
                "field6": {
                    "type": "text"
                }
            }
        }
    }

    # Create the index
    es.indices.create(index=index_name, body=settings)


def create_docs(start, end, index_name, last_doc):
        # Generate the dummy data
        dummy_data = []
        for j in range(start, end):  # This will generate 1 million documents
            doc = {
                "field1": f"dummy text {j}",
                "field2": j,
                "field3": "2022-01-01",
                "field4": True,
                "field5": "The problem is, you are setting the dataset to chart and then you are setting its X Range instead of setting X Range before setting the dataset to chart. You can add either following code in else part at the end: horizontalBarChart.invalidate(); horizontalBarChart.refreshDrawableState(); Or following code in your initCombinedChart: horizontalBarChart.setVisibleXRangeMaximum(10); horizontalBarChart.setVisibleXRangeMinimum(7); Edit 1: Try with setVisibleYRangeMaximum instead of setVisibleXRangeMaximum. ",
                "field6": "I am using IBM MobileFirst 7.0. I am getting following error message when I invoke a procedure. [6/6/16 15:13:18:516 IST] 000000e6 DataAccessSer E logError FWLSE0099E: An error occurred while invoking procedure [project PNotifications]AllInOneAdapter/HttpRequestFWLSE0100E: parameters: [project PNotifications] Http request failed: java.net.SocketTimeoutException: Read timed out FWLSE0101E: Caused by: [project PNotifications]java.net.SocketTimeoutException: Read timed outjava.lang.RuntimeException: Http request failed: java.net.SocketTimeoutException: Read timed out Below is log of adapter : [6/6/16 15:13:18:519 IST] 000000e6 JavaScriptInt I com.worklight.integration.js.JavaScriptIntegrationLibraryImplementation info { \"errors\": [ \"Runtime: Http request failed: java.net.SocketTimeoutException: Read timed out\" ], \"info\": [ ], \"isSuccessful\": false, \"warnings\": [ ] } [project PNotifications] I am facing this issue intermittently on the QA server which is at client-side. The app works perfectly at times, before this issue occurs randomly. I never faced this issue in the development environment. I also tried increasing the connectionTimeoutInMilliseconds value to 90000 but the issue still persists. As this issue occurs randomly we haven't been able to find the cause for it. Any help would be much appreciated. "
            }
            dummy_data.append(doc)

        chunk = dummy_data
        actions = []
        for doc in chunk:
            action = {
                "index": {
                    "_index": index_name
                }
            }
            actions.append(action)
            actions.append(doc)
        print(f'{end} docs added / {last_doc}')
        es.bulk(body=actions)


awsauth = aws_client()
es = es_client(awsauth)


def main():
    # Program starts here, create_doc fails if i define a main method
    args = parse_args()
    # For 100GB
    #num_docs = 674000000
    num_docs = args['total']
    # Use the bulk API to index the dummy data in chunks
    chunk_size = args['chunk_size']
    # Number of machines this workload will be divided up by
    num_machines = args['num_machines']
    machine_id = args['machine_id']
    index_name = args['index_name']

    #awsauth = aws_client()
    #es = es_client(awsauth)
    # Create the index
    if args['create_index']:
        create_index(index_name, es)

    first_doc = (num_docs // num_machines) * (machine_id - 1)
    last_doc = (num_docs // num_machines) * machine_id

    # Start timer when adding docs to index
    start_time = time.time()

    # Create a ProcessPoolExecutor with the desired number of processes
    with cf.ProcessPoolExecutor(max_workers=args['max_workers']) as executor:
        futures = []
        for i in range(first_doc, last_doc, chunk_size):
            # Don't go over the limit
            ending_doc = min(i + chunk_size, last_doc)
            futures.append(executor.submit(create_docs, i, ending_doc, index_name, last_doc))

        cf.wait(futures)

    # Print end time
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f'Elapsed time: {elapsed_time:.2f} seconds')

    # Refresh the index to make the indexed data searchable
    es.indices.refresh(index=index_name)

    # Check the size of the index
    index_stats = es.indices.stats(index=index_name)
    index_size_bytes = index_stats['indices'][index_name]['total']['store']['size_in_bytes']
    index_size_gb = index_size_bytes / (1024 ** 3)  # Convert bytes to gigabytes

    # Print the size of the index
    print(f"Index size: {index_size_gb:.2f} GB")

if __name__ == "__main__":
    main()

## Usage

This script can be deployed to multiple machines and make use of multiple cores per machine. 

Steps to start using the script are:

- Download this python script to each machine you want this to run on
- pip3 install all of these python packages
```
elasticsearch==7.10 # or whatever version you're using
requests
requests_aws4auth
boto3
```
- `export DOMAIN=https://cluster_endpoint`
- Run this script with required command line arguments

## Command Line Args

There are multiple command line arguments involved. All are required except for `create_index` which only needs to be used on one node.

- total: total number of docs to create
- chunk_size: Use Bulk API to index dummy data into specfied chunks
- num_machines: Number of machines to distribute the load onto
- machine_id: Id of machine (should be > 0 and less than or equal to num_machines
- index_name: Name of index to add docs to
- max_workers: Max number of workers or processes to spawn
- create_index: Including this means this instance of the script will create the index (only the first machine should)

## Example Usage

Let's say we want to run this tool across 4 machines. Let's send 2,300,000 docs in total, and we only want to send 10000 docs at a time for stability, and only use 10 cpu cores at any given time.

This would be the layout across 4 different machines.

```
export DOMAIN=https://opensearch_cluster_endpoint.com/
```

Machine 1:
```
python3 create_index_multithreaded.py --total 2300000 --chunk_size 10000 --num_machines 4 --machine_id 1 --index_name test100gb-index --max_workers=10 --create_index
```
Machine 2:
```
python3 create_index_multithreaded.py --total 2300000 --chunk_size 10000 --num_machines 4 --machine_id 2 --index_name test100gb-index --max_workers=10
```
Machine 3:
```
python3 create_index_multithreaded.py --total 2300000 --chunk_size 10000 --num_machines 4 --machine_id 3 --index_name test100gb-index --max_workers=10
```
Machine 4:
```
python3 create_index_multithreaded.py --total 2300000 --chunk_size 10000 --num_machines 4 --machine_id 4 --index_name test100gb-index --max_workers=10
```
The four machines will:
- Divide up the workload of 2,300,000 docs among the 4 machines equally using dummy data
- Show progress of docs added to the index after each chunk is bulk added
- Print out the total time the operation took
- Print the size of the index created
