In [None]:
from google.cloud import bigquery
import os
import json
import pandas as pd
import numpy as np
from pathlib import Path
import datetime

from importlib import reload

import src.table_stats
reload(src.table_stats)

from src.table_stats import print_stats

# Initialize BigQuery client
client = bigquery.Client()

# Set maximum width for table view
pd.set_option('max_colwidth', 60)
# Set maximum rows for table view
pd.set_option('display.max_rows',200)
pd.options.plotting.backend = "plotly"

DATA_FOLDER = Path(os.getenv("WORKDIR")).joinpath("data")

In [None]:
# Configure query by run id

PROJECT_ID = "symphony-dev-2"
DATASET_ID = "log_dataset_default"
TABLE_ID = "logs-2"
RUN_ID = "test-5000-iter-3"
# RUN_ID = "test-3000-iter-2"
# RUN_ID = "test-fail-2"

In [None]:
QUERY = """
SELECT DISTINCT run from `{project}.{dataset}.{table}`
""".format(
    project = PROJECT_ID,
    dataset = DATASET_ID,
    table = TABLE_ID,
    run_id = RUN_ID
)

query_job = client.query(QUERY)
rows = query_job.result()
df = rows.to_dataframe()
runs = df["run"].to_list()

RUN_ID in runs


In [None]:
QUERY = """
SELECT * from `{project}.{dataset}.{table}`
WHERE run = "{run_id}"
ORDER BY time DESC
-- LIMIT 1000 
-- Optionally limit the query when dealing with too big datasets...
""".format(
    project = PROJECT_ID,
    dataset = DATASET_ID,
    table = TABLE_ID,
    run_id = RUN_ID
)

query_job = client.query(QUERY)
rows = query_job.result()
df = rows.to_dataframe()

# Parse detail json string
df.detail = df.detail.apply(lambda x: json.loads(x) if x is not None else None)

# Sort by time
df = df.set_index("time").sort_index().reset_index()

# Optionally identify when grr changed
df["grr_shift_out"] = df[df.event == "cli:grr_out"].detail != df[df.event == "cli:grr_out"].shift().detail
df["grr_shift_in"] = df[df.event == "cli:grr_in"].detail != df[df.event == "cli:grr_in"].shift().detail

print_stats(df)


In [None]:
TODO: Backpropagate grs_in requestId
TODO: Backpropagate grs_out machineId 
TODO: 

In [None]:
REQUEST_TIMESTAMP = None; RETURN_TIMESTAMP = None

REQUEST_TIMESTAMP = "2025-09-04T12:03:37+00:00"
RETURN_TIMESTAMP = "2025-09-04T12:38:26+00:00"

REQUEST_TIMESTAMP = datetime.datetime.fromisoformat(REQUEST_TIMESTAMP).astimezone(datetime.UTC)
RETURN_TIMESTAMP = datetime.datetime.fromisoformat(RETURN_TIMESTAMP).astimezone(datetime.UTC)


test_folder = DATA_FOLDER.joinpath(RUN_ID)

cloud_hosts_path = test_folder.joinpath("cloud_hosts.csv")

cloud_hosts = None
if os.path.isfile(cloud_hosts_path):
    cloud_hosts_df = pd.read_csv(
        cloud_hosts_path,
        index_col=0,
        converters={
            "releaseTime": datetime.datetime.fromisoformat,
            "launchTime": datetime.datetime.fromisoformat
        },
    )

# Pod Parsing and back propagation

In [None]:
# TODO: Add preemption analysis with `dfc_pivot_preempted` and `cloud_hosts`

# Copy dataframe
dfc = df.copy(deep=True)

# Filter pertinent events
dfc = dfc[
    dfc.event.isin([
            "cli:rm_out",
            "pod:create",
            "pod:scheduled",
            "container:started",
            "node:preempted",
            "cli:grs_out",
            "cli:rrm_in",
            "pod:delete"
    ])
]

# ========= Parse Node Preemption =========

# This transforms the `node:preempted` into multiple `pod:node_preempted` for
# all pods that have been scheduled to the node before its preemption.
# This is latter used to remove lines from the pivoted table

def parse_preemption(row):
    row.pod = df[
        np.logical_and.reduce([
            df.event == "pod:scheduled",
            df.node == row.node,
            df.time <= row.time
        ])
    ].pod.to_list()
    row.event = "pod:node_preempted"
    return row


dfc = pd.concat([
    dfc.loc[~(dfc.event == "node:preempted")],
    dfc[dfc.event == "node:preempted"].apply(
        parse_preemption, axis=1
    ).explode("pod")
]).reset_index(drop=True)

# ========= Parse RM =========

# This transforms a single cli:rm_out into multiple pod:rm
# extracting the pod name

def parse_rm_out(row):
    # Get associated pods
    row.pod = dfc[dfc.sym_request == row.detail["payload"]["requestId"]].pod.to_list()
    # Change event name
    row.event = "cli:rm"
    return row

dfc = pd.concat([
    dfc.loc[~(dfc.event == "cli:rm_out")],
    dfc[dfc.event == "cli:rm_out"].apply(
        parse_rm_out, axis=1
    ).explode("pod")
]).reset_index(drop=True)

# ========= Parse RRM =========

# This transforms a single cli:rrm_in into multiple pod:rrm
# extracting the pod name

def parse_rrm_in(row):
    # Get pod names
    row.pod = [machine["name"] for machine in row.detail["payload"]["machines"]]
    # Change event name
    row.event = "cli:rrm"
    return row

dfc = pd.concat([
    dfc.loc[~(dfc.event == "cli:rrm_in")],
    dfc[dfc.event == "cli:rrm_in"].apply(
        parse_rrm_in, axis=1
    ).explode("pod")
]).reset_index(drop=True)

# ========= Parse GRS Output =========

# This extracts from the `cli:grs_out` when a machine 
# was first recognized as running by Symphony.

def parse_grs_out(row):
    # Extract pods which are running

    machines = [
        machine 
        for request in row.detail["payload"]["requests"]
        for machine in request["machines"]
    ]

    # if len(machines) == 0:
    #     return None

    row.pod = [
        machine["name"] for machine in machines
    ]

    row.detail = [
        machine["status"] for machine in machines
    ]

    row.event = [
        f"cli:grs_{machine["result"]}"  for machine in machines
    ]

    return row

dfc = pd.concat([
    dfc.loc[~(dfc.event == "cli:grs_out")],
    dfc[dfc.event == "cli:grs_out"].apply(
        parse_grs_out, axis=1
    ).explode(["pod","detail", "event"]),
]).reset_index(drop=True)

# ========= Load Cloud Hosts if existing =========

if cloud_hosts is not None:
    cloud_hosts = cloud_hosts_df.copy(deep=True)
    cloud_hosts=cloud_hosts[[
            "hostname", "releaseTime", "launchTime"
    ]]
    cloud_hosts.index.rename("pod", inplace=True)
    cloud_hosts.rename(
        columns = {
            "launchTime": "hf:launched",
            "releaseTime": "hf:released"
        },
        inplace=True
    )
    

# ========= Pivot table =========

# This step pivots the table to have pods as indexes, events as columns
# and values as time. 

dfc_pivot = dfc.pivot_table(
    index="pod",
    columns="event",
    values="time",
    aggfunc=["first", "last"]
)

preempted_pods = None
dfc_pivot_preempted = None
# Remove preempted pod lines
if "pod:node_preempted" in dfc_pivot["first"].columns:
    preempted_pods = dfc_pivot[~dfc_pivot["first"]["pod:node_preempted"].isna()].index
    dfc_pivot_preempted = dfc_pivot.loc[preempted_pods]
    dfc_pivot.drop(
        index=preempted_pods,
        inplace=True
    )

pod_schedule = dfc_pivot.copy(deep=True)

# ========= Calculate deltas =========

# Scaleup Delta
pod_scale_up = pd.DataFrame()
pod_scale_up["cli:rm->pod:create"] = (dfc_pivot["first"]["pod:create"] - dfc_pivot["first"]["cli:rm"]).apply(lambda x: x.total_seconds())
if cloud_hosts is not None:
    pod_scale_up["hf:launched->pod:scheduled"] = (dfc_pivot["last"]["pod:scheduled"] - cloud_hosts["hf:launched"]).apply(lambda x: x.total_seconds())
pod_scale_up["pod:create->pod:scheduled"] = (dfc_pivot["first"]["pod:scheduled"] - dfc_pivot["first"]["pod:create"]).apply(lambda x: x.total_seconds())
# pod_scale_up["container:started->cli:grs_succeed"] = (dfc_pivot["first"]["cli:grs_succeed"] - dfc_pivot["first"]["container:started"]).apply(lambda x: x.total_seconds())
pod_scale_up["pod:create->cli:grs_executing"] = (dfc_pivot["first"]["cli:grs_executing"] - dfc_pivot["first"]["pod:create"]).apply(lambda x: x.total_seconds())
pod_scale_up["pod:schedule->cli:grs_succeed"] = (dfc_pivot["first"]["cli:grs_succeed"] - dfc_pivot["first"]["pod:scheduled"]).apply(lambda x: x.total_seconds())

# Scaledown Delta
pod_scale_down = pd.DataFrame()
pod_scale_down["cli:rrm->pod:delete"] = (dfc_pivot["last"]["pod:delete"] - dfc_pivot["first"]["cli:rrm"]).apply(lambda x: x.total_seconds())

if cloud_hosts is not None:
    pod_scale_down["pod:delete->hf:released"] = (dfc_pivot["last"]["pod:delete"] - cloud_hosts["hf:released"]).apply(lambda x: x.total_seconds())

print(f"Run ID: {RUN_ID}")

if preempted_pods is not None:
    print(f"Number of preempted pods: {len(preempted_pods)}")

pod_scale_down = pod_scale_down.describe(
    percentiles=[
        0.25, 0.5, 0.75, 0.99
    ]
)

pod_scale_up = pod_scale_up.describe(
    percentiles=[
        0.25, 0.5, 0.75, 0.99
    ]
)

print(pod_scale_down)
pod_scale_up

In [None]:
dfc[
    np.logical_and.reduce([
        dfc.pod == "g5c406243-248a-47c3-bd6b-52781a89237e-5ltnp-pod-785",
        # dfc.event=="pod:delete"
    ])
].set_index("time").sort_index()

2025-09-04T12:04:09+00:00	

In [None]:
dfc[dfc.event == "pod:node_preempted"]

In [None]:
df[df.pod == "g5c406243-248a-47c3-bd6b-52781a89237e-5ltnp-pod-1456"]

In [None]:
dfc[dfc.pod == "gb812e8c9-f2b4-4ab4-b356-e473ef841312-74wvg-pod-744"].sort_values(by="time")

In [None]:
df[df.event.isin([
    "cli:grs_in",
    "cli:grs_out",
    "cli:grr_in",
    "cli:grr_out",
    "cli:rrm_in",
    "cli:rrm_out",
])]

In [None]:
df[df.pod == "gb812e8c9-f2b4-4ab4-b356-e473ef841312-74wvg-pod-744"]

In [None]:
dfc[dfc.event == "pod:delete"].time.max()

In [None]:
pods_not_removed, failed_grs

In [None]:
pods_not_removed = set(dfc.pod.unique()) - set(dfc[dfc.event == "cli:rrm"].pod.unique())
failed_grs = dfc[dfc.event == "cli:grs_fail"].pod.dropna().unique()
preempted_pods = dfc_pivot_preempted.index.to_list()

set(failed_grs) - set(preempted_pods)


# Pod Scale Up Plot

In [None]:
pod_scale_up_plot = pd.DataFrame()

pod_scale_up_plot["cli:rm"] = dfc_pivot["first"]["cli:rm"]
pod_scale_up_plot["pod:create"] = dfc_pivot["first"]["pod:create"]
pod_scale_up_plot["cli:grs_executing"] = dfc_pivot["first"]["cli:grs_executing"]
pod_scale_up_plot["pod:scheduled"] = dfc_pivot["first"]["pod:scheduled"]
pod_scale_up_plot["cli:grs_succeed"] = dfc_pivot["first"]["cli:grs_succeed"]

if cloud_hosts is not None:
    pod_scale_up_plot["hf:launched"] = cloud_hosts["hf:launched"]

pod_scale_up_plot = pod_scale_up_plot.melt().set_index("value").sort_index().reset_index()
pod_scale_up_plot["count"] = pod_scale_up_plot.groupby("variable").cumcount()
scale_up_index = pod_scale_up_plot["value"]

pod_scale_up_plot = pod_scale_up_plot.pivot(
    columns="variable",
    values="count"
)

if REQUEST_TIMESTAMP is not None:
    pod_scale_up_plot["time"] = scale_up_index.apply(
        lambda x: (x - REQUEST_TIMESTAMP).total_seconds()
    )
else:
    pod_scale_up_plot["time"] = scale_up_index

pod_scale_up_plot = pod_scale_up_plot.set_index("time")

pod_scale_up_plot.head()


# ============= Configure Plot =============

# Reorder columns
pod_scale_up_plot = pod_scale_up_plot[[
    "cli:rm",
    "pod:create",
    "cli:grs_executing",
    "pod:scheduled",
    "cli:grs_succeed",
    # "hf:launched"
]]

pod_scale_up_plot.rename(
    columns={
        "cli:rm": "CLI Request Machine",
        "pod:create": "Pod Created",
        "cli:grs_executing": "CLI Get Request Status: Executing",
        "pod:scheduled": "Pod Scheduled",
        "cli:grs_succeed": "CLI Get Request Status: Succeded",
        # "hf:launched": "Host Factory API Value: Launched"
    },
    inplace=True
)

pod_scale_up_plot.columns.rename("Event Count", inplace=True)

if REQUEST_TIMESTAMP is not None:
    pod_scale_up_plot.index.rename("Time after API request (seconds)", inplace=True)
else:
    pod_scale_up_plot.index.rename("Timestamp", inplace=True)

pod_scale_up_fig = pod_scale_up_plot.plot(
    kind="scatter",
    title="Pod Scale Up"
)

pod_scale_up_fig.show()


In [None]:
pod_scale_up_fig.write_image(
    file=test_folder.joinpath("pod_scale_up.svg"),
    format="svg",
    width=900,
    height=500,
)

pod_scale_up_fig.write_html(
    file=test_folder.joinpath("pod_scale_up.html"),
)

# Pod Scale Down Plot

In [None]:

pod_scale_down_plot = pd.DataFrame()
pod_scale_down_plot["cli:rrm"] = dfc_pivot["first"]["cli:rrm"]
pod_scale_down_plot["pod:delete"] = dfc_pivot["last"]["pod:delete"]

if cloud_hosts is not None:
    pod_scale_down_plot["hf:released"] = cloud_hosts["hf:released"]

pod_scale_down_plot = pod_scale_down_plot.melt().set_index("value").sort_index().reset_index()
pod_scale_down_plot["count"] = pod_scale_down_plot.groupby("variable").cumcount()
scale_down_index = pod_scale_down_plot["value"]

pod_scale_down_plot = pod_scale_down_plot.pivot(
    columns="variable",
    values="count"
)

if RETURN_TIMESTAMP is not None:
    pod_scale_down_plot["time"] = scale_down_index.apply(
        lambda x: (x - RETURN_TIMESTAMP).total_seconds()
    )
else:
    pod_scale_down_plot["time"] = scale_down_index

pod_scale_down_plot = pod_scale_down_plot.set_index("time")


# ============= Configure Plot =============

# Reorder columns
pod_scale_down_plot = pod_scale_down_plot[[
    "cli:rrm",
    # "hf:released",
    "pod:delete",
]]

pod_scale_down_plot.columns.rename("Event Count", inplace=True)

pod_scale_down_plot.rename(
    columns={
        "cli:rrm": "CLI Request Return Machines",
        "hf:released": "Host Factory API Timestamp: Released",
        "pod:delete": "Pod Deleted"
    },
    inplace=True
)

if REQUEST_TIMESTAMP is not None:
    pod_scale_down_plot.index.rename("Time after return API request (seconds)", inplace=True)
else:
    pod_scale_down_plot.index.rename("Timestamp", inplace=True)

pod_scale_down_fig = pod_scale_down_plot.plot(
    kind="scatter",
    title="Pod Scale Down"
)

pod_scale_down_fig.show()


In [None]:
pod_scale_down_fig.write_image(
    file=test_folder.joinpath("pod_scale_down.svg"),
    format="svg",
    width=900,
    height=500,
)

pod_scale_down_fig.write_html(
    file=test_folder.joinpath("pod_scale_down.html"),
)

# Node General Analysis

In [None]:

dfc = df.copy(deep=True)

# Filter pertinent events
dfc = dfc[
    dfc.event.isin([
            "pod:scheduled",
            "node:preempted",
            "node:create",
            "node:ready_patch",
            "pod:delete",
            "node:delete",
            "pod:create"
    ])
]

first_node_by_pod = dfc.pivot_table(
    index="pod",
    values="node",
    aggfunc="first"
)["node"]

def backprogate_node_in_pod_creation(row):
    row.node = first_node_by_pod.loc[row.pod]
    return row

dfc[dfc.event == "pod:create"] = dfc[dfc.event == "pod:create"].apply(backprogate_node_in_pod_creation, axis=1)

dfc_first = dfc.pivot_table(
    index="node",
    columns="event",
    values="time",
    aggfunc="first"
)

dfc = dfc.pivot_table(
    index="node",
    columns="event",
    values="time",
    aggfunc="last"
).drop(
    columns=[
        "pod:scheduled",
    ]
).join(dfc_first[["pod:scheduled"]])


first_pod_create_timestamp = dfc_pivot["first"]["pod:create"].min()
first_pod_delete_timestamp = dfc_pivot["last"]["pod:delete"].min()

# Removes nodes without pods
nodes_without_pods = dfc[dfc["pod:scheduled"].isna()].index
dfc.drop(index=nodes_without_pods, inplace=True)

# # Remove preempted nodes
if "node:preempted" in dfc.columns:
    preempted_nodes = dfc[~dfc["node:preempted"].isna()].index
    dfc.drop(index=preempted_nodes, inplace=True)

# Calculate delta 
dfc["first-pod-create->node:create"] = (
    dfc["node:create"] - first_pod_create_timestamp
).apply(lambda x: x.total_seconds())

dfc["pod:delete->node:delete"] = (dfc["node:delete"] - dfc["pod:delete"]).apply(lambda x: x.total_seconds())


print(f"Run ID: {RUN_ID}")
print(f"Number of node create events: {(df.event == "node:create").sum()}")
print(f"Number of node preemption events: {(df.event == "node:preempted").sum()}")
print(f"Number of unique nodes preempted: {len(df[df.event == "node:preempted"].node.unique())}")
print(f"Number of unique nodes: {len(df.node.dropna().unique())}")
print(f"Number of nodes without pods scheduled: {len(nodes_without_pods)}")

dfc_desc = dfc.describe(
    percentiles=[
        0.25, 0.5, 0.77, 0.99
    ]
)

dfc_desc

# First (overall pod creation) 
# 

# Node Scale Up Plot

In [None]:

node_scale_up = dfc.copy(deep=True)

node_scale_up = node_scale_up[[
    "node:create",
    # "node:preempted",
    "node:ready_patch",
    "pod:scheduled",
    "pod:create"
]]

node_scale_up = node_scale_up.melt().set_index("value").sort_index().reset_index()

node_scale_up["count"] = node_scale_up.groupby("event").cumcount()

node_scale_up_index = node_scale_up["value"].apply(
    lambda x:  (x - first_pod_create_timestamp).total_seconds()
)

node_scale_up = node_scale_up.pivot(
    columns="event",
    values="count"
)

node_scale_up["time"] = node_scale_up_index
node_scale_up = node_scale_up.set_index("time")

# =========== Format plot ===========

node_scale_up.columns.rename("Event Count", inplace=True)
node_scale_up.index.rename("Time after first pod creation (seconds)", inplace=True)

node_scale_up.rename(
    columns={
        "node:create": "Node Created",
        "pod:scheduled": "First Pod Scheduled",
        "pod:create": "First Pod Created"
    },
    inplace=True
)

node_scale_up_fig = node_scale_up.plot(
    kind="scatter",
    title="Node Scaleup",
)

node_scale_up_fig.show()

In [None]:
node_scale_up_fig.write_image(
    file=test_folder.joinpath("node_scale_up.svg"),
    format="svg",
    width=900,
    height=500,
)

node_scale_up_fig.write_html(
    test_folder.joinpath("node_scale_up.html"),
)

# Node Scale Down Plot

In [None]:
node_scale_down = dfc.copy(deep=True)

node_scale_down = node_scale_down[[
    "pod:delete",
    "node:delete",
    # "node:preempted"
]]

node_scale_down = node_scale_down.melt().set_index("value").sort_index().reset_index()

node_scale_down["count"] = node_scale_down.groupby("event").cumcount()

node_scale_down.head()

In [None]:

node_scale_down_index = node_scale_down["value"].apply(
    lambda x:  (x - first_pod_delete_timestamp).total_seconds()
)

node_scale_down = node_scale_down.pivot(
    columns="event",
    values="count"
)

node_scale_down["time"] = node_scale_down_index
node_scale_down = node_scale_down.set_index("time")

# =========== Format plot ===========

node_scale_down.columns.rename("Event Count", inplace=True)
node_scale_down.index.rename("Time after first intentional pod deletion (seconds)", inplace=True)

node_scale_down = node_scale_down[[
    "pod:delete",
    "node:delete"
]]

node_scale_down.rename(
    columns={
        "pod:delete": "Last pod deleted",
        "node:delete": "Node deleted"
    },
    inplace=True
)

node_scale_down_fig = node_scale_down.plot(
    kind="scatter",
    title="Node Scaledown",
)

node_scale_down_fig.show()


In [None]:
node_scale_down_index

In [None]:
dfc.to_csv(
    test_folder.joinpath("node_timing.csv")
)

dfc_desc.to_csv(
    test_folder.joinpath("node_timing_description.csv")
)

In [None]:
dfc["first-pod-create->node:create"].apply(
    lambda x: x.total_seconds()
).hist(
    nbins=100
)

In [None]:
# For a single pod per node, extract pod associated to node and pivot table 
# To index = pod, rows = time, columns = events 

dfc = df.copy()

# Extract pod name from pod:scheduled for node:ready
dfc.loc[dfc.event == "node:ready","pod"] = dfc[dfc.event == "node:ready"].apply(
    lambda row: 
        dfc[np.logical_and(dfc.event == "pod:scheduled", dfc.node == row.node)].iloc[0].pod,
    axis=1
)

# Pivot table to time 
dfc = dfc[
    ["time", "event", "pod"]
][
    dfc.event.isin(["pod:create", "container:started", "pod:scheduled", "node:ready"])
].pivot_table(
    index="pod",
    columns=["event"],
    values=["time"],
    # Aggregate by first and last apperances
    aggfunc=["first", "last"]
)

# Use only the first time appearance
results = dfc["first"]["time"]

results["create->start"] = results["container:started"] - results["pod:create"]
results["create->scheduled"] = results["pod:scheduled"] - results["pod:create"]
results["scheduled->started"] = results["container:started"] - results["pod:scheduled"]
results["create->node_ready"] = results["node:ready"] - results["pod:create"]
results["node_ready->started"] = results["container:started"] - results["node:ready"]

print(f"Run: {RUN_ID}")
results.describe()



In [None]:
# Apply multiple filters to better visualize data...
# This example requires the pre creation of grr shift 

df[
    np.logical_or.reduce([
        df.event.isin(["node:preempted", "pod:delete","node:deleted","node:ready","cli:grs_in", "cli:grs_out","cli:rrm_in", "cli:rrm_out"]),
        df.grr_shift_out == True,
        df.grr_shift_in == True
    ])
]

In [None]:
# Query specific pod timeline

POD = "gf813d2c6-cfd2-4aee-b035-4ffeb67a4af0-cg8tw-pod-1"
NODE = "gke-cluster-test-0-n2-node-pool-test--1487096a-d446"
ACN = "gf813d2c6-cfd2-4aee-b035-4ffeb67a4af0-cg8tw"

df[
    np.logical_or(
        df.node == NODE,
        df.pod == POD,
        df.acn == ACN
)]

In [None]:
# Save as excel
# (requires removing TZ info)

dfc = df.copy()
dfc.time = dfc.time.apply(lambda x: x.replace(tzinfo=None))
dfc.set_index("time").sort_index().to_excel("base-test-7.xlsx")

