In [12]:
import os

# Set environment variables
os.environ['HOST'] = 'localhost'
os.environ['BBUY_DATA'] = '/workspace/datasets/product_data/products/'
os.environ['BBUY_QUERIES'] = '/workspace/datasets/'

## 1. Cluster manager and Leader Election

In [47]:
!docker ps

CONTAINER ID   IMAGE                                           COMMAND                  CREATED             STATUS                    PORTS                                                                                                      NAMES
ebd2598b59b6   grafana/grafana:9.4.7                           "/run.sh"                About an hour ago   Up 48 seconds             0.0.0.0:3000->3000/tcp, :::3000->3000/tcp                                                                  grafana
8ad0f364798a   prom/prometheus:v2.43.0                         "/bin/prometheus --c…"   About an hour ago   Up 49 seconds             0.0.0.0:9090->9090/tcp, :::9090->9090/tcp                                                                  prometheus
60e969dc5b6e   gcr.io/cadvisor/cadvisor:v0.47.1                "/usr/bin/cadvisor -…"   About an hour ago   Up 49 seconds (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp                                                                  cadvisor
66

### Notes

Confirming cluster healthy with three active nodes.

![](assets/cluster-green.png)

In [48]:
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/nodes?v'

ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role node.roles                                        cluster_manager name
172.18.0.4           36          97   5    2.41    2.03     2.67 dimr      cluster_manager,data,ingest,remote_cluster_client *               opensearch-node2
172.18.0.5           34          97   4    2.41    2.03     2.67 dimr      cluster_manager,data,ingest,remote_cluster_client -               opensearch-node3
172.18.0.3           31          97   4    2.41    2.03     2.67 dimr      cluster_manager,data,ingest,remote_cluster_client -               opensearch-node1


### Notes

Looks like `opensearch-node2` is the cluster manager due to the `*` in the `cluster_manager` column

In [49]:
# Force docker manager to stop
!docker stop opensearch-node2

opensearch-node2


In [51]:
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/nodes?v'

ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role node.roles                                        cluster_manager name
172.18.0.5           39          94   3    1.87    1.93     2.62 dimr      cluster_manager,data,ingest,remote_cluster_client *               opensearch-node3
172.18.0.3           35          94   4    1.87    1.93     2.62 dimr      cluster_manager,data,ingest,remote_cluster_client -               opensearch-node1


### Notes

Via the Grafana dashboard, we can also observe that there are only two active nodes.

![](assets/two-nodes.png)

In [None]:
# Check logs for new cluster manager
!docker logs opensearch-node3

In [53]:
# Start the stopped node again
!docker start opensearch-node2

opensearch-node2


In [58]:
# Check if all three nodes are back
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/nodes?v'

ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role node.roles                                        cluster_manager name
172.18.0.4           24          97  17    2.32    2.03     2.63 dimr      cluster_manager,data,ingest,remote_cluster_client -               opensearch-node2
172.18.0.5           47          97  15    2.32    2.03     2.63 dimr      cluster_manager,data,ingest,remote_cluster_client *               opensearch-node3
172.18.0.3           41          97  15    2.32    2.03     2.63 dimr      cluster_manager,data,ingest,remote_cluster_client -               opensearch-node1


### Notes
Now we're back to 3 active nodes, and opensearch-node3 continues as the cluster manager

## 2. Creating a sharded index

In [74]:
# First, delete the existing index
!curl -k -X DELETE -u admin:admin "https://localhost:9200/bbuy_products"

{"acknowledged":true}

In [87]:
# Then, create index based on index definitions with 3 primary shards and 2 replica shards
!curl -k -X PUT -u admin:admin "https://localhost:9200/bbuy_products" -H 'Content-Type: application/json' -d @/workspace/search_engineering/week3/bbuy_products_3primary2replica.json

{"acknowledged":true,"shards_acknowledged":true,"index":"bbuy_products"}

In [89]:
# Check shards (all empty)
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state   docs store ip         node
bbuy_products 0     p      STARTED    0  208b 172.18.0.5 opensearch-node3
bbuy_products 0     r      STARTED    0  208b 172.18.0.4 opensearch-node2
bbuy_products 0     r      STARTED    0  208b 172.18.0.3 opensearch-node1
bbuy_products 1     p      STARTED    0  208b 172.18.0.4 opensearch-node2
bbuy_products 1     r      STARTED    0  208b 172.18.0.5 opensearch-node3
bbuy_products 1     r      STARTED    0  208b 172.18.0.3 opensearch-node1
bbuy_products 2     p      STARTED    0  208b 172.18.0.3 opensearch-node1
bbuy_products 2     r      STARTED    0  208b 172.18.0.5 opensearch-node3
bbuy_products 2     r      STARTED    0  208b 172.18.0.4 opensearch-node2


In [78]:
# Index BBUY data, 100k documents
!python index.py -s /workspace/datasets/product_data/products/ -w 8 -b 3200 --refresh_interval 60s -m 100000

INFO:Indexing /workspace/datasets/product_data/products/ to bbuy_products with 8 workers, refresh_interval of 60s to host localhost with a maximum number of docs sent per file per worker of 100000 and 3200 per batch.
INFO:Done. 1275077 were indexed in 10.108549114650001 minutes.  Total accumulated time spent in `bulk` indexing: 25.73044800694946 minutes


In [85]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 442.1mb 172.18.0.5 opensearch-node3
bbuy_products 0     r      STARTED 423938 431.5mb 172.18.0.4 opensearch-node2
bbuy_products 0     r      STARTED 423938   436mb 172.18.0.3 opensearch-node1
bbuy_products 1     p      STARTED 425833 422.8mb 172.18.0.4 opensearch-node2
bbuy_products 1     r      STARTED 425833 417.9mb 172.18.0.5 opensearch-node3
bbuy_products 1     r      STARTED 425833 418.7mb 172.18.0.3 opensearch-node1
bbuy_products 2     p      STARTED 425306 424.2mb 172.18.0.3 opensearch-node1
bbuy_products 2     r      STARTED 425306 423.6mb 172.18.0.5 opensearch-node3
bbuy_products 2     r      STARTED 425306 435.9mb 172.18.0.4 opensearch-node2


### Notes
The number of docs is pretty similar across nodes.

Comparing primary shards across replicas (for the same shard number), the document counts on the primary is equaly to the replica.

Nonetheless, there's some difference is storage size

```
index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 442.1mb 172.18.0.5 opensearch-node3
bbuy_products 0     r      STARTED 423938 431.5mb 172.18.0.4 opensearch-node2
bbuy_products 0     r      STARTED 423938   436mb 172.18.0.3 opensearch-node1
bbuy_products 1     p      STARTED 425833 422.8mb 172.18.0.4 opensearch-node2
bbuy_products 1     r      STARTED 425833 417.9mb 172.18.0.5 opensearch-node3
bbuy_products 1     r      STARTED 425833 418.7mb 172.18.0.3 opensearch-node1
bbuy_products 2     p      STARTED 425306 424.2mb 172.18.0.3 opensearch-node1
bbuy_products 2     r      STARTED 425306 423.6mb 172.18.0.5 opensearch-node3
bbuy_products 2     r      STARTED 425306 435.9mb 172.18.0.4 opensearch-node2
```

### Re-index the data but with no replicas

In [94]:
# First, delete the existing index
!curl -k -X DELETE -u admin:admin "https://localhost:9200/bbuy_products"

{"acknowledged":true}

In [95]:
# Then, create index based on index definitions with 3 primary shards
!curl -k -X PUT -u admin:admin "https://localhost:9200/bbuy_products" -H 'Content-Type: application/json' -d @/workspace/search_engineering/week3/bbuy_products_3primary.json

{"acknowledged":true,"shards_acknowledged":true,"index":"bbuy_products"}

In [96]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state   docs store ip         node
bbuy_products 0     p      STARTED    0  208b 172.18.0.5 opensearch-node3
bbuy_products 1     p      STARTED    0  208b 172.18.0.4 opensearch-node2
bbuy_products 2     p      STARTED    0  208b 172.18.0.3 opensearch-node1


In [97]:
# Index BBUY data, 100k documents
!python index.py -s /workspace/datasets/product_data/products/ -w 8 -b 3200 --refresh_interval 60s -m 100000

INFO:Indexing /workspace/datasets/product_data/products/ to bbuy_products with 8 workers, refresh_interval of 60s to host localhost with a maximum number of docs sent per file per worker of 100000 and 3200 per batch.
INFO:Done. 1275077 were indexed in 6.422385269833285 minutes.  Total accumulated time spent in `bulk` indexing: 10.447608690066833 minutes


### Notes
- Indexing time taken with 3 primary 2 replica: 10 min 7 seconds
- Indexing time taken with 3 primary 0 replica: 6 min 25 seconds

### Dynamically add replicas

In [99]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 378465 501.2mb 172.18.0.5 opensearch-node3
bbuy_products 1     p      STARTED 378527 508.1mb 172.18.0.4 opensearch-node2
bbuy_products 2     p      STARTED 377788 512.1mb 172.18.0.3 opensearch-node1


In [18]:
!curl -k -XPUT -u admin:admin 'https://localhost:9200/bbuy_products/_settings' -H 'Content-Type: application/json' -d '{"index": {"number_of_replicas": 2}}'

{"acknowledged":true}

In [19]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 441.7mb 172.18.0.7 opensearch-node3
bbuy_products 0     r      STARTED 423938 438.5mb 172.18.0.6 opensearch-node2
bbuy_products 0     r      STARTED 423938 438.8mb 172.18.0.8 opensearch-node1
bbuy_products 1     p      STARTED 425833 435.3mb 172.18.0.8 opensearch-node1
bbuy_products 1     r      STARTED 425833 436.4mb 172.18.0.7 opensearch-node3
bbuy_products 1     r      STARTED 425833 440.9mb 172.18.0.6 opensearch-node2
bbuy_products 2     p      STARTED 425306   441mb 172.18.0.8 opensearch-node1
bbuy_products 2     r      STARTED 425306 438.4mb 172.18.0.7 opensearch-node3
bbuy_products 2     r      STARTED 425306 439.4mb 172.18.0.6 opensearch-node2


### Notes

Via grafana, we observe that the cluster was yellow for ~2 minutes.

![](assets/adding-2-replicas.png)

If we look at the logs, the replica took about 1 minute 20 seconds.

```
[2023-05-12T18:51:24,243][INFO ][o.o.c.m.MetadataUpdateSettingsService] [opensearch-node3] updating number_of_replicas to [2] for indices [bbuy_products]

...

[2023-05-12T18:52:43,246][INFO ][o.o.c.r.a.AllocationService] [opensearch-node3] Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[bbuy_products][2]]]).
```

We also observe that the logs were from the cluster manager (`opensearch-node3) because it's the source of truth for cluster-wide information.

Also, when we check the shards, we observe that the replicas have the same storage size as their primaries.

In [20]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 441.7mb 172.18.0.7 opensearch-node3
bbuy_products 0     r      STARTED 423938 438.5mb 172.18.0.6 opensearch-node2
bbuy_products 0     r      STARTED 423938 438.8mb 172.18.0.8 opensearch-node1
bbuy_products 1     p      STARTED 425833 435.3mb 172.18.0.8 opensearch-node1
bbuy_products 1     r      STARTED 425833 436.4mb 172.18.0.7 opensearch-node3
bbuy_products 1     r      STARTED 425833 440.9mb 172.18.0.6 opensearch-node2
bbuy_products 2     p      STARTED 425306   441mb 172.18.0.8 opensearch-node1
bbuy_products 2     r      STARTED 425306 438.4mb 172.18.0.7 opensearch-node3
bbuy_products 2     r      STARTED 425306 439.4mb 172.18.0.6 opensearch-node2


## 3. Query performance with replica shards

In [11]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 441.7mb 172.18.0.7 opensearch-node3
bbuy_products 0     r      STARTED 423938 438.5mb 172.18.0.6 opensearch-node2
bbuy_products 0     r      STARTED 423938 438.8mb 172.18.0.8 opensearch-node1
bbuy_products 1     p      STARTED 425833 435.3mb 172.18.0.8 opensearch-node1
bbuy_products 1     r      STARTED 425833 436.4mb 172.18.0.7 opensearch-node3
bbuy_products 1     r      STARTED 425833 440.9mb 172.18.0.6 opensearch-node2
bbuy_products 2     p      STARTED 425306   441mb 172.18.0.8 opensearch-node1
bbuy_products 2     r      STARTED 425306 438.4mb 172.18.0.7 opensearch-node3
bbuy_products 2     r      STARTED 425306 439.4mb 172.18.0.6 opensearch-node2


In [None]:
# Query with four working threads
!python ./query.py -q $BBUY_QUERIES/train.csv -w 4 -m 25000

### Notes
- We achieve about 330 queries/second.
- This is about 4x more than the previous run which achieved 70-80 queries/second.
- Note that it's not an apples-to-apples comparison on replicas though, because here we're using a bigger machine with 2x CPU (8 cores instead of 4) and 2x RAM (16GB instead of 8GB). 
- Thus, if we factor in the 2x larger machine, it's approximately 2x increased query rate.

![](assets/query-rates.png)

## 4. Resharding

In [14]:
# Check shards
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 441.7mb 172.18.0.7 opensearch-node3
bbuy_products 0     r      STARTED 423938 438.5mb 172.18.0.6 opensearch-node2
bbuy_products 0     r      STARTED 423938 438.8mb 172.18.0.8 opensearch-node1
bbuy_products 1     p      STARTED 425833 435.3mb 172.18.0.8 opensearch-node1
bbuy_products 1     r      STARTED 425833 436.4mb 172.18.0.7 opensearch-node3
bbuy_products 1     r      STARTED 425833 440.9mb 172.18.0.6 opensearch-node2
bbuy_products 2     p      STARTED 425306   441mb 172.18.0.8 opensearch-node1
bbuy_products 2     r      STARTED 425306 438.4mb 172.18.0.7 opensearch-node3
bbuy_products 2     r      STARTED 425306 439.4mb 172.18.0.6 opensearch-node2


In [23]:
# Update source index to be read only
!curl -k -XPUT -u admin:admin 'https://localhost:9200/bbuy_products/_settings' -H 'Content-Type: application/json' -d '{"index.blocks.write": true}'

{"acknowledged":true}

In [24]:
# Then, update to single primary shard and 2 replicas
!curl -k -X POST -u admin:admin "https://localhost:9200/bbuy_products/_shrink/bbuy_products_1shard" -H 'Content-Type: application/json' -d '{"settings": {"index.number_of_replicas": 2, "index.number_of_shards": 1}}'

{"acknowledged":true,"shards_acknowledged":true,"index":"bbuy_products_1shard"}

In [27]:
# Check shards of old index
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products?v&s=shard,prirep'

index         shard prirep state     docs   store ip         node
bbuy_products 0     p      STARTED 423938 441.7mb 172.18.0.7 opensearch-node3
bbuy_products 0     r      STARTED 423938 438.5mb 172.18.0.6 opensearch-node2
bbuy_products 0     r      STARTED 423938 438.8mb 172.18.0.8 opensearch-node1
bbuy_products 1     p      STARTED 425833 435.3mb 172.18.0.8 opensearch-node1
bbuy_products 1     r      STARTED 425833 436.4mb 172.18.0.7 opensearch-node3
bbuy_products 1     r      STARTED 425833 440.9mb 172.18.0.6 opensearch-node2
bbuy_products 2     p      STARTED 425306   441mb 172.18.0.8 opensearch-node1
bbuy_products 2     r      STARTED 425306 438.4mb 172.18.0.7 opensearch-node3
bbuy_products 2     r      STARTED 425306 439.4mb 172.18.0.6 opensearch-node2


In [28]:
# Check shards of new index
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products_1shard?v&s=shard,prirep'

index                shard prirep state           docs store ip         node
bbuy_products_1shard 0     p      STARTED      1275077 1.6gb 172.18.0.7 opensearch-node3
bbuy_products_1shard 0     r      INITIALIZING               172.18.0.6 opensearch-node2
bbuy_products_1shard 0     r      INITIALIZING               172.18.0.8 opensearch-node1


In [34]:
# Check shards of new index
!curl -XGET -k 'https://admin:admin@localhost:9200/_cat/shards/bbuy_products_1shard?v&s=shard,prirep'

index                shard prirep state      docs store ip         node
bbuy_products_1shard 0     p      STARTED 1275077   2gb 172.18.0.7 opensearch-node3
bbuy_products_1shard 0     r      STARTED 1275077 1.2gb 172.18.0.6 opensearch-node2
bbuy_products_1shard 0     r      STARTED 1275077 1.2gb 172.18.0.8 opensearch-node1
