In [6]:
import os
import io
import math
import string
import random
import requests
import numpy as np
import pandas as pd
from subprocess import run
import multiprocessing as mp

from copy import deepcopy
from ruamel.yaml import YAML
from tqdm import tqdm_notebook as tqdm
from typing import List, Dict, Any, Tuple


yaml = YAML()
random.seed(42)
IO_BUFFER_SIZE = 4 * 2**20  # 4 MiB

In [223]:
# Helper functions

KiB = 2**10
MiB = 2**20
GiB = 2**30


def download_to_file(url: str, file: str):
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(file, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)


def format_number(num: int) -> str:
    """Format a number with commas."""
    return "{:,}".format(num)


def format_bytes(num_bytes: int) -> str:
    """Format bytes in human-readable form."""
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if num_bytes < 1024:
            return f"{num_bytes:.2f} {unit}"
        num_bytes /= 1024
    return f"{num_bytes:.2f} PB"


In [224]:
# Inputs

# Previous Inputs:
# - cluster_17, 0, 12M rows
# - cluster_18, 0, 20M rows
# - cluster_34, 0, 20M rows
# - cluster_52, 0, 20M rows

# Trace file (zstd compressed)
cluster_name: str = "cluster18"
num_rows_to_keep: int = 20 * 10**6
delete_trace_file: bool = False
force: bool = False


In [None]:
# part = 0
trace_file = "/mydata/twitter_traces/%s" % cluster_name
compressed_trace_file = "%s.sort.zst" % trace_file

init_workload_file = trace_file + "_init.txt"
bench_workload_file = trace_file + "_bench.txt"

# Trace analysis
analyze_trace = False

create_data_dir = True
myycsb_init_yaml_file = "/mydata/My-YCSB/leveldb/config/twitter_%s_init.yaml" % cluster_name
myycsb_bench_yaml_file = "/mydata/My-YCSB/leveldb/config/twitter_%s_bench.yaml" % cluster_name
data_dir = "/mydata/leveldb_twitter_%s_db" % cluster_name


In [225]:
download_tmpl = "https://ftp.pdl.cmu.edu/pub/datasets/twemcacheWorkload/open_source/%s.sort.zst"
if not os.path.exists(compressed_trace_file):
    print("Downloading trace file")
    download_to_file(download_tmpl % cluster_name, compressed_trace_file)

Downloading trace file


In [226]:
# Parse trace file into pandas dataframe
# Line format is comma separated:
# timestamp, key, key size, value size, client id, operation, ttl
columns = ["timestamp", "key", "key_size", "value_size", "client_id", "op_type", "ttl"]
trace = pd.read_csv(compressed_trace_file, names=columns, index_col=False, compression="zstd", nrows=num_rows_to_keep, on_bad_lines="warn")
print("Trace rows: ", len(trace))

Trace rows:  20000000


In [227]:
# Print mean and median value size. Make sure keys first.
# Calculate mean and median value sizes
mean_value_size = trace.groupby("key")["value_size"].mean().mean()
median_value_size = trace.groupby("key")["value_size"].median().median()

print("Mean value size: ", int(mean_value_size))
print("Median value size: ", int(median_value_size))

# Convert key_size to numeric, dropping any non-numeric values
temp_trace = trace.copy()
temp_trace["key_size"] = pd.to_numeric(temp_trace["key_size"], errors="coerce")
# Keep only rows where key_size is not null (was successfully converted to numeric)
temp_trace = temp_trace[temp_trace["key_size"].notna()]


# Calculate mean and median key sizes
mean_key_size = temp_trace.groupby("key")["key_size"].mean().mean()
median_key_size = temp_trace.groupby("key")["key_size"].median().median()

print("Mean key size: ", int(mean_key_size))
print("Median key size: ", int(median_key_size))

key_size = int(mean_key_size)
value_size = int(mean_value_size)

print("Key size: ", key_size)
print("Value size: ", value_size)


Mean value size:  34
Median value size:  28
Mean key size:  15
Median key size:  15
Key size:  15
Value size:  34


In [228]:
# How big is the working set?
num_unique_keys = trace["key"].nunique()
wss = num_unique_keys * (key_size + value_size)
print("Working set size: ", format_bytes(wss))

Working set size:  54.62 MB


In [229]:
# Check if timestamp column is ordered
# Check if timestamps are monotonically increasing
is_ordered = trace["timestamp"].is_monotonic_increasing
print("Timestamps are ordered:", is_ordered)

if not is_ordered:
    raise ValueError("Timestamps are not ordered")

# Keep only first num_rows_to_keep rows
if len(trace) > num_rows_to_keep:
    print(f"Keeping first {num_rows_to_keep:,} rows out of {len(trace):,} total rows")
    trace = trace.head(num_rows_to_keep)
else:
    print(f"Trace has {len(trace):,} rows, no truncation needed")

Timestamps are ordered: True
Trace has 20,000,000 rows, no truncation needed


In [230]:
if analyze_trace:
    # Print a list of value sizes and their counts in decreasing order
    value_size_counts = trace["value_size"].value_counts()
    print("Value size counts:")
    print(value_size_counts)

    key_size_counts = trace["key_size"].value_counts()
    print("Key size counts:")
    print(key_size_counts)

In [231]:
if analyze_trace:
    # How many lines is the trace?
    print("Number of lines in trace: %d" % len(trace))

    # What is the proportion of read and write operations?
    read_ops = len(trace[trace["op_type"] == "get"])
    write_ops = len(trace[trace["op_type"] == "set"])
    print("Read operations: %d" % read_ops)
    print("Write operations: %d" % write_ops)
    print("Read proportion: %.2f%%" % (100 * read_ops / (read_ops + write_ops)))


In [232]:
# Prefix pad keys with zeros to make them the same length.
# Alternatively, trim keys to a fixed length.
def pad_key(key: str, target_size: int) -> str:
    # If key is longer than target size, truncate it
    if len(key) > target_size:
        return key[:target_size]
    # If key is shorter than target size, pad with zeros
    return key.zfill(target_size)


# Pad trace keys to a fixed size
trace["key"] = trace["key"].apply(lambda x: pad_key(x, key_size))
num_unique_keys = trace["key"].nunique()

def generate_init_workload(trace: pd.DataFrame, filename: str) -> None:
    # Construct the init workload file
    # Each line is a key
    # All keys and values are the same size
    trace_keys = list(trace["key"].unique())

    # Permuate the keys
    random.shuffle(trace_keys)

    # Write keys to file
    print("Writing keys to file")
    with open(filename, "w", buffering=IO_BUFFER_SIZE) as f:
        for i, key in enumerate(trace_keys):
            if i > 0:
                f.write("\n")
            f.write(key)

if not os.path.exists(init_workload_file) or force:
    print("Generating init workload file")
    generate_init_workload(trace, init_workload_file)
else:
    print("Init workload file already exists")


Writing keys to file


In [233]:
# Now generate the bench workload file
# Each line is an operation followed by a key (space separated)
# Operations are get or insert

def generate_bench_workload(trace: pd.DataFrame, filename: str) -> None:
    # Construct the bench workload file
    # Each line is an operation followed by a key (space separated)
    print("Generating bench workload")

    with open(filename, "w", buffering=IO_BUFFER_SIZE) as f:
        # Iterate through trace rows with progress bar
        for i, (_, row) in enumerate(tqdm(trace.iterrows(), total=len(trace), desc="Writing operations")):
            key = row["key"]

            # Add newline before all lines except the first
            if i > 0:
                f.write("\n")

            # Write operation and key
            if row["op_type"] in ["get", "gets"]:
                f.write("get " + key)
            elif row["op_type"] == "cas":
                f.write("update " + key)
            else:
                f.write("insert " + key)

    print("Done generating bench workload")


if not os.path.exists(bench_workload_file) or force:
    print("Generating bench workload file")
    generate_bench_workload(trace, bench_workload_file)
else:
    print("Bench workload file already exists")

Generating bench workload


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for _, row in tqdm(trace.iterrows(), total=len(trace), desc="Writing operations"):


Writing operations:   0%|          | 0/20000000 [00:00<?, ?it/s]

Done generating bench workload


In [234]:
# Write a summary of the trace that might be useful later
summary_file = trace_file + "_summary.txt"
summary = {
    "num_rows": len(trace),
    "key_size": key_size,
    "value_size": value_size,
    "num_unique_keys": num_unique_keys,
    "working_set_size": wss
}

if not os.path.exists(summary_file) or force:
    print("Writing summary file")
    with open(summary_file, "w") as f:
        yaml.dump(summary, f)
else:
    print("Summary file already exists")

In [235]:
libcachesim_dir = "/mydata/libCacheSim"
trace_analyzer = os.path.join(libcachesim_dir, "_build/bin/traceAnalyzer")

if not os.path.exists(trace_analyzer):
    print("libCacheSim traceAnalyzer not found. Skipping trace analysis.")
elif analyze_trace:
    cmd = [trace_analyzer, trace_file, "csv", "--common", "-t", "time-col=1, obj-id-col=2, obj-size-col=4, delimiter=,, has-header=false"]
    print("Running trace analysis")
    run(cmd, check=True)

In [236]:
def create_myycsb_yamls(init_filename: str, bench_filename: str, data_dir: str,
                        init_workload_file: str, bench_workload_file: str,
                        key_size: int, value_size: int, nr_entry: int) -> None:
    # Create a YAML config file for My-YCSB initialization
    config_init = {
        "database": {
            "key_size": key_size + 1,
            "value_size": value_size + 1,
            "nr_entry": nr_entry
        },
        "workload": {
            "nr_warmup_op": 10000000,
            "warmup_runtime_seconds": 240,
            "runtime_seconds": 240,
            "nr_op": 10000000,
            "nr_thread": 8,
            "next_op_interval_ns": 0,
            "operation_proportion": {
                "read": 0.5,
                "update": 0.5,
                "insert": 0,
                "scan": 0,
                "read_modify_write": 0
            },
            "request_distribution": "trace",
            "zipfian_constant": 0.99,
            "trace_file": init_workload_file,
            "trace_type": "twitter_init",
            "scan_length": 100
        },
        "leveldb": {
            "data_dir": data_dir,
            "options_file": "/mydata/My-YCSB/rocksdb/config/rocksdb_rubble_16gb_config.ini",
            "cache_size": 100000000,
            "print_stats": True
        }
    }

    with open(init_filename, "w") as f:
        yaml.dump(config_init, f)

    config_bench = deepcopy(config_init)
    config_bench["workload"]["trace_file"] = bench_workload_file
    config_bench["workload"]["trace_type"] = "twitter_bench"

    with open(bench_filename, "w") as f:
        yaml.dump(config_bench, f)


if create_data_dir:
    if not os.path.exists(data_dir) or force:
        if os.path.exists(data_dir):
            print("Data directory already exists. Deleting.")
            run(["rm", "-rf", data_dir], check=True)
        print("Creating data directory")
        os.makedirs(data_dir)
        print("Creating My-YCSB init YAML file")
        create_myycsb_yamls(myycsb_init_yaml_file, myycsb_bench_yaml_file, data_dir,
                            init_workload_file, bench_workload_file,
                            key_size, value_size, num_unique_keys)
        cmd = ["/mydata/My-YCSB/build/init_leveldb", myycsb_init_yaml_file]
        print("Running My-YCSB init")
        run(cmd, check=True)
    else:
        print("Data directory already exists. Skipping creation.")

Data directory created
Creating My-YCSB init YAML file
Running My-YCSB init


LevelDBFactory: data_dir: /mydata/leveldb_twitter_cluster18_db, print_stats: 1
InitTraceWorkload: trace_path=/mydata/twitter_traces/cluster18_init.txt, trace_type=twitter_init
Number of lines in trace file: 1168683
Key size: 16, Value size: 35


Initialization (trace) (epoch 0, progress 0.00%): UPDATE throughput 0.00 ops/sec, INSERT throughput 0.00 ops/sec, READ throughput 0.00 ops/sec, SCAN throughput 0.00 ops/sec, READ_MODIFY_WRITE throughput 0.00 ops/sec, total throughput 0.00 ops/sec
Initialization (trace) (epoch 1, progress 8.19%): UPDATE throughput 0.00 ops/sec, INSERT throughput 95737.14 ops/sec, READ throughput 0.00 ops/sec, SCAN throughput 0.00 ops/sec, READ_MODIFY_WRITE throughput 0.00 ops/sec, total throughput 95737.14 ops/sec
Initialization (trace) (epoch 2, progress 16.43%): UPDATE throughput 0.00 ops/sec, INSERT throughput 96221.03 ops/sec, READ throughput 0.00 ops/sec, SCAN throughput 0.00 ops/sec, READ_MODIFY_WRITE throughput 0.00 ops/sec, total throughput 96221.03 ops/sec
Initialization (trace) (epoch 3, progress 24.65%): UPDATE throughput 0.00 ops/sec, INSERT throughput 96115.37 ops/sec, READ throughput 0.00 ops/sec, SCAN throughput 0.00 ops/sec, READ_MODIFY_WRITE throughput 0.00 ops/sec, total throughput 961

In [None]:
if delete_trace_file:
    print("Deleting trace file")
    os.remove(compressed_trace_file)
    print("Deleted trace file")
else:
    print("Trace file not deleted")