In [37]:
import re
import json
import time
import signal

from subprocess import Popen
from yanniszark_common.cmdutils import run, check_output

In [34]:
# DB = "kv_store"
# PRIMARY_MASTER = "172.18.0.2"
# SECONDARY_MASTER = "172.18.0.3"
# PRIMARY_CONTAINER = "yb-primary"
# SECONDARY_CONTAINERS = ["yb-secondary-1", "yb-secondary-2"]


In [33]:
def docker_check_output(container, cmd):
    return check_output(["docker", "exec", container, "bash", "-c", cmd])

def get_safe_time_info():
    """Get cross-cluster safe time info from the secondary cluster.

    Returns:
        dict: Dictionary containing:
            - namespace_id (str): ID of the namespace (e.g. '000034cb000030008000000000000000')
            - namespace_name (str): Name of the namespace (e.g. 'yugabyte')
            - safe_time (str): Safe time in human readable format (e.g. '2025-06-23 21:24:52.764103')
            - safe_time_epoch (str): Safe time in epoch microseconds (e.g. '1750713892764103')
            - safe_time_lag_sec (str): Lag in seconds between clusters (e.g. '0.64')
            - safe_time_skew_sec (str): Clock skew in seconds between clusters (e.g. '0.30')
    """
    safe_time_info = docker_check_output(
        SECONDARY_CONTAINERS[0],
        f"./bin/yb-admin --master_addresses {SECONDARY_MASTER} get_xcluster_safe_time include_lag_and_skew"
    )
    safe_time_info = json.loads(safe_time_info.strip())

    if len(safe_time_info) != 1:
        raise ValueError(f"Expected exactly 1 safe time info entry, got {len(safe_time_info)}")

    return safe_time_info[0]



In [None]:
# Create table on primary
docker_check_output(
    PRIMARY_CONTAINER,
    f"""ysqlsh -h {PRIMARY_MASTER} -c "
        CREATE TABLE {DB} (
            k TEXT,
            v TEXT,
            PRIMARY KEY (k ASC)
        ) SPLIT AT VALUES (('d'), ('m'), ('t'));"
    """
)

# Create table on secondary
docker_check_output(
    SECONDARY_CONTAINERS[0],
    f"""ysqlsh -h {SECONDARY_MASTER} -c "
        CREATE TABLE {DB} (
            k TEXT,
            v TEXT,
            PRIMARY KEY (k ASC)
        ) SPLIT AT VALUES (('d'), ('m'), ('t'));"
    """
)



# Create snapshot schedule on secondary
docker_check_output(
    SECONDARY_CONTAINERS[0],
    f"./bin/yb-admin --master_addresses {SECONDARY_MASTER} create_snapshot_schedule 1 10 ysql.yugabyte"
)

# Get table ID
# Example output:
# yugabyte.kv_store [ysql_schema=public] [000034cb000030008000000000004000]
table_id_output = docker_check_output(
    PRIMARY_CONTAINER,
    f"./bin/yb-admin --master_addresses {PRIMARY_MASTER} list_tables | grep {DB}"
)
table_id = table_id_output.strip()
# Extract the table ID from the output using regex
table_id = re.search(r"\[([0-9a-f]+)\]$", table_id_output).group(1)

print(f"Table ID: '{table_id}'")

# Get bootstrap ID
# Example output:
# table id: 000034cb000030008000000000004000, CDC bootstrap id: de0b5affa1d0a6acbd42ab48a22e653f
bootstrap_output = docker_check_output(
    PRIMARY_CONTAINER,
    f"./bin/yb-admin --master_addresses {PRIMARY_MASTER} bootstrap_cdc_producer {table_id}"
)
# Extract the bootstrap ID from the output using regex
bootstrap_id = re.search(r"CDC bootstrap id: ([0-9a-f]+)", bootstrap_output).group(1)

print(f"Bootstrap ID: '{bootstrap_id}'")

# Set up universe replication on the first secondary
docker_check_output(
    SECONDARY_CONTAINERS[0],
    f"./bin/yb-admin --master_addresses {SECONDARY_MASTER} setup_universe_replication kv_store_replication {PRIMARY_MASTER} {table_id} {bootstrap_id} transactional"
)

# Set the secondary cluster to STANDBY mode
docker_check_output(
    SECONDARY_CONTAINERS[0],
    f"./bin/yb-admin --master_addresses {SECONDARY_MASTER} change_xcluster_role STANDBY"
)

safe_time_info = get_safe_time_info()
print(safe_time_info)


In [None]:
class YugabyteDB:

    PRIMARY_MASTER = "172.18.0.2"
    SECONDARY_MASTER = "172.18.0.3"
    PRIMARY_CONTAINER = "yb-primary"
    SECONDARY_CONTAINERS = ["yb-secondary-1", "yb-secondary-2"]

    def __init__(self):
        self.__compose_process = None

    def setup(self):
        self.__start_docker_compose()
        # Create table on primary
        docker_check_output(
            self.PRIMARY_CONTAINER,
            f"""ysqlsh -h {self.PRIMARY_MASTER} -c "
                CREATE TABLE {self.DB} (
                    k TEXT,
                    v TEXT,
                    PRIMARY KEY (k ASC)
                ) SPLIT AT VALUES (('d'), ('m'), ('t'));"
            """
        )

        # Create table on secondary
        docker_check_output(
            self.SECONDARY_CONTAINERS[0],
            f"""ysqlsh -h {self.SECONDARY_MASTER} -c "
                CREATE TABLE {self.DB} (
                    k TEXT,
                    v TEXT,
                    PRIMARY KEY (k ASC)
                ) SPLIT AT VALUES (('d'), ('m'), ('t'));"
            """
        )



        # Create snapshot schedule on secondary
        docker_check_output(
            self.SECONDARY_CONTAINERS[0],
            f"./bin/yb-admin --master_addresses {self.SECONDARY_MASTER} create_snapshot_schedule 1 10 ysql.yugabyte"
        )

        # Get table ID
        # Example output:
        # yugabyte.kv_store [ysql_schema=public] [000034cb000030008000000000004000]
        table_id_output = docker_check_output(
            self.PRIMARY_CONTAINER,
            f"./bin/yb-admin --master_addresses {self.PRIMARY_MASTER} list_tables | grep {self.DB}"
        )
        table_id = table_id_output.strip()
        # Extract the table ID from the output using regex
        table_id = re.search(r"\[([0-9a-f]+)\]$", table_id_output).group(1)

        print(f"Table ID: '{table_id}'")

        # Get bootstrap ID
        # Example output:
        # table id: 000034cb000030008000000000004000, CDC bootstrap id: de0b5affa1d0a6acbd42ab48a22e653f
        bootstrap_output = docker_check_output(
            self.PRIMARY_CONTAINER,
            f"./bin/yb-admin --master_addresses {self.PRIMARY_MASTER} bootstrap_cdc_producer {table_id}"
        )
        # Extract the bootstrap ID from the output using regex
        bootstrap_id = re.search(r"CDC bootstrap id: ([0-9a-f]+)", bootstrap_output).group(1)

        print(f"Bootstrap ID: '{bootstrap_id}'")

        # Set up universe replication on the first secondary
        docker_check_output(
            self.SECONDARY_CONTAINERS[0],
            f"./bin/yb-admin --master_addresses {self.SECONDARY_MASTER} setup_universe_replication kv_store_replication {self.PRIMARY_MASTER} {table_id} {bootstrap_id} transactional"
        )

        # Set the secondary cluster to STANDBY mode
        docker_check_output(
            self.SECONDARY_CONTAINERS[0],
            f"./bin/yb-admin --master_addresses {self.SECONDARY_MASTER} change_xcluster_role STANDBY"
        )

        safe_time_info = get_safe_time_info()
        print(safe_time_info)

    def __start_docker_compose(self):
        if self.is_running():
            raise Exception("Docker compose is already running")
        cmd = ["docker", "compose", "up", "--force-recreate"]
        self.__compose_process = Popen(cmd)
        time.sleep(5)
        if not self.is_running():
            raise Exception("Failed to start docker compose")

    def teardown(self):
        """Stop the docker compose process by sending SIGINT (Ctrl+C)."""
        if self.__compose_process:
            self.__compose_process.send_signal(signal.SIGINT)
            print("Sent SIGINT to docker compose. Waiting for it to exit...")
            self.__compose_process.wait()
            self.__compose_process = None


    def is_running(self):
        """Check if the docker compose process is still running.

        Returns:
            bool: True if running, False otherwise
        """
        if not self.__compose_process:
            return False
        return self.__compose_process.poll() is None


    def get_safe_time_info(self):
        """Get cross-cluster safe time info from the secondary cluster.

        Returns:
            dict: Dictionary containing:
                - namespace_id (str): ID of the namespace (e.g. '000034cb000030008000000000000000')
                - namespace_name (str): Name of the namespace (e.g. 'yugabyte')
                - safe_time (str): Safe time in human readable format (e.g. '2025-06-23 21:24:52.764103')
                - safe_time_epoch (str): Safe time in epoch microseconds (e.g. '1750713892764103')
                - safe_time_lag_sec (str): Lag in seconds between clusters (e.g. '0.64')
                - safe_time_skew_sec (str): Clock skew in seconds between clusters (e.g. '0.30')
        """
        safe_time_info = docker_check_output(
            self.SECONDARY_CONTAINERS[0],
            f"./bin/yb-admin --master_addresses {self.SECONDARY_MASTER} get_xcluster_safe_time include_lag_and_skew"
        )
        safe_time_info = json.loads(safe_time_info.strip())

        if len(safe_time_info) != 1:
            raise ValueError(f"Expected exactly 1 safe time info entry, got {len(safe_time_info)}")

        return safe_time_info[0]



In [43]:
def monitor_safe_time_stats(interval_sec=1, duration_sec=None):
    """Monitor safe time lag and skew statistics.

    Args:
        yb (YugabyteDB): YugabyteDB instance to monitor
        interval_sec (int): How often to check stats in seconds
        duration_sec (int, optional): How long to monitor for in seconds. If None, runs indefinitely.
    """
    max_lag = 0
    max_skew = 0
    start_time = time.time()

    try:
        while True:
            if duration_sec and time.time() - start_time > duration_sec:
                break

            info = get_safe_time_info()
            curr_lag = float(info["safe_time_lag_sec"])
            curr_skew = float(info["safe_time_skew_sec"])

            max_lag = max(max_lag, curr_lag)
            max_skew = max(max_skew, abs(curr_skew))

            print(f"Current lag: {curr_lag:.2f}s (max: {max_lag:.2f}s)")
            print(f"Current skew: {curr_skew:.2f}s (max: {max_skew:.2f}s)")
            print("-" * 50)

            time.sleep(interval_sec)

    except KeyboardInterrupt:
        print("\nStopping monitoring...")
        print(f"Final max lag: {max_lag:.2f}s")
        print(f"Final max skew: {max_skew:.2f}s")

monitor_safe_time_stats()

Current lag: 7.14s (max: 7.14s)
Current skew: 7.02s (max: 7.02s)
--------------------------------------------------
Current lag: 8.26s (max: 8.26s)
Current skew: 8.04s (max: 8.04s)
--------------------------------------------------
Current lag: 9.38s (max: 9.38s)
Current skew: 8.97s (max: 8.97s)
--------------------------------------------------
Current lag: 10.49s (max: 10.49s)
Current skew: 10.00s (max: 10.00s)
--------------------------------------------------
Current lag: 11.62s (max: 11.62s)
Current skew: 11.03s (max: 11.03s)
--------------------------------------------------
Current lag: 12.73s (max: 12.73s)
Current skew: 11.96s (max: 11.96s)
--------------------------------------------------
Current lag: 13.84s (max: 13.84s)
Current skew: 12.98s (max: 12.98s)
--------------------------------------------------
Current lag: 14.96s (max: 14.96s)
Current skew: 14.01s (max: 14.01s)
--------------------------------------------------
Current lag: 16.07s (max: 16.07s)
Current skew: 15.9

ValueError: Expected exactly 1 safe time info entry, got 0