# Questions about effects of multiple shards, nodes on query speed

Trying to demonstrate two issues in this notebook:

1. Searches on an index with $n$ shards and search threads are not $n$ times faster than searches on an index with 1 shard.
2. Searches on a single node with $n$ shards and search threads are substantially slower than searches on 2 nodes each with $n/2$ shards.

## Background

My assumption was always that adding shards would roughly linearly decrease the query latency. I.e., 8 shards should be ~8x faster than 1 shard. However, when I measured it for a few cases, I noticed that 8 shards aren't even ~2x faster than 1 shard, hence issue #1. Then I ran into issue #2 in the process of debugging issue #1.

I have a hunch about #1. Specifically, there's some overhead in serializing/transferring the query from the client to the server and some more overhead in collecting the results from each shard and serializing/transferring them back to the client. This overhead is inherentally serial, so it negates some of the speedup from having more shards. This is reflected if you look at the sampled search thread state in VisualVM. The threads spend about half their runtime in a "Parked" state. AFAIK, this means they just don't have any work to do. You can keep them busy by executing multiple search requests in parallel. I posted some screenshots in this Github issue: https://github.com/alexklibisz/elastiknn/issues/220

I have no idea why #2 is happening.


## Setup

To run Elasticsearch, I'm using a docker-compose stack with:
- one master node which stores no data with a 1GB heap
- 1 or 2 data nodes, each with 8 search threads and a 4GB heap

To generate an index and queries, I'm using the [Amazon Reviews Dataset](https://jmcauley.ucsd.edu/data/amazon/), specifically the 5-core Clothing and Jewelry reviews, containing 278k reviews. I index the full review text in a `text` field, use the shorter review summary to build a `term` query, and run the queries one at a time.

I'm running this on my Dell XPS 9570 laptop, ubuntu 20.04, with 32GB memory, an SSD, and this CPU: i7-8750H CPU @ 2.20GHz × 12.

In [1]:
# Download the dataset.
!wget -nc http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Clothing_Shoes_and_Jewelry_5.json.gz
!du -hs *.json.gz

File ‘reviews_Clothing_Shoes_and_Jewelry_5.json.gz’ already there; not retrieving.

46M	reviews_Clothing_Shoes_and_Jewelry_5.json.gz


In [2]:
# Helpers for benchmarking.
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from tqdm import tqdm
from itertools import islice
import json
import gzip 

es = Elasticsearch(hosts=["localhost:9200"])
review_file = 'reviews_Clothing_Shoes_and_Jewelry_5.json.gz'

def build_index(shards):
  index = f"ix-{shards}"
  body = {
    "settings": {
      "number_of_shards": shards,
      "number_of_replicas": 0
    }
  }
  mapping = {
    "properties": {
      "review": {
        "type": "text"
      }
    }
  }
  if es.indices.exists(index):
      print(f"Index {index} already exists")
      return index
  es.indices.create(index, body=body)
  es.indices.put_mapping(mapping, index)
  
  def bulkgen():
    with gzip.open(review_file, 'r') as g:
      for l in tqdm(g, desc=f"Indexing with {shards} shards"):
        d = eval(l)
        yield {"_op_type": "index", "_index": index, "review": d['reviewText']}
      
  (_, errors) = bulk(es, bulkgen(), chunk_size=5000, max_retries=9)
  assert len(errors) == 0, errors
  es.indices.refresh(index=index)
  es.indices.forcemerge(index=index, max_num_segments=1)
  return index

def search(index, n=1000, size=10):
  # Pre-generate queries so that it's not the bottleneck.
  queries = []
  
  def querygen():
    with gzip.open(review_file, 'r') as g:
      for l in g:
        d = eval(l)
        yield {
          "query": {
            "match": {
              "review": {
                "query": d['summary'],
              }
            }
          }
        }
          
  queries = list(islice(querygen(), n))
          
  for query in tqdm(queries, desc=f"Running {n} queries on index {index}"):
    res = es.search(index=index, body=query, size=size, _source=False)
    
def start_cluster(data_nodes=1):
  !docker-compose up -d --force-recreate --scale elasticsearch_data=$data_nodes
  !sleep 30
  !echo '\n'
  !curl localhost:9200/_cat/nodes?v

## Baseline: Single data node, single shard

In [3]:
start_cluster(1)
index = build_index(1)
search(index, n=50000, size=10)

Recreating parallel-shards-question_elasticsearch_data_1 ... 
Recreating elasticsearch_master                          ... 
[1Beating elasticsearch_master                          ... [32mdone[0m[1A[2K\n
ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.18.0.3            7          27   8    1.45    0.76     0.31 dilrt     -      07d88c0d6541
172.18.0.2           61          27   6    1.45    0.76     0.31 mr        *      elasticsearch_master


Indexing with 1 shards: 278677it [00:27, 10262.50it/s]
Running 50000 queries on index ix-1: 100%|██████████| 50000/50000 [03:13<00:00, 258.51it/s]


## Single data node, eight shards

In [4]:
index = build_index(8)
search(index, n=50000, size=10)

Indexing with 8 shards: 278677it [00:19, 13962.05it/s]
Running 50000 queries on index ix-8: 100%|██████████| 50000/50000 [03:02<00:00, 274.16it/s]


## Two data nodes, eight shards

In [5]:
start_cluster(2)
index = build_index(8)
search(index, n=50000, size=10)

Recreating parallel-shards-question_elasticsearch_data_1 ... 
Recreating elasticsearch_master                          ... 
[2BCreating parallel-shards-question_elasticsearch_data_2   ... mdone[0m[2A[2K
[1Bting parallel-shards-question_elasticsearch_data_2   ... [32mdone[0m\n
ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.18.0.4            7          43  12    3.53    2.44     1.23 dilrt     -      7bd2b11572db
172.18.0.3            7          43  12    3.53    2.44     1.23 dilrt     -      74cca019a317
172.18.0.2           21          43  16    3.53    2.44     1.23 mr        *      elasticsearch_master


Indexing with 8 shards: 278677it [00:22, 12482.99it/s]
Running 50000 queries on index ix-8: 100%|██████████| 50000/50000 [02:36<00:00, 318.85it/s]
