In [None]:
input_dataset = "data/csv/osm21_sports.csv"
insert_batch_size = 10_000
query_count = 1000

dataverse = "LocationDb"
table_name = "Locations_RTree_Constant"

dbHost = "http://localhost:19002"
benchmark_plan_path = f"commands/benchmark_plan_{dataverse}_{table_name}_{insert_batch_size}_{query_count}.sql"

# None means no limit
debug_max_batches = None

## Generate Commands

In [27]:
import random

data =[]
with open(input_dataset, "r") as f:
    f.readline()
    for line in f:
        row = line.strip().split(",")
        id = row[0]
        locationX = row[1]
        locationY = row[2]
        description = row[3]
        data.append([id, locationX, locationY, description])

random.shuffle(data)

In [None]:
import random
import requests
import os
from utils import point, rectangle, create_offset_rectangle

def generate_insert_command(batch):
    query_lines = []
    query_lines.append(f"INSERT INTO {table_name} ([")
    for row in batch:
        id = row[0]
        location_x = row[1]
        location_y = row[2]
        description = row[3]
        is_last = (row == batch[-1])
        query_lines.append(f'{{"id": {id}, "location": {point(location_x, location_y)}, "description": "{description}"}}{ "" if is_last else "," }')
    query_lines.append("])")
    
    return "".join(query_lines)

def generate_query_commands(inserted_points):
    queries = []
    selected_points = random.sample(inserted_points, query_count)
    
    for selected_point in selected_points:
        start_x, start_y, end_x, end_y = create_offset_rectangle(selected_point[0], selected_point[1])
        
        query = f"SELECT * FROM {table_name} WHERE spatial_intersect(location, {rectangle(start_x, start_y, end_x, end_y)});"
        queries.append(query)
    
    return queries


In [29]:
inserted_points = []
current_batch = 0
commands = []
while True:
    if debug_max_batches is not None and current_batch >= debug_max_batches:
        break
    batch_to_insert = data[current_batch * insert_batch_size:(current_batch + 1) * insert_batch_size]
    if not batch_to_insert: # no more data to insert
        break
    
    insert_command = generate_insert_command(batch_to_insert)
    inserted_points.extend(map(lambda x: (x[1], x[2]), batch_to_insert))
    
    query_commands = generate_query_commands(inserted_points)
    commands.append(insert_command)
    commands.extend(query_commands)
    
    current_batch += 1
    
# write the commands to a file
benchmark_dir = os.path.dirname(benchmark_plan_path)
if not os.path.exists(benchmark_dir):
    os.makedirs(benchmark_dir)
with open(benchmark_plan_path, "w") as f:
    for command in commands:
        f.write(command + "\n")
    

## Generate Database Schema

In [56]:
create_database_script = """
DROP DATAVERSE LocationDb IF EXISTS;
CREATE DATAVERSE LocationDb;
USE LocationDb;

CREATE TYPE LocationType AS {
    id: bigint,
    location: point,
    description: string
};

CREATE DATASET Locations_RTree_Constant(LocationType) PRIMARY KEY id;
CREATE INDEX Location_RTree_Constant_Index on Locations_RTree_Constant(location) TYPE rtree ENFORCED;

CREATE DATASET Locations_RTree_Concurrent(LocationType) PRIMARY KEY id;
CREATE INDEX Locations_RTree_Concurrent_Index on Locations_RTree_Concurrent(location) TYPE rtree ENFORCED;

CREATE DATASET Locations_BTree_Constant(LocationType) PRIMARY KEY id;
CREATE INDEX Locations_BTree_Constant_Index on Locations_BTree_Constant(location) TYPE rtree ENFORCED;

CREATE DATASET Locations_BTree_Concurrent(LocationType) PRIMARY KEY id;
CREATE INDEX Locations_BTree_Concurrent_Index on Locations_BTree_Concurrent(location) TYPE rtree ENFORCED;
"""
response = requests.post(f"{dbHost}/query/service", data={'statement': create_database_script})
print(response.text)

{
	"requestID": "9cc8401a-df72-4cfa-9353-d45d2f6e2f41",
	"plans":{},
	"status": "success",
	"metrics": {
		"elapsedTime": "980.334458ms",
		"executionTime": "838.241ms",
		"compileTime": "0ns",
		"queueWaitTime": "0ns",
		"resultCount": 0,
		"resultSize": 0,
		"processedObjects": 0,
		"bufferCacheHitRatio": "0.00%",
		"bufferCachePageReadCount": 0
	}
}



## Execute Commands

In [44]:
from utils import Logger
def execute_command(command, client_context_id=None):
    return requests.post(f"{dbHost}/query/service", data={
        'statement': command,
        'dataverse': dataverse,
        'client_context_id': client_context_id,
    })


log_file_parts = [dataverse, table_name, insert_batch_size, query_count]
logger = Logger(log_file_parts, console_log=False)

with open(benchmark_plan_path, "r") as f:
    index = 0
    for line in f:
        index += 1
        command = line.strip()
        if not command:
            continue
        
        command_type = "insert" if command.startswith("INSERT") else "query"
        event = f"command.{command_type}"
        trace_id = str(index).zfill(10)
        logger.log({"trace-id": trace_id,"event": event, "status": "start"})
        response = execute_command(command, client_context_id=trace_id)
        json_response = response.json()
        if response.status_code != 200:
            logger.error({
                "trace_id": json_response.get("clientContextID"),
                "event": event,
                "status": json_response.get("status"),
                "http.status": response.status_code,
                "metrics": json_response.get("metrics"),
                "error": json_response.get("errors"),
            })
        else:
            logger.log({
                "trace_id": json_response.get("clientContextID"),
                "event": event,
                "status": json_response.get("status"),
                "http.status": response.status_code,
                "result_count": len(json_response.get("results", [])) if json_response.get("results") else None,
                "metrics": json_response.get("metrics"),
            })
                        


KeyboardInterrupt: 