# data download, index build

In [None]:
from concurrent.futures import ThreadPoolExecutor
import random
import os
import urllib.request
proxy_handler = urllib.request.ProxyHandler({})  # empty dictionary means no proxy
opener = urllib.request.build_opener(proxy_handler)
urllib.request.install_opener(opener)
executor = ThreadPoolExecutor(max_workers=100)
for sample in range(107):
    # os.system('wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/laion/{:04d}.parquet'.format(sample))
    executor.submit(urllib.request.urlretrieve, 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/laion/{:04d}.parquet'.format(sample), '/public/xinyu/laion_100m/{:04d}.parquet'.format(sample))
    # urllib.request.urlretrieve('https://datasets-documentation.s3.eu-west-3.amazonaws.com/laion/{:04d}.parquet'.format(sample), 'laion_100m/{:04d}.parquet'.format(sample))
executor.shutdown(wait=True)
from autofaiss import build_index
import numpy as np
import time
begin = time.time()
index, index_infos = build_index('/public/xinyu/laion_100m/', save_on_disk=True, index_path='autofaiss_100m.index',
                                 index_infos_path="autofaiss_100m.json", file_format='parquet', 
                                 embedding_column_name='image_embedding')
print(time.time() - begin)

# Experiment

In [2]:
import pyarrow.parquet as pq
import pyarrow.orc as po
import pyarrow as pa
import sys
import os
import datetime
import pathlib
import pandas as pd
import time

dir_path = pathlib.Path(os.path.abspath('')).resolve()
print(dir_path)
HOME_DIR = str(dir_path).split('/OpenFormat')[0]

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

PROJ_SRC_DIR = f'{HOME_DIR}/OpenFormat'
sys.path.insert(1, f'{PROJ_SRC_DIR}')
from python.scripts.utils import *

from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
executor = ThreadPoolExecutor(max_workers=8)

/mnt/OpenFormat/vector_data


In [2]:
def create_row_id_map(directory):
    files = sorted(os.listdir(directory))
    row_id_map = {}
    total_rows = 0

    for file in files:
        if file.endswith('.parquet'):
            file_path = os.path.join(directory, file)
            num_rows = pq.read_metadata(file_path).num_rows
            row_id_map[file_path] = (total_rows, total_rows + num_rows - 1)
            total_rows += num_rows
        elif file.endswith('.orc'):
            file_path = os.path.join(directory, file)
            num_rows = po.ORCFile(file_path).nrows
            row_id_map[file_path] = (total_rows, total_rows + num_rows - 1)
            total_rows += num_rows

    return row_id_map

# return file and local row id
def find_file(row_id, row_id_map):
    for file_path, (start, end) in row_id_map.items():
        if start <= row_id <= end:
            return file_path, row_id - start
    return None

# Usage
row_id_map = {}
row_id_map['parquet'] = create_row_id_map('/s3-mnt/laion_100m')
row_id_map['orc'] = create_row_id_map('/s3-mnt/laion_100m_orc')
# row_id_map['parquet'] = create_row_id_map('./laion_100m')
# row_id_map['orc'] = create_row_id_map('./laion_100m_orc')

# S3 breakdown

In [3]:
from faiss import read_index
import pyarrow.dataset as dataset
import pyarrow as pa
import numpy as np
import time
import json

SCALE = 100 # Million
K = 10
QUERY_CNT = 3

# scan_exec_pq = f'{HOME_DIR}/arrow-private/cpp/out/build/openformat-release/release/selection_scan'
scan_exec_pq =  '/mnt/arrow-private/cpp/out/build/openformat-release/release/selection_scan_multi_files'
scan_exec_orc =  '/mnt/orc/build/c++/test/SelectionScanMultiFiles'

class LogReader:
    def __init__(self, filename, string):
        self.filename = filename
        self.string = string
        self.position = 0

    def count_new_lines_with_string(self):
        count = 0
        with open(self.filename, 'r') as f:
            f.seek(self.position)
            for line in f:
                if self.string in line:
                    count += 1
            self.position = f.tell()
        return count

# Use the class
log_reader = LogReader('/root/s3_log/mountpoint-s3-2023-07-31T02-55-45Z.log', 'mountpoint_s3_client::s3_crt_client::get_object: new request')
print(log_reader.count_new_lines_with_string())

ds = dataset.dataset('0139.parquet', format='parquet')
df = ds.to_table(use_threads=True, columns=['image_embedding']).to_pandas()
# df = pq.read_table('0139.parquet', columns=['image_embedding'], use_threads=True, ).to_pandas()
queries = np.vstack([np.array(x[0]) for x in df.values])  # Convert to 2D array
index = read_index(f'autofaiss_{SCALE}m.index')
os.system('rm -f outputs/stats.json')
output_stats = {}
# for batch_size in [512, 1024, 2048]:
fmts = ['orc', 'parquet']
# fmts = ['orc']
for batch_size in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
# for batch_size in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096]:
# for batch_size in [1, 2, 4, 8, 16, 32, 64]:
    output_stats['batch_size'] = batch_size
    for i in range(1):
        for j in range(1):
            batch_query = queries[j*batch_size:(j+1)*batch_size]
            begin = time.time()
            _, I = index.search(batch_query, K)
            output_stats['vector_search_time'] = time.time() - begin
            #output_stats['vector_search_result'] = np.array2string(I, separator=',')
            output_stats['i'] = i
            output_stats['query_id'] = j
            for fmt in fmts:
                # iterate each value in I, which is a 2-D numpy array
                file_to_ids = {}
                for _, value in np.ndenumerate(I):
                    file_path, row_id = find_file(value, row_id_map[fmt])
                    if file_path not in file_to_ids.keys():
                        file_to_ids[file_path] = set()
                    if fmt == 'orc':
                        file_to_ids[file_path].add(row_id+row_id_map[fmt][file_path][0])
                    else:
                        file_to_ids[file_path].add(row_id)
                for file_path, row_ids in file_to_ids.copy().items():
                    file_to_ids[file_path] = ','.join([str(r) for r in sorted(list(row_ids))])
                total_time = 0
                total_bytes = 0
                os.system('sync; echo 3 > /proc/sys/vm/drop_caches')
                futures = []
                begin = time.time()
                serialized = json.dumps(file_to_ids)
                with open('file_to_ids.json', 'w') as f:
                    f.write(serialized)
            #     for file_path, row_ids in file_to_ids.items():
            #         futures.append(executor.submit(lambda: os.popen(f'''{scan_exec_pq} \
            # --columns=1,2 --rows={file_to_ids[file_path]} {file_path}''')))
            #         output_lines = os.popen(f'''{scan_exec_pq} \
            # # --columns=1,2 --rows={','.join([str(r) for r in sorted(list(row_ids))])} {file_path}''').read().split('\n')
                    # total_time += float(output_lines[1].split(' ')[-2])
                    # total_bytes += int(output_lines[3].split(' ')[-1])
                # concurrent.futures.wait(futures)
                if fmt == 'orc':
                    output_lines = os.popen(f'''{scan_exec_orc} file_to_ids.json''').read().split('\n') 
                    output_stats['time (s)_orc'] = float(output_lines[0].split(' ')[-2])
                    output_stats['s3_gets_orc'] = log_reader.count_new_lines_with_string()
                else:
                    output_lines = os.popen(f'''{scan_exec_pq} file_to_ids.json''').read().split('\n') 
                    output_stats['time (s)'] = float(output_lines[0].split(' ')[-2])
                    output_stats['s3_gets'] = log_reader.count_new_lines_with_string()
            # output_stats['time_py (s)'] = time.time() - begin
            # output_stats['bytes'] = total_bytes
            # output_stats['fmt'] = fmt
            parse_output(output_stats)
collect_results()
os.system('mv outputs/stats.csv outputs/{}_{}.csv'.format(f'batch_vector_search_{SCALE}m_top{K}', timestamp))

90345


# Massive exps

In [3]:
from faiss import read_index
import pyarrow.dataset as dataset
import pyarrow as pa
import numpy as np
import time
import json

SCALE = 100 # Million
K = 10
QUERY_CNT = 3

# scan_exec_pq = f'{HOME_DIR}/arrow-private/cpp/out/build/openformat-release/release/selection_scan'
scan_exec_pq =  '/mnt/arrow-private/cpp/out/build/openformat-release/release/selection_scan_multi_files'
scan_exec_orc =  '/mnt/orc/build/c++/test/SelectionScanMultiFiles'

ds = dataset.dataset('0139.parquet', format='parquet')
df = ds.to_table(use_threads=True, columns=['image_embedding']).to_pandas()
# df = pq.read_table('0139.parquet', columns=['image_embedding'], use_threads=True, ).to_pandas()
queries = np.vstack([np.array(x[0]) for x in df.values])  # Convert to 2D array
index = read_index(f'autofaiss_{SCALE}m.index')
os.system('rm -f outputs/stats.json')
output_stats = {}
# for batch_size in [512, 1024, 2048]:
# fmts = ['parquet']
fmts = ['orc', 'parquet']
# for batch_size in [64, 128, 256, 512, 1024]:
# for batch_size in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096]:
for batch_size in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
    output_stats['batch_size'] = batch_size
    for i in range(1):
        for j in range(QUERY_CNT):
            batch_query = queries[j*batch_size:(j+1)*batch_size]
            begin = time.time()
            _, I = index.search(batch_query, K)
            output_stats['vector_search_time'] = time.time() - begin
            #output_stats['vector_search_result'] = np.array2string(I, separator=',')
            output_stats['i'] = i
            output_stats['query_id'] = j
            for fmt in fmts:
                # iterate each value in I, which is a 2-D numpy array
                file_to_ids = {}
                for _, value in np.ndenumerate(I):
                    file_path, row_id = find_file(value, row_id_map[fmt])
                    if file_path not in file_to_ids.keys():
                        file_to_ids[file_path] = set()
                    if fmt == 'orc':
                        file_to_ids[file_path].add(row_id+row_id_map[fmt][file_path][0])
                    else:
                        file_to_ids[file_path].add(row_id)
                for file_path, row_ids in file_to_ids.copy().items():
                    file_to_ids[file_path] = ','.join([str(r) for r in sorted(list(row_ids))])
                total_time = 0
                total_bytes = 0
                os.system('sync; echo 3 > /proc/sys/vm/drop_caches')
                futures = []
                begin = time.time()
                serialized = json.dumps(file_to_ids)
                with open('file_to_ids.json', 'w') as f:
                    f.write(serialized)
            #     for file_path, row_ids in file_to_ids.items():
            #         futures.append(executor.submit(lambda: os.popen(f'''{scan_exec_pq} \
            # --columns=1,2 --rows={file_to_ids[file_path]} {file_path}''')))
            #         output_lines = os.popen(f'''{scan_exec_pq} \
            # # --columns=1,2 --rows={','.join([str(r) for r in sorted(list(row_ids))])} {file_path}''').read().split('\n')
                    # total_time += float(output_lines[1].split(' ')[-2])
                    # total_bytes += int(output_lines[3].split(' ')[-1])
                # concurrent.futures.wait(futures)
                if fmt == 'orc':
                    output_lines = os.popen(f'''{scan_exec_orc} file_to_ids.json''').read().split('\n') 
                    output_stats['time (s)_orc'] = float(output_lines[0].split(' ')[-2])
                else:
                    output_lines = os.popen(f'''{scan_exec_pq} file_to_ids.json''').read().split('\n') 
                    output_stats['time (s)'] = float(output_lines[0].split(' ')[-2])
            # output_stats['time_py (s)'] = time.time() - begin
            # output_stats['bytes'] = total_bytes
            # output_stats['fmt'] = fmt
            parse_output(output_stats)
collect_results()
os.system('mv outputs/stats.csv outputs/{}_{}.csv'.format(f'batch_vector_search_{SCALE}m_top{K}', timestamp))