In [None]:
# Import python packages
import pandas as pd
import numpy as np

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

report_data = {}
success = True

#set the run_ids for the evaluation
raw_runids = "2025-11-13-DYA75F9xjI, 2025-11-13-YP0bpvPIXN" 

runids = [item.strip() for item in raw_runids.split(',')]
runid_a= runids[0]
runid_b= runids[1]

#set the performance threshold for the evaluation
perf_threshold_percent = 70


In [None]:
statement = f"""
select *
from GATLING_SQL_HEADERS
where status = 'FAILED'
and gatling_run_id in ('{runid_a}','{runid_b}')
"""
df = session.sql(statement).to_pandas()
num_rows = df.shape[0]



if(num_rows == 0):
    report_data['Failed queries'] = f"{num_rows} queries report as failed for run ids : {runid_a} and {runid_b}"
    report_data['Failed queries']
else:
    success = success and False
    report_data['Failed queries'] = f"{num_rows} queries report as failed for run ids : {runid_a} and {runid_b}"
    report_data['Failed queries']
    df

In [None]:
statement = f"""
SELECT
    COALESCE(a.QUERY_HASH, b.QUERY_HASH) AS query_hash,
    a.QUERY_NAME AS query_name_a,
    a.STATUS AS status_a,
    b.QUERY_NAME AS query_name_b,
    b.STATUS AS status_b,
    CASE
        WHEN a.GATLING_RUN_ID IS NULL THEN 'Failed in B only (Query missing from A)'
        WHEN b.GATLING_RUN_ID IS NULL THEN 'Failed in A only (Query missing from B)'
        WHEN a.STATUS = 'FAILED' AND b.STATUS != 'FAILED' THEN 'Failed in A only'
        WHEN a.STATUS != 'FAILED' AND b.STATUS = 'FAILED' THEN 'Failed in B only'
        ELSE 'Status Mismatch'
    END AS failure_type
FROM
    (SELECT * FROM GATLING_SQL_DETAILS WHERE GATLING_RUN_ID = '{runid_a}') a
FULL OUTER JOIN
    (SELECT * FROM GATLING_SQL_DETAILS WHERE GATLING_RUN_ID = '{runid_b}') b
ON
    a.QUERY_HASH = b.QUERY_HASH
WHERE
    (a.STATUS IS DISTINCT FROM b.STATUS) -- Status is different (e.g., FAILED vs OK, or one is NULL)
    OR (a.GATLING_RUN_ID IS NULL AND b.GATLING_RUN_ID IS NOT NULL) -- Query only exists in B
    OR (b.GATLING_RUN_ID IS NULL AND a.GATLING_RUN_ID IS NOT NULL); -- Query only exists in A
"""

df = session.sql(statement).to_pandas()

num_rows = df.shape[0]

if (num_rows == 0):
    report_data['Failed query details'] = f"If query failures exist they are consistent between runs for run ids : {runid_a} and {runid_b}"
    report_data['Failed query details']
else:
    success = success and False
    report_data['Failed query details'] = f"There are {num_rows} query failure differences between query runs for run ids : {runid_a} and {runid_b}"
    report_data['Failed query details']
    df

In [None]:
statement = f"""
Select gatling_run_id, model, count(query_name) as num_queries 
from gatling_sql_logs
where gatling_run_id in ('{runid_a}', '{runid_b}')
group by 1, 2
"""

df = session.sql(statement).to_pandas()

num_rows = df.shape[0]
assert num_rows == 2, f"Expected 2 rows found {num_rows}"

column_name = 'NUM_QUERIES'

# Get the value from the first row to compare against
expected_value = df[column_name].iloc[0]

are_all_same = (df[column_name] == expected_value).all()

if(are_all_same):
    report_data['Queries by model'] = f"✅ Validated number of query result set rows by model: {expected_value}"
    report_data['Queries by model']
else:
    success = success and False
    report_data['Queries by model'] = f"Result sets contain inconsistent number of rows."
    report_data['Queries by model']
    df

In [None]:
statement = f"""
Select gatling_run_id, model, count(query_name) as num_queries 
from gatling_sql_headers
where gatling_run_id in ('{runid_a}', '{runid_b}')
group by 1, 2
"""

df = df = session.sql(statement).to_pandas()

num_rows = df.shape[0]
assert num_rows == 2, f"Expected 2 rows found {num_rows}"

column_name = 'NUM_QUERIES'

# Get the value from the first row to compare against
expected_value = df[column_name].iloc[0]

are_all_same = (df[column_name] == expected_value).all()

if(are_all_same):
    report_data['Headers by model'] = f"✅ Validated numer of headers by model: {expected_value}"
    report_data['Headers by model']
else:
    success = success and False
    report_data['Headers by model'] = f"Column '{column_name}' contains inconsistent values."
    report_data['Headers by model']
    df

    

In [None]:
statement = f"""
Select gatling_run_id, model, count(query_name) as num_queries 
from gatling_sql_details
where gatling_run_id in ('{runid_a}', '{runid_b}')
group by 1, 2
"""
df = session.sql(statement).to_pandas()

num_rows = df.shape[0]
assert num_rows == 2, f"Expected 2 rows found {num_rows}"

column_name = 'NUM_QUERIES'

# Get the value from the first row to compare against
expected_value = df[column_name].iloc[0]

are_all_same = (df[column_name] == expected_value).all()

if(are_all_same):
    report_data['Details by model'] = f"✅ Validated numer of detail records by model: {expected_value}"
    report_data['Details by model']
else:
    success = success and False
    report_data['Details by model'] = f"Column '{column_name}' contains inconsistent values."
    report_data['Details by model']
    df


In [None]:
results_matching_query = f"""
with
    a as (
        select
            model as model_name,
            query_name,
            query_hash,
            status,
            gatling_session_id,
            rownumber,
            row_hash
        from gatling_sql_details
        where gatling_run_id = '{runid_a}'
        order by model_name, query_name, query_hash, gatling_session_id, rownumber
    ),
    b as (
        select
            model as model_name,
            query_name,
            query_hash,
            status,
            gatling_session_id,
            rownumber,
            row_hash
        from gatling_sql_details
        where gatling_run_id = '{runid_b}'
        order by model_name, query_name, query_hash, gatling_session_id, rownumber
    )
select * from a where not exists (select row_hash from b where a.row_hash = b.row_hash)
    UNION
    select * from b where not exists (select row_hash from a where a.row_hash = b.row_hash)
"""

df = session.sql(results_matching_query)

# Get the row count
row_count = df.count()

if(row_count == 0):
    report_data['Results Matching Test'] = f"✅ Validated results match for run ids : {runid_a} and {runid_b}"
    report_data['Results Matching Test']
else:
    success = success and False
    report_data['Results Matching Test'] = f"Failed there are {row_count} records where results do not match as expected for run ids: {runid_a} and {runid_b}"
    report_data['Results Matching Test']
    df

In [None]:
performance_evaluation_query = f"""
with
    a as (
        select
            run_key,
            model as model_name,
            query_name,
            query_hash,
            status,
            concurrent_users,
            header_duration_ms as duration_ms
        from v_gatling_joined
        where gatling_run_id = '{runid_a}'
    qualify row_number() over (partition by test_name, model, query_hash, concurrent_users order by header_ts desc) = 1
    ),
    b as (
        select
            run_key,
            model as model_name,
            query_name,
            query_hash,
            status,
            concurrent_users,
            header_duration_ms as duration_ms
        from v_gatling_joined
        where gatling_run_id = '{runid_b}'
    qualify row_number() over (partition by test_name, model, query_hash, concurrent_users order by header_ts desc) = 1
    ),
    joined as (
        select
            a.model_name,
            a.query_name,
            a.query_hash,
            a.status,
            a.concurrent_users,
            a.duration_ms as duration_a,
            b.duration_ms as duration_b,
            round(((b.duration_ms - a.duration_ms) / nullif(a.duration_ms, 0)) * 100, 2) as pct_diff
        from a
            join b
        on  a.model_name       = b.model_name
            and a.query_hash       = b.query_hash
    )
select
    model_name,
    query_name,
    query_hash,
    status,
    concurrent_users,
    duration_a,
    duration_b,
    pct_diff,
    case
        when pct_diff > 0 then 'SLOWER'
        when pct_diff < 0 then 'FASTER'
        else 'SAME'
        end as perf_change
from joined
where abs(pct_diff) >= {perf_threshold_percent}
order by abs(pct_diff) desc, query_hash
"""

df = session.sql(performance_evaluation_query)

# Get the row count
row_count = df.count()

if(row_count == 0):
    report_data['Performance Evaluation Test'] = f"✅ Validated performance results for run ids: {runid_a} and {runid_b} are within {perf_threshold_percent} percent"
    report_data['Performance Evaluation Test']
else:
    success = success and False
    report_data['Performance Evaluation Test'] = f"Failed performance results for for run ids: {runid_a} and {runid_b} {row_count} records are not within {perf_threshold_percent} percent as expected"
    report_data['Performance Evaluation Test']
    df

In [None]:
report_data

In [None]:
if success:
    assert(True)
    print("All tests passed")
else:
    print("Check for test failures")
    assert(False)