In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text as sql_text, inspect
import json
import os
import pprint
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

# Ensure 'perf_data' directory exists
os.makedirs('perf_data', exist_ok=True)

# Connect to the database
db_eng = create_engine('postgresql+psycopg2://postgres:postgres@localhost:5432/airbnb',
                       connect_args={'options': '-csearch_path={}'.format('new_york_city')},
                       pool_size=10,  # Use connection pooling
                       max_overflow=20,
                       isolation_level='SERIALIZABLE')


In [None]:
# Modify the reviews table within the same connection block
with db_eng.connect() as conn:
    alter_table_query = """
    ALTER TABLE reviews
    ADD COLUMN IF NOT EXISTS comments_tsv tsvector;
    """
    update_table_query = """
    UPDATE reviews
    SET comments_tsv = to_tsvector(comments);
    """
    create_index_query = """
    CREATE INDEX IF NOT EXISTS comments_tsv_in_reviews
    ON reviews USING GIN (comments_tsv);
    """
    conn.execute(sql_text(alter_table_query))
    conn.execute(sql_text(update_table_query))
    conn.execute(sql_text(create_index_query))
    conn.commit()

    # Check if the column 'comments_tsv' exists
    inspector = inspect(conn)
    columns = [col['name'] for col in inspector.get_columns('reviews')]
    if 'comments_tsv' in columns:
        print("'comments_tsv' column successfully added.")
    else:
        print("'comments_tsv' column was not added.")
        exit()


In [None]:
# Function to build ts query for comments_tsv
def build_ts_query(word, date_start, date_end):
    query = f"""
    SELECT count(*)
    FROM reviews r
    WHERE comments_tsv @@ to_tsquery('{word}')
      AND datetime >= '{date_start}'
      AND datetime <= '{date_end}'
    """
    return query

# Function to build text search query for comments
def build_text_search_query(word, date_start, date_end):
    query = f"""
    SELECT count(*)
    FROM reviews r
    WHERE comments ILIKE '%%{word}%%'
      AND datetime >= '{date_start}'
      AND datetime <= '{date_end}'
    """
    return query


In [None]:
# Function to add or drop index and fetch current indexes on the table
def add_drop_index(engine, action, column, table):
    index_name = f"idx_{column}_in_{table}"
    if column == 'comments_tsv':
        index_type = 'GIN'
    else:
        index_type = 'BTREE'

    if action == 'add':
        query = sql_text(f"CREATE INDEX {index_name} ON {table} USING {index_type}({column});")
    elif action == 'drop':
        query = sql_text(f"DROP INDEX IF EXISTS {index_name};")
    with engine.connect() as conn:
        conn.execute(query)


In [None]:
# Function to calculate time difference
def time_diff(start_time, end_time):
    return (end_time - start_time).total_seconds()

# Function to execute a query and measure time
def run_query(query, conn):
    start_time = datetime.now()
    conn.execute(sql_text(query))
    end_time = datetime.now()
    return time_diff(start_time, end_time)

# Function to compute performance metrics
def compute_metrics(times):
    return {
        "avg": round(np.mean(times), 4),
        "min": round(np.min(times), 4),
        "max": round(np.max(times), 4),
        "std": round(np.std(times), 4),
        "exec_count": len(times),
        "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

# Fetch performance data
def fetch_perf_data(filename):
    try:
        with open(filename) as f:
            if os.stat(filename).st_size == 0:
                return {}
            return json.load(f)
    except FileNotFoundError:
        return {}

# Write performance data
def write_perf_data(data, filename):
    with open(filename, 'w') as fp:
        json.dump(data, fp, indent=4)


In [None]:
# Create queries for each year from 2009 to 2024
q_dict_ts = {}
q_dict_text = {}
years = [2009, 2010, 2011, 2012, 2013, 2014, 2017, 2019, 2023]
words = ['apartment', 'awesome', 'horrible']

for word in words:
    for yr in years:
        q_name_ts = f'{word}_{yr}'
        q_name_text = f'{word}_{yr}'
        date_start = f'{yr}-01-01 00:00:00'
        date_end = f'{yr}-12-31 23:59:59'
        q_dict_ts[q_name_ts] = build_ts_query(word, date_start, date_end)
        q_dict_text[q_name_text] = build_text_search_query(word, date_start, date_end)

pprint.pp(q_dict_ts)
pprint.pp(q_dict_text)


In [None]:
perf_summary_path = 'perf_data/text_search_query.json'
if not os.path.exists(perf_summary_path):
    with open(perf_summary_path, 'w') as f:
        json.dump({}, f)

perf_summary = fetch_perf_data(perf_summary_path)

# List of all indexes to test
all_indexes = [['datetime', 'reviews'], ['comments_tsv', 'reviews']]

# Different combinations of indexes to test
specs = [
    [],
    [['comments_tsv', 'reviews']],
    [['datetime', 'reviews']],
    [['datetime', 'reviews'], ['comments_tsv', 'reviews']],
]

def process_query(query_name, query):
    time_list = []
    with db_eng.connect() as conn:
        for _ in range(1):  # Run each query 10 times
            time_list.append(run_query(query, conn))
    perf_profile = compute_metrics(time_list)
    return query_name, perf_profile

def run_queries(query_dict):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:  # Use ThreadPoolExecutor for parallel execution
        futures = []
        for query_name, query in query_dict.items():
            for spec in specs:
                # Drop all indexes first
                for index in all_indexes:
                    add_drop_index(db_eng, 'drop', index[0], index[1])

                # Add necessary indexes
                for index in spec:
                    add_drop_index(db_eng, 'add', index[0], index[1])

                # Build spec key for JSON output
                spec_key = '__' + '__'.join([f"{index[0]}_in_{index[1]}" for index in spec]) + '__'

                futures.append(executor.submit(process_query, query_name + '__' + spec_key, query))
        for future in futures:
            results.append(future.result())
    return results

# Run queries for each word sequentially
for word in ['apartment', 'awesome', 'horrible']:
    print(f"Running experiments for word: {word}")
    word_queries_ts = {k: v for k, v in q_dict_ts.items() if k.startswith(word)}
    word_queries_text = {k: v for k, v in q_dict_text.items() if k.startswith(word)}

    # Run full-text search queries with different index configurations
    results_ts = run_queries(word_queries_ts)
    # Run text search queries with different index configurations
    results_text = run_queries(word_queries_text)

    # Update performance summary
    for query_name, perf_profile in results_ts + results_text:
        base_name, spec_key = query_name.split('__', 1)
        if base_name not in perf_summary:
            perf_summary[base_name] = {}
        perf_summary[base_name][spec_key] = perf_profile

    write_perf_data(perf_summary, perf_summary_path)

print("JSON files created successfully.")