Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Veilid Chunking #8558

Merged
merged 36 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a2b7e4b
add VeilidStreamer for sending large messages in chunks
itstauq Feb 27, 2024
450981a
Refactor everything
itstauq Feb 29, 2024
de42ee2
fix bugs in veilid streamer and add notebooks for testing
itstauq Mar 1, 2024
5bd08e7
Make VeilidStreamer compatible with app_call by adding callback to re…
itstauq Mar 3, 2024
107b1c0
Add instructions to use VeilidStreamer in the docstring
itstauq Mar 3, 2024
ab65443
Send chunks parallelly, add batching and retries, improve test notebooks
itstauq Mar 4, 2024
2ecd1e8
fix linting issues
itstauq Mar 6, 2024
76fc280
fix linting issues try 2
itstauq Mar 6, 2024
5c5c5eb
fix linting issues try 3
itstauq Mar 6, 2024
219ee62
add architecture diagram for VeilidStreamer
itstauq Mar 8, 2024
78c1d95
Integrate VeilidStreamer with veilid_core
itstauq Mar 8, 2024
880439c
Add code to benchmark Veilid's performance with different message sizes
itstauq Mar 8, 2024
fd6895f
Add a single notebook with instructions to send large veilid messages
itstauq Mar 8, 2024
5066cac
replace batching and retries with semaphores; update test notebook
itstauq Mar 11, 2024
bf894d6
- Use random UUID to identify a stream call instead of message hash
itstauq Mar 11, 2024
acb8491
Merge branch 'rasswanth/veilid-prototype' into tauquir/veilid-chunking
itstauq Mar 12, 2024
e002dce
Merge branch 'dev' into tauquir/veilid-chunking
rasswanth-s Mar 13, 2024
49c976c
Merge branch 'dev' into tauquir/veilid-chunking
madhavajay Mar 14, 2024
e2f30c3
Add ability to send responses greater than 32 KB in size
itstauq Mar 15, 2024
4b693e2
Merge remote-tracking branch 'origin' into tauquir/veilid-chunking
itstauq Mar 15, 2024
b237552
Merge branch 'tauquir/veilid-chunking' of github.com:OpenMined/PySyft…
itstauq Mar 15, 2024
5551e4f
Make VeilidStreamer compatible with vld_key and update notebooks
itstauq Mar 15, 2024
3714bc0
Add retries to VeilidStreamer; remove unnecessary notebooks
itstauq Mar 15, 2024
b222058
Add endpoint for testing handle_app_call and add more tests to notebook
itstauq Mar 17, 2024
cc3f8ce
- Abstract away app_call and app_call_reply entirely
itstauq Mar 18, 2024
2a1ddf1
add changes for resolving PR comments
itstauq Mar 18, 2024
5f2999a
Merge branch 'dev' into tauquir/veilid-chunking
itstauq Mar 18, 2024
be44b74
Update docstring for STREAM_SINGLE request
itstauq Mar 19, 2024
03ad5aa
Add BytesEnum to utils.py and use it in veilid_streamer.py
itstauq Mar 19, 2024
b87b24a
Add changes for resolving PR comments; fix logging
itstauq Mar 19, 2024
105e1c3
Add suggestions from PR comments
itstauq Mar 20, 2024
0d07473
Ensure that all the chunks are received before reassembling the message
itstauq Mar 20, 2024
ab741db
Merge branch 'dev' into tauquir/veilid-chunking
itstauq Mar 20, 2024
4c2d722
removed compression proxy route
rasswanth-s Mar 20, 2024
d2e5c27
Merge branch 'dev' into tauquir/veilid-chunking
rasswanth-s Mar 20, 2024
9735e72
skip veilid tests temporarily
rasswanth-s Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
363 changes: 363 additions & 0 deletions notebooks/Testing/Veilid/Large-Message-Testing.ipynb
@@ -0,0 +1,363 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Instructions\n",
"\n",
"1. Follow these instructions from `packages/grid/veilid/development.md` to build veilid docker containers:\n",
" ```bash\n",
" cd packages/grid/veilid && docker build -f veilid.dockerfile -t veilid:0.1 .\n",
" ```\n",
"2. From within the `packages/grid/veilid` directory run the receiver docker container on port 4000:\n",
" ```bash\n",
" docker run -it -e DEV_MODE=True -p 4000:4000 -v $(pwd)/server:/app/server veilid:0.1\n",
" ```\n",
"3. On a separate terminal tab/window, cd into `packages/grid/veilid` directory again and run the sender docker container on port 4001:\n",
" ```bash\n",
" docker run -it -e DEV_MODE=True -p 4001:4000 -v $(pwd)/server:/app/server veilid:0.1\n",
" ```\n",
"4. Follow and run the below cells to test out sending large messages through Veilid. You may also use the **`Run All`** notebook function once the above two docker containers are up and running."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1. Set up imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# stdlib\n",
"import json\n",
"import logging\n",
"from pprint import pprint\n",
"import random\n",
"import time\n",
"\n",
"# third party\n",
"import requests\n",
"\n",
"logging.basicConfig(level=logging.INFO, format=\"%(message)s\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2. Set up receiver"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"RECEIVER_HOST = \"localhost\"\n",
"RECEIVER_PORT = 4000\n",
"RECEIVER_BASE_ADDRESS = f\"http://{RECEIVER_HOST}:{RECEIVER_PORT}\"\n",
"\n",
"requests.post(f\"{RECEIVER_BASE_ADDRESS}/generate_vld_key\")\n",
"res = requests.get(f\"{RECEIVER_BASE_ADDRESS}/retrieve_vld_key\")\n",
"receiver_vld_key = res.json()[\"message\"]\n",
"logging.info(f\"{'=' * 30}\\n{receiver_vld_key}\\n{'=' * 30}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Set up sender"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"SENDER_HOST = \"localhost\"\n",
"SENDER_PORT = 4001\n",
"SENDER_BASE_ADDRESS = f\"http://{SENDER_HOST}:{SENDER_PORT}\"\n",
"\n",
"requests.post(f\"{SENDER_BASE_ADDRESS}/generate_vld_key\")\n",
"res = requests.get(f\"{SENDER_BASE_ADDRESS}/retrieve_vld_key\")\n",
"sender_vld_key = res.json()[\"message\"]\n",
"logging.info(f\"{'=' * 30}\\n{sender_vld_key}\\n{'=' * 30}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Declare utility functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def send_test_request(request_size_bytes, response_size_bytes):\n",
" \"\"\"\n",
" Send a test request of the specified size and receive a response of.\n",
"\n",
" Args:\n",
" request_size_bytes (int): Size of the request body in bytes.\n",
" response_size_bytes (int): Expected size of the response body in bytes.\n",
"\n",
" Returns:\n",
" tuple: A tuple containing the total transfer size, total time taken and success status.\n",
" \"\"\"\n",
" message = build_vld_message(request_size_bytes, response_size_bytes)\n",
" json_data = {\n",
" \"vld_key\": receiver_vld_key,\n",
" \"message\": message,\n",
" }\n",
"\n",
" logging.debug(f\"Sending message of size {len(message) // 1024} KB...\")\n",
"\n",
" start = time.time()\n",
" app_call = requests.post(f\"{SENDER_BASE_ADDRESS}/app_call\", json=json_data)\n",
" end = time.time()\n",
"\n",
" response = app_call.content\n",
" response_len = len(response)\n",
" response = response.decode()\n",
" response_pretty = (\n",
" response if len(response) <= 100 else f\"{response[:50]}...{response[-50:]}\"\n",
" )\n",
"\n",
" total_xfer = request_size_bytes + response_size_bytes\n",
" total_time = round(end - start, 2)\n",
"\n",
" success = \"received_request_body_length\" in response\n",
" logging.debug(f\"[{total_time}s] Response({response_len} B): {response_pretty}\")\n",
" return total_xfer, total_time, success\n",
"\n",
"\n",
"def build_vld_message(request_size_bytes, response_size_bytes):\n",
" \"\"\"\n",
" Build a message of length `request_size_bytes`. Padded with random characters.\n",
"\n",
" Args:\n",
" request_size_bytes (int): Size of the request body in bytes.\n",
" response_size_bytes (int): Expected size of the response body in bytes.\n",
"\n",
" Returns:\n",
" dict: The constructed request body.\n",
" \"\"\"\n",
" endpoint = f\"{RECEIVER_BASE_ADDRESS}/test_veilid_streamer\"\n",
" message = {\n",
" \"method\": \"POST\",\n",
" \"url\": endpoint,\n",
" \"json\": {\n",
" \"expected_response_length\": response_size_bytes,\n",
" \"random_padding\": \"\",\n",
" },\n",
" }\n",
" padding_length = request_size_bytes - len(json.dumps(message))\n",
" random_padding = generate_random_alphabets(padding_length)\n",
" message[\"json\"][\"random_padding\"] = random_padding\n",
" return json.dumps(message)\n",
"\n",
"\n",
"def generate_random_alphabets(length):\n",
" return \"\".join([random.choice(\"abcdefghijklmnopqrstuvwxyz\") for _ in range(length)])\n",
"\n",
"\n",
"def bytes_to_human_readable(size_in_bytes):\n",
" if size_in_bytes >= (2**20):\n",
" size_in_mb = size_in_bytes / (2**20)\n",
" return f\"{size_in_mb:.2f} MB\"\n",
" else:\n",
" size_in_kb = size_in_bytes / (2**10)\n",
" return f\"{size_in_kb:.2f} KB\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Run manual tests"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"MIN_MESSAGE_SIZE = 1024\n",
"MAX_CHUNK_SIZE = 32768\n",
"\n",
"\n",
"def get_random_single_chunk_size():\n",
" return random.randint(MIN_MESSAGE_SIZE, MAX_CHUNK_SIZE)\n",
"\n",
"\n",
"def get_random_multi_chunk_size():\n",
" return random.randint(4 * MAX_CHUNK_SIZE, 8 * MAX_CHUNK_SIZE)\n",
"\n",
"\n",
"def test_for_single_chunk_request_and_single_chunk_response():\n",
" request_size = get_random_single_chunk_size()\n",
" response_size = get_random_single_chunk_size()\n",
" total_xfer, total_time, success = send_test_request(request_size, response_size)\n",
" result = \"Success\" if success else \"Failure\"\n",
" logging.info(\n",
" f\"[{request_size} B ⇅ {response_size} B] \"\n",
" f\"Transferred {bytes_to_human_readable(total_xfer)} \"\n",
" f\"in {total_time}s; \"\n",
" f\"Result: {result}\"\n",
" )\n",
"\n",
"\n",
"def test_for_multi_chunk_request_and_single_chunk_response():\n",
" request_size = get_random_multi_chunk_size()\n",
" response_size = get_random_single_chunk_size()\n",
" total_xfer, total_time, success = send_test_request(request_size, response_size)\n",
" result = \"Success\" if success else \"Failure\"\n",
" logging.info(\n",
" f\"[{request_size} B ⇅ {response_size} B] \"\n",
" f\"Transferred {bytes_to_human_readable(total_xfer)} \"\n",
" f\"in {total_time}s; \"\n",
" f\"Result: {result}\"\n",
" )\n",
"\n",
"\n",
"def test_for_single_chunk_request_and_multi_chunk_response():\n",
" request_size = get_random_single_chunk_size()\n",
" response_size = get_random_multi_chunk_size()\n",
" total_xfer, total_time, success = send_test_request(request_size, response_size)\n",
" result = \"Success\" if success else \"Failure\"\n",
" logging.info(\n",
" f\"[{request_size} B ⇅ {response_size} B] \"\n",
" f\"Transferred {bytes_to_human_readable(total_xfer)} \"\n",
" f\"in {total_time}s; \"\n",
" f\"Result: {result}\"\n",
" )\n",
"\n",
"\n",
"def test_for_multi_chunk_request_and_multi_chunk_response():\n",
" request_size = get_random_multi_chunk_size()\n",
" response_size = get_random_multi_chunk_size()\n",
" total_xfer, total_time, success = send_test_request(request_size, response_size)\n",
" result = \"Success\" if success else \"Failure\"\n",
" logging.info(\n",
" f\"[{request_size} B ⇅ {response_size} B] \"\n",
" f\"Transferred {bytes_to_human_readable(total_xfer)} \"\n",
" f\"in {total_time}s; \"\n",
" f\"Result: {result}\"\n",
" )\n",
"\n",
"\n",
"test_for_single_chunk_request_and_single_chunk_response()\n",
"test_for_multi_chunk_request_and_single_chunk_response()\n",
"test_for_single_chunk_request_and_multi_chunk_response()\n",
"test_for_multi_chunk_request_and_multi_chunk_response()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 6. Run benchmarks on requests-responses of sizes from 1 KB to 512 MB"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"benchmarks = {}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Baseline tests (Tests with single chunk messages i.e. 1 KB to 32 KB)\n",
"for powers_of_two in range(0, 6): # Test from 1 KB to 32 KB\n",
" message_size = 2**powers_of_two * 1024\n",
" total_xfer, total_time = send_test_request(message_size, message_size)\n",
" benchmarks[bytes_to_human_readable(total_xfer)] = total_time\n",
"pprint(benchmarks, sort_dicts=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Tests with smaller messages\n",
"for powers_of_two in range(6, 13): # Test from 64 KB to 4 MB\n",
" message_size = 2**powers_of_two * 1024\n",
" total_xfer, total_time = send_test_request(message_size, message_size)\n",
" benchmarks[bytes_to_human_readable(total_xfer)] = total_time\n",
"pprint(benchmarks, sort_dicts=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Tests with larger messages\n",
"for powers_of_two in range(13, 16): # Test from 8 MB to 32 MB\n",
" message_size = 2**powers_of_two * 1024\n",
" total_xfer, total_time = send_test_request(message_size, message_size)\n",
" benchmarks[bytes_to_human_readable(total_xfer)] = total_time\n",
"pprint(benchmarks, sort_dicts=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Tests with super large messages\n",
"for powers_of_two in range(16, 19): # Test from 64 MB to 256 MB\n",
" message_size = 2**powers_of_two * 1024\n",
" total_xfer, total_time = send_test_request(message_size, message_size)\n",
" benchmarks[bytes_to_human_readable(total_xfer)] = total_time\n",
"pprint(benchmarks, sort_dicts=False)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySyft",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
1 change: 1 addition & 0 deletions packages/grid/veilid/server/constants.py
Expand Up @@ -9,3 +9,4 @@
DHT_KEY_CREDS = "syft-dht-key-creds"

USE_DIRECT_CONNECTION = True
MAX_MESSAGE_SIZE = 32768 # 32KB