## Analysis of pipeline execution time across different branches

In [None]:
USE_REAL_DATA = False
USE_OLD_PIPELINE_RUNS = False

if USE_REAL_DATA:
    DB_NAME = 'openpath_prod_nrel_commute'
    HISTORICAL_DAYS = [
        # ('2024-11-01', '2024-12-01'),
        # ('2024-12-01', '2025-01-01'),
        # ('2025-01-01', '2025-02-01'),
    ]
    OPCODE = 'nrelop_nrel-commute_fYsciV5vttojUzmWBouIhSjwacQeerdzjc3EauETcVKzc'
    REAL_EXAMPLES = None
else:
    DB_NAME = None
    OPCODE = 'nrelop_dev-emulator-study_0'
    REAL_EXAMPLES = [
        # "shankari_2016-06-20",
        # "shankari_2016-07-25",
        "shankari_2016-07-27",
        # "shankari_2016-08-04",
    ]

BRANCHES = [
    # '47ab8be28a1682f841b3d6a03cbe0f9fe0515e0f',
    # 'vectorized_segmentation',
    # 'remove_invalid_query',
    # 'master',
    # 'section_segmentation_db_optimize',
    # 'batch_overpass'
    'Numpyzation',
    'master'
]

In [2]:
# dependencies for this notebook that aren't already in the 'emission' environment
!pip install plotly
!pip install nbformat

if DB_NAME:
    %env DB_HOST=mongodb://localhost:27017/$DB_NAME
%env MONITOR_DB=True
%env USE_HINTS=True
%env DB_RESULT_LIMIT=5000000

import datetime
import time
import subprocess
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.set_option('display.float_format', '{:.2f}'.format)
import plotly.express as px
import emission.core.get_database as edb
import emission.core.timer as ect
import emission.core.wrapper.user as ecwu
import emission.storage.decorations.stats_queries as esds
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.timeseries.timequery as estt

[0menv: MONITOR_DB=True
env: USE_HINTS=True
env: DB_RESULT_LIMIT=5000000
Config file not found, returning a copy of the environment variables instead...
Retrieved config: {'DB_HOST': None, 'DB_RESULT_LIMIT': '5000000', 'USE_HINTS': 'True', 'MONITOR_DB': 'True'}
Registering DB query monitor
URL not formatted, defaulting to "Stage_database"
Connecting to database URL localhost


In [3]:
STAGES = {
    'USERCACHE': 'moveToLongTerm',
    'USER_INPUT_MATCH_INCOMING': 'match_incoming_user_inputs',
    'ACCURACY_FILTERING': 'filter_accuracy',
    'TRIP_SEGMENTATION': 'segment_current_trips',
    'SECTION_SEGMENTATION': 'segment_current_sections',
    'JUMP_SMOOTHING': 'filter_current_sections',
    'CLEAN_RESAMPLING': 'clean_and_resample',
    'MODE_INFERENCE': 'predict_mode',
    'LABEL_INFERENCE': 'infer_labels',
    'EXPECTATION_POPULATION': 'populate_expectations',
    'CREATE_CONFIRMED_OBJECTS': 'create_confirmed_objects',
    'CREATE_COMPOSITE_OBJECTS': 'create_composite_objects',
    'STORE_PIPELINE_DEPENDENT_USER_STATS': 'get_and_store_pipeline_dependent_user_stats',
}

In [4]:
if REAL_EXAMPLES:
    print(f"Loading {REAL_EXAMPLES} for {OPCODE}")
    !./e-mission-py.bash bin/debug/purge_user.py -e $OPCODE
    for DAY in REAL_EXAMPLES:
        !./e-mission-py.bash bin/debug/load_timeline_for_day_and_user.py emission/tests/data/real_examples/$DAY $OPCODE

def run_intake_on_branch(branch_name, intake_stat_name):
    curr_branch_name_name = subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode('utf-8').strip()
    print(f"Checking out {branch_name} to measure pipeline runtime")
    !git checkout $branch_name
    with ect.Timer() as t:
        if OPCODE:
            !./e-mission-py.bash ./bin/reset_pipeline.py -e $OPCODE
            !./e-mission-py.bash bin/debug/intake_single_user.py -e $OPCODE
        else:
            !./e-mission-py.bash ./bin/reset_pipeline.py --all
            !./e-mission-py.bash bin/intake_multiprocess.py 3
    assert t.elapsed > 1
    end_ts = time.time()
    start_ts = end_ts - t.elapsed
    esds.store_pipeline_time(ecwu.User.fromEmail(OPCODE).uuid if OPCODE else None,
                             intake_stat_name,
                             end_ts,
                             t.elapsed)
    print(f"Ran pipeline on {branch_name} from {start_ts} to {end_ts}")
    print(f"Switching back to {curr_branch_name_name}")
    !git checkout $curr_branch_name_name
    return (start_ts, end_ts)

ts = esta.TimeSeries.get_aggregate_time_series()

pipeline_stats_dfs = []
if USE_REAL_DATA and HISTORICAL_DAYS:
    for (START_DAY, END_DAY) in HISTORICAL_DAYS:
        start_ts = datetime.datetime.strptime(START_DAY, '%Y-%m-%d').timestamp()
        end_ts = datetime.datetime.strptime(END_DAY, '%Y-%m-%d').timestamp()
        tq = estt.TimeQuery('data.ts', start_ts, end_ts)
        df = ts.get_data_df('stats/pipeline_time', time_query=tq).assign(branch=f'{START_DAY} to {END_DAY}')
        pipeline_stats_dfs.append(df)
else:
    for branch_name in BRANCHES:
        commit_hash = subprocess.check_output(['git', 'rev-parse', branch_name]).decode('utf-8').strip()
        intake_stat_name = f'INTAKE/{commit_hash}/{DB_NAME}/{OPCODE or "all"}'
        intake_stats = list(edb.get_timeseries_db().find({ 'data.name': intake_stat_name }).sort('_id', -1).limit(1))
        if USE_OLD_PIPELINE_RUNS and intake_stats:
            start_ts = intake_stats[0]['data']['ts'] - intake_stats[0]['data']['reading']
            end_ts = intake_stats[0]['data']['ts']
            print(f"Using previous pipeline run on {branch_name}: at commit {commit_hash}, from {start_ts} to {end_ts}")
        else:
            (start_ts, end_ts) = run_intake_on_branch(branch_name, intake_stat_name)
        tq = estt.TimeQuery('data.ts', start_ts, end_ts)
        df = ts.get_data_df('stats/pipeline_time', time_query=tq).assign(branch=branch_name)
        pipeline_stats_dfs.append(df)

pipeline_stats_df = pd.concat(pipeline_stats_dfs).reset_index(drop=True)
pipeline_stages_df = pipeline_stats_df[pipeline_stats_df['name'].isin(STAGES.keys())]

def get_stage(row):
    for stage, fn_name in STAGES.items():
        if row['name'].startswith(stage):
            return stage
        if isinstance(row['reading'], str) and fn_name in row['reading']:
            return stage
    return None
pipeline_stats_df['stage'] = pipeline_stats_df.apply(get_stage, axis=1)

Loading ['shankari_2016-07-27'] for nrelop_dev-emulator-study_0
Config file not found, returning a copy of the environment variables instead...
Retrieved config: {'DB_HOST': None, 'DB_RESULT_LIMIT': '5000000', 'USE_HINTS': 'True', 'MONITOR_DB': 'True'}
Registering DB query monitor
URL not formatted, defaulting to "Stage_database"
Connecting to database URL localhost
DEBUG:root:insert called with entry of type <class 'emission.core.wrapper.entry.Entry'>
DEBUG:root:entry was fine, no need to fix it
DEBUG:root:Inserting entry Entry({'_id': ObjectId('6807f89722f5ec7101df87f2'), 'user_id': None, 'metadata': Metadata({'key': 'stats/pipeline_time', 'platform': 'server', 'write_ts': 1745352855.254787, 'time_zone': 'America/Los_Angeles', 'write_local_dt': LocalDate({'year': 2025, 'month': 4, 'day': 22, 'hour': 13, 'minute': 14, 'second': 15, 'weekday': 1, 'timezone': 'America/Los_Angeles'}), 'write_fmt_time': '2025-04-22T13:14:15.254787-07:00'}), 'data': {'name': 'db_call/find', 'ts': 174535285

In [5]:
def plot_runtimes(df, color='branch', pattern_shape=None, x='reading', y='name', barmode='group', title='Pipeline runtimes'):
    if df.empty: return print(f'No {title}')
    
    df['user_id'] = df['user_id'].astype(str)
    fig = px.bar(
        df,
        y=y,
        x=x,
        color=color,
        orientation="h",
        hover_data=['user_id']
    )
    fig.update_layout(
        title=f"{title}<br>({REAL_EXAMPLES or DB_NAME})",
        barmode=barmode,
        legend=dict(yref="container", xanchor="right", x=1, y=0),
    )
    fig.show()


def plot_db_calls_by_stage(db_calls_df, title='DB calls'):
    if db_calls_df.empty: return print(f'No {title}')
    
    db_calls_counts_df = db_calls_df.groupby(['stage', 'name', 'reading'])['branch'].value_counts(sort=False).reset_index(name='count')
    # display(db_calls_counts_df.sort_values(['count'], ascending=False))
    db_calls_counts_df['reading_wrapped'] = db_calls_counts_df['reading'].str.replace(',', '<br>')
    fig = px.bar(
        db_calls_counts_df,
        y='stage',
        x='count',
        color='branch',
        orientation="h",
        hover_data=['name', 'reading_wrapped'],
    )
    fig.update_layout(
        title=f"{title}<br>({REAL_EXAMPLES or DB_NAME})",
        barmode='group',
        xaxis=dict(title="# DB calls"),
        legend=dict(yref="container", xanchor="right", x=1, y=0),
    )
    fig.show()
    return db_calls_counts_df


def plot_db_calls_by_name(db_calls_df, title='DB calls'):
    if db_calls_df.empty: return print(f'No {title}')

    db_calls_counts_df = db_calls_df.groupby(['name', 'reading'])['branch'].value_counts(sort=False).reset_index(name='count')
    db_calls_counts_df['reading_wrapped'] = db_calls_counts_df['reading'].str.replace(',', '<br>')
    fig = px.bar(
        db_calls_counts_df,
        y='name',
        x='count',
        color='branch',
        orientation="h",
        hover_data=['reading_wrapped'],
    )
    fig.update_layout(
        title=f"{title}<br>({REAL_EXAMPLES or DB_NAME})",
        barmode='group',
        xaxis=dict(title="# DB calls"),
        legend=dict(yref="container", xanchor="right", x=1, y=0),
    )
    fig.show()
    return db_calls_counts_df

In [6]:
plot_runtimes(
    pipeline_stages_df,
    y='branch',
    color='branch',
    pattern_shape='name',
    barmode='stack',
    title='Pipeline runtimes, overall',
)

plot_runtimes(
    pipeline_stages_df,
    title='Pipeline runtimes by stage',
)

df = plot_db_calls_by_name(
    pipeline_stats_df[pipeline_stats_df['name'].str.startswith('db_call/')],
    title='DB calls during pipeline (all types)'
)
display(df[(df['name'] == 'db_call/createIndexes') & (df['branch'] == 'master')][['reading', 'count']])


plot_db_calls_by_stage(
    pipeline_stats_df[pipeline_stats_df['name'].str.startswith('db_call/')],
    title='DB calls during pipeline by stage (all types)'
)

NON_INDEX_DB_CMDS = ['find', 'aggregate', 'insert', 'update', 'delete']

plot_db_calls_by_name(
    pipeline_stats_df[
        pipeline_stats_df['name'].isin([f'db_call/{n}' for n in NON_INDEX_DB_CMDS])
    ],
    title=f'DB calls during pipeline ({NON_INDEX_DB_CMDS})'
)

plot_db_calls_by_stage(
    pipeline_stats_df[
        pipeline_stats_df['name'].isin([f'db_call/{n}' for n in NON_INDEX_DB_CMDS])
    ],
    title=f'DB calls during pipeline by stage ({NON_INDEX_DB_CMDS})'
)

No Pipeline runtimes, overall
No Pipeline runtimes by stage


Unnamed: 0,reading,count


Unnamed: 0,stage,name,reading,branch,count,reading_wrapped


In [7]:
for stage in [
    # 'USERCACHE',
    # 'USER_INPUT_MATCH_INCOMING',
    # 'ACCURACY_FILTERING',
    # 'TRIP_SEGMENTATION',
    # 'SECTION_SEGMENTATION',
    'JUMP_SMOOTHING',
    'CLEAN_RESAMPLING',
    # 'MODE_INFERENCE',
    # 'LABEL_INFERENCE',
    # 'EXPECTATION_POPULATION',
    # 'CREATE_CONFIRMED_OBJECTS',
    # 'CREATE_COMPOSITE_OBJECTS',
    # 'STORE_PIPELINE_DEPENDENT_USER_STATS',
]:
    plot_runtimes(
        pipeline_stages_df[pipeline_stages_df['name'] == stage],
        title=f'Pipeline runtimes of {stage}'
    )

    plot_runtimes(
        pipeline_stats_df[pipeline_stats_df['name'].str.match(f'{stage}/')],
        title=f'Substage runtimes of {stage}',
    )

    plot_db_calls_by_stage(
        pipeline_stats_df[
            (pipeline_stats_df['name'].str.startswith('db_call/'))
            & (pipeline_stats_df['reading'].str.contains(STAGES[stage]))
        ],
        title=f'DB calls during {stage} (all types)',
    )

    plot_db_calls_by_stage(
        pipeline_stats_df[
            (pipeline_stats_df['name'].isin([f'db_call/{n}' for n in NON_INDEX_DB_CMDS]))
            & (pipeline_stats_df['reading'].str.contains(STAGES[stage]))
        ],
        title=f'DB calls during {stage} (all types)',
    )

    plot_db_calls_by_name(
        pipeline_stats_df[
            (pipeline_stats_df['name'].str.startswith('db_call/'))
            & (pipeline_stats_df['reading'].str.contains(STAGES[stage]))
        ],
        title=f'DB calls during {stage} (all types)'
    )

    plot_db_calls_by_name(
        pipeline_stats_df[
            (pipeline_stats_df['name'].isin([f'db_call/{n}' for n in NON_INDEX_DB_CMDS]))
            & (pipeline_stats_df['reading'].str.contains(STAGES[stage]))
        ],
        title=f'DB calls during {stage} ({NON_INDEX_DB_CMDS})'
    )


No Pipeline runtimes of JUMP_SMOOTHING
No Substage runtimes of JUMP_SMOOTHING
No DB calls during JUMP_SMOOTHING (all types)
No DB calls during JUMP_SMOOTHING (all types)
No DB calls during JUMP_SMOOTHING (all types)
No DB calls during JUMP_SMOOTHING (['find', 'aggregate', 'insert', 'update', 'delete'])
No Pipeline runtimes of CLEAN_RESAMPLING
No Substage runtimes of CLEAN_RESAMPLING
No DB calls during CLEAN_RESAMPLING (all types)
No DB calls during CLEAN_RESAMPLING (all types)
No DB calls during CLEAN_RESAMPLING (all types)
No DB calls during CLEAN_RESAMPLING (['find', 'aggregate', 'insert', 'update', 'delete'])
